Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow-specific hold/release #5698

Draft
wants to merge 18 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
Style fixes.
  • Loading branch information
hjoliver committed Aug 21, 2023
commit b5f83a4a289cdaaeded3eb26e810679953814b0d
2 changes: 1 addition & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 905,7 @@ def get_command_method(self, command_name: str) -> Callable:
def queue_command(self, command: str, kwargs: dict) -> None:
self.command_queue.put((
command,
[],
(),
kwargs,
))

Expand Down
156 changes: 90 additions & 66 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 16,6 @@

"""Wrangle task proxies to manage the workflow."""

from copy import copy
from contextlib import suppress
from collections import Counter
import json
Expand Down Expand Up @@ -88,100 87,125 @@


class TaskHoldMgr:
"""Keep track of current and future tasks to hold."""
"""Keep track of current and future tasks to hold.

Active task instances (i.e., task proxies in the pool):
- can be held/released if the given flow (flow=n) matches any of the task's
flow numbers, or regardless of flow number (flow=None).

Future tasks (point/name):
- can be flagged for future hold, or forgotten, by specific flow (flow=n)
or regardless of flow number (flow=None)

"""

def __init__(self, workflow_db_mgr, data_store_mgr):
self.hold: Set[Tuple[str, 'PointBase', Optional[int]]] = set()
# (name, point): flow
self.hold: Dict[Tuple[str, 'PointBase'], Optional[int]] = {}
# flow may be None: hold future task regardless of its flow number.
# NOTE: RHS could be a set of flow numbers, meaning future-hold same
# task in multiple specific flows. But those instances can't coexist in
# the pool so serially holding them is probably fine.
self.data_store_mgr = data_store_mgr
self.db_mgr = workflow_db_mgr

def _flatten(self):
# possibly temporary interfacing to existing flat set
result = set()
for (name, point), flow in self.hold.items():
result.add((name, point, flow))
return result

def _update_stores(
self, itask: Union[TaskProxy, Tuple[str, 'PointBase', bool]]
):
self, itask: Union[TaskProxy, Tuple[str, 'PointBase', bool]]
):
"""Update datastore and database."""
self.db_mgr.put_tasks_to_hold(self.hold)
self.db_mgr.put_tasks_to_hold(self._flatten())
# TODO ADAPT TO PROXY AND FUTURE HOLD
self.data_store_mgr.delta_task_held(itask)
LOG.debug(f"Tasks to hold {self.hold}")

def load_from_db(self):
# Note this doesn't need to hold any active tasks in the list - they're
# automatically held at creation via their internal is_held attribute.
self.hold.update(
(name, get_point(cycle), flow_num)
for name, cycle in self.db_mgr.pri_dao.select_tasks_to_hold()
)
# Note this doesn't need to reset-to-held active tasks - they're
# automatically held at creation via their is_held attribute.
for name, cycle, flow_num in (
self.db_mgr.pri_dao.select_tasks_to_hold()
):
self.hold[(name, get_point(cycle))] = flow_num

def hold_active_task(
self, itask: TaskProxy, flow_num: Optional[int]=None
) -> bool:
"""Hold task if flow is None, or if flow matches task."""
self, itask: TaskProxy, flow_num: Optional[int] = None
) -> bool:
"""Hold itask if the specified flow_num matches or is None."""
if flow_num is not None and flow_num not in itask.flow_nums:
# specified flow does not match this task
return False
if not itask.state_reset(is_held=True):
# this task is already held
return False
self.hold.add((itask.tdef.name, itask.point, flow_num))

self.hold[(itask.tdef.name, itask.point)] = flow_num
self._update_stores(itask)
return True

def hold_future_task(self, name, point, flow_num=None) -> None:
"""Hold future task if flow is None, or if flow matches task."""
self.hold.add((name, point, flow_num))
def flag_future_task(self, name, point, flow_num=None) -> None:
"""Flag that we should hold a future task."""
self.hold[(name, point)] = flow_num
self._update_stores((name, point, True))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that this means that you can only flag a future task for one flow at a time:

> cylc hold flows-hold//10100101T0000Z/a --flow 1
Done
> sqlite3 ~/cylc-run/flows-hold/runN/log/db 'SELECT * FROM tasks_to_hold'
a|10100101T0000Z|1
> cylc hold flows-hold//10100101T0000Z/a --flow 2
Done
> sqlite3 ~/cylc-run/flows-hold/runN/log/db 'SELECT * FROM tasks_to_hold'
a|10100101T0000Z|2

I would expect to get either

a|10100101T0000Z|1,2
# or
a|10100101T0000Z|1
a|10100101T0000Z|2

I also think that this means that:

