diff --git a/cylc/flow/config.py b/cylc/flow/config.py index a085c08df13..5eaac921bee 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -26,9 +26,6 @@ structures. """ -# TODO NOCYCLE GRAPHS: -# - graph and check-circular for startup and shutdown - from contextlib import suppress from copy import copy from fnmatch import fnmatchcase @@ -58,9 +55,9 @@ INTEGER_CYCLING_TYPE, ISO8601_CYCLING_TYPE ) from cylc.flow.cycling.nocycle import ( + NocycleSequence, NOCYCLE_SEQ_ALPHA, - NOCYCLE_SEQ_OMEGA, - NocycleSequence + NOCYCLE_SEQ_OMEGA ) from cylc.flow.id import Tokens from cylc.flow.cycling.integer import IntegerInterval @@ -69,7 +66,6 @@ CylcError, WorkflowConfigError, IntervalParsingError, - SequenceParsingError, TaskDefError, ParamExpandError, InputError @@ -274,8 +270,8 @@ def __init__( self.xtrigger_mgr = xtrigger_mgr self.workflow_polling_tasks = {} # type: ignore # TODO figure out type - self.initial_point: Optional['PointBase'] = None - self.start_point: Optional['PointBase'] = None + self.initial_point: 'PointBase' + self.start_point: 'PointBase' self.stop_point: Optional['PointBase'] = None self.final_point: Optional['PointBase'] = None self.nocycle_sequences: Set['NocycleSequence'] = set() @@ -593,7 +589,11 @@ def prelim_process_graph(self) -> None: 'cycling mode' not in self.cfg['scheduling'] and self.cfg['scheduling'].get('initial cycle point', '1') == '1' and all( - item in ['graph', '1', 'R1', 'startup', 'shutdown'] + item in [ + 'graph', '1', 'R1', + str(NOCYCLE_SEQ_ALPHA), + str(NOCYCLE_SEQ_OMEGA) + ] for item in graphdict ) ): diff --git a/cylc/flow/cycling/integer.py b/cylc/flow/cycling/integer.py index 749c651fc08..b6bc0e2555c 100644 --- a/cylc/flow/cycling/integer.py +++ b/cylc/flow/cycling/integer.py @@ -21,7 +21,12 @@ import re from cylc.flow.cycling import ( - PointBase, IntervalBase, SequenceBase, ExclusionBase, parse_exclusion, cmp + PointBase, + IntervalBase, + SequenceBase, + ExclusionBase, + parse_exclusion, + cmp ) from cylc.flow.exceptions import ( CylcMissingContextPointError, diff --git a/cylc/flow/cycling/nocycle.py b/cylc/flow/cycling/nocycle.py index a5da0c1671b..05c2b184d96 100644 --- a/cylc/flow/cycling/nocycle.py +++ b/cylc/flow/cycling/nocycle.py @@ -18,12 +18,21 @@ Cycling logic for isolated non-cycling startup and shutdown graphs. """ +from cylc.flow.cycling import PointBase, SequenceBase, cmp + +# TODO: scheduler check DB to be sure alpha and omega sections have run or not. + # cycle point values NOCYCLE_PT_ALPHA = "alpha" NOCYCLE_PT_OMEGA = "omega" +NOCYCLE_POINTS = ( + NOCYCLE_PT_ALPHA, + NOCYCLE_PT_OMEGA +) + -class NocyclePoint: +class NocyclePoint(PointBase): """A string-valued point.""" def __init__(self, value: str) -> None: @@ -52,8 +61,27 @@ def __gt__(self, other): def __str__(self): return self.value + def _cmp(self, other): + return cmp(int(self), int(other)) + + def add(self, other): + # NOT USED + return None + + def sub(self, other): + # NOT USED + return None + + def TYPE(self) -> str: + # NOT USED + return self.__class__.__name__ + + def TYPE_SORT_KEY(self) -> int: + # NOT USED + return 0 + -class NocycleSequence: +class NocycleSequence(SequenceBase): """A single point sequence.""" def __init__(self, dep_section, p_context_start=None, p_context_stop=None): @@ -89,6 +117,43 @@ def __eq__(self, other): def __str__(self): return str(self.point) + def TYPE(self) -> str: + raise NotImplementedError + + def TYPE_SORT_KEY(self) -> int: + raise NotImplementedError + + def get_async_expr(cls, start_point=0): + raise NotImplementedError + + def get_interval(self): + """Return the cycling interval of this sequence.""" + raise NotImplementedError + + def get_offset(self): + """Deprecated: return the offset used for this sequence.""" + raise NotImplementedError + + def set_offset(self, i_offset): + """Deprecated: alter state to offset the entire sequence.""" + raise NotImplementedError + + def is_on_sequence(self, point): + """Is point on-sequence, disregarding bounds?""" + raise NotImplementedError + + def get_prev_point(self, point): + """Return the previous point < point, or None if out of bounds.""" + raise NotImplementedError + + def get_nearest_prev_point(self, point): + """Return the largest point < some arbitrary point.""" + raise NotImplementedError + + def get_stop_point(self): + """Return the last point in this sequence, or None if unbounded.""" + raise NotImplementedError + NOCYCLE_SEQ_ALPHA = NocycleSequence(NOCYCLE_PT_ALPHA) NOCYCLE_SEQ_OMEGA = NocycleSequence(NOCYCLE_PT_OMEGA) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index b7ff145a61a..e291c96b038 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -19,6 +19,7 @@ from contextlib import suppress from collections import deque from dataclasses import dataclass +from functools import partial import logging from optparse import Values import os @@ -55,7 +56,9 @@ from cylc.flow.cfgspec.glbl_cfg import glbl_cfg from cylc.flow.config import WorkflowConfig from cylc.flow.cycling.nocycle import ( + NOCYCLE_POINTS, NOCYCLE_PT_ALPHA, + NOCYCLE_PT_OMEGA, NOCYCLE_SEQ_ALPHA, NOCYCLE_SEQ_OMEGA ) @@ -296,7 +299,7 @@ def __init__(self, reg: str, options: Values) -> None: pub_d=os.path.join(self.workflow_run_dir, 'log') ) self.is_restart = Path(self.workflow_db_mgr.pri_path).is_file() - self.next_graphs: List[str] = [] + self.graph_loaders: List[str] = [] async def install(self): """Get the filesystem in the right state to run the flow. @@ -460,40 +463,6 @@ async def configure(self): self.data_store_mgr.initiate_data_model() - self.next_graphs = [] - if self.is_restart: - self.task_job_mgr.task_remote_mgr.is_restart = True - self._load_pool_from_db() - if self.restored_stop_task_id is not None: - self.pool.set_stop_task(self.restored_stop_task_id) - self.next_graphs = self._get_next_graphs() - # self.restart_remote_init() # poll orphaned tasks - - elif self.options.starttask: - self._load_pool_from_tasks() - self.next_graphs = self._get_next_graphs() - - else: - if ( - self.config.start_point == "omega" and - NOCYCLE_SEQ_OMEGA in self.config.nocycle_sequences - ): - self.pool.load_nocycle_graph(NOCYCLE_SEQ_OMEGA) - elif ( - self.config.start_point in - [self.config.initial_point, "alpha"] and - NOCYCLE_SEQ_ALPHA in self.config.nocycle_sequences - ): - self.pool.load_nocycle_graph(NOCYCLE_SEQ_ALPHA) - if self.config.sequences: - self.next_graphs.append("normal") - if NOCYCLE_SEQ_OMEGA in self.config.nocycle_sequences: - self.next_graphs.append("omega") - else: - self._load_pool_from_point() - if NOCYCLE_SEQ_OMEGA in self.config.nocycle_sequences: - self.next_graphs.append("omega") - self.workflow_db_mgr.put_workflow_params(self) self.workflow_db_mgr.put_workflow_template_vars(self.template_vars) self.workflow_db_mgr.put_runtime_inheritance(self.config) @@ -624,28 +593,56 @@ def log_start(self) -> None: extra=RotatingLogFileHandler.header_extra ) - def _get_next_graphs(self): + def _get_graph_loaders(self) -> None: """Get next graphs base on current pool content.""" + # Check pool points in case this is a restart. + # TODO REALLY NEED TO CHECK DB FOR SECTIONS THAT RAN ALREADY. + points = [p.value for p in self.pool.get_points()] - nxt = [] - if points == [NOCYCLE_PT_ALPHA]: - if self.config.sequences: - nxt.append("normal") - if NOCYCLE_SEQ_OMEGA in self.config.nocycle_sequences: - nxt.append("omega") - elif ( - "omega" not in points and + if self.is_restart and not points: + # Restart with empty pool: only unfinished event handlers. + # No graph to load. + return + + if ( NOCYCLE_SEQ_OMEGA in self.config.nocycle_sequences + and (not points or NOCYCLE_PT_OMEGA not in points) + ): + # Omega section exists and hasn't started yet. + self.graph_loaders.append( + partial(self.pool.load_nocycle_graph, NOCYCLE_SEQ_OMEGA) + ) + if ( + self.config.sequences + and ( + not points + or ( + not any(p not in NOCYCLE_POINTS for p in points) + and NOCYCLE_PT_OMEGA not in points + ) + ) ): - nxt.append("omega") - return nxt + # Normal graph exists and hasn't started yet. + if self.options.starttask: + # Cold start from specified tasks. + self.graph_loaders.append(self._load_pool_from_tasks) + else: + # Cold start from cycle point. + self.graph_loaders.append(self._load_pool_from_point) + + if ( + NOCYCLE_SEQ_ALPHA in self.config.nocycle_sequences + and not self.is_restart + ): + # Alpha section exists and hasn't started yet. + # (Never in a restart). + self.graph_loaders.append( + partial(self.pool.load_nocycle_graph, NOCYCLE_SEQ_ALPHA) + ) async def run_scheduler(self): """Start the scheduler main loop.""" try: - if self.is_restart: - self.restart_remote_init() - self.task_job_mgr.task_remote_mgr.is_restart = True self.run_event_handlers(self.EVENT_STARTUP, 'workflow starting') await asyncio.gather( *main_loop.get_runners( @@ -654,42 +651,26 @@ async def run_scheduler(self): self ) ) - self.server.publish_queue.put( - self.data_store_mgr.publish_deltas) + self.server.publish_queue.put(self.data_store_mgr.publish_deltas) + # Non-async sleep - yield to other threads rather than event loop sleep(0) self.profiler.start() - # a = startup; b = cycles; c = shutdown - # - # Load the task pool and start the main loop. - # If cold start from ICP, run [a, b, c]: - # - # If cold start from point: - # - check point and start in [a,b,c] - # - # If restart or cold start from tasks, check all points - # - if multiple points, load as-is - # - # If multiple points: - # - run concurrently - # - control by pausing flows - - await self.main_loop() - if ( - "normal" in self.next_graphs and - self.config.sequences - ): - self.next_graphs.remove("normal") - self._load_pool_from_point() + self.graph_loaders = [] + if self.is_restart: + # Restart from DB. + self.task_job_mgr.task_remote_mgr.is_restart = True + self._load_pool_from_db() + self.restart_remote_init() + # next graphs depends on content of restart pool + self._get_graph_loaders() await self.main_loop() + else: + self._get_graph_loaders() - if ( - "omega" in self.next_graphs and - NOCYCLE_SEQ_OMEGA in self.config.nocycle_sequences - ): - self.next_graphs.remove("omega") - self.pool.load_nocycle_graph(NOCYCLE_SEQ_OMEGA) + while self.graph_loaders: + (self.graph_loaders.pop())() await self.main_loop() except SchedulerStop as exc: @@ -780,10 +761,7 @@ def _load_pool_from_point(self): """ LOG.info("LOADING MAIN GRAPH") msg = f"start from {self.config.start_point}" - if ( - self.config.start_point - in ["alpha" or self.config.initial_point] - ): + if self.config.start_point == self.config.initial_point: msg = "Cold " + msg LOG.info(msg) self.pool.load_from_point() @@ -808,6 +786,9 @@ def _load_pool_from_db(self): self.pool.load_db_tasks_to_hold() self.pool.update_flow_mgr() + if self.restored_stop_task_id is not None: + self.pool.set_stop_task(self.restored_stop_task_id) + def restart_remote_init(self): """Remote init for all submitted/running tasks in the pool.""" self.task_job_mgr.task_remote_mgr.is_restart = True @@ -849,8 +830,8 @@ def restart_remote_init(self): # Poll all pollable tasks self.command_poll_tasks(['*/*']) # TODO - WHY DOESN'T '*/*' MATCH THE FOLLOWING? - self.command_poll_tasks(['startup/*']) - self.command_poll_tasks(['shutdown/*']) + self.command_poll_tasks([f"{NOCYCLE_PT_ALPHA}/*"]) + self.command_poll_tasks([f"{NOCYCLE_PT_OMEGA}/*"]) def _load_task_run_times(self, row_idx, row): """Load run times of previously succeeded task jobs.""" @@ -1696,7 +1677,8 @@ async def main_loop(self) -> None: # Shutdown workflow if timeouts have occurred self.timeout_check() - if self.graph_finished() and self.next_graphs: + if self.graph_finished() and self.graph_loaders: + # Return control to load the next graph. break # Does the workflow need to shutdown on task failure? diff --git a/cylc/flow/scheduler_cli.py b/cylc/flow/scheduler_cli.py index 51865f518e6..5091a1a1400 100644 --- a/cylc/flow/scheduler_cli.py +++ b/cylc/flow/scheduler_cli.py @@ -427,7 +427,6 @@ async def _run(scheduler: Scheduler) -> int: @cli_function(get_option_parser) def play(parser: COP, options: 'Values', id_: str): """Implement cylc play.""" - #from pudebug import go; go() if options.starttask: options.starttask = upgrade_legacy_ids( *options.starttask, diff --git a/cylc/flow/scripts/graph.py b/cylc/flow/scripts/graph.py index 3488177ac17..d9971135950 100644 --- a/cylc/flow/scripts/graph.py +++ b/cylc/flow/scripts/graph.py @@ -43,6 +43,10 @@ from typing import Dict, List, Optional, TYPE_CHECKING, Tuple, Callable from cylc.flow.config import WorkflowConfig +from cylc.flow.cycling.nocycle import ( + NOCYCLE_PT_ALPHA, + NOCYCLE_PT_OMEGA +) from cylc.flow.exceptions import InputError, CylcError from cylc.flow.id import Tokens from cylc.flow.id_cli import parse_id @@ -64,10 +68,20 @@ def sort_integer_node(id_): Example: >>> sort_integer_node('11/foo') ('foo', 11) - + >>> sort_integer_node('alpha/foo') + ('foo', 0) + >>> sort_integer_node('omega/foo') + ('foo', 1) """ tokens = Tokens(id_, relative=True) - return (tokens['task'], int(tokens['cycle'])) + try: + return (tokens['task'], int(tokens['cycle'])) + except ValueError: + # nocycle point + if tokens['cycle'] == NOCYCLE_PT_ALPHA: + return (tokens['task'], 0) + else: + return (tokens['task'], 1) def sort_integer_edge(id_): @@ -87,6 +101,28 @@ def sort_integer_edge(id_): ) +def sort_datetime_node(id_): + """Return sort tokens for nodes with cyclepoints in datetime format. + + Lexicological sort, but tweaked for nocycle graphs. + + Example: + >>> sort_datetime_node('2001/foo') + ('foo', '2001') + >>> sort_datetime_node('alpha/foo') + ('foo', '0') + >>> sort_datetime_node('omega/foo') + ('foo', '9') + """ + tokens = Tokens(id_, relative=True) + if tokens['cycle'] == NOCYCLE_PT_ALPHA: + return (tokens['task'], '0') + elif tokens['cycle'] == NOCYCLE_PT_OMEGA: + return (tokens['task'], '9') + else: + return (tokens['task'], tokens['cycle']) + + def sort_datetime_edge(item): """Return sort tokens for edges with cyclepoints in ISO8601 format. @@ -150,7 +186,7 @@ def _get_graph_nodes_edges( edge_sort = sort_integer_edge else: # datetime sorting - node_sort = None # lexicographically sortable + node_sort = sort_datetime_node edge_sort = sort_datetime_edge # get nodes diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index cb95a29da9f..5c4cd4bdf90 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -26,7 +26,7 @@ import cylc.flow.flags from cylc.flow import LOG from cylc.flow.cycling.loader import get_point, standardise_point_string -from cylc.flow.cycling.nocycle import NocyclePoint +from cylc.flow.cycling.nocycle import NocyclePoint, NOCYCLE_POINTS from cylc.flow.exceptions import WorkflowConfigError, PointParsingError from cylc.flow.id import Tokens, detokenise from cylc.flow.id_cli import contains_fnmatch @@ -444,7 +444,7 @@ def load_db_task_pool_for_restart(self, row_idx, row): (cycle, name, flow_nums, is_late, status, is_held, submit_num, _, platform_name, time_submit, time_run, timeout, outputs_str) = row - if cycle in ("alpha", "omega"): + if cycle in NOCYCLE_POINTS: point = NocyclePoint(cycle) else: point = get_point(cycle) @@ -1278,7 +1278,7 @@ def spawn_on_output(self, itask, output, forced=False): if ( t.point <= self.runahead_limit_point - or str(t.point) in ["alpha", "omega"] + or str(t.point) in NOCYCLE_POINTS ): self.rh_release_and_queue(t) @@ -1875,7 +1875,7 @@ def match_future_tasks( try: point_str = standardise_point_string(point_str) except PointParsingError as exc: - if point_str in ["alpha", "omega"]: + if point_str in NOCYCLE_POINTS: point = NocyclePoint(point_str) else: LOG.warning(