-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Standalone activity heartbeating #8730
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: standalone-activity
Are you sure you want to change the base?
Conversation
0527002 to
6ffe4da
Compare
chasm/lib/activity/activity.go
Outdated
| func (a *Activity) HandleStarted( | ||
| ctx chasm.MutableContext, | ||
| request *historyservice.RecordActivityTaskStartedRequest, | ||
| ) (*historyservice.RecordActivityTaskStartedResponse, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignore: formatting only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like @fretz12 and you have different ideas on how to format these signatures. I personally like the style in this PR better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this formatting better too, but ideally we should have the linter take care of it.
I'm running gofumpt locally, it seems to do a good job... maybe introduce it as part of the lint-code target?
chasm/lib/activity/activity.go
Outdated
| ctx chasm.Context, | ||
| key chasm.EntityKey, | ||
| response *historyservice.RecordActivityTaskStartedResponse, | ||
| ) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignore: formatting only
chasm/lib/activity/activity.go
Outdated
| func (a *Activity) handleCancellationRequested( | ||
| ctx chasm.MutableContext, | ||
| req *activitypb.CancelActivityExecutionRequest, | ||
| ) (*activitypb.CancelActivityExecutionResponse, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignore: formatting only
chasm/lib/activity/activity.go
Outdated
| func (a *Activity) HandleStarted( | ||
| ctx chasm.MutableContext, | ||
| request *historyservice.RecordActivityTaskStartedRequest, | ||
| ) (*historyservice.RecordActivityTaskStartedResponse, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like @fretz12 and you have different ideas on how to format these signatures. I personally like the style in this PR better.
| } | ||
|
|
||
| // Validate validates a HeartbeatTimeoutTask. | ||
| func (e *heartbeatTimeoutTaskExecutor) Validate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You will want to validate that the schedule time in the attributes is still relevant here not in the execute function. It should deterministic function of the last heartbeat time and the attempt start time (hbDeadline below should be equal to the schedule time).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See reply below
chasm/lib/activity/activity_tasks.go
Outdated
| hbTimeout := activity.GetHeartbeatTimeout().AsDuration() | ||
| attemptStartTime := attempt.GetStartedTime().AsTime() | ||
| lastHbTime := lastHb.GetRecordedTime().AsTime() // could be from a previous attempt | ||
| // No heartbeats in the attempt so far is equivalent to a heartbeat having been sent at attempt | ||
| // start time. | ||
| hbDeadline := util.MaxTime(lastHbTime, attemptStartTime).Add(hbTimeout) | ||
|
|
||
| if ctx.Now(activity).Before(hbDeadline) { | ||
| // Deadline has not expired; schedule a new task. | ||
| ctx.AddTask( | ||
| activity, | ||
| chasm.TaskAttributes{ | ||
| ScheduledTime: hbDeadline, | ||
| }, | ||
| &activitypb.HeartbeatTimeoutTask{ | ||
| Attempt: attempt.GetCount(), | ||
| }, | ||
| ) | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't happen here. Validate in the Validate function. Schedule a new task any time a heartbeat is received.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline -- design B incurs extra persistence writes that aren't balanced out by the changes to numbers of logical tasks created. I've switched the PR over to the design that @bergundy proposes.
chasm/lib/activity/library.go
Outdated
| l.activityDispatchTaskExecutor, | ||
| l.activityDispatchTaskExecutor, | ||
| ), | ||
| // TODO(dan): why are the task names "FooTimer" but "FooTimeoutTask" in the struct names? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We agreed not to add Task to the task names. Not sure if we want to use TaskExecutor vs. just Executor for the struct names but that is not critical and can change easily. The string names cannot be changed easily OTOH because they affect how tasks are represented in persistence.
| Details: input.Request.HeartbeatRequest.GetDetails(), | ||
| }) | ||
| return nil, nil | ||
| return &historyservice.RecordActivityTaskHeartbeatResponse{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just before returning here you want to generate a new heartbeat task if the heartbeat timeout is set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See reply above; I currently still think the proposed design is preferable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done now -- switched back to your design.
| require.Error(t, err) | ||
| statusErr := serviceerror.ToStatus(err) | ||
| require.NotNil(t, statusErr) | ||
| require.Equal(t, codes.InvalidArgument, statusErr.Code()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, you can use require.ErrorAs(err, &invalidArgumentErr)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving as is for now
| ) error { | ||
| if a.Status != activitystatepb.ACTIVITY_EXECUTION_STATUS_STARTED && | ||
| a.Status != activitystatepb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { | ||
| return serviceerror.NewNotFound("activity task not found") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong error message? Also NotFound doesn't feel like the right error type, even though I get it's checking if a token "matches". Perhaps FailPrecondition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used NotFound because it'd what the workflow implementation uses (see IsActivityTaskNotFoundForToken)
| return err | ||
| } | ||
| if token.Attempt != attempt.GetCount() { | ||
| return serviceerror.NewNotFound("activity task not found") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used NotFound because it'd what the workflow implementation uses (see IsActivityTaskNotFoundForToken)
| if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil { | ||
| return nil, err | ||
| } | ||
| a.LastHeartbeat = chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bergundy is there any performance pentalty if we create a new field on every hearbeat?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, no penalty.
chasm/lib/activity/activity.go
Outdated
| a.LastHeartbeat = chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{ | ||
| RecordedTime: timestamppb.New(ctx.Now(a)), | ||
| Details: details, | ||
| Details: input.Request.HeartbeatRequest.GetDetails(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Details: input.Request.HeartbeatRequest.GetDetails(), | |
| Details: input.Request.GetHeartbeatRequest().GetDetails(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, taken
chasm/lib/activity/activity_tasks.go
Outdated
| func (e *heartbeatTimeoutTaskExecutor) Execute( | ||
| ctx chasm.MutableContext, | ||
| activity *Activity, | ||
| taskAttrs chasm.TaskAttributes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: taskAttrs, task are unused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, marked as _ unused parameters
chasm/lib/activity/activity_tasks.go
Outdated
| ctx.AddTask( | ||
| activity, | ||
| chasm.TaskAttributes{ | ||
| ScheduledTime: hbDeadline, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If hbDeadline happens to be attemptStartTime + hbTimeout (i.e., no hearbeat recorded yet), won't the next timer basically pop the same time as the current one being executed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That should not happen because of this condition
if ctx.Now(activity).Before(hbDeadline)the heartbeat task will not execute before time hbDeadline since the first task is scheduled for hbTimeout.
81b2516 to
c8e5800
Compare
938ff87 to
4ba7671
Compare
|
|
||
| // Set if activity cancelation was requested. | ||
| ActivityCancelState cancel_state = 11; | ||
| ActivityCancelState cancel_state = 12; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Proto field numbers changed breaking wire format compatibility
The new field last_heartbeat_task_scheduled_time was inserted at field number 7, causing all subsequent fields to be renumbered: retry_policy moved from 7→8, status from 8→9, schedule_time from 9→10, priority from 10→11, and cancel_state from 11→12. This breaks protobuf wire format backward compatibility. If any ActivityState messages were previously serialized and stored, deserializing them with the new schema will cause data corruption (e.g., the old retry_policy data would be interpreted as last_heartbeat_task_scheduled_time). New fields should be added with the next available field number (12) without renumbering existing fields.
Additional Locations (1)
| if err := TransitionStarted.Apply(a, ctx, nil); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The movement here was required to fix a bug that I encountered when implementing the other proposed design. We needed attempt.StartedTime to be available in TransitionStarted so that we could use it in scheduling the first heartbeat task, as well as the base for the start-to-close timeout. Before this PR we were using an ad-hoc ctx.Now() that was close to but not equal to StartedTime.
Reverting it doesn't cause a test failure with the hreartbeating design in this PR, but it's better to set the field state before calling the transition function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverting this would now cause a test failure since the PR is back to design A.
| activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, | ||
| func(a *Activity, ctx chasm.MutableContext, _ any) error { | ||
| attempt := a.LastAttempt.Get(ctx) | ||
| startTime := attempt.GetStartedTime().AsTime() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where we use StartedTime that I referred to above; it wasn't possible before because we were calling the transition function before setting the attempt field state.
Reverting this doesn't cause a test failure in the current PR. But it did with the alternative design, and in any case it's better to use StartedTime rather than an arbitrary timestamp that's close to it in time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverting this would now cause a test failure since the PR is back to design A.
This reverts commit 534685045cdb01abd7d6191126c25204cc6493ec.
This reverts commit 3796405.

What changed?
Implement heartbeating for CHASM activities (standalone activity)
Why?
Required feature
How did you test it?
Note
Implements heartbeating for standalone activities, adding heartbeat timer tasks, token validation, API wiring, and comprehensive tests for timeout/retry behavior.
RecordHeartbeatwith task-token validation, recording details, and schedulingHeartbeatTimeoutTask; heartbeat response includesCancelRequested.WithToken[T]wrapper and require it forHandleCompleted/Failed/Canceled; validate via newValidateActivityTaskToken.TransitionStarted; compute timers from attempt start time; rename helper togetOrCreateLastHeartbeat.TransitionTimedOut.heartbeatTimeoutTaskExecutorwith precise validation (attempt match, latest hb) and execution (retry or heartbeat timeout).library.Tasks()and register viafx.RecordActivityTaskHeartbeat,RespondActivityTask{Completed,Failed,Canceled}to CHASM component when token hasComponentRef, passingWithToken.proto/v1/tasks.protowithHeartbeatTimeoutTask(+ generated helpers).Written by Cursor Bugbot for commit 988cc7f. This will update automatically on new commits. Configure here.