diff --git a/CHANGES.md b/CHANGES.md index 9f3698432c2..00539f6490f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -24,6 +24,11 @@ Cylc to crash when preparing the job script. ------------------------------------------------------------------------------- ## __cylc-8.2.0 (Coming Soon)__ +### Enhancements + +[#5090](https://github.com/cylc/cylc-flow/pull/5090) - Implement initial and +final graphs, distinct from the main cycling graph. + ### Fixes [#5328](https://github.com/cylc/cylc-flow/pull/5328) - Efficiency improvements to reduce task management overheads on the Scheduler. @@ -73,19 +78,20 @@ workflows with many-to-many dependencies (e.g. ` => `). ### Enhancements +[#5184](https://github.com/cylc/cylc-flow/pull/5184) - Scan for active +runs of the same workflow at install time. + +[#5032](https://github.com/cylc/cylc-flow/pull/5032) - Set a default limit of + [#5229](https://github.com/cylc/cylc-flow/pull/5229) - - Added a single command to validate a previously run workflow against changes to its source and reinstall a workflow. - Allows Cylc commands (including validate, list, view, config, and graph) to load template variables configured by `cylc install` and `cylc play`. -[#5184](https://github.com/cylc/cylc-flow/pull/5184) - scan for active -runs of the same workflow at install time. - [#5121](https://github.com/cylc/cylc-flow/pull/5121) - Added a single command to validate, install and play a workflow. -[#5032](https://github.com/cylc/cylc-flow/pull/5032) - set a default limit of 100 for the "default" queue. [#5055](https://github.com/cylc/cylc-flow/pull/5055) and diff --git a/cylc/flow/config.py b/cylc/flow/config.py index a549c9ed771..fa1c773264d 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -54,6 +54,11 @@ get_sequence, get_sequence_cls, init_cyclers, get_dump_format, INTEGER_CYCLING_TYPE, ISO8601_CYCLING_TYPE ) +from cylc.flow.cycling.nocycle import ( + NocycleSequence, + NOCYCLE_SEQ_ALPHA, + NOCYCLE_SEQ_OMEGA +) from cylc.flow.id import Tokens from cylc.flow.cycling.integer import IntegerInterval from cylc.flow.cycling.iso8601 import ingest_time, ISO8601Interval @@ -270,6 +275,7 @@ def __init__( self.start_point: 'PointBase' self.stop_point: Optional['PointBase'] = None self.final_point: Optional['PointBase'] = None + self.nocycle_sequences: Set['NocycleSequence'] = set() self.sequences: List['SequenceBase'] = [] self.actual_first_point: Optional['PointBase'] = None self._start_point_for_actual_first_point: Optional['PointBase'] = None @@ -618,9 +624,16 @@ def prelim_process_graph(self) -> None: if ( 'cycling mode' not in self.cfg['scheduling'] and self.cfg['scheduling'].get('initial cycle point', '1') == '1' and - all(item in ['graph', '1', 'R1'] for item in graphdict) + all( + item in [ + 'graph', '1', 'R1', + str(NOCYCLE_SEQ_ALPHA), + str(NOCYCLE_SEQ_OMEGA) + ] + for item in graphdict + ) ): - # Pure acyclic graph, assume integer cycling mode with '1' cycle + # Non-cycling graph, assume integer cycling mode with '1' cycle self.cfg['scheduling']['cycling mode'] = INTEGER_CYCLING_TYPE for key in ('initial cycle point', 'final cycle point'): if key not in self.cfg['scheduling']: @@ -2087,15 +2100,24 @@ def load_graph(self): try: seq = get_sequence(section, icp, fcp) except (AttributeError, TypeError, ValueError, CylcError) as exc: - if cylc.flow.flags.verbosity > 1: - traceback.print_exc() - msg = 'Cannot process recurrence %s' % section - msg += ' (initial cycle point=%s)' % icp - msg += ' (final cycle point=%s)' % fcp - if isinstance(exc, CylcError): - msg += ' %s' % exc.args[0] - raise WorkflowConfigError(msg) - self.sequences.append(seq) + try: + seq = NocycleSequence(section) + except ValueError: + if cylc.flow.flags.verbosity > 1: + traceback.print_exc() + msg = ( + f"Cannot process recurrence {section}" + f" (initial cycle point={icp})" + f" (final cycle point={fcp})" + ) + if isinstance(exc, CylcError): + msg += ' %s' % exc.args[0] + raise WorkflowConfigError(msg) + else: + self.nocycle_sequences.add(seq) + else: + self.sequences.append(seq) + parser = GraphParser( family_map, self.parameters, diff --git a/cylc/flow/cycling/__init__.py b/cylc/flow/cycling/__init__.py index 1bb60f916dc..8191fba036b 100644 --- a/cylc/flow/cycling/__init__.py +++ b/cylc/flow/cycling/__init__.py @@ -345,7 +345,11 @@ def TYPE_SORT_KEY(self) -> int: @classmethod @abstractmethod # Note: stacked decorator not strictly enforced in Py2.x def get_async_expr(cls, start_point=0): - """Express a one-off sequence at the initial cycle point.""" + """Express a one-off sequence at the initial cycle point. + + Note "async" has nothing to do with asyncio. It was a (bad) + name for one-off (non-cycling) graphs in early Cylc versions. + """ pass @abstractmethod diff --git a/cylc/flow/cycling/integer.py b/cylc/flow/cycling/integer.py index 749c651fc08..b6bc0e2555c 100644 --- a/cylc/flow/cycling/integer.py +++ b/cylc/flow/cycling/integer.py @@ -21,7 +21,12 @@ import re from cylc.flow.cycling import ( - PointBase, IntervalBase, SequenceBase, ExclusionBase, parse_exclusion, cmp + PointBase, + IntervalBase, + SequenceBase, + ExclusionBase, + parse_exclusion, + cmp ) from cylc.flow.exceptions import ( CylcMissingContextPointError, diff --git a/cylc/flow/cycling/nocycle.py b/cylc/flow/cycling/nocycle.py new file mode 100644 index 00000000000..f5a46bf54c0 --- /dev/null +++ b/cylc/flow/cycling/nocycle.py @@ -0,0 +1,159 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Cycling logic for isolated non-cycling startup and shutdown graphs. +""" + +from cylc.flow.cycling import PointBase, SequenceBase, cmp + +# TODO: scheduler check DB to be sure alpha and omega sections have run or not. + +# cycle point values +NOCYCLE_PT_ALPHA = "alpha" +NOCYCLE_PT_OMEGA = "omega" + +NOCYCLE_POINTS = ( + NOCYCLE_PT_ALPHA, + NOCYCLE_PT_OMEGA +) + +CYCLER_TYPE_NOCYCLE = "nocycle" +CYCLER_TYPE_SORT_KEY_NOCYCLE = 1 + + +class NocyclePoint(PointBase): + """A string-valued point.""" + + TYPE = CYCLER_TYPE_NOCYCLE + TYPE_SORT_KEY = CYCLER_TYPE_SORT_KEY_NOCYCLE + + __slots__ = ('value') + + def __init__(self, value: str) -> None: + if value not in [NOCYCLE_PT_ALPHA, NOCYCLE_PT_OMEGA]: + raise ValueError(f"Illegal Nocycle value {value}") + self.value = value + + def __hash__(self): + return hash(self.value) + + def __eq__(self, other): + return str(other) == self.value + + def __le__(self, other): + """less than or equal only if equal.""" + return str(other) == self.value + + def __lt__(self, other): + """never less than.""" + return False + + def __gt__(self, other): + """never greater than.""" + return False + + def __str__(self): + return self.value + + def _cmp(self, other): + return cmp(int(self), int(other)) + + def add(self, other): + # NOT USED + return None + + def sub(self, other): + # NOT USED + return None + + +class NocycleSequence(SequenceBase): + """A single point sequence.""" + + def __init__(self, dep_section, p_context_start=None, p_context_stop=None): + """Workflow cycling context is ignored.""" + self.point = NocyclePoint(dep_section) + + def __hash__(self): + return hash(str(self.point)) + + def is_valid(self, point): + """Is point on-sequence and in-bounds?""" + return str(point) == self.point + + def get_first_point(self, point): + """First point is the only point""" + return self.point + + def get_next_point(self, point): + """There is no next point""" + return None + + def get_next_point_on_sequence(self, point): + """There is no next point""" + return None + + def __eq__(self, other): + try: + return other.point == self.point + except AttributeError: + # (other is not a nocycle sequence) + return False + + def __str__(self): + return str(self.point) + + def TYPE(self): + raise NotImplementedError + + def TYPE_SORT_KEY(self): + raise NotImplementedError + + def get_async_expr(cls, start_point=0): + raise NotImplementedError + + def get_interval(self): + """Return the cycling interval of this sequence.""" + raise NotImplementedError + + def get_offset(self): + """Deprecated: return the offset used for this sequence.""" + raise NotImplementedError + + def set_offset(self, i_offset): + """Deprecated: alter state to offset the entire sequence.""" + raise NotImplementedError + + def is_on_sequence(self, point): + """Is point on-sequence, disregarding bounds?""" + raise NotImplementedError + + def get_prev_point(self, point): + """Return the previous point < point, or None if out of bounds.""" + raise NotImplementedError + + def get_nearest_prev_point(self, point): + """Return the largest point < some arbitrary point.""" + raise NotImplementedError + + def get_stop_point(self): + """Return the last point in this sequence, or None if unbounded.""" + raise NotImplementedError + + +NOCYCLE_SEQ_ALPHA = NocycleSequence(NOCYCLE_PT_ALPHA) +NOCYCLE_SEQ_OMEGA = NocycleSequence(NOCYCLE_PT_OMEGA) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 4fa19c81870..5a2f620db1a 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -19,6 +19,7 @@ from contextlib import suppress from collections import deque from dataclasses import dataclass +from functools import partial from optparse import Values import os from pathlib import Path @@ -54,6 +55,13 @@ from cylc.flow.broadcast_mgr import BroadcastMgr from cylc.flow.cfgspec.glbl_cfg import glbl_cfg from cylc.flow.config import WorkflowConfig +from cylc.flow.cycling.nocycle import ( + NOCYCLE_POINTS, + NOCYCLE_PT_ALPHA, + NOCYCLE_PT_OMEGA, + NOCYCLE_SEQ_ALPHA, + NOCYCLE_SEQ_OMEGA +) from cylc.flow.data_store_mgr import DataStoreMgr from cylc.flow.id import Tokens from cylc.flow.flow_mgr import FLOW_NONE, FlowMgr, FLOW_NEW @@ -306,6 +314,9 @@ def __init__(self, reg: str, options: Values) -> None: pub_d=os.path.join(self.workflow_run_dir, 'log') ) self.is_restart = Path(self.workflow_db_mgr.pri_path).is_file() + + self.graph_loaders: List[Callable] = [] + # Map used to track incomplete remote inits for restart # {install_target: platform} self.incomplete_ri_map: Dict[str, Dict] = {} @@ -473,17 +484,6 @@ async def configure(self): self.data_store_mgr.initiate_data_model() - self.profiler.log_memory("scheduler.py: before load_tasks") - if self.is_restart: - self._load_pool_from_db() - if self.restored_stop_task_id is not None: - self.pool.set_stop_task(self.restored_stop_task_id) - elif self.options.starttask: - self._load_pool_from_tasks() - else: - self._load_pool_from_point() - self.profiler.log_memory("scheduler.py: after load_tasks") - self.workflow_db_mgr.put_workflow_params(self) self.workflow_db_mgr.put_workflow_template_vars(self.template_vars) self.workflow_db_mgr.put_runtime_inheritance(self.config) @@ -610,16 +610,86 @@ def log_start(self) -> None: extra=RotatingLogFileHandler.header_extra ) + def _get_graph_loaders(self) -> None: + """Get next graphs base on current pool content.""" + # Check pool points in case this is a restart. + # TODO REALLY NEED TO CHECK DB FOR SECTIONS THAT RAN ALREADY. + + points = [p.value for p in self.pool.get_points()] + if self.is_restart and not points: + # Restart with empty pool: only unfinished event handlers. + # No graph to load. + return + + if ( + NOCYCLE_SEQ_OMEGA in self.config.nocycle_sequences + and (not points or NOCYCLE_PT_OMEGA not in points) + ): + # Omega section exists and hasn't started yet. + self.graph_loaders.append( + partial(self.pool.load_nocycle_graph, NOCYCLE_SEQ_OMEGA) + ) + if ( + self.config.sequences + and ( + not points + or ( + not any(p not in NOCYCLE_POINTS for p in points) + and NOCYCLE_PT_OMEGA not in points + ) + ) + ): + # Normal graph exists and hasn't started yet. + if self.options.starttask: + # Cold start from specified tasks. + self.graph_loaders.append(self._load_pool_from_tasks) + else: + # Cold start from cycle point. + self.graph_loaders.append(self._load_pool_from_point) + + if ( + NOCYCLE_SEQ_ALPHA in self.config.nocycle_sequences + and not self.is_restart + ): + # Alpha section exists and hasn't started yet. + # (Never in a restart). + self.graph_loaders.append( + partial(self.pool.load_nocycle_graph, NOCYCLE_SEQ_ALPHA) + ) + + async def run_graphs(self): + self.graph_loaders = [] + if self.is_restart: + # Restart from DB. + self.task_job_mgr.task_remote_mgr.is_restart = True + self.task_job_mgr.task_remote_mgr.rsync_includes = ( + self.config.get_validated_rsync_includes()) + self._load_pool_from_db() + self.restart_remote_init() + # Poll all pollable tasks + self.command_poll_tasks(['*/*']) + # TODO - WHY DOESN'T '*/*' MATCH THE FOLLOWING? + self.command_poll_tasks([f"{NOCYCLE_PT_ALPHA}/*"]) + self.command_poll_tasks([f"{NOCYCLE_PT_OMEGA}/*"]) + + self._get_graph_loaders() + await self.main_loop() + # next graphs depends on content of restart pool + while self.graph_loaders: + (self.graph_loaders.pop())() + await self.main_loop() + elif self.pool.main_pool: + # pool loaded for integration test! + await self.main_loop() + else: + self._get_graph_loaders() + while self.graph_loaders: + (self.graph_loaders.pop())() + await self.main_loop() + async def run_scheduler(self) -> None: """Start the scheduler main loop.""" try: - if self.is_restart: - self.task_job_mgr.task_remote_mgr.is_restart = True - self.task_job_mgr.task_remote_mgr.rsync_includes = ( - self.config.get_validated_rsync_includes()) - self.restart_remote_init() - self.command_poll_tasks(['*/*']) - self.run_event_handlers(self.EVENT_STARTUP, 'workflow starting') await asyncio.gather( *main_loop.get_runners( @@ -628,12 +698,14 @@ async def run_scheduler(self) -> None: self ) ) - self.server.publish_queue.put( - self.data_store_mgr.publish_deltas) + self.server.publish_queue.put(self.data_store_mgr.publish_deltas) + # Non-async sleep - yield to other threads rather than event loop sleep(0) self.profiler.start() - await self.main_loop() + + await self.run_graphs() + LOG.critical("DONE") except SchedulerStop as exc: # deliberate stop @@ -660,7 +732,6 @@ async def run_scheduler(self) -> None: await self.handle_exception(exc) else: - # main loop ends (not used?) await self.shutdown(SchedulerStop(StopMode.AUTO.value)) finally: @@ -703,7 +774,7 @@ async def run(self): def _load_pool_from_tasks(self): """Load task pool with specified tasks, for a new run.""" - LOG.info(f"Start task: {self.options.starttask}") + LOG.info(f"LOADING START TASKS: {self.options.starttask}") # flow number set in this call: self.pool.force_trigger_tasks( self.options.starttask, @@ -722,15 +793,16 @@ def _load_pool_from_point(self): released from runhead.) """ - start_type = ( - "Warm" if self.config.start_point > self.config.initial_point - else "Cold" - ) - LOG.info(f"{start_type} start from {self.config.start_point}") + LOG.info("LOADING MAIN GRAPH") + msg = f"start from {self.config.start_point}" + if self.config.start_point == self.config.initial_point: + msg = "Cold " + msg + LOG.info(msg) self.pool.load_from_point() def _load_pool_from_db(self): """Load task pool from DB, for a restart.""" + LOG.info("LOADING DB FOR RESTART") self.workflow_db_mgr.pri_dao.select_broadcast_states( self.broadcast_mgr.load_db_broadcast_states) self.broadcast_mgr.post_load_db_coerce() @@ -749,6 +821,9 @@ def _load_pool_from_db(self): self.pool.load_db_tasks_to_hold() self.pool.update_flow_mgr() + if self.restored_stop_task_id is not None: + self.pool.set_stop_task(self.restored_stop_task_id) + def restart_remote_init(self): """Remote init for all submitted/running tasks in the pool.""" self.task_job_mgr.task_remote_mgr.is_restart = True @@ -796,11 +871,11 @@ def manage_remote_init(self): install_target] if status == REMOTE_INIT_DONE: self.task_job_mgr.task_remote_mgr.file_install(platform) - if status in [REMOTE_FILE_INSTALL_DONE, - REMOTE_INIT_255, - REMOTE_FILE_INSTALL_255, - REMOTE_INIT_FAILED, - REMOTE_FILE_INSTALL_FAILED]: + elif status in [REMOTE_FILE_INSTALL_DONE, + REMOTE_INIT_255, + REMOTE_FILE_INSTALL_255, + REMOTE_INIT_FAILED, + REMOTE_FILE_INSTALL_FAILED]: # Remove install target self.incomplete_ri_map.pop(install_target) @@ -1645,6 +1720,10 @@ async def main_loop(self) -> None: # Shutdown workflow if timeouts have occurred self.timeout_check() + if self.graph_finished() and self.graph_loaders: + # Return control to load the next graph. + break + # Does the workflow need to shutdown on task failure? await self.workflow_shutdown() @@ -1876,6 +1955,21 @@ def set_stop_clock(self, unix_time): self.workflow_db_mgr.put_workflow_stop_clock_time(self.stop_clock_time) self.update_data_store() + def graph_finished(self): + """Nothing left to run.""" + return not any( + itask for itask in self.pool.get_tasks() + if itask.state( + TASK_STATUS_PREPARING, + TASK_STATUS_SUBMITTED, + TASK_STATUS_RUNNING + ) + or ( + itask.state(TASK_STATUS_WAITING) + and not itask.state.is_runahead + ) + ) + def stop_clock_done(self): """Return True if wall clock stop time reached.""" if self.stop_clock_time is None: @@ -1893,24 +1987,10 @@ def stop_clock_done(self): def check_auto_shutdown(self): """Check if we should shut down now.""" - if self.is_paused: - # Don't if paused. - return False - - if self.check_workflow_stalled(): - return False - - if any( - itask for itask in self.pool.get_tasks() - if itask.state( - TASK_STATUS_PREPARING, - TASK_STATUS_SUBMITTED, - TASK_STATUS_RUNNING - ) - or ( - itask.state(TASK_STATUS_WAITING) - and not itask.state.is_runahead - ) + if ( + self.is_paused or + self.check_workflow_stalled() or + not self.graph_finished() ): # Don't if there are more tasks to run (if waiting and not # runahead, then held, queued, or xtriggered). diff --git a/cylc/flow/scripts/graph.py b/cylc/flow/scripts/graph.py index ce22cfdc2ed..258f5389644 100644 --- a/cylc/flow/scripts/graph.py +++ b/cylc/flow/scripts/graph.py @@ -44,6 +44,7 @@ from typing import Dict, List, Optional, TYPE_CHECKING, Tuple, Callable from cylc.flow.config import WorkflowConfig +from cylc.flow.cycling.nocycle import NOCYCLE_PT_ALPHA from cylc.flow.exceptions import InputError, CylcError from cylc.flow.id import Tokens from cylc.flow.id_cli import parse_id_async @@ -66,10 +67,20 @@ def sort_integer_node(id_): Example: >>> sort_integer_node('11/foo') ('foo', 11) - + >>> sort_integer_node('alpha/foo') + ('foo', 0) + >>> sort_integer_node('omega/foo') + ('foo', 1) """ tokens = Tokens(id_, relative=True) - return (tokens['task'], int(tokens['cycle'])) + try: + return (tokens['task'], int(tokens['cycle'])) + except ValueError: + # nocycle point + if tokens['cycle'] == NOCYCLE_PT_ALPHA: + return (tokens['task'], 0) + else: + return (tokens['task'], 1) def sort_integer_edge(id_): @@ -153,7 +164,7 @@ def _get_graph_nodes_edges( edge_sort = sort_integer_edge else: # datetime sorting - node_sort = None # lexicographically sortable + node_sort = None edge_sort = sort_datetime_edge # get nodes diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 1dd0d5e6c19..b1e69e9b4d5 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -29,6 +29,7 @@ import cylc.flow.flags from cylc.flow import LOG from cylc.flow.cycling.loader import get_point, standardise_point_string +from cylc.flow.cycling.nocycle import NocyclePoint, NOCYCLE_POINTS from cylc.flow.exceptions import WorkflowConfigError, PointParsingError from cylc.flow.id import Tokens, detokenise from cylc.flow.id_cli import contains_fnmatch @@ -167,6 +168,23 @@ def _swap_out(self, itask): self.main_pool[itask.point][itask.identity] = itask self.main_pool_changed = True + def load_nocycle_graph(self, seq): + """blah """ + LOG.info(f"LOADING {seq} GRAPH") + flow_num = self.flow_mgr.get_new_flow(f"original {seq} flow") + self.runahead_limit_point = None + for name in self.config.get_task_name_list(): + tdef = self.config.get_taskdef(name) + if str(seq) not in [str(s) for s in tdef.sequences]: + continue + if tdef.is_parentless(seq.point, seq): + ntask = self._get_spawned_or_merged_task( + seq.point, tdef.name, {flow_num} + ) + if ntask is not None: + self.add_to_pool(ntask) + self.rh_release_and_queue(ntask) + def load_from_point(self): """Load the task pool for the workflow start point. @@ -174,6 +192,8 @@ def load_from_point(self): """ flow_num = self.flow_mgr.get_new_flow( f"original flow from {self.config.start_point}") + + # self.runahead_limit_point = None # reset from nocycle self.compute_runahead() for name in self.config.get_task_name_list(): tdef = self.config.get_taskdef(name) @@ -258,7 +278,7 @@ def release_runahead_tasks(self): Return True if any tasks are released, else False. Call when RH limit changes. """ - if not self.main_pool or not self.runahead_limit_point: + if not self.main_pool: # (At start-up main pool might not exist yet) return False @@ -270,7 +290,12 @@ def release_runahead_tasks(self): itask for point, itask_id_map in self.main_pool.items() for itask in itask_id_map.values() - if point <= self.runahead_limit_point + if ( + self.runahead_limit_point and + point <= self.runahead_limit_point + or + str(point) in NOCYCLE_POINTS + ) if itask.state.is_runahead ] @@ -334,6 +359,10 @@ def compute_runahead(self, force=False) -> bool: ) ): points.append(point) + points = [ + p for p in points + if type(p) is not NocyclePoint # type: ignore + ] if not points: return False base_point = min(points) @@ -436,11 +465,17 @@ def load_db_task_pool_for_restart(self, row_idx, row): (cycle, name, flow_nums, flow_wait, is_manual_submit, is_late, status, is_held, submit_num, _, platform_name, time_submit, time_run, timeout, outputs_str) = row + + if cycle in NOCYCLE_POINTS: + point = NocyclePoint(cycle) + else: + point = get_point(cycle) + try: itask = TaskProxy( self.tokens, self.config.get_taskdef(name), - get_point(cycle), + point, deserialise(flow_nums), status=status, is_held=is_held, @@ -831,6 +866,10 @@ def release_queued_tasks(self): # Note: released and pre_prep_tasks can overlap return list(set(released + pre_prep_tasks)) + def get_points(self): + """Return current list of cycle points in the pool.""" + return list(self.main_pool) + def get_min_point(self): """Return the minimum cycle point currently in the pool.""" cycles = list(self.main_pool) @@ -1269,7 +1308,10 @@ def spawn_on_output(self, itask, output, forced=False): # Add it to the hidden pool or move it to the main pool. self.add_to_pool(t) - if t.point <= self.runahead_limit_point: + if ( + t.point <= self.runahead_limit_point + or str(t.point) in NOCYCLE_POINTS + ): self.rh_release_and_queue(t) # Event-driven suicide. @@ -1383,6 +1425,7 @@ def spawn_on_all_outputs( self.data_store_mgr.delta_task_prerequisite(c_task) self.add_to_pool(c_task) if ( + # TODO NOCYCLE self.runahead_limit_point is not None and c_task.point <= self.runahead_limit_point ): @@ -1875,11 +1918,15 @@ def match_future_tasks( try: point_str = standardise_point_string(point_str) except PointParsingError as exc: - LOG.warning( - f"{id_} - invalid cycle point: {point_str} ({exc})") - unmatched_tasks.append(id_) - continue - point = get_point(point_str) + if point_str in NOCYCLE_POINTS: + point = NocyclePoint(point_str) + else: + LOG.warning( + f"{id_} - invalid cycle point: {point_str} ({exc})") + unmatched_tasks.append(id_) + continue + else: + point = get_point(point_str) taskdef = self.config.taskdefs[name_str] if taskdef.is_valid_point(point): matched_tasks.add((taskdef.name, point)) diff --git a/cylc/flow/taskdef.py b/cylc/flow/taskdef.py index 29e0f7d0de0..61f8062c43c 100644 --- a/cylc/flow/taskdef.py +++ b/cylc/flow/taskdef.py @@ -133,6 +133,7 @@ def __init__(self, name, rtcfg, run_mode, start_point, initial_point): self.initial_point = initial_point self.sequences = [] + self.used_in_offset_trigger = False # some defaults @@ -264,15 +265,19 @@ def check_for_explicit_cycling(self): raise TaskDefError( "No cycling sequences defined for %s" % self.name) - def get_parent_points(self, point): + def get_parent_points(self, point, seq=None): """Return the cycle points of my parents, at point.""" parent_points = set() - for seq in self.sequences: - if not seq.is_valid(point): + if seq: + sequences = [seq] + else: + sequences = self.sequences + for sequence in sequences: + if not sequence.is_valid(point): continue - if seq in self.dependencies: + if sequence in self.dependencies: # task has prereqs in this sequence - for dep in self.dependencies[seq]: + for dep in self.dependencies[sequence]: if dep.suicide: continue for trig in dep.task_triggers: @@ -309,9 +314,12 @@ def is_valid_point(self, point: 'PointBase') -> bool: def first_point(self, icp): """Return the first point for this task.""" + from cylc.flow.cycling.nocycle import NocycleSequence point = None adjusted = [] for seq in self.sequences: + if type(seq) is NocycleSequence: + continue pt = seq.get_first_point(icp) if pt: # may be None if beyond the sequence bounds @@ -333,7 +341,7 @@ def next_point(self, point): p_next = min(adjusted) return p_next - def is_parentless(self, point): + def is_parentless(self, point, seq=None): """Return True if task has no parents at point. Tasks are considered parentless if they have: @@ -352,7 +360,8 @@ def is_parentless(self, point): if self.sequential: # Implicit parents return False - parent_points = self.get_parent_points(point) + + parent_points = self.get_parent_points(point, seq) return ( not parent_points or all(x < self.start_point for x in parent_points) diff --git a/tests/functional/alpha-omega/00-basic.t b/tests/functional/alpha-omega/00-basic.t new file mode 100644 index 00000000000..c753503bf82 --- /dev/null +++ b/tests/functional/alpha-omega/00-basic.t @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- + +# Check basic separate triggering of alpha, omega, and main graphs. +. "$(dirname "$0")/test_header" +set_test_number 3 + +install_and_validate + +reftest_run + +graph_workflow "${WORKFLOW_NAME}" "${WORKFLOW_NAME}.graph" +cmp_ok "${WORKFLOW_NAME}.graph" "$TEST_SOURCE_DIR/${TEST_NAME_BASE}/reference.graph" + +purge diff --git a/tests/functional/alpha-omega/00-basic/.cylcignore b/tests/functional/alpha-omega/00-basic/.cylcignore new file mode 100644 index 00000000000..6ded72ca000 --- /dev/null +++ b/tests/functional/alpha-omega/00-basic/.cylcignore @@ -0,0 +1 @@ +twat diff --git a/tests/functional/alpha-omega/00-basic/flow.cylc b/tests/functional/alpha-omega/00-basic/flow.cylc new file mode 100644 index 00000000000..6e3b45ff681 --- /dev/null +++ b/tests/functional/alpha-omega/00-basic/flow.cylc @@ -0,0 +1,12 @@ + +[scheduling] + cycling mode = integer + final cycle point = 2 + [[graph]] + alpha = "a => b" + omega = "x => y" + R1 = "foo => bar" + P1 = "bar => baz" +[runtime] + [[a, b, x, y]] + [[foo, bar, baz]] diff --git a/tests/functional/alpha-omega/00-basic/reference.graph b/tests/functional/alpha-omega/00-basic/reference.graph new file mode 100644 index 00000000000..8bfc40edead --- /dev/null +++ b/tests/functional/alpha-omega/00-basic/reference.graph @@ -0,0 +1,13 @@ +edge "alpha/a" "alpha/b" +edge "1/bar" "1/baz" +edge "2/bar" "2/baz" +edge "1/foo" "1/bar" +graph +node "alpha/a" "a\nalpha" +node "alpha/b" "b\nalpha" +node "1/bar" "bar\n1" +node "2/bar" "bar\n2" +node "1/baz" "baz\n1" +node "2/baz" "baz\n2" +node "1/foo" "foo\n1" +stop diff --git a/tests/functional/alpha-omega/00-basic/reference.log b/tests/functional/alpha-omega/00-basic/reference.log new file mode 100644 index 00000000000..5d44fb3261f --- /dev/null +++ b/tests/functional/alpha-omega/00-basic/reference.log @@ -0,0 +1,9 @@ +alpha/a -triggered off [] in flow 1 +alpha/b -triggered off ['alpha/a'] in flow 1 +1/foo -triggered off [] in flow 2 +2/bar -triggered off [] in flow 2 +1/bar -triggered off ['1/foo'] in flow 2 +2/baz -triggered off ['2/bar'] in flow 2 +1/baz -triggered off ['1/bar'] in flow 2 +omega/x -triggered off [] in flow 3 +omega/y -triggered off ['omega/x'] in flow 3 diff --git a/tests/functional/alpha-omega/01-restart.t b/tests/functional/alpha-omega/01-restart.t new file mode 100644 index 00000000000..5a40023cb4d --- /dev/null +++ b/tests/functional/alpha-omega/01-restart.t @@ -0,0 +1,34 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- + +# Check restart in alpha, maind, and omega graphs. + +. "$(dirname "$0")/test_header" +set_test_number 5 + +install_and_validate + +SRC_DIR="$TEST_SOURCE_DIR/${TEST_NAME_BASE}" +RUN_DIR="$WORKFLOW_RUN_DIR" + +for RUN in 1 2 3 4; do + cp "${SRC_DIR}/reference.log.$RUN" "${RUN_DIR}/reference.log" + reftest_run "${TEST_NAME_BASE}-${RUN}" +done + +purge diff --git a/tests/functional/alpha-omega/01-restart/flow.cylc b/tests/functional/alpha-omega/01-restart/flow.cylc new file mode 100644 index 00000000000..2140de2eb37 --- /dev/null +++ b/tests/functional/alpha-omega/01-restart/flow.cylc @@ -0,0 +1,23 @@ +[scheduling] + cycling mode = integer + final cycle point = 2 + [[graph]] + alpha = "a => b" + omega = "x => y" + R1 = "foo => bar" + P1 = "bar[-P1] => bar => baz" +[runtime] + [[a]] + script = cylc stop $CYLC_WORKFLOW_ID + [[b]] + [[x]] + script = cylc stop $CYLC_WORKFLOW_ID + [[y]] + [[foo]] + [[bar]] + script = """ + if ((CYLC_TASK_CYCLE_POINT == 1)); then + cylc stop $CYLC_WORKFLOW_ID + fi + """ + [[baz]] diff --git a/tests/functional/alpha-omega/01-restart/reference.log.1 b/tests/functional/alpha-omega/01-restart/reference.log.1 new file mode 100644 index 00000000000..7554e7516e0 --- /dev/null +++ b/tests/functional/alpha-omega/01-restart/reference.log.1 @@ -0,0 +1 @@ +alpha/a -triggered off [] in flow 1 diff --git a/tests/functional/alpha-omega/01-restart/reference.log.2 b/tests/functional/alpha-omega/01-restart/reference.log.2 new file mode 100644 index 00000000000..ae6c028f2fc --- /dev/null +++ b/tests/functional/alpha-omega/01-restart/reference.log.2 @@ -0,0 +1,3 @@ +alpha/b -triggered off ['alpha/a'] in flow 1 +1/foo -triggered off [] in flow 2 +1/bar -triggered off ['0/bar', '1/foo'] in flow 2 diff --git a/tests/functional/alpha-omega/01-restart/reference.log.3 b/tests/functional/alpha-omega/01-restart/reference.log.3 new file mode 100644 index 00000000000..cc9e663fd41 --- /dev/null +++ b/tests/functional/alpha-omega/01-restart/reference.log.3 @@ -0,0 +1,4 @@ +1/baz -triggered off ['1/bar'] in flow 2 +2/bar -triggered off ['1/bar'] in flow 2 +2/baz -triggered off ['2/bar'] in flow 2 +omega/x -triggered off [] in flow 3 diff --git a/tests/functional/alpha-omega/01-restart/reference.log.4 b/tests/functional/alpha-omega/01-restart/reference.log.4 new file mode 100644 index 00000000000..c47f0b3c77f --- /dev/null +++ b/tests/functional/alpha-omega/01-restart/reference.log.4 @@ -0,0 +1 @@ +omega/y -triggered off ['omega/x'] in flow 3 diff --git a/tests/functional/alpha-omega/02-retrigger.t b/tests/functional/alpha-omega/02-retrigger.t new file mode 100644 index 00000000000..1eecf2c5cbd --- /dev/null +++ b/tests/functional/alpha-omega/02-retrigger.t @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- + +# Check manual triggering of alpha and omega graph tasks. + +. "$(dirname "$0")/test_header" +set_test_number 2 +reftest +exit diff --git a/tests/functional/alpha-omega/02-retrigger/flow.cylc b/tests/functional/alpha-omega/02-retrigger/flow.cylc new file mode 100644 index 00000000000..7f1889a9f19 --- /dev/null +++ b/tests/functional/alpha-omega/02-retrigger/flow.cylc @@ -0,0 +1,24 @@ + +# 1. alpha graph completes +# 2. then main graph starts, and triggers alpha/a and omega/x with --flow=none +# 3. then when all tasks finish, omega graph runs + +# TODO alpha and omega graphs need special flow number treatment? + +[scheduling] + [[graph]] + alpha = "a => b" + omega = "x => y" + R1 = "foo => bar => baz" +[runtime] + [[a]] + [[b]] + [[x]] + [[y]] + [[foo]] + [[bar]] + script = """ + cylc trigger --flow=none $CYLC_WORKFLOW_ID//alpha/a + cylc trigger --flow=none $CYLC_WORKFLOW_ID//omega/x + """ + [[baz]] diff --git a/tests/functional/alpha-omega/02-retrigger/reference.log b/tests/functional/alpha-omega/02-retrigger/reference.log new file mode 100644 index 00000000000..807771322ac --- /dev/null +++ b/tests/functional/alpha-omega/02-retrigger/reference.log @@ -0,0 +1,9 @@ +alpha/a -triggered off [] in flow 1 +alpha/b -triggered off ['alpha/a'] in flow 1 +1/foo -triggered off [] in flow 2 +1/bar -triggered off ['1/foo'] in flow 2 +alpha/a -triggered off [] in flow none +omega/x -triggered off [] in flow none +1/baz -triggered off ['1/bar'] in flow 2 +omega/x -triggered off [] in flow 3 +omega/y -triggered off ['omega/x'] in flow 3 diff --git a/tests/functional/alpha-omega/test_header b/tests/functional/alpha-omega/test_header new file mode 120000 index 00000000000..90bd5a36f92 --- /dev/null +++ b/tests/functional/alpha-omega/test_header @@ -0,0 +1 @@ +../lib/bash/test_header \ No newline at end of file diff --git a/tests/integration/utils/flow_tools.py b/tests/integration/utils/flow_tools.py index 2a172a09140..2d86beb48de 100644 --- a/tests/integration/utils/flow_tools.py +++ b/tests/integration/utils/flow_tools.py @@ -69,6 +69,14 @@ def _make_flow( return reg +def _load_graph(sched): + """Get scheduler to load the main graph.""" + if sched.is_restart: + sched._load_pool_from_db() + else: + sched._load_pool_from_point() + + @contextmanager def _make_scheduler(): """Return a scheduler object for a flow registration.""" @@ -106,6 +114,7 @@ async def _start_flow( # exception occurs in Scheduler try: await schd.start() + _load_graph(schd) finally: # After this `yield`, the `with` block of the context manager # is executed: @@ -137,6 +146,7 @@ async def _run_flow( # exception occurs in Scheduler try: await schd.start() + _load_graph(schd) # Do not await as we need to yield control to the main loop: task = asyncio.create_task(schd.run_scheduler()) finally: