Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow-specific hold/release #5698

Draft
wants to merge 18 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
Update flow hold integration tests.
  • Loading branch information
hjoliver committed Sep 1, 2023
commit f554c96b2d48539383bf172ff10b1588c95e24a3
36 changes: 28 additions & 8 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 16,15 @@

from copy import deepcopy
import logging
from typing import AsyncGenerator, Callable, Iterable, List, Tuple, Union
from typing import (
AsyncGenerator,
Callable,
Iterable,
List,
Tuple,
Union,
Optional
)

import pytest
from pytest import param
Expand Down Expand Up @@ -336,33 344,44 @@ async def test_match_taskdefs(


@pytest.mark.parametrize(
'items, expected_tasks_to_hold_ids, expected_warnings',
'items, flow_num, expected_tasks_to_hold_ids, expected_warnings',
[
param(
['1/foo', '3/asd'], ['1/foo', '3/asd'], [],
['1/foo', '3/asd'], None, ['1/foo', '3/asd'], [],
id="Active & future tasks"
),
param(
['1/*', '2/*', '3/*', '6/*'],
['1/*', '2/*', '3/*', '6/*'], None,
['1/foo', '1/bar', '2/foo', '2/bar', '2/pub', '3/foo', '3/bar'],
["No active tasks matching: 6/*"],
id="Name globs hold active tasks only" # (active means n=0 here)
),
param(
['1/FAM', '2/FAM', '6/FAM'], ['1/bar', '2/bar'],
['1/*', '2/*', '3/*', '6/*'], 1,
['1/foo', '1/bar', '2/foo', '2/bar', '2/pub', '3/foo', '3/bar'],
["No active tasks matching: 6/*"],
id="Flow match"
),
param(
['1/*', '2/*', '3/*', '6/*'], 2, [],
["No active tasks matching: 6/*"],
id="No flow match"
),
param(
['1/FAM', '2/FAM', '6/FAM'], None, ['1/bar', '2/bar'],
["No active tasks in the family FAM matching: 6/FAM"],
id="Family names hold active tasks only"
),
param(
['1/grogu', 'H/foo', '20/foo', '1/pub'], [],
['1/grogu', 'H/foo', '20/foo', '1/pub'], None, [],
["No matching tasks found: grogu",
"H/foo - invalid cycle point: H",
"Invalid cycle point for task: foo, 20",
"Invalid cycle point for task: pub, 1"],
id="Non-existent task name or invalid cycle point"
),
param(
['1/foo:waiting', '1/foo:failed', '6/bar:waiting'], ['1/foo'],
['1/foo:waiting', '1/foo:failed', '6/bar:waiting'], None, ['1/foo'],
["No active tasks matching: 1/foo:failed",
"No active tasks matching: 6/bar:waiting"],
id="Specifying task state works for active tasks, not future tasks"
Expand All @@ -371,6 390,7 @@ async def test_match_taskdefs(
)
async def test_hold_tasks(
items: List[str],
flow_num: Optional[int],
expected_tasks_to_hold_ids: List[str],
expected_warnings: List[str],
example_flow: Scheduler, caplog: pytest.LogCaptureFixture,
Expand All @@ -390,7 410,7 @@ async def test_hold_tasks(
expected_tasks_to_hold_ids = sorted(expected_tasks_to_hold_ids)
caplog.set_level(logging.WARNING, CYLC_LOG)
task_pool = example_flow.pool
n_warnings = task_pool.hold_tasks(items)
n_warnings = task_pool.hold_tasks(items, flow_num)

for itask in task_pool.get_all_tasks():
hold_expected = itask.identity in expected_tasks_to_hold_ids
Expand Down