Skip to content

Commit

Permalink
Switch to alpha and omega.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Sep 23, 2022
1 parent 129c2e3 commit 7a62450
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 71 deletions.
35 changes: 19 additions & 16 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 28,6 @@

# TODO NOCYCLE GRAPHS:
# - graph and check-circular for startup and shutdown
# - validation should disallow reuse of task names in the 3 sections

from contextlib import suppress
from copy import copy
Expand Down Expand Up @@ -58,7 57,11 @@
get_sequence, get_sequence_cls, init_cyclers, get_dump_format,
INTEGER_CYCLING_TYPE, ISO8601_CYCLING_TYPE
)
from cylc.flow.cycling.nocycle import NOCYCLE_GRAPHS, NocycleSequence
from cylc.flow.cycling.nocycle import (
NOCYCLE_SEQ_ALPHA,
NOCYCLE_SEQ_OMEGA,
NocycleSequence
)
from cylc.flow.id import Tokens
from cylc.flow.cycling.integer import IntegerInterval
from cylc.flow.cycling.iso8601 import ingest_time, ISO8601Interval
Expand Down Expand Up @@ -2064,22 2067,22 @@ def load_graph(self):
for section, graph in sections:
try:
seq = get_sequence(section, icp, fcp)
except SequenceParsingError:
if section in NOCYCLE_GRAPHS:
except (AttributeError, TypeError, ValueError, CylcError) as exc:
try:
seq = NocycleSequence(section)
self.nocycle_sequences.add(seq)
except ValueError:
if cylc.flow.flags.verbosity > 1:
traceback.print_exc()
msg = (
f"Cannot process recurrence {section}"
f" (initial cycle point={icp}"
f" (final cycle point={fcp}"
)
if isinstance(exc, CylcError):
msg = ' %s' % exc.args[0]
raise WorkflowConfigError(msg)
else:
raise
except (AttributeError, TypeError, ValueError, CylcError) as exc:
if cylc.flow.flags.verbosity > 1:
traceback.print_exc()
msg = 'Cannot process recurrence %s' % section
msg = ' (initial cycle point=%s)' % icp
msg = ' (final cycle point=%s)' % fcp
if isinstance(exc, CylcError):
msg = ' %s' % exc.args[0]
raise WorkflowConfigError(msg)

self.nocycle_sequences.add(seq)
else:
self.sequences.append(seq)

