Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow-specific hold/release #5698

Draft
wants to merge 18 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
hold mgr: clear not needed.
  • Loading branch information
hjoliver committed Aug 31, 2023
commit b2e744041ddd5b2e34597fda77a48a9f311d61a0
14 changes: 6 additions & 8 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 108,7 @@ def __init__(self, workflow_db_mgr, data_store_mgr):
self.db_mgr = workflow_db_mgr

def _flatten(self):
# possibly temporary interfacing to existing flat set
# possibly-temporary conversion to old-style flat set
result = set()
for (name, point), flow in self.hold.items():
result.add((name, point, flow))
Expand All @@ -119,7 119,6 @@ def _update_stores(
):
"""Update datastore and database."""
self.db_mgr.put_tasks_to_hold(self._flatten())
# TODO ADAPT TO PROXY AND FUTURE HOLD
self.data_store_mgr.delta_task_held(itask)
LOG.debug(f"Tasks to hold {self.hold}")

Expand Down Expand Up @@ -172,7 171,6 @@ def release_future_task(self, name, point, flow_num=None) -> None:
"""Un-flag point/name if flow matches or flow is None."""
if (name, point) not in self.hold.keys():
return

if (
flow_num is None
or flow_num == self.hold[(name, point)]
Expand Down Expand Up @@ -207,9 205,6 @@ def is_held(self, name, point) -> bool:
"""Is point/name held, regardless of flow."""
return (name, point) in self.hold

def clear(self):
self.hold.clear()


class TaskPool:
"""Task pool of a workflow."""
Expand Down Expand Up @@ -1342,11 1337,14 @@ def release_held_tasks(self, items: Iterable[str], flow_num=None) -> int:
return len(unmatched)

def release_hold_point(self) -> None:
"""Unset the workflow hold point and release all held active tasks."""
"""Release all held active tasks and unset the hold-after point.

Note the CLI does not currently have an option to just release tasks
after the hold point.
"""
self.hold_point = None
for itask in self.get_all_tasks():
self.hold_mgr.release_active_task(itask, self.queue_task)
self.hold_mgr.clear() # TODO: NOT NECESSARY?
self.workflow_db_mgr.put_workflow_hold_cycle_point(None)

def check_abort_on_task_fails(self):
Expand Down