From a53ac82f8fa919fa42a542a503cfa2a15c454a96 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 22 Sep 2022 16:23:16 +1200 Subject: [PATCH] startup and shutdown graphs; needs tidying --- cylc/flow/config.py | 18 ++++- cylc/flow/cycling/__init__.py | 6 +- cylc/flow/cycling/nocycle.py | 87 +++++++++++++++++++++ cylc/flow/scheduler.py | 143 +++++++++++++++++++++++++++------- cylc/flow/task_pool.py | 41 +++++++++- cylc/flow/taskdef.py | 23 ++++-- 6 files changed, 279 insertions(+), 39 deletions(-) create mode 100644 cylc/flow/cycling/nocycle.py diff --git a/cylc/flow/config.py b/cylc/flow/config.py index b52c8fcdb59..2d77f721a5b 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -26,6 +26,10 @@ structures. """ +# TODO NOCYCLE GRAPHS: +# - graph and check-circular for startup and shutdown +# - validation should disallow reuse of task names in the 3 sections + from contextlib import suppress from copy import copy from fnmatch import fnmatchcase @@ -54,6 +58,7 @@ get_sequence, get_sequence_cls, init_cyclers, get_dump_format, INTEGER_CYCLING_TYPE, ISO8601_CYCLING_TYPE ) +from cylc.flow.cycling.nocycle import NOCYCLE_GRAPHS, NocycleSequence from cylc.flow.id import Tokens from cylc.flow.cycling.integer import IntegerInterval from cylc.flow.cycling.iso8601 import ingest_time, ISO8601Interval @@ -61,6 +66,7 @@ CylcError, WorkflowConfigError, IntervalParsingError, + SequenceParsingError, TaskDefError, ParamExpandError, InputError @@ -269,6 +275,7 @@ def __init__( self.start_point: 'PointBase' self.stop_point: Optional['PointBase'] = None self.final_point: Optional['PointBase'] = None + self.nocycle_sequences: Set['NocycleSequence'] = set() self.sequences: List['SequenceBase'] = [] self.actual_first_point: Optional['PointBase'] = None self._start_point_for_actual_first_point: Optional['PointBase'] = None @@ -2054,6 +2061,12 @@ def load_graph(self): for section, graph in sections: try: seq = get_sequence(section, icp, fcp) + except SequenceParsingError: + if section in NOCYCLE_GRAPHS: + seq = NocycleSequence(section) + self.nocycle_sequences.add(seq) + else: + raise except (AttributeError, TypeError, ValueError, CylcError) as exc: if cylc.flow.flags.verbosity > 1: traceback.print_exc() @@ -2063,7 +2076,10 @@ def load_graph(self): if isinstance(exc, CylcError): msg += ' %s' % exc.args[0] raise WorkflowConfigError(msg) - self.sequences.append(seq) + + else: + self.sequences.append(seq) + parser = GraphParser( family_map, self.parameters, diff --git a/cylc/flow/cycling/__init__.py b/cylc/flow/cycling/__init__.py index 8fb08866607..e85a3689e2f 100644 --- a/cylc/flow/cycling/__init__.py +++ b/cylc/flow/cycling/__init__.py @@ -342,7 +342,11 @@ def TYPE_SORT_KEY(self) -> int: @classmethod @abstractmethod # Note: stacked decorator not strictly enforced in Py2.x def get_async_expr(cls, start_point=0): - """Express a one-off sequence at the initial cycle point.""" + """Express a one-off sequence at the initial cycle point. + + Note "async" has nothing to do with asyncio. It was a (bad) + name for one-off (non-cycling) graphs in early Cylc versions. + """ pass @abstractmethod diff --git a/cylc/flow/cycling/nocycle.py b/cylc/flow/cycling/nocycle.py new file mode 100644 index 00000000000..846984ad9c4 --- /dev/null +++ b/cylc/flow/cycling/nocycle.py @@ -0,0 +1,87 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +No-cycle logic for isolated start-up and shutdown graphs. +""" + +NOCYCLE_STARTUP = "startup" +NOCYCLE_SHUTDOWN = "shutdown" + +NOCYCLE_GRAPHS = ( + NOCYCLE_STARTUP, + NOCYCLE_SHUTDOWN +) + + +class NocyclePoint: + """A string-valued no-cycle point.""" + + def __init__(self, value: str) -> None: + if value not in NOCYCLE_GRAPHS: + raise ValueError(f"Illegal Nocycle value {value}") + self.value = value + + def __hash__(self): + return hash(self.value) + + def __eq__(self, other): + return other.value == self.value + + def __le__(self, other): + return str(other) == str(self.value) + + def __lt__(self, other): + return False + + def __gt__(self, other): + return False + + def __str__(self): + return self.value + + +class NocycleSequence: + """A no-cycle sequence is just a point.""" + + def __init__(self, dep_section, p_context_start=None, p_context_stop=None): + """blah""" + self.point = NocyclePoint(dep_section) + + def is_valid(self, point): + """Is point on-sequence and in-bounds?""" + return True + + def get_first_point(self, point): + """blah""" + return None + + def get_next_point_on_sequence(self, point): + """blah""" + return self.point + + def __hash__(self): + return hash(str(self.point)) + + def __eq__(self, other): + return other.point == self.point + + def __str__(self): + return str(self.point) + + +NOCYCLE_STARTUP_SEQUENCE = NocycleSequence(NOCYCLE_STARTUP) +NOCYCLE_SHUTDOWN_SEQUENCE = NocycleSequence(NOCYCLE_SHUTDOWN) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index f9501eb8d81..09e9c0e8bb4 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -54,6 +54,11 @@ from cylc.flow.broadcast_mgr import BroadcastMgr from cylc.flow.cfgspec.glbl_cfg import glbl_cfg from cylc.flow.config import WorkflowConfig +from cylc.flow.cycling.nocycle import ( + NOCYCLE_STARTUP, + NOCYCLE_STARTUP_SEQUENCE, + NOCYCLE_SHUTDOWN_SEQUENCE +) from cylc.flow.data_store_mgr import DataStoreMgr from cylc.flow.id import Tokens from cylc.flow.flow_mgr import FLOW_NONE, FlowMgr, FLOW_NEW @@ -291,6 +296,7 @@ def __init__(self, reg: str, options: Values) -> None: pub_d=os.path.join(self.workflow_run_dir, 'log') ) self.is_restart = Path(self.workflow_db_mgr.pri_path).is_file() + self.next_graphs: List[str] = [] async def install(self): """Get the filesystem in the right state to run the flow. @@ -454,16 +460,39 @@ async def configure(self): self.data_store_mgr.initiate_data_model() - self.profiler.log_memory("scheduler.py: before load_tasks") + self.next_graphs = [] if self.is_restart: + self.task_job_mgr.task_remote_mgr.is_restart = True self._load_pool_from_db() if self.restored_stop_task_id is not None: self.pool.set_stop_task(self.restored_stop_task_id) + self.next_graphs = self._get_next_graphs() + # self.restart_remote_init() # poll orphaned tasks + elif self.options.starttask: self._load_pool_from_tasks() + self.next_graphs = self._get_next_graphs() + else: - self._load_pool_from_point() - self.profiler.log_memory("scheduler.py: after load_tasks") + if ( + self.config.start_point == "shutdown" and + NOCYCLE_SHUTDOWN_SEQUENCE in self.config.nocycle_sequences + ): + self.pool.load_nocycle_graph(NOCYCLE_SHUTDOWN_SEQUENCE) + elif ( + self.config.start_point in + [self.config.initial_point, "startup"] and + NOCYCLE_STARTUP_SEQUENCE in self.config.nocycle_sequences + ): + self.pool.load_nocycle_graph(NOCYCLE_STARTUP_SEQUENCE) + if self.config.sequences: + self.next_graphs.append("normal") + if NOCYCLE_SHUTDOWN_SEQUENCE in self.config.nocycle_sequences: + self.next_graphs.append("shutdown") + else: + self._load_pool_from_point() + if NOCYCLE_SHUTDOWN_SEQUENCE in self.config.nocycle_sequences: + self.next_graphs.append("shutdown") self.workflow_db_mgr.put_workflow_params(self) self.workflow_db_mgr.put_workflow_template_vars(self.template_vars) @@ -595,6 +624,22 @@ def log_start(self) -> None: extra=RotatingLogFileHandler.header_extra ) + def _get_next_graphs(self): + """Get next graphs base on current pool content.""" + points = [p.value for p in self.pool.get_points()] + nxt = [] + if points == [NOCYCLE_STARTUP]: + if self.config.sequences: + nxt.append("normal") + if NOCYCLE_SHUTDOWN_SEQUENCE in self.config.nocycle_sequences: + nxt.append("shutdown") + elif ( + "shutdown" not in points and + NOCYCLE_SHUTDOWN_SEQUENCE in self.config.nocycle_sequences + ): + nxt.append("shutdown") + return nxt + async def run_scheduler(self): """Start the scheduler main loop.""" try: @@ -614,7 +659,38 @@ async def run_scheduler(self): # Non-async sleep - yield to other threads rather than event loop sleep(0) self.profiler.start() + + # a = startup; b = cycles; c = shutdown + # + # Load the task pool and start the main loop. + # If cold start from ICP, run [a, b, c]: + # + # If cold start from point: + # - check point and start in [a,b,c] + # + # If restart or cold start from tasks, check all points + # - if multiple points, load as-is + # + # If multiple points: + # - run concurrently + # - control by pausing flows + await self.main_loop() + if ( + "normal" in self.next_graphs and + self.config.sequences + ): + self.next_graphs.remove("normal") + self._load_pool_from_point() + await self.main_loop() + + if ( + "shutdown" in self.next_graphs and + NOCYCLE_SHUTDOWN_SEQUENCE in self.config.nocycle_sequences + ): + self.next_graphs.remove("shutdown") + self.pool.load_nocycle_graph(NOCYCLE_SHUTDOWN_SEQUENCE) + await self.main_loop() except SchedulerStop as exc: # deliberate stop @@ -641,7 +717,6 @@ async def run_scheduler(self): await self.handle_exception(exc) else: - # main loop ends (not used?) await self.shutdown(SchedulerStop(StopMode.AUTO.value)) finally: @@ -684,7 +759,7 @@ async def run(self): def _load_pool_from_tasks(self): """Load task pool with specified tasks, for a new run.""" - LOG.info(f"Start task: {self.options.starttask}") + LOG.info(f"LOADING START TASKS: {self.options.starttask}") # flow number set in this call: self.pool.force_trigger_tasks( self.options.starttask, @@ -703,15 +778,19 @@ def _load_pool_from_point(self): released from runhead.) """ - start_type = ( - "Warm" if self.config.start_point > self.config.initial_point - else "Cold" - ) - LOG.info(f"{start_type} start from {self.config.start_point}") + LOG.info("LOADING MAIN GRAPH") + msg = f"start from {self.config.start_point}" + if ( + self.config.start_point + in ["startup" or self.config.initial_point] + ): + msg = "Cold " + msg + LOG.info(msg) self.pool.load_from_point() def _load_pool_from_db(self): """Load task pool from DB, for a restart.""" + LOG.info("LOADING DB FOR RESTART") self.workflow_db_mgr.pri_dao.select_broadcast_states( self.broadcast_mgr.load_db_broadcast_states) self.workflow_db_mgr.pri_dao.select_task_job_run_times( @@ -767,7 +846,11 @@ def restart_remote_init(self): sleep(1.0) # Remote init/file-install is done via process pool self.proc_pool.process() + # Poll all pollable tasks self.command_poll_tasks(['*/*']) + # TODO - WHY DOESN'T '*/*' MATCH THE FOLLOWING? + self.command_poll_tasks(['startup/*']) + self.command_poll_tasks(['shutdown/*']) def _load_task_run_times(self, row_idx, row): """Load run times of previously succeeded task jobs.""" @@ -1613,6 +1696,9 @@ async def main_loop(self) -> None: # Shutdown workflow if timeouts have occurred self.timeout_check() + if self.graph_finished() and self.next_graphs: + break + # Does the workflow need to shutdown on task failure? await self.workflow_shutdown() @@ -1837,6 +1923,21 @@ def set_stop_clock(self, unix_time): self.workflow_db_mgr.put_workflow_stop_clock_time(self.stop_clock_time) self.update_data_store() + def graph_finished(self): + """Nothing left to run.""" + return not any( + itask for itask in self.pool.get_tasks() + if itask.state( + TASK_STATUS_PREPARING, + TASK_STATUS_SUBMITTED, + TASK_STATUS_RUNNING + ) + or ( + itask.state(TASK_STATUS_WAITING) + and not itask.state.is_runahead + ) + ) + def stop_clock_done(self): """Return True if wall clock stop time reached.""" if self.stop_clock_time is None: @@ -1854,24 +1955,10 @@ def stop_clock_done(self): def check_auto_shutdown(self): """Check if we should shut down now.""" - if self.is_paused: - # Don't if paused. - return False - - if self.check_workflow_stalled(): - return False - - if any( - itask for itask in self.pool.get_tasks() - if itask.state( - TASK_STATUS_PREPARING, - TASK_STATUS_SUBMITTED, - TASK_STATUS_RUNNING - ) - or ( - itask.state(TASK_STATUS_WAITING) - and not itask.state.is_runahead - ) + if ( + self.is_paused or + self.check_workflow_stalled() or + not self.graph_finished() ): # Don't if there are more tasks to run (if waiting and not # runahead, then held, queued, or xtriggered). diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 2a30b978fd6..5a7453bb059 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -26,6 +26,7 @@ import cylc.flow.flags from cylc.flow import LOG from cylc.flow.cycling.loader import get_point, standardise_point_string +from cylc.flow.cycling.nocycle import NocyclePoint from cylc.flow.exceptions import WorkflowConfigError, PointParsingError from cylc.flow.id import Tokens, detokenise from cylc.flow.id_cli import contains_fnmatch @@ -161,6 +162,23 @@ def _swap_out(self, itask): self.main_pool[itask.point][itask.identity] = itask self.main_pool_changed = True + def load_nocycle_graph(self, seq): + """blah """ + LOG.info(f"LOADING {seq} GRAPH") + flow_num = self.flow_mgr.get_new_flow(f"original {seq} flow") + self.runahead_limit_point = None + for name in self.config.get_task_name_list(): + tdef = self.config.get_taskdef(name) + if str(seq) not in [str(s) for s in tdef.sequences]: + continue + if tdef.is_parentless(seq.point, seq): + ntask = self._get_spawned_or_merged_task( + seq.point, tdef.name, {flow_num} + ) + if ntask is not None: + self.add_to_pool(ntask) + self.rh_release_and_queue(ntask) + def load_from_point(self): """Load the task pool for the workflow start point. @@ -168,6 +186,8 @@ def load_from_point(self): """ flow_num = self.flow_mgr.get_new_flow( f"original flow from {self.config.start_point}") + + # self.runahead_limit_point = None # reset from nocycle self.compute_runahead() for name in self.config.get_task_name_list(): tdef = self.config.get_taskdef(name) @@ -318,6 +338,10 @@ def compute_runahead(self, force=False) -> bool: ) ): points.append(point) + points = [ + p for p in points + if type(p) is not NocyclePoint # type: ignore + ] if not points: return False base_point = min(points) @@ -420,10 +444,15 @@ def load_db_task_pool_for_restart(self, row_idx, row): (cycle, name, flow_nums, is_late, status, is_held, submit_num, _, platform_name, time_submit, time_run, timeout, outputs_str) = row + if cycle in ("startup", "shutdown"): + point = NocyclePoint(cycle) + else: + point = get_point(cycle) + try: itask = TaskProxy( self.config.get_taskdef(name), - get_point(cycle), + point, deserialise(flow_nums), status=status, is_held=is_held, @@ -801,6 +830,10 @@ def release_queued_tasks(self): # Note: released and pre_prep_tasks can overlap return list(set(released + pre_prep_tasks)) + def get_points(self): + """Return current list of cycle points in the pool.""" + return list(self.main_pool) + def get_min_point(self): """Return the minimum cycle point currently in the pool.""" cycles = list(self.main_pool) @@ -1243,7 +1276,10 @@ def spawn_on_output(self, itask, output, forced=False): # Add it to the hidden pool or move it to the main pool. self.add_to_pool(t) - if t.point <= self.runahead_limit_point: + if ( + t.point <= self.runahead_limit_point + or str(t.point) in ["startup", "shutdown"] + ): self.rh_release_and_queue(t) # Event-driven suicide. @@ -1357,6 +1393,7 @@ def spawn_on_all_outputs( self.data_store_mgr.delta_task_prerequisite(c_task) self.add_to_pool(c_task) if ( + # TODO NOCYCLE self.runahead_limit_point is not None and c_task.point <= self.runahead_limit_point ): diff --git a/cylc/flow/taskdef.py b/cylc/flow/taskdef.py index 29e0f7d0de0..61f8062c43c 100644 --- a/cylc/flow/taskdef.py +++ b/cylc/flow/taskdef.py @@ -133,6 +133,7 @@ def __init__(self, name, rtcfg, run_mode, start_point, initial_point): self.initial_point = initial_point self.sequences = [] + self.used_in_offset_trigger = False # some defaults @@ -264,15 +265,19 @@ def check_for_explicit_cycling(self): raise TaskDefError( "No cycling sequences defined for %s" % self.name) - def get_parent_points(self, point): + def get_parent_points(self, point, seq=None): """Return the cycle points of my parents, at point.""" parent_points = set() - for seq in self.sequences: - if not seq.is_valid(point): + if seq: + sequences = [seq] + else: + sequences = self.sequences + for sequence in sequences: + if not sequence.is_valid(point): continue - if seq in self.dependencies: + if sequence in self.dependencies: # task has prereqs in this sequence - for dep in self.dependencies[seq]: + for dep in self.dependencies[sequence]: if dep.suicide: continue for trig in dep.task_triggers: @@ -309,9 +314,12 @@ def is_valid_point(self, point: 'PointBase') -> bool: def first_point(self, icp): """Return the first point for this task.""" + from cylc.flow.cycling.nocycle import NocycleSequence point = None adjusted = [] for seq in self.sequences: + if type(seq) is NocycleSequence: + continue pt = seq.get_first_point(icp) if pt: # may be None if beyond the sequence bounds @@ -333,7 +341,7 @@ def next_point(self, point): p_next = min(adjusted) return p_next - def is_parentless(self, point): + def is_parentless(self, point, seq=None): """Return True if task has no parents at point. Tasks are considered parentless if they have: @@ -352,7 +360,8 @@ def is_parentless(self, point): if self.sequential: # Implicit parents return False - parent_points = self.get_parent_points(point) + + parent_points = self.get_parent_points(point, seq) return ( not parent_points or all(x < self.start_point for x in parent_points)