Skip to content

Commit

Permalink
Merge pull request #6279 from cylc/8.3.x-sync
Browse files Browse the repository at this point in the history
🤖 Merge 8.3.x-sync into master
  • Loading branch information
MetRonnie authored Aug 1, 2024
2 parents 2143d66 e2da477 commit 00a1fd2
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 35 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test_functional.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 8,7 @@ on:
- '!.github/workflows/test_functional.yml'
- 'cylc/flow/etc/syntax/**'
- 'etc/syntax/**'
- 'tests/conftest.py'
- 'tests/unit/**'
- 'tests/integration/**'
- '**.md'
Expand All @@ -21,6 22,7 @@ on:
- '!.github/workflows/test_functional.yml'
- 'cylc/flow/etc/syntax/**'
- 'etc/syntax/**'
- 'tests/conftest.py'
- 'tests/unit/**'
- 'tests/integration/**'
- '**.md'
Expand Down
7 changes: 5 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 14,28 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from pathlib import Path
import re
from pathlib import Path
from shutil import rmtree
from typing import List, Optional, Tuple

import pytest

from cylc.flow import flags
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.cfgspec.globalcfg import SPEC
from cylc.flow.graphnode import GraphNodeParser
from cylc.flow.parsec.config import ParsecConfig
from cylc.flow.parsec.validate import cylc_config_validate
from cylc.flow import flags


@pytest.fixture(autouse=True)
def test_reset():
"""Reset global state before all tests."""
flags.verbosity = 0
flags.cylc7_back_compat = False
# Reset graph node parser singleton:
GraphNodeParser.get_inst().clear()


@pytest.fixture(scope='module')
Expand Down
69 changes: 36 additions & 33 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 559,8 @@ def _submit_task_jobs(*args, **kwargs):


async def _complete(
schd,
*tokens_list: Union[Tokens, str],
schd: 'Scheduler',
*wait_tokens: Union[Tokens, str],
stop_mode=StopMode.AUTO,
timeout: int = 60,
) -> None:
Expand All @@ -569,7 569,7 @@ async def _complete(
Args:
schd:
The scheduler to await.
tokens_list:
wait_tokens:
If specified, this will wait for the tasks represented by these
tokens to be marked as completed by the task pool. Can use
relative task ids as strings (e.g. '1/a') rather than tokens for
Expand All @@ -590,56 590,59 @@ async def _complete(
"""
start_time = time()

_tokens_list: List[Tokens] = []
for tokens in tokens_list:
tokens_list: List[Tokens] = []
for tokens in wait_tokens:
if isinstance(tokens, str):
tokens = Tokens(tokens, relative=True)
_tokens_list.append(tokens.task)
tokens_list.append(tokens.task)

# capture task completion
remove_if_complete = schd.pool.remove_if_complete

def _remove_if_complete(itask, output=None):
nonlocal _tokens_list
nonlocal tokens_list
ret = remove_if_complete(itask)
if ret and itask.tokens.task in _tokens_list:
_tokens_list.remove(itask.tokens.task)
if ret and itask.tokens.task in tokens_list:
tokens_list.remove(itask.tokens.task)
return ret

schd.pool.remove_if_complete = _remove_if_complete

# capture workflow shutdown
# capture workflow shutdown request
set_stop = schd._set_stop
has_shutdown = False
stop_requested = False

def _set_stop(mode=None):
nonlocal has_shutdown, stop_mode
nonlocal stop_requested, stop_mode
if mode == stop_mode:
has_shutdown = True
stop_requested = True
return set_stop(mode)
else:
set_stop(mode)
raise Exception(f'Workflow bailed with stop mode = {mode}')

schd._set_stop = _set_stop

# determine the completion condition
if _tokens_list:
condition = lambda: bool(_tokens_list)
else:
condition = lambda: bool(not has_shutdown)

# wait for the condition to be met
while condition():
# allow the main loop to advance
await asyncio.sleep(0)
if (time() - start_time) > timeout:
raise Exception(
f'Timeout waiting for {", ".join(map(str, _tokens_list))}'
)

# restore regular shutdown logic
schd._set_stop = set_stop
def done():
if wait_tokens:
return not tokens_list
# otherwise wait for the scheduler to shut down
if not schd.contact_data:
return True
return stop_requested

with pytest.MonkeyPatch.context() as mp:
mp.setattr(schd.pool, 'remove_if_complete', _remove_if_complete)
mp.setattr(schd, '_set_stop', _set_stop)

# wait for the condition to be met
while not done():
# allow the main loop to advance
await asyncio.sleep(0)
if (time() - start_time) > timeout:
msg = "Timeout waiting for "
if wait_tokens:
msg = ", ".join(map(str, tokens_list))
else:
msg = "workflow to shut down"
raise Exception(msg)


@pytest.fixture
Expand Down

0 comments on commit 00a1fd2

Please sign in to comment.