-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Really Complex Workflows with Batches
Sidekiq Pro"s Batches feature can handle job workflows of any complexity. This page shows how to implement a complex workflow given to me by one Sidekiq Pro customer.
The workflow looks like this, where jobs are blue circles and orange boxes hold jobs which can execute in parallel. All jobs within a orange box must succeed before the workflow can move "down".
Let"s call this workflow the Order workflow. Perhaps it represents the series of steps necessary to ship a customer"s order.
First of all, we"re going to create an overall Batch to represent the entire workflow and then create a child batch to represent the first step in the workflow. That child batch will use a success callback to schedule step 2 in the workflow:
order = ...
overall = Sidekiq::Batch.new
overall.on(:success, "FulfillmentCallbacks#shipped", "oid" => order.id)
overall.description = "Fulfillment for Order #{order.id}"
overall.jobs do
StartWorkflow.perform_async(order.id)
end
class StartWorkflow
include Sidekiq::Job
def perform(order_id)
batch.jobs do
step1 = Sidekiq::Batch.new
step1.on(:success, "FulfillmentCallbacks#step1_done", "oid" => order_id)
step1.jobs do
A.perform_async(order_id)
end
end
end
end
Note that we create a StartWorkflow job in the overall batch which creates Step 1 batch. This is because all batches must have one job to be valid, otherwise their behavior is not defined. That one job can create further child batches with their own jobs.
class FulfillmentCallbacks
def step1_done(status, options)
oid = options["oid"]
overall = Sidekiq::Batch.new(status.parent_bid)
overall.jobs do
step2 = Sidekiq::Batch.new
step2.on(:success, "FulfillmentCallbacks#step2_done", "oid" => oid)
step2.jobs do
B.perform_async
C.perform_async
D.perform_async
E.perform_async
F.perform_async
end
end
end
def step2_done(status, options)
oid = options["oid"]
overall = Sidekiq::Batch.new(status.parent_bid)
overall.jobs do
step3 = Sidekiq::Batch.new
step3.on(:success, "FulfillmentCallbacks#step3_done", "oid" => oid)
step3.jobs do
G.perform_async(oid)
end
end
end
def step3_done(status, options)
oid = options["oid"]
overall = Sidekiq::Batch.new(status.parent_bid)
overall.jobs do
step4 = Sidekiq::Batch.new
step4.on(:success, "FulfillmentCallbacks#step4_done", "oid" => oid)
step4.jobs do
H.perform_async(oid)
I.perform_async(oid)
end
end
end
def step4_done(status, options)
oid = options["oid"]
overall = Sidekiq::Batch.new(status.parent_bid)
overall.jobs do
J.perform_async(oid)
K.perform_async(oid)
L.perform_async(oid)
end
end
def shipped(status, options)
# this callback will fire once M has succeeded
oid = options["oid"]
puts "Order #{oid} has shipped!"
end
end
class L
include Sidekiq::Job
def perform(oid)
# do stuff
if bid
# if we belong to a batch, assume we"re within the fulfillment workflow
# and need to kick off job M
batch.jobs do
M.perform_async(oid)
end
end
end
end
In this manner, we can implement serial steps in the workflow, with the jobs in each step executing in parallel.
Using the Batch API can be complex, these tips can help you avoid some of the more insidious gotchas.
- Once a batch is created, you cannot edit it except to add more jobs to it. Jobs can only be dynamically added to a Batch within another job in that Batch.
- Callbacks should never, ever change the associated batch. Notably you cannot add more jobs to the Batch once its callbacks are running. Callbacks are fired when all jobs have executed, what does it mean to add more jobs now?
Adding more jobs to the current batch:
def perform(...)
if some_condition
# Sidekiq::Job#bid and #batch are methods which give you access
# to the current BID and batch for this job. Use the lonely operator
# to optionally fire another job if some_condition is truthy.
batch&.jobs do
AnotherWorker.perform_async(...)
end
end
end
Once a step in a workflow is complete, it"s idiomatic to reopen the parent batch within a callback and add another step to that workflow. This is how we get sequential execution ("don"t execute this job until these 3 jobs are complete").
def step1_done(status, options)
parent = Sidekiq::Batch.new(status.parent_bid)
parent.jobs do
# add a new child batch which represents all the jobs
# necessary for Step 2, the batch picks up its parent
# automatically since it is defined within the parent"s `jobs`.
b = Sidekiq::Batch.new
b.description = "Step 2"
b.on(:success, "SomeClass#step2_done", options)
b.jobs do
MoreWork.perform_async(...)
end
end
end
Rule of thumb: jobs only open their own batch, callbacks only open their parent batch.
Complete and success callbacks do not provide any causality guarantees between the two.
- Child
success
always runs before parentsuccess
. - Child
complete
always runs before parentcomplete
. - There is no such guarantee that child
success
always runs before parentcomplete
.