Skip to content

Commit

Permalink
Merge pull request cylc#5074 from oliver-sanders/upperism
Browse files Browse the repository at this point in the history
play: check Cylc version on restart
  • Loading branch information
hjoliver authored Oct 25, 2022
2 parents 855b2bc f7f6144 commit b0ff549
Show file tree
Hide file tree
Showing 9 changed files with 520 additions and 60 deletions.
190 changes: 159 additions & 31 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 18,15 @@
from ansimarkup import parse as cparse
import asyncio
from functools import lru_cache
from itertools import zip_longest
from pathlib import Path
from shlex import quote
import sys
from typing import TYPE_CHECKING

from cylc.flow import LOG
from pkg_resources import parse_version

from cylc.flow import LOG, __version__
from cylc.flow.exceptions import ServiceFileError
import cylc.flow.flags
from cylc.flow.id import upgrade_legacy_ids
Expand All @@ -44,11 48,17 @@
from cylc.flow.remote import cylc_server_cmd
from cylc.flow.scheduler import Scheduler, SchedulerError
from cylc.flow.scripts.common import cylc_header
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
from cylc.flow.workflow_files import (
SUITERC_DEPR_MSG,
detect_old_contact_file,
SUITERC_DEPR_MSG
get_workflow_srv_dir,
)
from cylc.flow.terminal import (
cli_function,
is_terminal,
prompt,
)
from cylc.flow.terminal import cli_function

if TYPE_CHECKING:
from optparse import Values
Expand Down Expand Up @@ -231,6 241,28 @@ def get_option_parser(add_std_opts: bool = False) -> COP:
action="store_true", default=False, dest="abort_if_any_task_fails"
)

parser.add_option(
'--downgrade',
help=(
'Allow the workflow to be restarted with an older version of Cylc,'
' NOT RECOMMENDED.'
' By default Cylc prevents you from restarting a workflow with an'
' older version of Cylc than it was previously run with.'
' Use this flag to disable this check.'
),
action='store_true',
default=False
)

parser.add_option(
'--upgrade',
help=(
'Allow the workflow to be restarted with an newer version of Cylc.'
),
action='store_true',
default=False
)

if add_std_opts:
# This is for the API wrapper for integration tests. Otherwise (CLI
# use) "standard options" are added later in options.parse_args().
Expand Down Expand Up @@ -290,39 322,24 @@ def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
max_workflows=1,
# warn_depr=False, # TODO
)
try:
detect_old_contact_file(workflow_id)
except ServiceFileError as exc:
print(f"Resuming already-running workflow\n\n{exc}")
pclient = WorkflowRuntimeClient(
workflow_id,
timeout=options.comms_timeout,
)
mutation_kwargs = {
'request_string': RESUME_MUTATION,
'variables': {
'wFlows': [workflow_id]
}
}
pclient('graphql', mutation_kwargs)
sys.exit(0)

# resume the workflow if it is already running
_resume(workflow_id, options)

# check the workflow can be safely restarted with this version of Cylc
db_file = Path(get_workflow_srv_dir(workflow_id), 'db')
if not _version_check(
db_file,
options.upgrade,
options.downgrade,
):
sys.exit(1)

# re-execute on another host if required
_distribute(options.host, workflow_id_raw, workflow_id)

# print the start message
if (
cylc.flow.flags.verbosity > -1
and (options.no_detach or options.format == 'plain')
):
print(
cparse(
cylc_header()
)
)

if cylc.flow.flags.cylc7_back_compat:
LOG.warning(SUITERC_DEPR_MSG)
_print_startup_message(options)

# setup the scheduler
# NOTE: asyncio.run opens an event loop, runs your coro,
Expand Down Expand Up @@ -360,6 377,117 @@ def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
sys.exit(ret)


