From e63613c61df3b22ea164b69b1463a7ecdfcd6578 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Tue, 23 Jul 2024 18:03:04 +0100 Subject: [PATCH 1/3] Integration reftests: handle unexpected shutdown better --- tests/integration/conftest.py | 69 ++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 518ef40f018..bce6ea64e9f 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -559,8 +559,8 @@ def _submit_task_jobs(*args, **kwargs): async def _complete( - schd, - *tokens_list: Union[Tokens, str], + schd: 'Scheduler', + *wait_tokens: Union[Tokens, str], stop_mode=StopMode.AUTO, timeout: int = 60, ) -> None: @@ -569,7 +569,7 @@ async def _complete( Args: schd: The scheduler to await. - tokens_list: + wait_tokens: If specified, this will wait for the tasks represented by these tokens to be marked as completed by the task pool. Can use relative task ids as strings (e.g. '1/a') rather than tokens for @@ -590,56 +590,59 @@ async def _complete( """ start_time = time() - _tokens_list: List[Tokens] = [] - for tokens in tokens_list: + tokens_list: List[Tokens] = [] + for tokens in wait_tokens: if isinstance(tokens, str): tokens = Tokens(tokens, relative=True) - _tokens_list.append(tokens.task) + tokens_list.append(tokens.task) # capture task completion remove_if_complete = schd.pool.remove_if_complete def _remove_if_complete(itask, output=None): - nonlocal _tokens_list + nonlocal tokens_list ret = remove_if_complete(itask) - if ret and itask.tokens.task in _tokens_list: - _tokens_list.remove(itask.tokens.task) + if ret and itask.tokens.task in tokens_list: + tokens_list.remove(itask.tokens.task) return ret - schd.pool.remove_if_complete = _remove_if_complete - - # capture workflow shutdown + # capture workflow shutdown request set_stop = schd._set_stop - has_shutdown = False + stop_requested = False def _set_stop(mode=None): - nonlocal has_shutdown, stop_mode + nonlocal stop_requested, stop_mode if mode == stop_mode: - has_shutdown = True + stop_requested = True return set_stop(mode) else: set_stop(mode) raise Exception(f'Workflow bailed with stop mode = {mode}') - schd._set_stop = _set_stop - # determine the completion condition - if _tokens_list: - condition = lambda: bool(_tokens_list) - else: - condition = lambda: bool(not has_shutdown) - - # wait for the condition to be met - while condition(): - # allow the main loop to advance - await asyncio.sleep(0) - if (time() - start_time) > timeout: - raise Exception( - f'Timeout waiting for {", ".join(map(str, _tokens_list))}' - ) - - # restore regular shutdown logic - schd._set_stop = set_stop + def done(): + if wait_tokens: + return not tokens_list + # otherwise wait for the scheduler to shut down + if not schd.contact_data: + return True + return stop_requested + + with pytest.MonkeyPatch.context() as mp: + mp.setattr(schd.pool, 'remove_if_complete', _remove_if_complete) + mp.setattr(schd, '_set_stop', _set_stop) + + # wait for the condition to be met + while not done(): + # allow the main loop to advance + await asyncio.sleep(0) + if (time() - start_time) > timeout: + msg = "Timeout waiting for " + if wait_tokens: + msg += ", ".join(map(str, tokens_list)) + else: + msg += "workflow to shut down" + raise Exception(msg) @pytest.fixture From e37d27277988f0ddc55784bf57d08af63602752f Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Wed, 31 Jul 2024 18:07:50 +0100 Subject: [PATCH 2/3] Fix state leakage between integration tests --- tests/conftest.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index b9f794c19ed..a7b1d19ca2c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,18 +14,19 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from pathlib import Path import re +from pathlib import Path from shutil import rmtree from typing import List, Optional, Tuple import pytest +from cylc.flow import flags from cylc.flow.cfgspec.glbl_cfg import glbl_cfg from cylc.flow.cfgspec.globalcfg import SPEC +from cylc.flow.graphnode import GraphNodeParser from cylc.flow.parsec.config import ParsecConfig from cylc.flow.parsec.validate import cylc_config_validate -from cylc.flow import flags @pytest.fixture(autouse=True) @@ -33,6 +34,8 @@ def test_reset(): """Reset global state before all tests.""" flags.verbosity = 0 flags.cylc7_back_compat = False + # Reset graph node parser singleton: + GraphNodeParser.get_inst().clear() @pytest.fixture(scope='module') From 1e89d73641f250b7f69e0c9d56ce1c97330d1e35 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Wed, 31 Jul 2024 18:11:45 +0100 Subject: [PATCH 3/3] GH Actions: avoid running functional tests when not needed --- .github/workflows/test_functional.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/test_functional.yml b/.github/workflows/test_functional.yml index 7831fef64f0..116620cdf35 100644 --- a/.github/workflows/test_functional.yml +++ b/.github/workflows/test_functional.yml @@ -8,6 +8,7 @@ on: - '!.github/workflows/test_functional.yml' - 'cylc/flow/etc/syntax/**' - 'etc/syntax/**' + - 'tests/conftest.py' - 'tests/unit/**' - 'tests/integration/**' - '**.md' @@ -21,6 +22,7 @@ on: - '!.github/workflows/test_functional.yml' - 'cylc/flow/etc/syntax/**' - 'etc/syntax/**' + - 'tests/conftest.py' - 'tests/unit/**' - 'tests/integration/**' - '**.md'