Skip to content

Commit 6ffe4da

Browse files
committed
Test heartbeating with stale token (non-current attempt)
1 parent 32270ef commit 6ffe4da

File tree

1 file changed

+70
-0
lines changed

1 file changed

+70
-0
lines changed

tests/standalone_activity_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1490,6 +1490,76 @@ func (s *standaloneActivityTestSuite) TestHeartbeat() {
14901490
require.Contains(t, statusErr.Message(), "activity task not found")
14911491
})
14921492

1493+
t.Run("StaleAttemptToken", func(t *testing.T) {
1494+
// Start an activity with retries, fail first attempt, then try to heartbeat with old token.
1495+
// Use NextRetryDelay=1s to ensure the retry dispatch happens within test timeout.
1496+
activityID := testcore.RandomizeStr(t.Name())
1497+
taskQueue := testcore.RandomizeStr(t.Name())
1498+
1499+
_, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{
1500+
Namespace: s.Namespace().String(),
1501+
ActivityId: activityID,
1502+
ActivityType: s.tv.ActivityType(),
1503+
Options: &activitypb.ActivityOptions{
1504+
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
1505+
ScheduleToCloseTimeout: durationpb.New(1 * time.Minute),
1506+
RetryPolicy: &commonpb.RetryPolicy{
1507+
MaximumAttempts: 3,
1508+
},
1509+
},
1510+
})
1511+
require.NoError(t, err)
1512+
1513+
// Poll and get task token for attempt 1
1514+
attempt1Resp, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{
1515+
Namespace: s.Namespace().String(),
1516+
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
1517+
})
1518+
require.NoError(t, err)
1519+
require.EqualValues(t, 1, attempt1Resp.Attempt)
1520+
1521+
// Fail the task with NextRetryDelay to control retry timing
1522+
_, err = s.FrontendClient().RespondActivityTaskFailed(ctx, &workflowservice.RespondActivityTaskFailedRequest{
1523+
Namespace: s.Namespace().String(),
1524+
TaskToken: attempt1Resp.TaskToken,
1525+
Failure: &failurepb.Failure{
1526+
Message: "retryable failure",
1527+
FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
1528+
NonRetryable: false,
1529+
NextRetryDelay: durationpb.New(1 * time.Second),
1530+
}},
1531+
},
1532+
})
1533+
require.NoError(t, err)
1534+
1535+
// Poll to get attempt 2 (ensures retry has happened)
1536+
attempt2Resp, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{
1537+
Namespace: s.Namespace().String(),
1538+
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
1539+
})
1540+
require.NoError(t, err)
1541+
require.EqualValues(t, 2, attempt2Resp.Attempt)
1542+
1543+
// Heartbeat with the attempt 2 token
1544+
_, err = s.FrontendClient().RecordActivityTaskHeartbeat(ctx, &workflowservice.RecordActivityTaskHeartbeatRequest{
1545+
Namespace: s.Namespace().String(),
1546+
TaskToken: attempt2Resp.TaskToken,
1547+
Details: heartbeatDetails,
1548+
})
1549+
require.NoError(t, err)
1550+
1551+
// Try to heartbeat with the old attempt 1 token - should fail with NotFound
1552+
_, err = s.FrontendClient().RecordActivityTaskHeartbeat(ctx, &workflowservice.RecordActivityTaskHeartbeatRequest{
1553+
Namespace: s.Namespace().String(),
1554+
TaskToken: attempt1Resp.TaskToken,
1555+
Details: heartbeatDetails,
1556+
})
1557+
require.Error(t, err)
1558+
statusErr := serviceerror.ToStatus(err)
1559+
require.Equal(t, codes.NotFound, statusErr.Code())
1560+
require.Contains(t, statusErr.Message(), "activity task not found")
1561+
})
1562+
14931563
t.Run("ResponseIncludesCancelRequested", func(t *testing.T) {
14941564
// Start activity, worker accepts task, request cancellation, worker heartbeats.
14951565
// Verify: heartbeat response has cancel_requested=true.

0 commit comments

Comments
 (0)