Skip to content

Commit

Permalink
Adapt integration tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Sep 24, 2022
1 parent 1b685a0 commit 75c1763
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 16 deletions.
38 changes: 22 additions & 16 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 299,7 @@ def __init__(self, reg: str, options: Values) -> None:
pub_d=os.path.join(self.workflow_run_dir, 'log')
)
self.is_restart = Path(self.workflow_db_mgr.pri_path).is_file()
self.graph_loaders: List[str] = []
self.graph_loaders: List[Callable] = []

async def install(self):
"""Get the filesystem in the right state to run the flow.
Expand Down Expand Up @@ -640,6 640,25 @@ def _get_graph_loaders(self) -> None:
partial(self.pool.load_nocycle_graph, NOCYCLE_SEQ_ALPHA)
)

async def run_graphs(self):
self.graph_loaders = []
if self.is_restart:
# Restart from DB.
self.task_job_mgr.task_remote_mgr.is_restart = True
self._load_pool_from_db()
self.restart_remote_init()
# next graphs depends on content of restart pool
self._get_graph_loaders()
await self.main_loop()
elif self.pool.main_pool:
# pool loaded for integration test!
await self.main_loop()
else:
self._get_graph_loaders()
while self.graph_loaders:
(self.graph_loaders.pop())()
await self.main_loop()

async def run_scheduler(self):
"""Start the scheduler main loop."""
try:
Expand All @@ -657,21 676,8 @@ async def run_scheduler(self):
sleep(0)
self.profiler.start()

self.graph_loaders = []
if self.is_restart:
# Restart from DB.
self.task_job_mgr.task_remote_mgr.is_restart = True
self._load_pool_from_db()
self.restart_remote_init()
# next graphs depends on content of restart pool
self._get_graph_loaders()
await self.main_loop()
else:
self._get_graph_loaders()

while self.graph_loaders:
(self.graph_loaders.pop())()
await self.main_loop()
await self.run_graphs()
LOG.critical("DONE")

except SchedulerStop as exc:
# deliberate stop
Expand Down
18 changes: 18 additions & 0 deletions tests/integration/utils/flow_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 58,14 @@ def _make_flow(
return reg


def _load_graph(sched):
"""Get scheduler to load the main graph."""
if sched.is_restart:
sched._load_pool_from_db()
else:
sched._load_pool_from_point()


@contextmanager
def _make_scheduler():
"""Return a scheduler object for a flow registration."""
Expand Down Expand Up @@ -95,6 103,7 @@ async def _start_flow(
# exception occurs in Scheduler
try:
await schd.start()
_load_graph(schd)
finally:
# After this `yield`, the `with` block of the context manager
# is executed:
Expand Down Expand Up @@ -126,6 135,7 @@ async def _run_flow(
# exception occurs in Scheduler
try:
await schd.start()
_load_graph(schd)
# Do not await as we need to yield control to the main loop:
task = asyncio.create_task(schd.run_scheduler())
finally:
Expand All @@ -137,16 147,24 @@ async def _run_flow(
# context manager.
# Need to shut down Scheduler, but time out in case something
# goes wrong:
print("ONE")
async with timeout(5):
print("TWAO")
if task:
print("THREE")
# ask the scheduler to shut down nicely,
# let main loop handle it:
schd._set_stop(StopMode.REQUEST_NOW_NOW)
await task
print("FOUR")
if schd.contact_data:
print("FIVE")
async with timeout(5):
# Scheduler still running... try more forceful tear down:
await schd.shutdown(SchedulerStop("integration test teardown"))
print("SIX")
if task:
# Brute force cleanup if something went wrong:
print("SEVEN")
task.cancel()
print("EIGHT")

0 comments on commit 75c1763

Please sign in to comment.