Expand Down
55 changes: 31 additions & 24 deletions cylc/flow/cycling/nocycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,73 15,80 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""
No-cycle logic for isolated start-up and shutdown graphs.
Cycling logic for isolated non-cycling startup and shutdown graphs.
"""

NOCYCLE_STARTUP = "startup"
NOCYCLE_SHUTDOWN = "shutdown"

NOCYCLE_GRAPHS = (
NOCYCLE_STARTUP,
NOCYCLE_SHUTDOWN
)
# cycle point values
NOCYCLE_PT_ALPHA = "alpha"
NOCYCLE_PT_OMEGA = "omega"


class NocyclePoint:
"""A string-valued no-cycle point."""
"""A string-valued point."""

def __init__(self, value: str) -> None:
if value not in NOCYCLE_GRAPHS:
if value not in [NOCYCLE_PT_ALPHA, NOCYCLE_PT_OMEGA]:
raise ValueError(f"Illegal Nocycle value {value}")
self.value = value

def __hash__(self):
return hash(self.value)

def __eq__(self, other):
return other.value == self.value
return str(other) == self.value

def __le__(self, other):
return str(other) == str(self.value)
"""less than or equal only if equal."""
return str(other) == self.value

def __lt__(self, other):
"""never less than."""
return False

def __gt__(self, other):
"""never greater than."""
return False

def __str__(self):
return self.value


class NocycleSequence:
"""A no-cycle sequence is just a point."""
"""A single point sequence."""

def __init__(self, dep_section, p_context_start=None, p_context_stop=None):
"""blah"""
"""Workflow cycling context is ignored."""
self.point = NocyclePoint(dep_section)

def __hash__(self):
return hash(str(self.point))

def is_valid(self, point):
"""Is point on-sequence and in-bounds?"""
return True
return str(point) == self.point

def get_first_point(self, point):
"""blah"""
"""First point is the only point"""
return self.point

def get_next_point(self, point):
"""There is no next point"""
return None

def get_next_point_on_sequence(self, point):
"""blah"""
return self.point

def __hash__(self):
return hash(str(self.point))
"""There is no next point"""
return None

def __eq__(self, other):
return other.point == self.point
try:
return other.point == self.point
except AttributeError:
# (other is not a nocycle sequence)
return False

def __str__(self):
return str(self.point)


NOCYCLE_STARTUP_SEQUENCE = NocycleSequence(NOCYCLE_STARTUP)
NOCYCLE_SHUTDOWN_SEQUENCE = NocycleSequence(NOCYCLE_SHUTDOWN)
NOCYCLE_SEQ_ALPHA = NocycleSequence(NOCYCLE_PT_ALPHA)
NOCYCLE_SEQ_OMEGA = NocycleSequence(NOCYCLE_PT_OMEGA)
48 changes: 24 additions & 24 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 55,9 @@
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.config import WorkflowConfig
from cylc.flow.cycling.nocycle import (
NOCYCLE_STARTUP,
NOCYCLE_STARTUP_SEQUENCE,
NOCYCLE_SHUTDOWN_SEQUENCE
NOCYCLE_PT_ALPHA,
NOCYCLE_SEQ_ALPHA,
NOCYCLE_SEQ_OMEGA
)
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.id import Tokens
Expand Down Expand Up @@ -475,24 475,24 @@ async def configure(self):

else:
if (
self.config.start_point == "shutdown" and
NOCYCLE_SHUTDOWN_SEQUENCE in self.config.nocycle_sequences
self.config.start_point == "omega" and
NOCYCLE_SEQ_OMEGA in self.config.nocycle_sequences
):
self.pool.load_nocycle_graph(NOCYCLE_SHUTDOWN_SEQUENCE)
self.pool.load_nocycle_graph(NOCYCLE_SEQ_OMEGA)
elif (
self.config.start_point in
[self.config.initial_point, "startup"] and
NOCYCLE_STARTUP_SEQUENCE in self.config.nocycle_sequences
[self.config.initial_point, "alpha"] and
NOCYCLE_SEQ_ALPHA in self.config.nocycle_sequences
):
self.pool.load_nocycle_graph(NOCYCLE_STARTUP_SEQUENCE)
self.pool.load_nocycle_graph(NOCYCLE_SEQ_ALPHA)
if self.config.sequences:
self.next_graphs.append("normal")
if NOCYCLE_SHUTDOWN_SEQUENCE in self.config.nocycle_sequences:
self.next_graphs.append("shutdown")
if NOCYCLE_SEQ_OMEGA in self.config.nocycle_sequences:
self.next_graphs.append("omega")
else:
self._load_pool_from_point()
if NOCYCLE_SHUTDOWN_SEQUENCE in self.config.nocycle_sequences:
self.next_graphs.append("shutdown")
if NOCYCLE_SEQ_OMEGA in self.config.nocycle_sequences:
self.next_graphs.append("omega")

self.workflow_db_mgr.put_workflow_params(self)
self.workflow_db_mgr.put_workflow_template_vars(self.template_vars)
Expand Down Expand Up @@ -628,16 628,16 @@ def _get_next_graphs(self):
"""Get next graphs base on current pool content."""
points = [p.value for p in self.pool.get_points()]
nxt = []
if points == [NOCYCLE_STARTUP]:
if points == [NOCYCLE_PT_ALPHA]:
if self.config.sequences:
nxt.append("normal")
if NOCYCLE_SHUTDOWN_SEQUENCE in self.config.nocycle_sequences:
nxt.append("shutdown")
if NOCYCLE_SEQ_OMEGA in self.config.nocycle_sequences:
nxt.append("omega")
elif (
"shutdown" not in points and
NOCYCLE_SHUTDOWN_SEQUENCE in self.config.nocycle_sequences
"omega" not in points and
NOCYCLE_SEQ_OMEGA in self.config.nocycle_sequences
):
nxt.append("shutdown")
nxt.append("omega")
return nxt

async def run_scheduler(self):
Expand Down Expand Up @@ -685,11 685,11 @@ async def run_scheduler(self):
await self.main_loop()

if (
"shutdown" in self.next_graphs and
NOCYCLE_SHUTDOWN_SEQUENCE in self.config.nocycle_sequences
"omega" in self.next_graphs and
NOCYCLE_SEQ_OMEGA in self.config.nocycle_sequences
):
self.next_graphs.remove("shutdown")
self.pool.load_nocycle_graph(NOCYCLE_SHUTDOWN_SEQUENCE)
self.next_graphs.remove("omega")
self.pool.load_nocycle_graph(NOCYCLE_SEQ_OMEGA)
await self.main_loop()

except SchedulerStop as exc:
Expand Down Expand Up @@ -782,7 782,7 @@ def _load_pool_from_point(self):
msg = f"start from {self.config.start_point}"
if (
self.config.start_point
in ["startup" or self.config.initial_point]
in ["alpha" or self.config.initial_point]
):
msg = "Cold " msg
LOG.info(msg)
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 427,7 @@ async def _run(scheduler: Scheduler) -> int:
@cli_function(get_option_parser)
def play(parser: COP, options: 'Values', id_: str):
"""Implement cylc play."""
#from pudebug import go; go()
if options.starttask:
options.starttask = upgrade_legacy_ids(
*options.starttask,
Expand Down
18 changes: 11 additions & 7 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 444,7 @@ def load_db_task_pool_for_restart(self, row_idx, row):
(cycle, name, flow_nums, is_late, status, is_held, submit_num, _,
platform_name, time_submit, time_run, timeout, outputs_str) = row

if cycle in ("startup", "shutdown"):
if cycle in ("alpha", "omega"):
point = NocyclePoint(cycle)
else:
point = get_point(cycle)
Expand Down Expand Up @@ -1278,7 1278,7 @@ def spawn_on_output(self, itask, output, forced=False):

if (
t.point <= self.runahead_limit_point
or str(t.point) in ["startup", "shutdown"]
or str(t.point) in ["alpha", "omega"]
):
self.rh_release_and_queue(t)

Expand Down Expand Up @@ -1875,11 1875,15 @@ def match_future_tasks(
try:
point_str = standardise_point_string(point_str)
except PointParsingError as exc:
LOG.warning(
f"{id_} - invalid cycle point: {point_str} ({exc})")
unmatched_tasks.append(id_)
continue
point = get_point(point_str)
if point_str in ["alpha", "omega"]:
point = NocyclePoint(point_str)
else:
LOG.warning(
f"{id_} - invalid cycle point: {point_str} ({exc})")
unmatched_tasks.append(id_)
continue
else:
point = get_point(point_str)
taskdef = self.config.taskdefs[name_str]
if taskdef.is_valid_point(point):
matched_tasks.add((taskdef.name, point))
Expand Down

0 comments on commit 7a62450

Please sign in to comment.