-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support orchestration id reuse policy #7308
Conversation
Signed-off-by: kaibocai <[email protected]>
Signed-off-by: kaibocai <[email protected]>
8d85b62
to
e927c1b
Compare
Signed-off-by: kaibocai <[email protected]>
tag @cgillum , @ItalyPaleAle , @RyanLettieri for review. |
Signed-off-by: kaibocai <[email protected]>
for _, opt := range GetTestOptions() { | ||
suffix := opt(engine) | ||
t.Run(opt(engine), func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be using suffix
here?
t.Run(opt(engine), func(t *testing.T) { | |
t.Run(suffix, func(t *testing.T) { |
pkg/runtime/wfengine/workflow.go
Outdated
@@ -62,6 62,11 @@ type recoverableError struct { | |||
cause error | |||
} | |||
|
|||
type PolicyAndEventData struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's give this a more specific name, like CreateWorkflowInstanceRequest
pkg/runtime/wfengine/workflow.go
Outdated
@@ -62,6 62,11 @@ type recoverableError struct { | |||
cause error | |||
} | |||
|
|||
type PolicyAndEventData struct { | |||
Policy *api.OrchestrationIdReusePolicy `json:"policy"` | |||
EventData []byte `json:"eventData"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency with existing code, I suggest StartEventData
.
EventData []byte `json:"eventData"` | |
StartEventBytes []byte `json:"startEventBytes"` |
pkg/runtime/wfengine/workflow.go
Outdated
// orchestration already exists, apply reuse id policy | ||
runtimeState := getRuntimeState(actorID, state) | ||
runtimeStatus := runtimeState.RuntimeStatus() | ||
targetStatusValues := backend.BuildStatusSet(reuseIDPolicy.GetOperationStatus()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This goes back to my comment on the durabletask-go repo, but I don't think we need to expose a BuildStatusSet
utility function for this simple purpose. I think a simple for-loop over reuseIDPolicy.GetOperationStatus()
and comparing to runtimeStatus
on each loop iteration is good enough since we only have a small number of possible status values. This might even be more performant than creating a map and doing hashing operations for the key lookup.
pkg/runtime/wfengine/workflow.go
Outdated
|
||
func (wf *workflowActor) createInstance(ctx context.Context, actorID string, startEvent *backend.HistoryEvent, state *workflowState) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused about the function name here. It's called createInstance
but the only thing it does is create a reminder? Can we give this a more specific name, like scheduleWorkflowStart
?
pkg/runtime/wfengine/workflow.go
Outdated
@@ -209,6 255,36 @@ func (wf *workflowActor) createWorkflowInstance(ctx context.Context, actorID str | |||
return wf.saveInternalState(ctx, actorID, state) | |||
} | |||
|
|||
// This method cleans up a workflow associated with the given actorID | |||
func (wf *workflowActor) cleanupOrchestrationStateInternal(ctx context.Context, actorID string, state *workflowState, requireCompleted bool) error { | |||
runtimeState := getRuntimeState(actorID, state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've already loaded the runtime state in createWorkflowInstance
. Should we reuse that and pass it as a parameter rather than getting the runtime state again?
pkg/runtime/wfengine/workflow.go
Outdated
func (wf *workflowActor) cleanupOrchestrationStateInternal(ctx context.Context, actorID string, state *workflowState, requireCompleted bool) error { | ||
runtimeState := getRuntimeState(actorID, state) | ||
runtimeStatus := runtimeState.RuntimeStatus() | ||
isCompleted := runtimeStatus == api.COMPLETED || runtimeStatus == api.TERMINATED || runtimeStatus == api.FAILED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This requires a change in durabletask-go, but I wonder if we should prefix these constants with RUNTIME_STATUS_
so that it's clear what these values are for.
pkg/runtime/wfengine/workflow.go
Outdated
func (wf *workflowActor) cleanupOrchestrationStateInternal(ctx context.Context, actorID string, state *workflowState, requireCompleted bool) error { | ||
runtimeState := getRuntimeState(actorID, state) | ||
runtimeStatus := runtimeState.RuntimeStatus() | ||
isCompleted := runtimeStatus == api.COMPLETED || runtimeStatus == api.TERMINATED || runtimeStatus == api.FAILED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we just use the existing runtimeStatus.IsCompleted()
for this instead of checking specific status values? That would be more maintainable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I am not using runtimeStatus.IsCompleted()
is that I am not quite sure if it equals [COMPLETED , TERMINATED , FAILED]
. From the code, the completedEvent
is assigned when the orchestration completed at https://github.com/microsoft/durabletask-go/blob/aa335e20a192395649eb9ede97b57c9f647238ea/backend/runtimestate.go#L134, but when orchestration is terminated we don't assign a completedEvent
, instead we create a ExecutionTerminatedEvent
at https://github.com/microsoft/durabletask-go/blob/aa335e20a192395649eb9ede97b57c9f647238ea/backend/runtimestate.go#L197, so it seems runtimeStatus.IsCompleted()
only check if orchestration is completed but not check if it's terminated/failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to this integration test code, a terminated orchestration results in runtimeStatus.IsCompleted()
being true
.
The second code block you're referencing is specifically for handling cascading terminate. It's not directly related to how we represent a currently terminated orchestration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, hang on - I need to double check. The code I pointed to is for orchestration metadata, not the orchestration runtime state object...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, so confirming this wasn't as straightforward as I thought. But the short answer is that a terminated orchestration should always be completed. When an orchestration processes an ExecutionTerminated event, the SDK will generate a completedEvent
with a status of "Terminated". This can be seen in the SDK code here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Updated!
Signed-off-by: kaibocai <[email protected]>
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #7308 /- ##
==========================================
Coverage 64.59% 64.60% 0.01%
==========================================
Files 226 226
Lines 21191 21223 32
==========================================
Hits 13689 13712 23
- Misses 6324 6329 5
- Partials 1178 1182 4 ☔ View full report in Codecov by Sentry. |
0cecc57
to
ee6e8f7
Compare
Signed-off-by: kaibocai <[email protected]> simplify complete check Signed-off-by: kaibocai <[email protected]>
ee6e8f7
to
2517e86
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some minor things.
pkg/runtime/wfengine/workflow.go
Outdated
} | ||
wf.states.Delete(actorID) | ||
return nil | ||
return wf.cleanupOrchestrationStateInternal(ctx, actorID, state, true && !runtimeState.IsCompleted()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return wf.cleanupOrchestrationStateInternal(ctx, actorID, state, true && !runtimeState.IsCompleted()) | |
return wf.cleanupOrchestrationStateInternal(ctx, actorID, state, !runtimeState.IsCompleted()) |
pkg/runtime/wfengine/workflow.go
Outdated
switch reuseIDPolicy.GetAction() { | ||
case api.REUSE_ID_ACTION_IGNORE: | ||
// Log an warning message and ignore creating new instance | ||
wfLogger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", actorID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency, we should follow the logging format that's used elsewhere in this file. For example:
wfLogger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", actorID) | |
wfLogger.Warnf("Workflow actor '%s': ignoring request to recreate the current workflow instance", actorID) |
pkg/runtime/wfengine/workflow.go
Outdated
@@ -209,6 263,33 @@ func (wf *workflowActor) createWorkflowInstance(ctx context.Context, actorID str | |||
return wf.saveInternalState(ctx, actorID, state) | |||
} | |||
|
|||
// This method cleans up a workflow associated with the given actorID | |||
func (wf *workflowActor) cleanupOrchestrationStateInternal(ctx context.Context, actorID string, state *workflowState, requiredAndNotCompleted bool) error { | |||
// Only purge orchestration in the ['COMPLETED', 'FAILED', 'TERMINATED'] statuses, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment should probably be updated.
Signed-off-by: kaibocai <[email protected]> update method name Signed-off-by: kaibocai <[email protected]>
e6ecaf9
to
ab1d3f1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! I'll go ahead and merge the durabletask-go
changes and create a release so that this PR can be unblocked.
53ca9db
to
88df935
Compare
sorry that I didn't see this one and created the release myself. |
Hi @cgillum, @ItalyPaleAle and @RyanLettieri, the duraletask-go is updated to version v0.4.0 and I think this PR should be ready for final review and merge. Please help to take another look. Thanks! |
Signed-off-by: kaibocai <[email protected]>
914f7ee
to
0280640
Compare
Signed-off-by: kaibocai <[email protected]> update test time compare Signed-off-by: kaibocai <[email protected]> fix assert format Signed-off-by: kaibocai <[email protected]>
0280640
to
57bf384
Compare
tag @mukundansundar for review as well. |
Signed-off-by: Alessandro (Ale) Segala <43508 [email protected]>
return wf.createIfCompleted(ctx, runtimeState, actorID, state, startEvent) | ||
} | ||
|
||
switch reuseIDPolicy.GetAction() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR LGTM, I just wanted to understand how are we skipping the creation of a new orchestration here, (i.e. the option SkipIfExists
from microsoft/durabletask-go#42 (comment))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case api.REUSE_ID_ACTION_IGNORE:
// Log an warning message and ignore creating new instance
wfLogger.Warnf("Workflow actor '%s': ignoring request to recreate the current workflow instance", actorID)
return nil
So if users choose REUSE_ID_ACTION_IGNORE
and if there is an existing instance with the same instance ID, then we will enter the above block and just log a message and directly return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my bad, I missed that Ignore is Skip, thanks for clarification
Hi @artursouza, @yaron2, @mukundansundar. This workflow PR has been signed off and is ready to be merged. |
* support orchestration id reuse policy Signed-off-by: kaibocai <[email protected]> * clean up Signed-off-by: kaibocai <[email protected]> * lint fix Signed-off-by: kaibocai <[email protected]> * fix nit Signed-off-by: kaibocai <[email protected]> * minor updates Signed-off-by: kaibocai <[email protected]> * minor updates Signed-off-by: kaibocai <[email protected]> simplify complete check Signed-off-by: kaibocai <[email protected]> * minor updates on log message, comments, and request type Signed-off-by: kaibocai <[email protected]> update method name Signed-off-by: kaibocai <[email protected]> * update durabletask-go to v0.4.0 Signed-off-by: kaibocai <[email protected]> * fix time compare in test Signed-off-by: kaibocai <[email protected]> update test time compare Signed-off-by: kaibocai <[email protected]> fix assert format Signed-off-by: kaibocai <[email protected]> * 💄 Signed-off-by: Alessandro (Ale) Segala <43508 [email protected]> --------- Signed-off-by: kaibocai <[email protected]> Signed-off-by: Alessandro (Ale) Segala <43508 [email protected]> Co-authored-by: Alessandro (Ale) Segala <43508 [email protected]> Signed-off-by: Cassandra Coyle <[email protected]>
* support orchestration id reuse policy Signed-off-by: kaibocai <[email protected]> * clean up Signed-off-by: kaibocai <[email protected]> * lint fix Signed-off-by: kaibocai <[email protected]> * fix nit Signed-off-by: kaibocai <[email protected]> * minor updates Signed-off-by: kaibocai <[email protected]> * minor updates Signed-off-by: kaibocai <[email protected]> simplify complete check Signed-off-by: kaibocai <[email protected]> * minor updates on log message, comments, and request type Signed-off-by: kaibocai <[email protected]> update method name Signed-off-by: kaibocai <[email protected]> * update durabletask-go to v0.4.0 Signed-off-by: kaibocai <[email protected]> * fix time compare in test Signed-off-by: kaibocai <[email protected]> update test time compare Signed-off-by: kaibocai <[email protected]> fix assert format Signed-off-by: kaibocai <[email protected]> * 💄 Signed-off-by: Alessandro (Ale) Segala <43508 [email protected]> --------- Signed-off-by: kaibocai <[email protected]> Signed-off-by: Alessandro (Ale) Segala <43508 [email protected]> Co-authored-by: Alessandro (Ale) Segala <43508 [email protected]>
Description
Issue reference
This PR tries to update the logic to support reuse orchestration ID, more details can be found microsoft/durabletask-go#42, #7101
Corresponding protobuf updates can be found microsoft/durabletask-protobuf#19
[Completed, Failed, Terminated]
will be purged.Please reference the issue this PR will close: #7101
Checklist
Please make sure you've completed the relevant tasks for this PR, out of the following list: