Skip to content

Tasks

Big part of the Asyncz are the Tasks. Those are special objects with instructions and parameters that are created/sent to the scheduler and then executed.

Importing a task is as simple as:

from asyncz.tasks import Task

Parameters

  • id - The unique identifier of this task.
  • name - The description of this task.
  • fn - The callable function to execute.
  • args - Positional arguments to the callable.
  • kwargs - Keyword arguments to the callable.
  • coalesce - Whether to only run the task once when several run times are due.
  • trigger - The trigger object that controls the schedule of this task.
  • executor - The name of the executor that will run this task.
  • mistrigger_grace_time - The time (in seconds) how much this task's execution is allowed to be late (None means "allow the task to run no matter how late it is").
  • max_instances - The maximum number of concurrently executing instances allowed for this task.
  • next_run_time - The next scheduled run time of this task. (Note: this has not the pause mode effect like in add_task)

Create a task

Creating a task is as simple as:

from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger

# Create a scheduler
scheduler = AsyncIOScheduler()


def check_status():
    # Logic to check statuses
    ...


Task(
    id="my-task",
    fn=check_status,
    name="my-func",
    scheduler=scheduler,
    trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
    max_instances=3,
    coalesce=True,
)

scheduler.start()

# or manually submit one with a changed trigger

t = Task(
    id="my-task2",
    fn=check_status,
    max_instances=1,
    coalesce=True,
)

scheduler.add_task(
    t, trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1)
)

Update a task

You can also update a specific task and its properties directly.

from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger

# Create a scheduler
scheduler = AsyncIOScheduler()


def check_status():
    # Logic to check statuses
    ...


# Create a task
task = Task(
    id="my-task",
    fn=check_status,
    name="my-func",
    scheduler=scheduler,
    trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
    max_instances=3,
    coalesce=True,
)

# Update the task
task.update(
    name="my-new-task-id",
    max_instances=5,
    coalesce=False,
)

Internally the task is using the given scheduler to be updated and then executed.

Warning

All attributes can be updated but the id as this is immutable.

Reschedule a task

You can also reschedule a task when need and by that what it means is changing its trigger only.

The trigger must be the alias of the trigger object.

from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger

# Create a scheduler
scheduler = AsyncIOScheduler()


def check_status():
    # Logic to check statuses
    ...


# Create a task
task = Task(
    id="my-task",
    fn=check_status,
    name="my-func",
    scheduler=scheduler,
    trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
    max_instances=3,
    coalesce=True,
)

# Reschedule the task
task.reschedule("my-task", trigger="cron", hour=10, minute=5)

Tasks with lifecyle

Sometimes tasks need a setup and a cleanup routine. This is possible to implement via generators, no matter if asynchronous or synchronous.

Generators have two methods: send, throw (for asynchronous generators there are asend and athrow). The send methods take a parameter which is returned from yield:

def generator():
    print(yield)

g = generator()
# start generator, it is  required to be None
g.send(None)
# raises StopIteration
g.send("hello world")

Now let's adapt this for tasks with lifecycle:

from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function

# Create a scheduler
scheduler = AsyncIOScheduler()


def lifecycle_task():
    # setup
    ...
    # we have to mask generator send so it could be set to a task
    scheduler.add_task(make_function(generator.send), args=[False], trigger="shutdown")
    running = yield
    while running:
        # do something
        running = yield
    # cleanup


# setup task
generator = lifecycle_task()
generator.send(None)

# Run every 5 minutes
scheduler.add_task(make_function(generator.send), args=[True], trigger="interval", minutes=5)

scheduler.start()
...
# Now the shutdown task is executed and the generator progresses in the cleanup
scheduler.stop()

Note the make_function and make_async_function decorators. They are required because the generator methods have no signature. It is especially required for asend/athrow.

This is also possible with asynchronous generators.