> cylc hold flows-hold//10100101T0000Z/a --flow 1 --flow 2
Done
> sqlite3 ~/cylc-run/flows-hold/runN/log/db 'SELECT * FROM tasks_to_hold'
a|10100101T0000Z|2

Is not what I as a user would expect: I would expect this to either return an error ("don't use --flow twice please") or set that task as held in both flows as above.

Copy link
Member

@oliver-sanders oliver-sanders Oct 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at cylc trigger --help:

  --flow=FLOW           Assign the triggered task to all active flows (all);
                        no flow (none); a new flow (new); or a specific flow
                        (e.g. 2). The default is all. Reuse the option to
                        assign multiple specific flows.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, in that case it's only the Reuse the option that I have a problem with.


def hold_if_requested(self, itask: TaskProxy) -> None:
"""Hold this task if it is in the hold list."""
for (h_name, h_point, h_flow) in self.hold:
if (
h_name == itask.tdef.name
and h_point == itask.point
and (h_flow is None or h_flow in itask.flow_nums)
):
LOG.info(f"[{itask}] holding (as requested earlier)")
# (it's already in self.hold)
itask.state_reset(is_held=True)
break
def hold_if_flagged(self, itask: TaskProxy) -> None:
"""Hold a newly-spawned task if flagged in the future hold list.

flow None: a future-held specific task regardless of flow.
"""
if (itask.tdef.name, itask.point) not in self.hold.keys():
return

if (
self.hold[(itask.tdef.name, itask.point)] is None
or self.hold[(itask.tdef.name, itask.point)] in itask.flow_nums
):
LOG.info(f"[{itask}] holding (as requested earlier)")
itask.state_reset(is_held=True)

def release_future_task(self, name, point, flow_num=None) -> None:
"""Release if point/name match and flow matches or flow is None."""
for h_name, h_point, h_flow_num in copy(self.hold):
if(
h_name == name and
h_point == point and
(flow_num is None or flow_num == h_flow_num)
):
self.hold.remove((h_name, h_point, h_flow_num))
"""Release point/name if flow matches or flow is None."""
if (name, point) not in self.hold.keys():
return

if (
flow_num is None
or flow_num == self.hold[(name, point)]
):
self.hold[(name, point)]

self._update_stores((name, point, True))

def release_active_task(
self, itask: TaskProxy, queue_func: Callable, flow_num=None,
) -> None:
"""Release a held task and queue it if ready to run."""
for h_name, h_point, h_flow_num in copy(self.hold):
if(
h_name == itask.tdef.name and
h_point == itask.point and
(flow_num is None or h_flow_num in itask.flow_nums)
):
self.hold.remove((h_name, h_point, h_flow_num))
itask.state_reset(is_held=False)
self._update_stores(itask)
if (
not itask.state.is_runahead
and all(itask.is_ready_to_run())
):
queue_func(itask)
break
"""Release a held task if flow matches, and queue it if ready."""
if (
flow_num is not None
and flow_num not in itask.flow_nums
):
return

if not itask.state_reset(is_held=False):
# not held
return

del self.hold[(itask.tdef.name, itask.point)]
self._update_stores(itask)
if (
not itask.state.is_runahead
and all(itask.is_ready_to_run())
):
queue_func(itask)

def is_held(self, name, point) -> bool:
"""Is point/name held, regardless of flow."""
for h_name, h_point, _ in self.hold:
if (
h_name == name and
h_point == point
):
return True
return False
return (name, point) in self.hold

def clear(self):
self.hold.clear()
Expand Down Expand Up @@ -1288,11 1312,11 @@ def hold_tasks(self, items: Iterable[str], flow_num=None) -> int:
)
# Hold active tasks:
for itask in itasks:
self.hold_mgr.hold_task(itask, flow_num)
self.hold_mgr.hold_active_task(itask, flow_num)

# Set future tasks to be held:
for name, cycle in future_tasks:
self.hold_mgr.hold_future_task(name, cycle, flow_num)
self.hold_mgr.flag_future_task(name, cycle, flow_num)

return len(unmatched)

Expand All @@ -1313,7 1337,7 @@ def release_held_tasks(self, items: Iterable[str], flow_num=None) -> int:

# Unhold future tasks:
for name, cycle in future_tasks:
self.hold_mgr.release_future_task((name, cycle, flow_num))
self.hold_mgr.release_future_task(name, cycle, flow_num)

return len(unmatched)

Expand Down Expand Up @@ -1627,7 1651,7 @@ def spawn_task(
self.hold_mgr.hold_active_task(itask)
else:
# Or if in the future hold list.
self.hold_mgr.hold_if_requested(itask)
self.hold_mgr.hold_if_flagged(itask)

if self.stop_point and itask.point <= self.stop_point:
future_trigger_overrun = False
Expand Down
Loading