def _resume(workflow_id, options):
"""Resume the workflow if it is already running."""
try:
detect_old_contact_file(workflow_id)
except ServiceFileError as exc:
print(f"Resuming already-running workflow\n\n{exc}")
pclient = WorkflowRuntimeClient(
workflow_id,
timeout=options.comms_timeout,
)
mutation_kwargs = {
'request_string': RESUME_MUTATION,
'variables': {
'wFlows': [workflow_id]
}
}
pclient('graphql', mutation_kwargs)
sys.exit(0)


def _version_check(
db_file: Path,
can_upgrade: bool,
can_downgrade: bool
) -> bool:
"""Check the workflow can be safely restarted with this version of Cylc."""
if not db_file.is_file():
# not a restart
return True
wdbm = WorkflowDatabaseManager(db_file.parent)
this_version = parse_version(__version__)
last_run_version = wdbm.check_workflow_db_compatibility()

for itt, (this, that) in enumerate(zip_longest(
this_version.release,
last_run_version.release,
fillvalue=-1,
)):
if this < that:
# restart would REDUCE the Cylc version
if can_downgrade:
# permission to downgrade given in CLI flags
LOG.warning(
'Restarting with an older version of Cylc'
f' ({last_run_version} -> {__version__})'
)
return True
print(cparse(
'<red>'
'It is not advisible to restart a workflow with an older'
' version of Cylc than it was previously run with.'
'</red>'

'\n* This workflow was previously run with'
f' <green>{last_run_version}</green>.'
f'\n* This version of Cylc is <red>{__version__}</red>.'

'\nUse --downgrade to disable this check (NOT RECOMMENDED!) or'
' use a more recent version e.g:'
'<blue>'
f'\n$ CYLC_VERSION={last_run_version} {" ".join(sys.argv[1:])}'
'</blue>'
))
return False
elif itt < 2 and this > that:
# restart would INCREASE the Cylc version in a big way
if can_upgrade:
# permission to upgrade given in CLI flags
LOG.warning(
'Restarting with a newer version of Cylc'
f' ({last_run_version} -> {__version__})'
)
return True
print(cparse(
'This workflow was previously run with'
f' <yellow>{last_run_version}</yellow>.'
f'\nThis version of Cylc is <green>{__version__}</green>.'
))
if is_terminal():
# we are in interactive mode, ask the user if this is ok
return prompt(
'Are you sure you want to upgrade from'
f' <yellow>{last_run_version}</yellow>'
' to <green>{__version__}</green>?',
{'y': True, 'n': False},
process=str.lower,
)
# we are in non-interactive mode, abort abort abort
return False
elif itt > 2 and this > that:
# restart would INCREASE the Cylc version in a little way
return True
return True


def _print_startup_message(options):
"""Print the Cylc header including the CLI logo."""
if (
cylc.flow.flags.verbosity > -1
and (options.no_detach or options.format == 'plain')
):
print(
cparse(
cylc_header()
)
)

if cylc.flow.flags.cylc7_back_compat:
LOG.warning(SUITERC_DEPR_MSG)


