diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index e291c96b038..b826e178585 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -299,7 +299,7 @@ def __init__(self, reg: str, options: Values) -> None: pub_d=os.path.join(self.workflow_run_dir, 'log') ) self.is_restart = Path(self.workflow_db_mgr.pri_path).is_file() - self.graph_loaders: List[str] = [] + self.graph_loaders: List[Callable] = [] async def install(self): """Get the filesystem in the right state to run the flow. @@ -640,6 +640,25 @@ def _get_graph_loaders(self) -> None: partial(self.pool.load_nocycle_graph, NOCYCLE_SEQ_ALPHA) ) + async def run_graphs(self): + self.graph_loaders = [] + if self.is_restart: + # Restart from DB. + self.task_job_mgr.task_remote_mgr.is_restart = True + self._load_pool_from_db() + self.restart_remote_init() + # next graphs depends on content of restart pool + self._get_graph_loaders() + await self.main_loop() + elif self.pool.main_pool: + # pool loaded for integration test! + await self.main_loop() + else: + self._get_graph_loaders() + while self.graph_loaders: + (self.graph_loaders.pop())() + await self.main_loop() + async def run_scheduler(self): """Start the scheduler main loop.""" try: @@ -657,21 +676,8 @@ async def run_scheduler(self): sleep(0) self.profiler.start() - self.graph_loaders = [] - if self.is_restart: - # Restart from DB. - self.task_job_mgr.task_remote_mgr.is_restart = True - self._load_pool_from_db() - self.restart_remote_init() - # next graphs depends on content of restart pool - self._get_graph_loaders() - await self.main_loop() - else: - self._get_graph_loaders() - - while self.graph_loaders: - (self.graph_loaders.pop())() - await self.main_loop() + await self.run_graphs() + LOG.critical("DONE") except SchedulerStop as exc: # deliberate stop diff --git a/tests/integration/utils/flow_tools.py b/tests/integration/utils/flow_tools.py index dcd683023bb..8a880e211b9 100644 --- a/tests/integration/utils/flow_tools.py +++ b/tests/integration/utils/flow_tools.py @@ -58,6 +58,14 @@ def _make_flow( return reg +def _load_graph(sched): + """Get scheduler to load the main graph.""" + if sched.is_restart: + sched._load_pool_from_db() + else: + sched._load_pool_from_point() + + @contextmanager def _make_scheduler(): """Return a scheduler object for a flow registration.""" @@ -95,6 +103,7 @@ async def _start_flow( # exception occurs in Scheduler try: await schd.start() + _load_graph(schd) finally: # After this `yield`, the `with` block of the context manager # is executed: @@ -126,6 +135,7 @@ async def _run_flow( # exception occurs in Scheduler try: await schd.start() + _load_graph(schd) # Do not await as we need to yield control to the main loop: task = asyncio.create_task(schd.run_scheduler()) finally: @@ -137,16 +147,24 @@ async def _run_flow( # context manager. # Need to shut down Scheduler, but time out in case something # goes wrong: + print("ONE") async with timeout(5): + print("TWAO") if task: + print("THREE") # ask the scheduler to shut down nicely, # let main loop handle it: schd._set_stop(StopMode.REQUEST_NOW_NOW) await task + print("FOUR") if schd.contact_data: + print("FIVE") async with timeout(5): # Scheduler still running... try more forceful tear down: await schd.shutdown(SchedulerStop("integration test teardown")) + print("SIX") if task: # Brute force cleanup if something went wrong: + print("SEVEN") task.cancel() + print("EIGHT")