def _distribute(host, workflow_id_raw, workflow_id):
"""Re-invoke this command on a different host if requested.
Expand Down
43 changes: 43 additions & 0 deletions cylc/flow/terminal.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 43,9 @@
# default grey colour (do not use "dim", it is not sufficiently portable)
DIM = 'fg 248'

# turn input into a global() for testing purposes
input = input # noqa


def is_terminal():
"""Determine if running in (and printing to) a terminal."""
Expand Down Expand Up @@ -285,3 288,43 @@ def wrapper(*api_args: str) -> None:
raise exc from None
return wrapper
return inner


def prompt(message, options, default=None, process=None):
"""Dead simple CLI textual prompting.
Args:
message:
The message to put before the user, don't end this with
punctuation.
options:
The choices the user can pick:
* If this is a list the option selected will be returned.
* If this is a dict the keys are options, the corresponding value
will be returned.
default:
A value to be chosen if the user presses <return> without first
typing anything.
process:
A function to run the user's input through before comparision.
E.G. string.lower.
Returns:
The selected option (if options is a list) else the corresponding
value (if options is a dict).
"""
default_ = ''
if default:
default_ = f'[{default}] '
message = f': {default_}{",".join(options)}?'
usr = None
while usr not in options:
usr = input(f'{message}')
if default is not None and usr == '':
usr = default
if process:
usr = process(usr)
if isinstance(options, dict):
return options[usr]
return usr
15 changes: 7 additions & 8 deletions cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,16 680,10 @@ def restart_check(self) -> None:
"""Check & vacuum the runtime DB for a restart.
Increments the restart number in the DB. Sets self.n_restart.
Raises ServiceFileError if DB is incompatible.
"""
if self.n_restart != 0:
# This will not raise unless the method is mistakenly called twice
raise RuntimeError("restart check must only happen once")
try:
self.check_workflow_db_compatibility()
except ServiceFileError as exc:
raise ServiceFileError(f"Cannot restart - {exc}")
with self.get_pri_dao() as pri_dao:
pri_dao.vacuum()
self.n_restart = pri_dao.select_workflow_params_restart_count() 1
Expand Down Expand Up @@ -728,6 722,10 @@ def upgrade_pre_803(self, pri_dao: CylcWorkflowDAO) -> None:
)
conn.commit()

def upgrade(self, last_run_ver, pri_dao):
if last_run_ver < parse_version("8.0.3.dev"):
self.upgrade_pre_803(pri_dao)

def check_workflow_db_compatibility(self):
"""Raises ServiceFileError if the existing workflow database is
incompatible with the current version of Cylc."""
Expand Down Expand Up @@ -757,5 755,6 @@ def check_workflow_db_compatibility(self):
f"Cylc {last_run_ver})."
f"\n{manual_rm_msg}"
)
if last_run_ver < parse_version("8.0.3.dev"):
self.upgrade_pre_803(pri_dao)
self.upgrade(last_run_ver, pri_dao)

return last_run_ver
2 changes: 1 addition & 1 deletion tests/functional/job-submission/01-job-nn-localhost.t
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 26,7 @@ mkdir -p "${WORKFLOW_RUN_DIR}/.service/"
sqlite3 "${WORKFLOW_RUN_DIR}/.service/db" <'db.sqlite3'

workflow_run_ok "${TEST_NAME_BASE}-restart" \
cylc play --reference-test --debug --no-detach "${WORKFLOW_NAME}"
cylc play --reference-test --upgrade --debug --no-detach "${WORKFLOW_NAME}"

purge
exit
2 changes: 1 addition & 1 deletion tests/functional/job-submission/02-job-nn-remote-host.t
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 25,7 @@ run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
mkdir -p "${WORKFLOW_RUN_DIR}/.service/"
sqlite3 "${WORKFLOW_RUN_DIR}/.service/db" <'db.sqlite3'
workflow_run_ok "${TEST_NAME_BASE}-restart" \
cylc play --reference-test --debug --no-detach \
cylc play --upgrade --reference-test --debug --no-detach \
-s "CYLC_TEST_PLATFORM='${CYLC_TEST_PLATFORM}'" "${WORKFLOW_NAME}"

purge
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/restart/57-ghost-job.t
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 54,7 @@ _args_

run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"

workflow_run_ok "${TEST_NAME_BASE}-restart" cylc play -vv "${WORKFLOW_NAME}" --pause
workflow_run_ok "${TEST_NAME_BASE}-restart" cylc play --upgrade -vv "${WORKFLOW_NAME}" --pause

# There will be 1 ghost job in DB:
TEST_NAME="${TEST_NAME_BASE}-db-query-1"
Expand All @@ -79,7 79,7 @@ cmp_json "${TEST_NAME}-cmp" "${TEST_NAME}.stdout" << EOF
}
EOF

workflow_run_ok "${TEST_NAME_BASE}-resume" cylc play "${WORKFLOW_NAME}"
workflow_run_ok "${TEST_NAME_BASE}-resume" cylc play --upgrade "${WORKFLOW_NAME}"
poll_workflow_stopped

# Job should have been replaced in DB with same submit num:
Expand Down
Loading

0 comments on commit b0ff549

Please sign in to comment.