Skip to content

Conversation

@dandavison
Copy link
Contributor

@dandavison dandavison commented Dec 6, 2025

This PR contains all changes needed to make tests compile and pass after merging main into standalone-activity.

The PR targets a branch named standalone-activity-with-main-merged which is the merge commit. That branch will become the new standalone-activity.

The changes from the merge commit itself (ce0afe4) are in the following diff:

git show ce0afe4 --cc
commit ce0afe449bb488eda3116d9d4237c7e821aab61b (origin/standalone-activity-with-main-merged, z-standalone-activity-3, standalone-activity-with-main-merged)
Merge: 799f4968d 0679c8a43
Author: Dan Davison <[email protected]>
Date:   22 hours ago

    Merge main into standalone-activity

diff --cc chasm/context.go
index c18b34f57,6f13f6aff..14f6e08e7
--- a/chasm/context.go
+++ b/chasm/context.go
@@@ -4,36 -4,35 +4,36 @@@ import 
  	"context"
  	"time"
  )
  
  type Context interface {
  	// Context is not bound to any component,
  	// so all methods needs to take in component as a parameter
  
  	// NOTE: component created in the current transaction won't have a ref
  	// this is a Ref to the component state at the start of the transition
  	Ref(Component) ([]byte, error)
  	// Now returns the current time in the context of the given component.
  	// In a context of a transaction, this time must be used to allow for framework support of pause and time skipping.
  	Now(Component) time.Time
  	// ExecutionKey returns the execution key for the execution the context is operating on.
- 	ExecutionKey() EntityKey
+ 	ExecutionKey() ExecutionKey
  
  	// Intent() OperationIntent
  	// ComponentOptions(Component) []ComponentOption
  
 +	structuredRef(Component) (ComponentRef, error)
  	getContext() context.Context
  }
  
  type MutableContext interface {
  	Context
  
  	// AddTask adds a task to be emitted as part of the current transaction.
  	// The task is associated with the given component and will be invoked via the registered executor for the given task
  	// referencing the component.
  	AddTask(Component, TaskAttributes, any)
  
  	// Add more methods here for other storage commands/primitives.
  	// e.g. HistoryEvent
  
  	// Get a Ref for the component
@@@ -71,62 -70,58 +71,62 @@@ func NewContext
  	node *Node,
  ) Context {
  	return newContext(ctx, node)
  }
  
  // newContext creates a new immutableCtx from an existing Context and root Node.
  // This is similar to NewContext, but returns *immutableCtx instead of Context interface.
  func newContext(
  	ctx context.Context,
  	node *Node,
  ) *immutableCtx {
  	workflowKey := node.backend.GetWorkflowKey()
  	return &immutableCtx{
  		ctx:  ctx,
  		root: node.root(),
- 		executionKey: EntityKey{
+ 		executionKey: ExecutionKey{
  			NamespaceID: workflowKey.NamespaceID,
  			BusinessID:  workflowKey.WorkflowID,
- 			EntityID:    workflowKey.RunID,
+ 			RunID:       workflowKey.RunID,
  		},
  	}
  }
  
  func (c *immutableCtx) Ref(component Component) ([]byte, error) {
  	return c.root.Ref(component)
  }
  
 +func (c *immutableCtx) structuredRef(component Component) (ComponentRef, error) {
 +	return c.root.structuredRef(component)
 +}
 +
  func (c *immutableCtx) Now(component Component) time.Time {
  	return c.root.Now(component)
  }
  
- func (c *immutableCtx) ExecutionKey() EntityKey {
+ func (c *immutableCtx) ExecutionKey() ExecutionKey {
  	return c.executionKey
  }
  
  func (c *immutableCtx) getContext() context.Context {
  	return c.ctx
  }
  
  // NewMutableContext creates a new MutableContext from an existing Context and root Node.
  //
  // NOTE: Library authors should not invoke this constructor directly, and instead use the [UpdateComponent],
- // [UpdateWithNewEntity], or [NewEntity] APIs.
+ // [UpdateWithNewExecution], or [NewExecution] APIs.
  func NewMutableContext(
  	ctx context.Context,
  	root *Node,
  ) MutableContext {
  	return &mutableCtx{
  		immutableCtx: newContext(ctx, root),
  	}
  }
  
  func (c *mutableCtx) AddTask(
  	component Component,
  	attributes TaskAttributes,
  	payload any,
  ) {
  	c.root.AddTask(component, attributes, payload)
diff --cc chasm/engine.go
index 152f0a3bf,7711967c5..35f5e8024
--- a/chasm/engine.go
+++ b/chasm/engine.go
@@@ -30,89 -30,89 +30,91 @@@ type Engine interface 
  		context.Context,
  		ComponentRef,
  		func(MutableContext, Component) error,
  		...TransitionOption,
  	) ([]byte, error)
  	ReadComponent(
  		context.Context,
  		ComponentRef,
  		func(Context, Component) error,
  		...TransitionOption,
  	) error
  
  	PollComponent(
  		context.Context,
  		ComponentRef,
 -		func(Context, Component) (any, bool, error),
 -		func(MutableContext, Component, any) error,
 +		func(Context, Component) (bool, error),
  		...TransitionOption,
  	) ([]byte, error)
 +
 +	// NotifyExecution notifies any PollComponent callers waiting on the execution.
 +	NotifyExecution(EntityKey)
  }
  
  type BusinessIDReusePolicy int
  
  const (
  	BusinessIDReusePolicyAllowDuplicate BusinessIDReusePolicy = iota
  	BusinessIDReusePolicyAllowDuplicateFailedOnly
  	BusinessIDReusePolicyRejectDuplicate
  )
  
  type BusinessIDConflictPolicy int
  
  const (
  	BusinessIDConflictPolicyFail BusinessIDConflictPolicy = iota
- 	BusinessIDConflictPolicyTermiateExisting
- 	// TODO: Do we want to support UseExisting conflict policy?
- 	// BusinessIDConflictPolicyUseExisting
+ 	BusinessIDConflictPolicyTerminateExisting
+ 	BusinessIDConflictPolicyUseExisting
  )
  
  type TransitionOptions struct {
  	ReusePolicy    BusinessIDReusePolicy
  	ConflictPolicy BusinessIDConflictPolicy
  	RequestID      string
  	Speculative    bool
  }
  
  type TransitionOption func(*TransitionOptions)
  
  // (only) this transition will not be persisted
  // The next non-speculative transition will persist this transition as well.
- // Compared to the EntityEphemeral() operation on RegistrableComponent,
+ // Compared to the ExecutionEphemeral() operation on RegistrableComponent,
  // the scope of this operation is limited to a certain transition,
- // while the EntityEphemeral() applies to all transitions.
+ // while the ExecutionEphemeral() applies to all transitions.
  // TODO: we need to figure out a way to run the tasks
  // generated in a speculative transition
  func WithSpeculative() TransitionOption {
  	return func(opts *TransitionOptions) {
  		opts.Speculative = true
  	}
  }
  
- // this only applies to NewEntity and UpdateWithNewEntity
+ // WithBusinessIDPolicy sets the businessID reuse and conflict policy
+ // used in the transition when creating a new execution.
+ // This option only applies to NewExecution() and UpdateWithNewExecution().
  func WithBusinessIDPolicy(
  	reusePolicy BusinessIDReusePolicy,
  	conflictPolicy BusinessIDConflictPolicy,
  ) TransitionOption {
  	return func(opts *TransitionOptions) {
  		opts.ReusePolicy = reusePolicy
  		opts.ConflictPolicy = conflictPolicy
  	}
  }
  
- // this only applies to NewEntity and UpdateWithNewEntity
+ // WithRequestID sets the requestID used when creating a new execution.
+ // This option only applies to NewExecution() and UpdateWithNewExecution().
  func WithRequestID(
  	requestID string,
  ) TransitionOption {
  	return func(opts *TransitionOptions) {
  		opts.RequestID = requestID
  	}
  }
  
  // Not needed for V1
  // func WithEagerLoading(
  // 	paths []ComponentPath,
  // ) OperationOption {
  // 	panic("not implemented")
  // }
  
@@@ -156,134 -156,123 +158,134 @@@ func UpdateWithNewExecution[C Component
  		NewComponentRef[C](key),
  		func(ctx MutableContext) (Component, error) {
  			var c C
  			var err error
  			c, output1, err = newFn(ctx, input)
  			return c, err
  		},
  		func(ctx MutableContext, c Component) error {
  			var err error
  			output2, err = updateFn(c.(C), ctx, input)
  			return err
  		},
  		opts...,
  	)
  	if err != nil {
- 		return output1, output2, EntityKey{}, nil, err
+ 		return output1, output2, ExecutionKey{}, nil, err
  	}
- 	return output1, output2, entityKey, serializedRef, err
+ 	return output1, output2, executionKey, serializedRef, err
  }
  
  // TODO:
  //   - consider merge with ReadComponent
  //   - consider remove ComponentRef from the return value and allow components to get
  //     the ref in the transition function. There are some caveats there, check the
  //     comment of the NewRef method in MutableContext.
 +//
 +// UpdateComponent applies updateFn to the component identified by the supplied component reference.
 +// It returns the result, along with the new component reference. opts are currently ignored.
- func UpdateComponent[C Component, R []byte | ComponentRef, I any, O any](
+ func UpdateComponent[C any, R []byte | ComponentRef, I any, O any](
  	ctx context.Context,
  	r R,
  	updateFn func(C, MutableContext, I) (O, error),
  	input I,
  	opts ...TransitionOption,
  ) (O, []byte, error) {
  	var output O
  
  	ref, err := convertComponentRef(r)
  	if err != nil {
  		return output, nil, err
  	}
  
  	newSerializedRef, err := engineFromContext(ctx).UpdateComponent(
  		ctx,
  		ref,
  		func(ctx MutableContext, c Component) error {
  			var err error
  			output, err = updateFn(c.(C), ctx, input)
  			return err
  		},
  		opts...,
  	)
  
  	if err != nil {
  		return output, nil, err
  	}
  	return output, newSerializedRef, err
  }
  
 +// ReadComponent returns the result of evaluating readFn against the component identified by the
 +// component reference. opts are currently ignored.
- func ReadComponent[C Component, R []byte | ComponentRef, I any, O any](
+ func ReadComponent[C any, R []byte | ComponentRef, I any, O any](
  	ctx context.Context,
  	r R,
  	readFn func(C, Context, I) (O, error),
  	input I,
  	opts ...TransitionOption,
  ) (O, error) {
  	var output O
  
  	ref, err := convertComponentRef(r)
  	if err != nil {
  		return output, err
  	}
  
  	err = engineFromContext(ctx).ReadComponent(
  		ctx,
  		ref,
  		func(ctx Context, c Component) error {
  			var err error
  			output, err = readFn(c.(C), ctx, input)
  			return err
  		},
  		opts...,
  	)
  	return output, err
  }
  
 -func PollComponent[C any, R []byte | ComponentRef, I any, O any, T any](
 +// PollComponent waits until the predicate is true when evaluated against the component identified
 +// by the supplied component reference. If this times out due to a server-imposed long-poll timeout
 +// then it returns (nil, nil, nil), as an indication that the caller should continue long-polling.
 +// Otherwise it returns (output, ref, err), where output is the output of the predicate function,
 +// and ref is a component reference identifying the state at which the predicate was satisfied. The
 +// predicate must be monotonic: if it returns true at execution state transition s then it must
 +// return true at all transitions t > s. If the predicate is true at the outset then PollComponent
 +// returns immediately. opts are currently ignored.
- func PollComponent[C Component, R []byte | ComponentRef, I any, O any](
++func PollComponent[C any, R []byte | ComponentRef, I any, O any](
  	ctx context.Context,
  	r R,
 -	predicateFn func(C, Context, I) (T, bool, error),
 -	operationFn func(C, MutableContext, I, T) (O, error),
 +	monotonicPredicate func(C, Context, I) (O, bool, error),
  	input I,
  	opts ...TransitionOption,
  ) (O, []byte, error) {
  	var output O
  
  	ref, err := convertComponentRef(r)
  	if err != nil {
  		return output, nil, err
  	}
  
  	newSerializedRef, err := engineFromContext(ctx).PollComponent(
  		ctx,
  		ref,
 -		func(ctx Context, c Component) (any, bool, error) {
 -			return predicateFn(c.(C), ctx, input)
 -		},
 -		func(ctx MutableContext, c Component, t any) error {
 -			var err error
 -			output, err = operationFn(c.(C), ctx, input, t.(T))
 -			return err
 +		func(ctx Context, c Component) (bool, error) {
 +			out, satisfied, err := monotonicPredicate(c.(C), ctx, input)
 +			if satisfied {
 +				output = out
 +			}
 +			return satisfied, err
  		},
  		opts...,
  	)
  	if err != nil {
  		return output, nil, err
  	}
  	return output, newSerializedRef, err
  }
  
  func convertComponentRef[R []byte | ComponentRef](
  	r R,
  ) (ComponentRef, error) {
  	if refToken, ok := any(r).([]byte); ok {
  		return DeserializeComponentRef(refToken)
  	}
diff --cc chasm/engine_mock.go
index c8cf2f2b2,cafdd48ef..cb0f90f8d
--- a/chasm/engine_mock.go
+++ b/chasm/engine_mock.go
@@@ -28,80 -28,68 +28,80 @@@ type MockEngineMockRecorder struct 
  	mock *MockEngine
  }
  
  // NewMockEngine creates a new mock instance.
  func NewMockEngine(ctrl *gomock.Controller) *MockEngine {
  	mock := &MockEngine{ctrl: ctrl}
  	mock.recorder = &MockEngineMockRecorder{mock}
  	return mock
  }
  
  // EXPECT returns an object that allows the caller to indicate expected use.
  func (m *MockEngine) EXPECT() *MockEngineMockRecorder {
  	return m.recorder
  }
  
- // NewEntity mocks base method.
- func (m *MockEngine) NewEntity(arg0 context.Context, arg1 ComponentRef, arg2 func(MutableContext) (Component, error), arg3 ...TransitionOption) (EntityKey, []byte, error) {
+ // NewExecution mocks base method.
+ func (m *MockEngine) NewExecution(arg0 context.Context, arg1 ComponentRef, arg2 func(MutableContext) (Component, error), arg3 ...TransitionOption) (ExecutionKey, []byte, error) {
  	m.ctrl.T.Helper()
  	varargs := []any{arg0, arg1, arg2}
  	for _, a := range arg3 {
  		varargs = append(varargs, a)
  	}
- 	ret := m.ctrl.Call(m, "NewEntity", varargs...)
- 	ret0, _ := ret[0].(EntityKey)
+ 	ret := m.ctrl.Call(m, "NewExecution", varargs...)
+ 	ret0, _ := ret[0].(ExecutionKey)
  	ret1, _ := ret[1].([]byte)
  	ret2, _ := ret[2].(error)
  	return ret0, ret1, ret2
  }
  
- // NewEntity indicates an expected call of NewEntity.
- func (mr *MockEngineMockRecorder) NewEntity(arg0, arg1, arg2 any, arg3 ...any) *gomock.Call {
+ // NewExecution indicates an expected call of NewExecution.
+ func (mr *MockEngineMockRecorder) NewExecution(arg0, arg1, arg2 any, arg3 ...any) *gomock.Call {
  	mr.mock.ctrl.T.Helper()
  	varargs := append([]any{arg0, arg1, arg2}, arg3...)
- 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewEntity", reflect.TypeOf((*MockEngine)(nil).NewEntity), varargs...)
+ 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewExecution", reflect.TypeOf((*MockEngine)(nil).NewExecution), varargs...)
  }
  
 +// NotifyExecution mocks base method.
 +func (m *MockEngine) NotifyExecution(arg0 EntityKey) {
 +	m.ctrl.T.Helper()
 +	m.ctrl.Call(m, "NotifyExecution", arg0)
 +}
 +
 +// NotifyExecution indicates an expected call of NotifyExecution.
 +func (mr *MockEngineMockRecorder) NotifyExecution(arg0 any) *gomock.Call {
 +	mr.mock.ctrl.T.Helper()
 +	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyExecution", reflect.TypeOf((*MockEngine)(nil).NotifyExecution), arg0)
 +}
 +
  // PollComponent mocks base method.
 -func (m *MockEngine) PollComponent(arg0 context.Context, arg1 ComponentRef, arg2 func(Context, Component) (any, bool, error), arg3 func(MutableContext, Component, any) error, arg4 ...TransitionOption) ([]byte, error) {
 +func (m *MockEngine) PollComponent(arg0 context.Context, arg1 ComponentRef, arg2 func(Context, Component) (bool, error), arg3 ...TransitionOption) ([]byte, error) {
  	m.ctrl.T.Helper()
 -	varargs := []any{arg0, arg1, arg2, arg3}
 -	for _, a := range arg4 {
 +	varargs := []any{arg0, arg1, arg2}
 +	for _, a := range arg3 {
  		varargs = append(varargs, a)
  	}
  	ret := m.ctrl.Call(m, "PollComponent", varargs...)
  	ret0, _ := ret[0].([]byte)
  	ret1, _ := ret[1].(error)
  	return ret0, ret1
  }
  
  // PollComponent indicates an expected call of PollComponent.
 -func (mr *MockEngineMockRecorder) PollComponent(arg0, arg1, arg2, arg3 any, arg4 ...any) *gomock.Call {
 +func (mr *MockEngineMockRecorder) PollComponent(arg0, arg1, arg2 any, arg3 ...any) *gomock.Call {
  	mr.mock.ctrl.T.Helper()
 -	varargs := append([]any{arg0, arg1, arg2, arg3}, arg4...)
 +	varargs := append([]any{arg0, arg1, arg2}, arg3...)
  	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PollComponent", reflect.TypeOf((*MockEngine)(nil).PollComponent), varargs...)
  }
  
  // ReadComponent mocks base method.
  func (m *MockEngine) ReadComponent(arg0 context.Context, arg1 ComponentRef, arg2 func(Context, Component) error, arg3 ...TransitionOption) error {
  	m.ctrl.T.Helper()
  	varargs := []any{arg0, arg1, arg2}
  	for _, a := range arg3 {
  		varargs = append(varargs, a)
  	}
  	ret := m.ctrl.Call(m, "ReadComponent", varargs...)
  	ret0, _ := ret[0].(error)
  	return ret0
  }
  
diff --cc chasm/lib/callback/component.go
index f0da60c4c,9c6aa4fac..a10734e19
--- a/chasm/lib/callback/component.go
+++ b/chasm/lib/callback/component.go
@@@ -53,81 -62,104 +62,104 @@@ func (c *Callback) LifecycleState(_ cha
  func (c *Callback) StateMachineState() callbackspb.CallbackStatus {
  	return c.Status
  }
  
  func (c *Callback) SetStateMachineState(status callbackspb.CallbackStatus) {
  	c.Status = status
  }
  
  func (c *Callback) recordAttempt(ts time.Time) {
  	c.Attempt++
  	c.LastAttemptCompleteTime = timestamppb.New(ts)
  }
  
  //nolint:revive // context.Context is an input parameter for chasm.ReadComponent, not a function parameter
  func (c *Callback) loadInvocationArgs(
- 	chasmCtx chasm.Context,
- 	ctx context.Context,
+ 	ctx chasm.Context,
+ 	_ chasm.NoValue,
  ) (callbackInvokable, error) {
- 	target, err := c.CompletionSource.Get(chasmCtx)
- 	if err != nil {
- 		return nil, err
- 	}
+ 	target := c.CompletionSource.Get(ctx)
  
- 	// TODO (seankane,yichao): we should be able to use the chasm context here.
  	completion, err := target.GetNexusCompletion(ctx, c.RequestId)
  	if err != nil {
  		return nil, err
  	}
  
- 	switch variant := c.GetCallback().GetVariant().(type) {
- 	case *callbackspb.Callback_Nexus_:
- 		if variant.Nexus.Url == chasm.NexusCompletionHandlerURL {
- 			return chasmInvocation{
- 				nexus:      variant.Nexus,
- 				attempt:    c.Attempt,
- 				completion: completion,
- 				requestID:  c.RequestId,
- 			}, nil
- 		}
- 		return nexusInvocation{
- 			nexus:      variant.Nexus,
- 			completion: completion,
- 			// workflowID: c.WorkflowId,
- 			workflowID: chasmCtx.ExecutionKey().BusinessID,
- 			runID:      chasmCtx.ExecutionKey().EntityID,
- 			attempt:    c.Attempt,
- 		}, nil
- 	default:
- 		return nil, queues.NewUnprocessableTaskError(
+ 	variant := c.GetCallback().GetNexus()
+ 	if variant == nil {
+ 		return nil, queueserrors.NewUnprocessableTaskError(
  			fmt.Sprintf("unprocessable callback variant: %v", variant),
  		)
  	}
+ 
+ 	if variant.Url == chasm.NexusCompletionHandlerURL {
+ 		return chasmInvocation{
+ 			nexus:      variant,
+ 			attempt:    c.Attempt,
+ 			completion: completion,
+ 			requestID:  c.RequestId,
+ 		}, nil
+ 	}
+ 	return nexusInvocation{
+ 		nexus:      variant,
+ 		completion: completion,
+ 		workflowID: ctx.ExecutionKey().BusinessID,
+ 		runID:      ctx.ExecutionKey().RunID,
+ 		attempt:    c.Attempt,
+ 	}, nil
+ }
+ 
+ type saveResultInput struct {
+ 	result      invocationResult
+ 	retryPolicy backoff.RetryPolicy
  }
  
  func (c *Callback) saveResult(
  	ctx chasm.MutableContext,
- 	result invocationResult,
+ 	input saveResultInput,
  ) (chasm.NoValue, error) {
- 	switch r := result.(type) {
+ 	switch r := input.result.(type) {
  	case invocationResultOK:
 -		err := TransitionSucceeded.Apply(ctx, c, EventSucceeded{Time: ctx.Now(c)})
 +		err := TransitionSucceeded.Apply(c, ctx, EventSucceeded{Time: ctx.Now(c)})
  		return nil, err
  	case invocationResultRetry:
 -		err := TransitionAttemptFailed.Apply(ctx, c, EventAttemptFailed{
 +		err := TransitionAttemptFailed.Apply(c, ctx, EventAttemptFailed{
  			Time:        ctx.Now(c),
  			Err:         r.err,
- 			RetryPolicy: r.retryPolicy,
+ 			RetryPolicy: input.retryPolicy,
  		})
  		return nil, err
  	case invocationResultFail:
 -		err := TransitionFailed.Apply(ctx, c, EventFailed{
 +		err := TransitionFailed.Apply(c, ctx, EventFailed{
  			Time: ctx.Now(c),
  			Err:  r.err,
  		})
  		return nil, err
  	default:
- 		return nil, queues.NewUnprocessableTaskError(
- 			fmt.Sprintf("unrecognized callback result %v", result),
+ 		return nil, queueserrors.NewUnprocessableTaskError(
+ 			fmt.Sprintf("unrecognized callback result %v", input.result),
  		)
  	}
  }
+ 
+ // ToAPICallback converts a CHASM callback to API callback proto.
+ func (c *Callback) ToAPICallback() (*commonpb.Callback, error) {
+ 	// Convert CHASM callback proto to API callback proto
+ 	chasmCB := c.GetCallback()
+ 	res := &commonpb.Callback{
+ 		Links: chasmCB.GetLinks(),
+ 	}
+ 
+ 	// CHASM currently only supports Nexus callbacks
+ 	if variant, ok := chasmCB.Variant.(*callbackspb.Callback_Nexus_); ok {
+ 		res.Variant = &commonpb.Callback_Nexus_{
+ 			Nexus: &commonpb.Callback_Nexus{
+ 				Url:    variant.Nexus.GetUrl(),
+ 				Header: variant.Nexus.GetHeader(),
+ 			},
+ 		}
+ 		return res, nil
+ 	}
+ 
+ 	// This should not happen as CHASM only supports Nexus callbacks currently
+ 	return nil, serviceerror.NewInternal("unsupported CHASM callback type")
+ }
diff --cc chasm/lib/callback/executors.go
index 0b25d9d62,dac4042b9..9ab9e199e
--- a/chasm/lib/callback/executors.go
+++ b/chasm/lib/callback/executors.go
@@@ -94,84 -103,91 +103,91 @@@ func (r invocationResultRetry) error() 
  
  type callbackInvokable interface {
  	// Invoke executes the callback logic and returns the invocation result.
  	Invoke(ctx context.Context, ns *namespace.Namespace, e InvocationTaskExecutor, task *callbackspb.InvocationTask, taskAttr chasm.TaskAttributes) invocationResult
  	// WrapError provides each variant the opportunity to wrap the error returned by the task executor for, e.g. to
  	// trigger the circuit breaker.
  	WrapError(result invocationResult, err error) error
  }
  
  func (e InvocationTaskExecutor) Invoke(
  	ctx context.Context,
  	ref chasm.ComponentRef,
  	taskAttr chasm.TaskAttributes,
  	task *callbackspb.InvocationTask,
  ) error {
- 	ns, err := e.NamespaceRegistry.GetNamespaceByID(namespace.ID(ref.NamespaceID))
+ 	ns, err := e.namespaceRegistry.GetNamespaceByID(namespace.ID(ref.NamespaceID))
  	if err != nil {
  		return fmt.Errorf("failed to get namespace by ID: %w", err)
  	}
  
  	invokable, err := chasm.ReadComponent(
  		ctx,
  		ref,
  		(*Callback).loadInvocationArgs,
- 		ctx,
+ 		nil,
  	)
  	if err != nil {
  		return err
  	}
  
  	callCtx, cancel := context.WithTimeout(
  		ctx,
- 		e.Config.RequestTimeout(ns.Name().String(), taskAttr.Destination),
+ 		e.config.RequestTimeout(ns.Name().String(), taskAttr.Destination),
  	)
  	defer cancel()
  
  	result := invokable.Invoke(callCtx, ns, e, task, taskAttr)
  	_, _, saveErr := chasm.UpdateComponent(
  		ctx,
  		ref,
  		(*Callback).saveResult,
- 		result,
+ 		saveResultInput{
+ 			result:      result,
+ 			retryPolicy: e.config.RetryPolicy(),
+ 		},
  	)
  	return invokable.WrapError(result, saveErr)
  }
  
  type BackoffTaskExecutor struct {
- 	BackoffTaskExecutorOptions
+ 	config         *Config
+ 	metricsHandler metrics.Handler
+ 	logger         log.Logger
  }
  
  type BackoffTaskExecutorOptions struct {
  	fx.In
  
  	Config         *Config
  	MetricsHandler metrics.Handler
  	Logger         log.Logger
  }
  
  func NewBackoffTaskExecutor(opts BackoffTaskExecutorOptions) *BackoffTaskExecutor {
  	return &BackoffTaskExecutor{
- 		BackoffTaskExecutorOptions: opts,
+ 		config:         opts.Config,
+ 		metricsHandler: opts.MetricsHandler,
+ 		logger:         opts.Logger,
  	}
  }
  
  // Execute transitions the callback from BACKING_OFF to SCHEDULED state
  // and generates an InvocationTask for the next attempt.
  func (e *BackoffTaskExecutor) Execute(
  	ctx chasm.MutableContext,
  	callback *Callback,
  	taskAttrs chasm.TaskAttributes,
  	task *callbackspb.BackoffTask,
  ) error {
 -	return TransitionRescheduled.Apply(ctx, callback, EventRescheduled{})
 +	return TransitionRescheduled.Apply(callback, ctx, EventRescheduled{})
  }
  
  func (e *BackoffTaskExecutor) Validate(
  	ctx chasm.Context,
  	callback *Callback,
  	taskAttr chasm.TaskAttributes,
  	task *callbackspb.BackoffTask,
  ) (bool, error) {
  	// Validate that the callback is in BACKING_OFF state
  	return callback.Status == callbackspb.CALLBACK_STATUS_BACKING_OFF && callback.Attempt == task.Attempt, nil
  }
diff --cc chasm/lib/callback/statemachine_test.go
index d031e5fcf,fbb659508..dbefaf5d9
--- a/chasm/lib/callback/statemachine_test.go
+++ b/chasm/lib/callback/statemachine_test.go
@@@ -36,77 -36,77 +36,77 @@@ func TestValidTransitions(t *testing.T
  		RetryPolicy: backoff.NewExponentialRetryPolicy(time.Second),
  	})
  	require.NoError(t, err)
  
  	// Assert info object is updated
  	require.Equal(t, callbackspb.CALLBACK_STATUS_BACKING_OFF, callback.StateMachineState())
  	require.Equal(t, int32(1), callback.Attempt)
  	require.Equal(t, "test", callback.LastAttemptFailure.Message)
  	require.False(t, callback.LastAttemptFailure.GetApplicationFailureInfo().NonRetryable)
  	require.Equal(t, currentTime, callback.LastAttemptCompleteTime.AsTime())
  	dt := currentTime.Add(time.Second).Sub(callback.NextAttemptScheduleTime.AsTime())
  	require.Less(t, dt, time.Millisecond*200)
  
  	// Assert backoff task is generated
  	require.Len(t, mctx.Tasks, 1)
- 	require.IsType(t, &callbackspb.InvocationTask{}, mctx.Tasks[0].Payload)
+ 	require.IsType(t, &callbackspb.BackoffTask{}, mctx.Tasks[0].Payload)
  
  	// Rescheduled
  	mctx = &chasm.MockMutableContext{}
 -	err = TransitionRescheduled.Apply(mctx, callback, EventRescheduled{})
 +	err = TransitionRescheduled.Apply(callback, mctx, EventRescheduled{})
  	require.NoError(t, err)
  
  	// Assert info object is updated only where needed
  	require.Equal(t, callbackspb.CALLBACK_STATUS_SCHEDULED, callback.StateMachineState())
  	require.Equal(t, int32(1), callback.Attempt)
  	require.Equal(t, "test", callback.LastAttemptFailure.Message)
  	// Remains unmodified
  	require.Equal(t, currentTime, callback.LastAttemptCompleteTime.AsTime())
  	require.Nil(t, callback.NextAttemptScheduleTime)
  
  	// Assert callback task is generated
  	require.Len(t, mctx.Tasks, 1)
  	require.IsType(t, &callbackspb.InvocationTask{}, mctx.Tasks[0].Payload)
  
  	// Store the pre-succeeded state to test Failed later
  	dup := &Callback{
  		CallbackState: proto.Clone(callback.CallbackState).(*callbackspb.CallbackState),
  	}
  	dup.Status = callback.StateMachineState()
  
  	// Succeeded
  	currentTime = currentTime.Add(time.Second)
  	mctx = &chasm.MockMutableContext{}
 -	err = TransitionSucceeded.Apply(mctx, callback, EventSucceeded{Time: currentTime})
 +	err = TransitionSucceeded.Apply(callback, mctx, EventSucceeded{Time: currentTime})
  	require.NoError(t, err)
  
  	// Assert info object is updated only where needed
  	require.Equal(t, callbackspb.CALLBACK_STATUS_SUCCEEDED, callback.StateMachineState())
  	require.Equal(t, int32(2), callback.Attempt)
  	require.Nil(t, callback.LastAttemptFailure)
  	require.Equal(t, currentTime, callback.LastAttemptCompleteTime.AsTime())
  	require.Nil(t, callback.NextAttemptScheduleTime)
  
- 	// Assert task is generated (success transitions also add tasks in chasm)
- 	require.Len(t, mctx.Tasks, 1)
+ 	// Assert no task is generated on success transition
+ 	require.Empty(t, mctx.Tasks)
  
  	// Reset back to scheduled
  	callback = dup
  	// Increment the time to ensure it's updated in the transition
  	currentTime = currentTime.Add(time.Second)
  
  	// failed
  	mctx = &chasm.MockMutableContext{}
 -	err = TransitionFailed.Apply(mctx, callback, EventFailed{Time: currentTime, Err: errors.New("failed")})
 +	err = TransitionFailed.Apply(callback, mctx, EventFailed{Time: currentTime, Err: errors.New("failed")})
  	require.NoError(t, err)
  
  	// Assert info object is updated only where needed
  	require.Equal(t, callbackspb.CALLBACK_STATUS_FAILED, callback.StateMachineState())
  	require.Equal(t, int32(2), callback.Attempt)
  	require.Equal(t, "failed", callback.LastAttemptFailure.Message)
  	require.True(t, callback.LastAttemptFailure.GetApplicationFailureInfo().NonRetryable)
  	require.Equal(t, currentTime, callback.LastAttemptCompleteTime.AsTime())
  	require.Nil(t, callback.NextAttemptScheduleTime)
  
- 	// Assert task is generated (failed transitions also add tasks in chasm)
- 	require.Len(t, mctx.Tasks, 1)
+ 	// Assert task is not generated, failed is terminal
+ 	require.Empty(t, mctx.Tasks)
  }
diff --cc chasm/ref.go
index efdeea18d,d4a91466d..673180802
--- a/chasm/ref.go
+++ b/chasm/ref.go
@@@ -1,168 -1,154 +1,165 @@@
  package chasm
  
  import (
  	"reflect"
  
  	"go.temporal.io/api/serviceerror"
  	persistencespb "go.temporal.io/server/api/persistence/v1"
  )
  
 -var (
 -	defaultShardingFn = func(key ExecutionKey) string { return key.NamespaceID + "_" + key.BusinessID }
 -)
 +// ErrMalformedComponentRef is returned when component ref bytes cannot be deserialized.
 +var ErrMalformedComponentRef = serviceerror.NewInvalidArgument("malformed component ref")
 +
 +// ErrInvalidComponentRef is returned when component ref bytes deserialize to an invalid component ref.
 +var ErrInvalidComponentRef = serviceerror.NewInvalidArgument("invalid component ref")
 +
- var defaultShardingFn = func(key EntityKey) string { return key.NamespaceID + "_" + key.BusinessID }
++var defaultShardingFn = func(key ExecutionKey) string { return key.NamespaceID + "_" + key.BusinessID }
  
- // EntityKey uniquely identifies a CHASM execution in the system.
- // TODO: Rename to ExecutionKey.
- type EntityKey struct {
+ // ExecutionKey uniquely identifies a CHASM execution in the system.
+ type ExecutionKey struct {
  	NamespaceID string
- 	// TODO: Rename to EntityID.
- 	BusinessID string
- 	// TODO: Rename to RunID.
- 	EntityID string
+ 	BusinessID  string
+ 	RunID       string
  }
  
  type ComponentRef struct {
- 	EntityKey
- 
- 	// archetype is the fully qualified type name of the root component.
- 	// It is used to look up the component's registered sharding function,
- 	// which determines the shardID of the entity that contains the referenced component.
- 	// It is also used to validate if a given entity has the right archetype.
- 	// E.g. The EntityKey can be empty and the current run of the BusinessID may have a different archetype.
- 	archetype Archetype
- 	// entityGoType is used for determining the ComponetRef's archetype.
- 	// When CHASM deverloper needs to create a ComponentRef, they will only provide this information,
- 	// and leave the work of determining archetype to the CHASM engine.
- 	entityGoType reflect.Type
- 
- 	// entityLastUpdateVT is the consistency token for the entire entity.
- 	entityLastUpdateVT *persistencespb.VersionedTransition
+ 	ExecutionKey
+ 
+ 	// archetypeID is CHASM framework's internal ID for the type of the root component of the CHASM execution.
+ 	//
+ 	// It is used to find and validate the loaded execution has the right archetype, especially when runID
+ 	// is not specified in the ExecutionKey.
+ 	archetypeID ArchetypeID
+ 	// executionGoType is used for determining the ComponetRef's archetype.
+ 	// When CHASM deverloper needs to create a ComponentRef, they will only provide the component type,
+ 	// and leave the work of determining archetypeID to the CHASM framework.
+ 	executionGoType reflect.Type
+ 
+ 	// executionLastUpdateVT is the consistency token for the entire execution.
+ 	executionLastUpdateVT *persistencespb.VersionedTransition
  
  	// componentType is the fully qualified component type name.
  	// It is for performing partial loading more efficiently in future versions of CHASM.
  	//
  	// From the componentType, we can find the registered component struct definition,
  	// then use reflection to find sub-components and understand if those sub-components
  	// need to be loaded or not.
  	// We only need to do this for sub-components, path for parent/ancenstor components
  	// can be inferred from the current component path and they always needs to be loaded.
  	//
  	// componentType string
  
  	// componentPath and componentInitialVT are used to identify a component.
  	componentPath      []string
  	componentInitialVT *persistencespb.VersionedTransition
  
  	validationFn func(NodeBackend, Context, Component) error
  }
  
  // NewComponentRef creates a new ComponentRef with a registered root component go type.
  //
  // In V1, if you don't have a ref,
- // then you can only interact with the (top level) entity.
+ // then you can only interact with the (top level) execution.
  func NewComponentRef[C Component](
- 	entityKey EntityKey,
+ 	executionKey ExecutionKey,
  ) ComponentRef {
  	return ComponentRef{
- 		EntityKey:    entityKey,
- 		entityGoType: reflect.TypeFor[C](),
+ 		ExecutionKey:    executionKey,
+ 		executionGoType: reflect.TypeFor[C](),
  	}
  }
  
- func (r *ComponentRef) Archetype(
+ func (r *ComponentRef) ArchetypeID(
  	registry *Registry,
- ) (Archetype, error) {
- 	if r.archetype != "" {
- 		return r.archetype, nil
+ ) (ArchetypeID, error) {
+ 	if r.archetypeID != UnspecifiedArchetypeID {
+ 		return r.archetypeID, nil
  	}
  
- 	rc, ok := registry.componentOf(r.entityGoType)
+ 	rc, ok := registry.componentOf(r.executionGoType)
  	if !ok {
- 		return "", serviceerror.NewInternal("unknown chasm component type: " + r.entityGoType.String())
+ 		return 0, serviceerror.NewInternal("unknown chasm component type: " + r.executionGoType.String())
  	}
- 	r.archetype = Archetype(rc.fqType())
+ 	r.archetypeID = rc.componentID
  
- 	return r.archetype, nil
+ 	return r.archetypeID, nil
  }
  
  // ShardingKey returns the sharding key used for determining the shardID of the run
  // that contains the referenced component.
+ // TODO: remove this method and ShardingKey concept, we don't need this functionality.
  func (r *ComponentRef) ShardingKey(
  	registry *Registry,
  ) (string, error) {
  
- 	archetype, err := r.Archetype(registry)
+ 	archetypeID, err := r.ArchetypeID(registry)
  	if err != nil {
  		return "", err
  	}
  
- 	rc, ok := registry.component(archetype.String())
+ 	rc, ok := registry.ComponentByID(archetypeID)
  	if !ok {
- 		return "", serviceerror.NewInternal("unknown chasm component type: " + archetype.String())
+ 		return "", serviceerror.NewInternalf("unknown chasm component type id: %d", archetypeID)
  	}
  
- 	return rc.shardingFn(r.EntityKey), nil
+ 	return rc.shardingFn(r.ExecutionKey), nil
  }
  
  func (r *ComponentRef) Serialize(
  	registry *Registry,
  ) ([]byte, error) {
  	if r == nil {
  		return nil, nil
  	}
  
- 	archetype, err := r.Archetype(registry)
+ 	archetypeID, err := r.ArchetypeID(registry)
  	if err != nil {
  		return nil, err
  	}
  
  	pRef := persistencespb.ChasmComponentRef{
  		NamespaceId:                         r.NamespaceID,
  		BusinessId:                          r.BusinessID,
- 		EntityId:                            r.EntityID,
- 		Archetype:                           archetype.String(),
- 		EntityVersionedTransition:           r.entityLastUpdateVT,
+ 		RunId:                               r.RunID,
+ 		ArchetypeId:                         archetypeID,
+ 		ExecutionVersionedTransition:        r.executionLastUpdateVT,
  		ComponentPath:                       r.componentPath,
  		ComponentInitialVersionedTransition: r.componentInitialVT,
  	}
  	return pRef.Marshal()
  }
  
  // DeserializeComponentRef deserializes a byte slice into a ComponentRef.
- // Provides caller the access to information including EntityKey, Archetype, and ShardingKey.
+ // Provides caller the access to information including ExecutionKey, Archetype, and ShardingKey.
  func DeserializeComponentRef(data []byte) (ComponentRef, error) {
 +	if len(data) == 0 {
 +		return ComponentRef{}, ErrInvalidComponentRef
 +	}
  	var pRef persistencespb.ChasmComponentRef
  	if err := pRef.Unmarshal(data); err != nil {
 -		return ComponentRef{}, err
 +		return ComponentRef{}, ErrMalformedComponentRef
  	}
  
 -	return ProtoRefToComponentRef(&pRef), nil
 +	ref := ProtoRefToComponentRef(&pRef)
 +	if ref.BusinessID == "" || ref.NamespaceID == "" {
 +		return ComponentRef{}, ErrInvalidComponentRef
 +	}
 +	return ref, nil
  }
  
  // ProtoRefToComponentRef converts a persistence ChasmComponentRef reference to a
  // ComponentRef. This is useful for situations where the protobuf ComponentRef has
  // already been deserialized as part of an enclosing message.
  func ProtoRefToComponentRef(pRef *persistencespb.ChasmComponentRef) ComponentRef {
  	return ComponentRef{
- 		EntityKey: EntityKey{
+ 		ExecutionKey: ExecutionKey{
  			NamespaceID: pRef.NamespaceId,
  			BusinessID:  pRef.BusinessId,
- 			EntityID:    pRef.EntityId,
+ 			RunID:       pRef.RunId,
  		},
- 		archetype:          Archetype(pRef.Archetype),
- 		entityLastUpdateVT: pRef.EntityVersionedTransition,
- 		componentPath:      pRef.ComponentPath,
- 		componentInitialVT: pRef.ComponentInitialVersionedTransition,
+ 		archetypeID:           pRef.ArchetypeId,
+ 		executionLastUpdateVT: pRef.ExecutionVersionedTransition,
+ 		componentPath:         pRef.ComponentPath,
+ 		componentInitialVT:    pRef.ComponentInitialVersionedTransition,
  	}
  }
diff --cc chasm/ref_test.go
index dc268fe3b,98e252c6f..07f3e3af8
--- a/chasm/ref_test.go
+++ b/chasm/ref_test.go
@@@ -27,93 -28,85 +28,90 @@@ func TestComponentRefSuite(t *testing.T
  	suite.Run(t, new(componentRefSuite))
  }
  
  func (s *componentRefSuite) SetupTest() {
  	// Do this in SetupSubTest() as well, if we have sub tests in this suite.
  	s.Assertions = require.New(s.T())
  	s.ProtoAssertions = protorequire.New(s.T())
  
  	s.controller = gomock.NewController(s.T())
  
  	s.registry = NewRegistry(log.NewTestLogger())
  	err := s.registry.Register(newTestLibrary(s.controller))
  	s.NoError(err)
  }
  
- func (s *componentRefSuite) TestArchetype() {
- 	tv := testvars.New(s.T())
- 	entityKey := EntityKey{
- 		tv.NamespaceID().String(),
- 		tv.WorkflowID(),
- 		tv.RunID(),
+ func (s *componentRefSuite) TestArchetypeID() {
+ 	executionKey := ExecutionKey{
+ 		NamespaceID: primitives.NewUUID().String(),
+ 		BusinessID:  primitives.NewUUID().String(),
+ 		RunID:       primitives.NewUUID().String(),
  	}
- 	ref := NewComponentRef[*TestComponent](entityKey)
+ 	ref := NewComponentRef[*TestComponent](executionKey)
  
- 	archetype, err := ref.Archetype(s.registry)
+ 	archetypeID, err := ref.ArchetypeID(s.registry)
  	s.NoError(err)
  
- 	rc, ok := s.registry.ComponentOf(reflect.TypeFor[*TestComponent]())
+ 	rc, ok := s.registry.componentOf(reflect.TypeFor[*TestComponent]())
  	s.True(ok)
  
- 	s.Equal(rc.FqType(), archetype.String())
+ 	s.Equal(rc.componentID, archetypeID)
  }
  
  func (s *componentRefSuite) TestShardingKey() {
- 	tv := testvars.New(s.T())
- 	entityKey := EntityKey{
- 		tv.NamespaceID().String(),
- 		tv.WorkflowID(),
- 		tv.RunID(),
+ 	executionKey := ExecutionKey{
+ 		NamespaceID: primitives.NewUUID().String(),
+ 		BusinessID:  primitives.NewUUID().String(),
+ 		RunID:       primitives.NewUUID().String(),
  	}
- 	ref := NewComponentRef[*TestComponent](entityKey)
+ 	ref := NewComponentRef[*TestComponent](executionKey)
  
  	shardingKey, err := ref.ShardingKey(s.registry)
  	s.NoError(err)
  
- 	rc, ok := s.registry.ComponentOf(reflect.TypeFor[*TestComponent]())
+ 	rc, ok := s.registry.componentOf(reflect.TypeFor[*TestComponent]())
  	s.True(ok)
  
- 	s.Equal(rc.shardingFn(entityKey), shardingKey)
+ 	s.Equal(rc.shardingFn(executionKey), shardingKey)
  }
  
  func (s *componentRefSuite) TestSerializeDeserialize() {
 +	_, err := DeserializeComponentRef(nil)
 +	s.ErrorIs(err, ErrInvalidComponentRef)
 +	_, err = DeserializeComponentRef([]byte{})
 +	s.ErrorIs(err, ErrInvalidComponentRef)
 +
- 	tv := testvars.New(s.T())
- 	entityKey := EntityKey{
- 		tv.NamespaceID().String(),
- 		tv.WorkflowID(),
- 		tv.RunID(),
+ 	executionKey := ExecutionKey{
+ 		NamespaceID: primitives.NewUUID().String(),
+ 		BusinessID:  primitives.NewUUID().String(),
+ 		RunID:       primitives.NewUUID().String(),
  	}
  	ref := ComponentRef{
- 		EntityKey:    entityKey,
- 		entityGoType: reflect.TypeFor[*TestComponent](),
- 		entityLastUpdateVT: &persistencespb.VersionedTransition{
- 			NamespaceFailoverVersion: tv.Namespace().FailoverVersion(),
- 			TransitionCount:          tv.Any().Int64(),
+ 		ExecutionKey:    executionKey,
+ 		executionGoType: reflect.TypeFor[*TestComponent](),
+ 		executionLastUpdateVT: &persistencespb.VersionedTransition{
+ 			NamespaceFailoverVersion: rand.Int63(),
+ 			TransitionCount:          rand.Int63(),
  		},
- 		componentPath: []string{tv.Any().String(), tv.Any().String()},
+ 		componentPath: []string{primitives.NewUUID().String(), primitives.NewUUID().String()},
  		componentInitialVT: &persistencespb.VersionedTransition{
- 			NamespaceFailoverVersion: tv.Namespace().FailoverVersion(),
- 			TransitionCount:          tv.Any().Int64(),
+ 			NamespaceFailoverVersion: rand.Int63(),
+ 			TransitionCount:          rand.Int63(),
  		},
  	}
  
  	serializedRef, err := ref.Serialize(s.registry)
  	s.NoError(err)
  
  	deserializedRef, err := DeserializeComponentRef(serializedRef)
  	s.NoError(err)
  
- 	s.ProtoEqual(ref.entityLastUpdateVT, deserializedRef.entityLastUpdateVT)
+ 	s.ProtoEqual(ref.executionLastUpdateVT, deserializedRef.executionLastUpdateVT)
  	s.ProtoEqual(ref.componentInitialVT, deserializedRef.componentInitialVT)
  
  	rootRc, ok := s.registry.ComponentFor(&TestComponent{})
  	s.True(ok)
- 	s.Equal(rootRc.FqType(), deserializedRef.archetype.String())
+ 	s.Equal(rootRc.componentID, deserializedRef.archetypeID)
  
- 	s.Equal(ref.EntityKey, deserializedRef.EntityKey)
+ 	s.Equal(ref.ExecutionKey, deserializedRef.ExecutionKey)
  	s.Equal(ref.componentPath, deserializedRef.componentPath)
  }
diff --cc chasm/tree.go
index 61daa27b2,132d85ebb..a881ace1e
--- a/chasm/tree.go
+++ b/chasm/tree.go
@@@ -1178,64 -1210,54 +1210,64 @@@ func unmarshalProto
  			Data:         []byte{},
  		}
  	}
  
  	if err := serialization.Decode(dataBlob, value.Interface().(proto.Message)); err != nil {
  		return reflect.Value{}, err
  	}
  
  	return value, nil
  }
  
  // Ref implements the CHASM Context interface
  func (n *Node) Ref(
  	component Component,
  ) ([]byte, error) {
 +	ref, err := n.structuredRef(component)
 +	if err != nil {
 +		return nil, err
 +	}
 +	return ref.Serialize(n.registry)
 +}
 +
 +// structuredRef returns a ComponentRef for the node.
 +func (n *Node) structuredRef(
 +	component Component,
 +) (ComponentRef, error) {
  	// No need to update tree structure here. If a Component can only be found after
  	// syncSubComponents() is called, it means the component is created in the
  	// current transition and don't have a reference yet.
  
  	for path, node := range n.andAllChildren() {
  		if node.value == component {
  			workflowKey := node.backend.GetWorkflowKey()
 -			ref := ComponentRef{
 -				ExecutionKey: ExecutionKey{
 +			return ComponentRef{
 +				EntityKey: EntityKey{
  					NamespaceID: workflowKey.NamespaceID,
  					BusinessID:  workflowKey.WorkflowID,
- 					EntityID:    workflowKey.RunID,
+ 					RunID:       workflowKey.RunID,
  				},
- 				archetype: n.Archetype(),
+ 				archetypeID: n.ArchetypeID(),
  				// TODO: Consider using node's LastUpdateVersionedTransition for checking staleness here.
  				// Using VersionedTransition of the entire tree might be too strict.
 -				executionLastUpdateVT: transitionhistory.CopyVersionedTransition(node.backend.CurrentVersionedTransition()),
 -				componentPath:         path,
 -				componentInitialVT:    node.serializedNode.GetMetadata().GetInitialVersionedTransition(),
 -			}
 -			return ref.Serialize(n.registry)
 +				entityLastUpdateVT: transitionhistory.CopyVersionedTransition(node.backend.CurrentVersionedTransition()),
 +				componentPath:      path,
 +				componentInitialVT: node.serializedNode.GetMetadata().GetInitialVersionedTransition(),
 +			}, nil
  		}
  	}
 -	return nil, errComponentNotFound
 +	return ComponentRef{}, errComponentNotFound
  }
  
  // componentNodePath implements the CHASM Context interface
  func (n *Node) componentNodePath(
  	component Component,
  ) ([]string, error) {
  	// It's unnecessary to deserialize entire tree as calling this method means
  	// caller already have the deserialized value.
  	for path, node := range n.andAllChildren() {
  		if !node.isComponent() {
  			continue
  		}
  
  		if node.value == component {
  			return path, nil
diff --cc proto/internal/temporal/server/api/historyservice/v1/request_response.proto
index 2704142a4,af2d853b4..92d7c2472
--- a/proto/internal/temporal/server/api/historyservice/v1/request_response.proto
+++ b/proto/internal/temporal/server/api/historyservice/v1/request_response.proto
@@@ -81,30 -81,36 +81,36 @@@ message StartWorkflowExecutionRequest 
      // Deprecated. use `inherited_build_id`
      temporal.api.common.v1.WorkerVersionStamp source_version_stamp = 10;
      // The root execution info of the new workflow.
      // For top-level workflows (ie., without parent), this field must be nil.
      temporal.server.api.workflow.v1.RootExecutionInfo root_execution_info = 11;
      // inherited build ID from parent/previous execution
      // Deprecated. Use behavior, version, and task queue fields in `parent_execution_info`.
      string inherited_build_id = 12;
      // If set, takes precedence over the Versioning Behavior sent by the SDK on Workflow Task completion.
      // To unset the override after the workflow is running, use UpdateWorkflowExecutionOptions.
      temporal.api.workflow.v1.VersioningOverride versioning_override = 13;
      // If set, we verify the parent-child relationship before applying ID conflict policy WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING
      bool child_workflow_only = 14;
      // If present, the new workflow should start on this version with pinned base behavior.
      temporal.api.deployment.v1.WorkerDeploymentVersion inherited_pinned_version = 15;
+     // Passes deployment version and revision number from a parent/previous workflow with AutoUpgrade behavior
+     // to its child/continued-as-new workflow. The first workflow task of the child/CAN workflow is dispatched to
 -    // either this deployment version or the current version of the task queue, depending on which is the more recent version. 
++    // either this deployment version or the current version of the task queue, depending on which is the more recent version.
+     // After the first workflow task, the effective behavior of the workflow is determined by worker-sent values in
+     // subsequent workflow tasks.
+     temporal.api.deployment.v1.InheritedAutoUpgradeInfo inherited_auto_upgrade_info = 16;
  }
  
  message StartWorkflowExecutionResponse {
      string run_id = 1;
      temporal.server.api.clock.v1.VectorClock clock = 2;
      // Set if request_eager_execution is set on the start request
      temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse eager_workflow_task = 3;
      bool started = 4;
      temporal.api.enums.v1.WorkflowExecutionStatus status = 5;
      temporal.api.common.v1.Link link = 6;
  }
  
  message GetMutableStateRequest {
      option (routing).workflow_id = "execution.workflow_id";
  
@@@ -299,34 -308,33 +308,37 @@@ message RecordActivityTaskStartedReques
      reserved 4;
      // Unique id of each poll request. Used to ensure at most once delivery of tasks.
      string request_id = 5;
      temporal.api.workflowservice.v1.PollActivityTaskQueueRequest poll_request = 6;
      temporal.server.api.clock.v1.VectorClock clock = 7;
      temporal.server.api.taskqueue.v1.BuildIdRedirectInfo build_id_redirect_info = 8;
  
      // Stamp represents the internal “version” of the activity options and can/will be changed with Activity API.
      int32 stamp = 9;
      // The deployment passed by History when the task was scheduled.
      // Deprecated. use `version_directive.deployment`.
      temporal.api.deployment.v1.Deployment scheduled_deployment = 10;
      reserved 11;
      // Versioning directive that was sent by history when scheduling the task.
      temporal.server.api.taskqueue.v1.TaskVersionDirective version_directive = 12;
+     // Revision number that was sent by matching when the task was dispatched. Used to resolve eventual consistency issues
+     // that may arise due to stale routing configs in task queue partitions.
+     int64 task_dispatch_revision_number = 13;
 +    // Reference to the Chasm component for activity execution (if applicable). For standalone activities, all necessary
 +    // start information is carried within this component, obviating the need to use the fields that apply to embedded
 +    // activities with the exception of version_directive.
-     bytes component_ref = 13;
++    bytes component_ref = 14;
  }
  
  message RecordActivityTaskStartedResponse {
      temporal.api.history.v1.HistoryEvent scheduled_event = 1;
      google.protobuf.Timestamp started_time = 2;
      int32 attempt = 3;
      google.protobuf.Timestamp current_attempt_scheduled_time = 4;
      temporal.api.common.v1.Payloads heartbeat_details = 5;
      temporal.api.common.v1.WorkflowType workflow_type = 6;
      string workflow_namespace = 7;
      temporal.server.api.clock.v1.VectorClock clock = 8;
      int64 version = 9;
      temporal.api.common.v1.Priority priority = 10;
      temporal.api.common.v1.RetryPolicy retry_policy = 11;
      int64 start_version = 12;
diff --cc service/frontend/configs/quotas.go
index b0c231aa5,11f131173..4b806e110
--- a/service/frontend/configs/quotas.go
+++ b/service/frontend/configs/quotas.go
@@@ -114,33 -102,32 +114,35 @@@ var 
  		"/temporal.api.workflowservice.v1.WorkflowService/UpdateWorkflowExecutionOptions":        2,
  		"/temporal.api.workflowservice.v1.WorkflowService/SetCurrentDeployment":                  2, // [cleanup-wv-pre-release]
  		"/temporal.api.workflowservice.v1.WorkflowService/SetCurrentDeploymentVersion":           2, // [cleanup-wv-pre-release]
  		"/temporal.api.workflowservice.v1.WorkflowService/SetWorkerDeploymentCurrentVersion":     2,
  		"/temporal.api.workflowservice.v1.WorkflowService/SetWorkerDeploymentRampingVersion":     2,
  		"/temporal.api.workflowservice.v1.WorkflowService/SetWorkerDeploymentManager":            2,
  		"/temporal.api.workflowservice.v1.WorkflowService/DeleteWorkerDeployment":                2,
  		"/temporal.api.workflowservice.v1.WorkflowService/DeleteWorkerDeploymentVersion":         2,
  		"/temporal.api.workflowservice.v1.WorkflowService/UpdateWorkerDeploymentVersionMetadata": 2,
  		"/temporal.api.workflowservice.v1.WorkflowService/CreateWorkflowRule":                    2,
  		"/temporal.api.workflowservice.v1.WorkflowService/DescribeWorkflowRule":                  2,
  		"/temporal.api.workflowservice.v1.WorkflowService/DeleteWorkflowRule":                    2,
  		"/temporal.api.workflowservice.v1.WorkflowService/ListWorkflowRules":                     2,
  		"/temporal.api.workflowservice.v1.WorkflowService/TriggerWorkflowRule":                   2,
  		"/temporal.api.workflowservice.v1.WorkflowService/UpdateTaskQueueConfig":                 2,
 +		"/temporal.api.workflowservice.v1.WorkflowService/RequestCancelActivityExecution":        2,
 +		"/temporal.api.workflowservice.v1.WorkflowService/TerminateActivityExecution":            2,
 +		"/temporal.api.workflowservice.v1.WorkflowService/DeleteActivityExecution":               2,
+ 		"/temporal.api.workflowservice.v1.WorkflowService/PauseWorkflowExecution":                2,
+ 		"/temporal.api.workflowservice.v1.WorkflowService/UnpauseWorkflowExecution":              2,
  
  		// P3: Status Querying APIs
  		"/temporal.api.workflowservice.v1.WorkflowService/DescribeWorkflowExecution":       3,
  		"/temporal.api.workflowservice.v1.WorkflowService/DescribeTaskQueue":               3,
  		"/temporal.api.workflowservice.v1.WorkflowService/GetWorkerBuildIdCompatibility":   3,
  		"/temporal.api.workflowservice.v1.WorkflowService/GetWorkerVersioningRules":        3,
  		"/temporal.api.workflowservice.v1.WorkflowService/ListTaskQueuePartitions":         3,
  		"/temporal.api.workflowservice.v1.WorkflowService/QueryWorkflow":                   3,
  		"/temporal.api.workflowservice.v1.WorkflowService/DescribeSchedule":                3,
  		"/temporal.api.workflowservice.v1.WorkflowService/ListScheduleMatchingTimes":       3,
  		"/temporal.api.workflowservice.v1.WorkflowService/DescribeBatchOperation":          3,
  		"/temporal.api.workflowservice.v1.WorkflowService/DescribeDeployment":              3, // [cleanup-wv-pre-release]
  		"/temporal.api.workflowservice.v1.WorkflowService/GetCurrentDeployment":            3, // [cleanup-wv-pre-release]
  		"/temporal.api.workflowservice.v1.WorkflowService/DescribeWorkerDeploymentVersion": 3,
  		"/temporal.api.workflowservice.v1.WorkflowService/DescribeWorkerDeployment":        3,
diff --cc service/frontend/fx.go
index d51a4c6cc,fe9d75408..f0ff43fa1
--- a/service/frontend/fx.go
+++ b/service/frontend/fx.go
@@@ -1,25 -1,25 +1,26 @@@
  package frontend
  
  import (
  	"fmt"
  	"net"
  
  	"github.com/gorilla/mux"
  	"go.temporal.io/server/api/adminservice/v1"
  	"go.temporal.io/server/chasm"
 +	"go.temporal.io/server/chasm/lib/activity"
+ 	schedulerpb "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
  	"go.temporal.io/server/client"
  	"go.temporal.io/server/common"
  	"go.temporal.io/server/common/archiver"
  	"go.temporal.io/server/common/archiver/provider"
  	"go.temporal.io/server/common/authorization"
  	"go.temporal.io/server/common/clock"
  	"go.temporal.io/server/common/cluster"
  	"go.temporal.io/server/common/config"
  	"go.temporal.io/server/common/dynamicconfig"
  	"go.temporal.io/server/common/log"
  	"go.temporal.io/server/common/log/tag"
  	"go.temporal.io/server/common/membership"
  	"go.temporal.io/server/common/metrics"
  	"go.temporal.io/server/common/namespace"
  	"go.temporal.io/server/common/nexus"
@@@ -99,32 -99,32 +100,33 @@@ var Module = fx.Options
  	fx.Provide(NamespaceCheckerProvider),
  	fx.Provide(func(so GrpcServerOptions) *grpc.Server { return grpc.NewServer(so.Options...) }),
  	fx.Provide(HandlerProvider),
  	fx.Provide(AdminHandlerProvider),
  	fx.Provide(OperatorHandlerProvider),
  	fx.Provide(NewVersionChecker),
  	fx.Provide(ServiceResolverProvider),
  	fx.Invoke(RegisterNexusHTTPHandler),
  	fx.Invoke(RegisterOpenAPIHTTPHandler),
  	fx.Provide(HTTPAPIServerProvider),
  	fx.Provide(NewServiceProvider),
  	fx.Provide(NexusEndpointClientProvider),
  	fx.Provide(NexusEndpointRegistryProvider),
  	fx.Invoke(ServiceLifetimeHooks),
  	fx.Invoke(EndpointRegistryLifetimeHooks),
+ 	fx.Provide(schedulerpb.NewSchedulerServiceLayeredClient),
  	nexusfrontend.Module,
 +	activity.FrontendModule,
  )
  
  func NewServiceProvider(
  	serviceConfig *Config,
  	server *grpc.Server,
  	healthServer *health.Server,
  	httpAPIServer *HTTPAPIServer,
  	handler Handler,
  	adminHandler *AdminHandler,
  	operatorHandler *OperatorHandlerImpl,
  	versionChecker *VersionChecker,
  	visibilityMgr manager.VisibilityManager,
  	logger log.SnTaggedLogger,
  	grpcListener net.Listener,
  	metricsHandler metrics.Handler,
@@@ -729,74 -734,72 +736,76 @@@ func HandlerProvider
  	dcRedirectionPolicy config.DCRedirectionPolicy,
  	serviceConfig *Config,
  	versionChecker *VersionChecker,
  	namespaceReplicationQueue FEReplicatorNamespaceReplicationQueue,
  	visibilityMgr manager.VisibilityManager,
  	logger log.SnTaggedLogger,
  	throttledLogger log.ThrottledLogger,
  	persistenceExecutionManager persistence.ExecutionManager,
  	clusterMetadataManager persistence.ClusterMetadataManager,
  	persistenceMetadataManager persistence.MetadataManager,
  	clientBean client.Bean,
  	historyClient resource.HistoryClient,
  	matchingClient resource.MatchingClient,
  	deploymentStoreClient deployment.DeploymentStoreClient,
  	workerDeploymentStoreClient workerdeployment.Client,
+ 	schedulerClient schedulerpb.SchedulerServiceClient,
  	archiverProvider provider.ArchiverProvider,
  	metricsHandler metrics.Handler,
  	payloadSerializer serialization.Serializer,
  	timeSource clock.TimeSource,
  	namespaceRegistry namespace.Registry,
  	saMapperProvider searchattribute.MapperProvider,
  	saProvider searchattribute.Provider,
  	clusterMetadata cluster.Metadata,
  	archivalMetadata archiver.ArchivalMetadata,
  	healthServer *health.Server,
  	membershipMonitor membership.Monitor,
  	healthInterceptor *interceptor.HealthInterceptor,
  	scheduleSpecBuilder *scheduler.SpecBuilder,
 +	activityHandler activity.FrontendHandler,
 +	registry *chasm.Registry,
  ) Handler {
  	wfHandler := NewWorkflowHandler(
  		serviceConfig,
  		namespaceReplicationQueue,
  		visibilityMgr,
  		logger,
  		throttledLogger,
  		persistenceExecutionManager.GetName(),
  		clusterMetadataManager,
  		persistenceMetadataManager,
  		historyClient,
  		matchingClient,
  		deploymentStoreClient,
  		workerDeploymentStoreClient,
+ 		schedulerClient,
  		archiverProvider,
  		payloadSerializer,
  		namespaceRegistry,
  		saMapperProvider,
  		saProvider,
  		clusterMetadata,
  		archivalMetadata,
  		healthServer,
  		timeSource,
  		membershipMonitor,
  		healthInterceptor,
  		scheduleSpecBuilder,
  		httpEnabled(cfg, serviceName),
 +		activityHandler,
 +		registry,
  	)
  	return wfHandler
  }
  
  func RegisterNexusHTTPHandler(
  	serviceConfig *Config,
  	serviceName primitives.ServiceName,
  	matchingClient resource.MatchingClient,
  	metricsHandler metrics.Handler,
  	clusterMetadata cluster.Metadata,
  	clientCache *cluster.FrontendHTTPClientCache,
  	namespaceRegistry namespace.Registry,
  	endpointRegistry nexus.EndpointRegistry,
  	authInterceptor *authorization.Interceptor,
  	telemetryInterceptor *interceptor.TelemetryInterceptor,
diff --cc service/frontend/workflow_handler.go
index a0e30731b,eaa27f65f..d35c8cdc6
--- a/service/frontend/workflow_handler.go
+++ b/service/frontend/workflow_handler.go
@@@ -20,32 -20,31 +20,33 @@@ import 
  	historypb "go.temporal.io/api/history/v1"
  	querypb "go.temporal.io/api/query/v1"
  	schedulepb "go.temporal.io/api/schedule/v1"
  	"go.temporal.io/api/serviceerror"
  	taskqueuepb "go.temporal.io/api/taskqueue/v1"
  	updatepb "go.temporal.io/api/update/v1"
  	workerpb "go.temporal.io/api/worker/v1"
  	workflowpb "go.temporal.io/api/workflow/v1"
  	"go.temporal.io/api/workflowservice/v1"
  	batchspb "go.temporal.io/server/api/batch/v1"
  	deploymentspb "go.temporal.io/server/api/deployment/v1"
  	"go.temporal.io/server/api/historyservice/v1"
  	"go.temporal.io/server/api/matchingservice/v1"
  	schedulespb "go.temporal.io/server/api/schedule/v1"
  	taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
 +	"go.temporal.io/server/chasm"
 +	"go.temporal.io/server/chasm/lib/activity"
+ 	schedulerpb "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
  	"go.temporal.io/server/client/frontend"
  	"go.temporal.io/server/common"
  	"go.temporal.io/server/common/archiver"
  	"go.temporal.io/server/common/archiver/provider"
  	"go.temporal.io/server/common/backoff"
  	"go.temporal.io/server/common/clock"
  	"go.temporal.io/server/common/cluster"
  	"go.temporal.io/server/common/collection"
  	"go.temporal.io/server/common/dynamicconfig"
  	"go.temporal.io/server/common/enums"
  	"go.temporal.io/server/common/failure"
  	"go.temporal.io/server/common/headers"
  	"go.temporal.io/server/common/log"
  	"go.temporal.io/server/common/log/tag"
  	"go.temporal.io/server/common/membership"
@@@ -120,80 -118,78 +122,82 @@@ type 
  		tokenSerializer                 *tasktoken.Serializer
  		config                          *Config
  		versionChecker                  headers.VersionChecker
  		namespaceHandler                *namespaceHandler
  		getDefaultWorkflowRetrySettings dynamicconfig.TypedPropertyFnWithNamespaceFilter[retrypolicy.DefaultRetrySettings]
  		visibilityMgr                   manager.VisibilityManager
  		logger                          log.Logger
  		throttledLogger                 log.Logger
  		persistenceExecutionName        string
  		clusterMetadataManager          persistence.ClusterMetadataManager
  		clusterMetadata                 cluster.Metadata
  		historyClient                   historyservice.HistoryServiceClient
  		matchingClient                  matchingservice.MatchingServiceClient
  		deploymentStoreClient           deployment.DeploymentStoreClient
  		workerDeploymentClient          workerdeployment.Client
+ 		schedulerClient                 schedulerpb.SchedulerServiceClient
  		archiverProvider                provider.ArchiverProvider
  		payloadSerializer               serialization.Serializer
  		namespaceRegistry               namespace.Registry
  		saMapperProvider                searchattribute.MapperProvider
  		saProvider                      searchattribute.Provider
  		saValidator                     *searchattribute.Validator
  		archivalMetadata                archiver.ArchivalMetadata
  		healthServer                    *health.Server
  		overrides                       *Overrides
  		membershipMonitor               membership.Monitor
  		healthInterceptor               *interceptor.HealthInterceptor
  		scheduleSpecBuilder             *scheduler.SpecBuilder
  		outstandingPollers              collection.SyncMap[string, collection.SyncMap[string, context.CancelFunc]]
  		httpEnabled                     bool
 +		registry                        *chasm.Registry
  	}
  )
  
  // NewWorkflowHandler creates a gRPC handler for workflowservice
  func NewWorkflowHandler(
  	config *Config,
  	namespaceReplicationQueue persistence.NamespaceReplicationQueue,
  	visibilityMgr manager.VisibilityManager,
  	logger log.Logger,
  	throttledLogger log.Logger,
  	persistenceExecutionName string,
  	clusterMetadataManager persistence.ClusterMetadataManager,
  	persistenceMetadataManager persistence.MetadataManager,
  	historyClient historyservice.HistoryServiceClient,
  	matchingClient matchingservice.MatchingServiceClient,
  	deploymentStoreClient deployment.DeploymentStoreClient,
  	workerDeploymentClient workerdeployment.Client,
+ 	schedulerClient schedulerpb.SchedulerServiceClient,
  	archiverProvider provider.ArchiverProvider,
  	payloadSerializer serialization.Serializer,
  	namespaceRegistry namespace.Registry,
  	saMapperProvider searchattribute.MapperProvider,
  	saProvider searchattribute.Provider,
  	clusterMetadata cluster.Metadata,
  	archivalMetadata archiver.ArchivalMetadata,
  	healthServer *health.Server,
  	timeSource clock.TimeSource,
  	membershipMonitor membership.Monitor,
  	healthInterceptor *interceptor.HealthInterceptor,
  	scheduleSpecBuilder *scheduler.SpecBuilder,
  	httpEnabled bool,
 +	activityHandler activity.FrontendHandler,
 +	registry *chasm.Registry,
  ) *WorkflowHandler {
  	handler := &WorkflowHandler{
 +		FrontendHandler: activityHandler,
  		status:          common.DaemonStatusInitialized,
  		config:          config,
  		tokenSerializer: tasktoken.NewSerializer(),
  		versionChecker:  headers.NewDefaultVersionChecker(),
  		namespaceHandler: newNamespaceHandler(
  			logger,
  			persistenceMetadataManager,
  			clusterMetadata,
  			nsreplication.NewReplicator(namespaceReplicationQueue, logger),
  			archivalMetadata,
  			archiverProvider,
  			timeSource,
  			config,
  		),
  		getDefaultWorkflowRetrySettings: config.DefaultWorkflowRetryPolicy,
@@@ -6141,50 -6391,76 +6443,110 @@@ func (wh *WorkflowHandler) DescribeWork
  
  	resp, err := wh.matchingClient.DescribeWorker(ctx, &matchingservice.DescribeWorkerRequest{
  		NamespaceId: namespaceID.String(),
  		Request:     request,
  	})
  
  	if err != nil {
  		return nil, err
  	}
  
  	return &workflowservice.DescribeWorkerResponse{
  		WorkerInfo: resp.GetWorkerInfo(),
  	}, nil
  }
  
 +func (wh *WorkflowHandler) TriggerWorkflowRule(context.Context, *workflowservice.TriggerWorkflowRuleRequest) (*workflowservice.TriggerWorkflowRuleResponse, error) {
 +	return nil, serviceerror.NewUnimplemented("method TriggerWorkflowRule not supported")
 +}
 +
 +// PauseActivityExecution pauses a standalone activity execution
 +func (wh *WorkflowHandler) PauseActivityExecution(
 +	ctx context.Context,
 +	request *workflowservice.PauseActivityExecutionRequest,
 +) (*workflowservice.PauseActivityExecutionResponse, error) {
 +	return nil, serviceerror.NewUnimplemented("temporary stub during Standalone Activity prototyping")
 +}
 +
 +// UnpauseActivityExecution unpauses a standalone activity execution
 +func (wh *WorkflowHandler) UnpauseActivityExecution(
 +	ctx context.Context,
 +	request *workflowservice.UnpauseActivityExecutionRequest,
 +) (*workflowservice.UnpauseActivityExecutionResponse, error) {
 +	return nil, serviceerror.NewUnimplemented("temporary stub during Standalone Activity prototyping")
 +}
 +
 +// ResetActivityExecution resets a standalone activity execution
 +func (wh *WorkflowHandler) ResetActivityExecution(
 +	ctx context.Context,
 +	request *workflowservice.ResetActivityExecutionRequest,
 +) (*workflowservice.ResetActivityExecutionResponse, error) {
 +	return nil, serviceerror.NewUnimplemented("temporary stub during Standalone Activity prototyping")
 +}
 +
 +// UpdateActivityExecutionOptions updates options for a standalone activity execution
 +func (wh *WorkflowHandler) UpdateActivityExecutionOptions(
 +	ctx context.Context,
 +	request *workflowservice.UpdateActivityExecutionOptionsRequest,
 +) (*workflowservice.UpdateActivityExecutionOptionsResponse, error) {
 +	return nil, serviceerror.NewUnimplemented("temporary stub during Standalone Activity prototyping")
+ // PauseWorkflowExecution pauses a workflow execution.
+ func (wh *WorkflowHandler) PauseWorkflowExecution(ctx context.Context, request *workflowservice.PauseWorkflowExecutionRequest) (_ *workflowservice.PauseWorkflowExecutionResponse, retError error) {
+ 	defer log.CapturePanic(wh.logger, &retError)
+ 
+ 	if request == nil {
+ 		return nil, errRequestNotSet
+ 	}
+ 
+ 	if !wh.config.WorkflowPauseEnabled(request.GetNamespace()) {
+ 		return nil, serviceerror.NewUnimplementedf("workflow pause is not enabled for namespace: %s", request.GetNamespace())
+ 	}
+ 
+ 	namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
+ 	if err != nil {
+ 		return nil, err
+ 	}
+ 
+ 	_, historyErr := wh.historyClient.PauseWorkflowExecution(ctx, &historyservice.PauseWorkflowExecutionRequest{
+ 		NamespaceId:  namespaceID.String(),
+ 		PauseRequest: request,
+ 	})
+ 	if historyErr != nil {
+ 		return nil, historyErr
+ 	}
+ 
+ 	return &workflowservice.PauseWorkflowExecutionResponse{}, nil
+ }
+ 
+ func (wh *WorkflowHandler) UnpauseWorkflowExecution(ctx context.Context, request *workflowservice.UnpauseWorkflowExecutionRequest) (_ *workflowservice.UnpauseWorkflowExecutionResponse, retError error) {
+ 	defer log.CapturePanic(wh.logger, &retError)
+ 
+ 	if request == nil {
+ 		return nil, errRequestNotSet
+ 	}
+ 
+ 	// verify size limits of reason, request id and identity.
+ 	if len(request.GetReason()) > wh.config.MaxIDLengthLimit() {
+ 		return nil, serviceerror.NewInvalidArgument("reason is too long.")
+ 	}
+ 	if len(request.GetRequestId()) > wh.config.MaxIDLengthLimit() {
+ 		return nil, serviceerror.NewInvalidArgument("request id is too long.")
+ 	}
+ 	if len(request.GetIdentity()) > wh.config.MaxIDLengthLimit() {
+ 		return nil, serviceerror.NewInvalidArgument("identity is too long.")
+ 	}
+ 
+ 	namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
+ 	if err != nil {
+ 		return nil, err
+ 	}
+ 
+ 	_, historyErr := wh.historyClient.UnpauseWorkflowExecution(ctx, &historyservice.UnpauseWorkflowExecutionRequest{
+ 		NamespaceId:    namespaceID.String(),
+ 		UnpauseRequest: request,
+ 	})
+ 	if historyErr != nil {
+ 		return nil, historyErr
+ 	}
+ 
+ 	return &workflowservice.UnpauseWorkflowExecutionResponse{}, nil
  }
diff --cc service/frontend/workflow_handler_test.go
index 7fe487fdf,061ecda3e..bbc7395de
--- a/service/frontend/workflow_handler_test.go
+++ b/service/frontend/workflow_handler_test.go
@@@ -165,45 -167,44 +167,46 @@@ func (s *WorkflowHandlerSuite) getWorkf
  	healthInterceptor := interceptor.NewHealthInterceptor()
  	healthInterceptor.SetHealthy(true)
  	return NewWorkflowHandler(
  		config,
  		s.mockProducer,
  		s.mockResource.GetVisibilityManager(),
  		s.mockResource.GetLogger(),
  		s.mockResource.GetThrottledLogger(),
  		s.mockResource.GetExecutionManager().GetName(),
  		s.mockResource.GetClusterMetadataManager(),
  		s.mockResource.GetMetadataManager(),
  		s.mockResource.GetHistoryClient(),
  		s.mockResource.GetMatchingClient(),
  		nil,
  		nil,
+ 		nil, // Not initializing the scheduler client here.
  		s.mockResource.GetArchiverProvider(),
  		s.mockResource.GetPayloadSerializer(),
  		s.mockResource.GetNamespaceRegistry(),
  		s.mockResource.GetSearchAttributesMapperProvider(),
  		s.mockResource.GetSearchAttributesProvider(),
  		s.mockResource.GetClusterMetadata(),
  		s.mockResource.GetArchivalMetadata(),
  		health.NewServer(),
  		clock.NewRealTimeSource(),
  		s.mockResource.GetMembershipMonitor(),
  		healthInterceptor,
  		scheduler.NewSpecBuilder(),
  		true,
 +		nil, // Not testing activity handler here
 +		nil,
  	)
  }
  
  func (s *WorkflowHandlerSuite) TestDisableListVisibilityByFilter() {
  	testNamespace := namespace.Name("test-namespace")
  	namespaceID := namespace.ID(uuid.NewString())
  	config := s.newConfig()
  	config.DisableListVisibilityByFilter = dc.GetBoolPropertyFnFilteredByNamespace(true)
  
  	wh := s.getWorkflowHandler(config)
  
  	s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(namespaceID, nil).AnyTimes()
  	s.mockVisibilityMgr.EXPECT().GetReadStoreName(testNamespace).Return("").AnyTimes()
  
  	// test list open by wid
diff --cc service/history/api/command_attr_validator.go
index 2c2dec208,73f26447e..b7fdd3f07
--- a/service/history/api/command_attr_validator.go
+++ b/service/history/api/command_attr_validator.go
@@@ -1,30 -1,28 +1,30 @@@
  package api
  
  import (
  	"fmt"
  	"strings"
  
- 	"github.com/pborman/uuid"
+ 	"github.com/google/uuid"
 +	activitypb "go.temporal.io/api/activity/v1"
  	commandpb "go.temporal.io/api/command/v1"
  	commonpb "go.temporal.io/api/common/v1"
  	enumspb "go.temporal.io/api/enums/v1"
  	"go.temporal.io/api/serviceerror"
  	taskqueuepb "go.temporal.io/api/taskqueue/v1"
  	persistencespb "go.temporal.io/server/api/persistence/v1"
 +	"go.temporal.io/server/chasm/lib/activity"
  	"go.temporal.io/server/common/backoff"
  	"go.temporal.io/server/common/dynamicconfig"
  	"go.temporal.io/server/common/namespace"
  	"go.temporal.io/server/common/primitives/timestamp"
  	"go.temporal.io/server/common/priorities"
  	"go.temporal.io/server/common/retrypolicy"
  	"go.temporal.io/server/common/searchattribute"
  	"go.temporal.io/server/common/tqid"
  	"go.temporal.io/server/service/history/configs"
  	"google.golang.org/protobuf/types/known/durationpb"
  )
  
  type (
  	CommandAttrValidator struct {
  		namespaceRegistry               namespace.Registry
diff --cc service/history/chasm_engine.go
index aa5adffec,ed4292681..9ee1aa248
--- a/service/history/chasm_engine.go
+++ b/service/history/chasm_engine.go
@@@ -16,169 -16,164 +16,176 @@@ import 
  	"go.temporal.io/server/common/namespace"
  	"go.temporal.io/server/common/persistence"
  	"go.temporal.io/server/common/primitives"
  	"go.temporal.io/server/service/history/api"
  	"go.temporal.io/server/service/history/configs"
  	"go.temporal.io/server/service/history/consts"
  	historyi "go.temporal.io/server/service/history/interfaces"
  	"go.temporal.io/server/service/history/shard"
  	"go.temporal.io/server/service/history/workflow"
  	"go.temporal.io/server/service/history/workflow/cache"
  	"go.uber.org/fx"
  )
  
  type (
  	ChasmEngine struct {
- 		entityCache     cache.Cache
+ 		executionCache  cache.Cache
  		shardController shard.Controller
  		registry        *chasm.Registry
  		config          *configs.Config
 +		notifier        *ChasmNotifier
  	}
  
- 	newEntityParams struct {
- 		entityRef     chasm.ComponentRef
- 		entityContext historyi.WorkflowContext
- 		mutableState  historyi.MutableState
- 		snapshot      *persistence.WorkflowSnapshot
- 		events        []*persistence.WorkflowEvents
+ 	newExecutionParams struct {
+ 		executionRef     chasm.ComponentRef
+ 		executionContext historyi.WorkflowContext
+ 		mutableState     historyi.MutableState
+ 		snapshot         *persistence.WorkflowSnapshot
+ 		events           []*persistence.WorkflowEvents
  	}
  
- 	currentRunInfo struct {
+ 	currentExecutionInfo struct {
  		createRequestID string
  		*persistence.CurrentWorkflowConditionFailedError
  	}
  )
  
  var defaultTransitionOptions = chasm.TransitionOptions{
  	ReusePolicy:    chasm.BusinessIDReusePolicyAllowDuplicate,
  	ConflictPolicy: chasm.BusinessIDConflictPolicyFail,
  	RequestID:      "",
  	Speculative:    false,
  }
  
  var ChasmEngineModule = fx.Options(
 +	fx.Provide(NewChasmNotifier),
  	fx.Provide(newChasmEngine),
  	fx.Provide(func(impl *ChasmEngine) chasm.Engine { return impl }),
  	fx.Invoke(func(impl *ChasmEngine, shardController shard.Controller) {
  		impl.SetShardController(shardController)
  	}),
  )
  
  func newChasmEngine(
- 	entityCache cache.Cache,
+ 	executionCache cache.Cache,
  	registry *chasm.Registry,
  	config *configs.Config,
 +	notifier *ChasmNotifier,
  ) *ChasmEngine {
  	return &ChasmEngine{
 -		executionCache: executionCache,
 -		registry:       registry,
 -		config:         config,
 +		entityCache: entityCache,
 +		registry:    registry,
 +		config:      config,
 +		notifier:    notifier,
  	}
  }
  
  // This is for breaking fx cycle dependency.
  // ChasmEngine -> ShardController -> ShardContextFactory -> HistoryEngineFactory -> QueueFactory -> ChasmEngine
  func (e *ChasmEngine) SetShardController(
  	shardController shard.Controller,
  ) {
  	e.shardController = shardController
  }
  
 +func (e *ChasmEngine) NotifyExecution(key chasm.EntityKey) {
 +	e.notifier.Notify(key)
 +}
 +
- func (e *ChasmEngine) NewEntity(
+ func (e *ChasmEngine) NewExecution(
  	ctx context.Context,
- 	entityRef chasm.ComponentRef,
+ 	executionRef chasm.ComponentRef,
  	newFn func(chasm.MutableContext) (chasm.Component, error),
  	opts ...chasm.TransitionOption,
- ) (entityKey chasm.EntityKey, newEntityRef []byte, retErr error) {
+ ) (executionKey chasm.ExecutionKey, newExecutionRef []byte, retErr error) {
  	options := e.constructTransitionOptions(opts...)
  
- 	shardContext, err := e.getShardContext(entityRef)
+ 	shardContext, err := e.getShardContext(executionRef)
  	if err != nil {
- 		return chasm.EntityKey{}, nil, err
+ 		return chasm.ExecutionKey{}, nil, err
  	}
  
- 	currentEntityReleaseFn, err := e.lockCurrentEntity(
+ 	archetypeID, err := executionRef.ArchetypeID(e.registry)
+ 	if err != nil {
+ 		return chasm.ExecutionKey{}, nil, err
+ 	}
+ 
+ 	currentExecutionReleaseFn, err := e.lockCurrentExecution(
  		ctx,
  		shardContext,
- 		namespace.ID(entityRef.NamespaceID),
- 		entityRef.BusinessID,
+ 		namespace.ID(executionRef.NamespaceID),
+ 		executionRef.BusinessID,
+ 		archetypeID,
  	)
  	if err != nil {
- 		return chasm.EntityKey{}, nil, err
+ 		return chasm.ExecutionKey{}, nil, err
  	}
  	defer func() {
- 		currentEntityReleaseFn(retErr)
+ 		currentExecutionReleaseFn(retErr)
  	}()
  
- 	newEntityParams, err := e.createNewEntity(
+ 	newExecutionParams, err := e.createNewExecution(
  		ctx,
  		shardContext,
- 		entityRef,
+ 		executionRef,
+ 		archetypeID,
  		newFn,
  		options,
  	)
  	if err != nil {
- 		return chasm.EntityKey{}, nil, err
+ 		return chasm.ExecutionKey{}, nil, err
  	}
  
  	currentRunInfo, hasCurrentRun, err := e.persistAsBrandNew(
  		ctx,
  		shardContext,
- 		newEntityParams,
+ 		newExecutionParams,
  	)
  	if err != nil {
- 		return chasm.EntityKey{}, nil, err
+ 		return chasm.ExecutionKey{}, nil, err
  	}
  	if !hasCurrentRun {
- 		serializedRef, err := newEntityParams.entityRef.Serialize(e.registry)
+ 		serializedRef, err := newExecutionParams.executionRef.Serialize(e.registry)
  		if err != nil {
- 			return chasm.EntityKey{}, nil, err
+ 			return chasm.ExecutionKey{}, nil, err
  		}
- 		return newEntityParams.entityRef.EntityKey, serializedRef, nil
+ 		return newExecutionParams.executionRef.ExecutionKey, serializedRef, nil
  	}
  
- 	return e.handleEntityConflict(
+ 	return e.handleExecutionConflict(
  		ctx,
  		shardContext,
- 		newEntityParams,
+ 		newExecutionParams,
  		currentRunInfo,
  		options,
  	)
  }
  
- func (e *ChasmEngine) UpdateWithNewEntity(
+ func (e *ChasmEngine) UpdateWithNewExecution(
  	ctx context.Context,
- 	entityRef chasm.ComponentRef,
+ 	executionRef chasm.ComponentRef,
  	newFn func(chasm.MutableContext) (chasm.Component, error),
  	updateFn func(chasm.MutableContext, chasm.Component) error,
  	opts ...chasm.TransitionOption,
- ) (newEntityKey chasm.EntityKey, newEntityRef []byte, retError error) {
- 	return chasm.EntityKey{}, nil, serviceerror.NewUnimplemented("UpdateWithNewEntity is not yet supported")
+ ) (newExecutionKey chasm.ExecutionKey, newExecutionRef []byte, retError error) {
+ 	return chasm.ExecutionKey{}, nil, serviceerror.NewUnimplemented("UpdateWithNewExecution is not yet supported")
  }
  
 +// UpdateComponent applies updateFn to the component identified by the supplied component reference,
 +// returning the new component reference corresponding to the transition. An error is returned if
 +// the state transition specified by the supplied component reference is inconsistent with execution
 +// transition history. opts are currently ignored.
  func (e *ChasmEngine) UpdateComponent(
  	ctx context.Context,
  	ref chasm.ComponentRef,
  	updateFn func(chasm.MutableContext, chasm.Component) error,
  	opts ...chasm.TransitionOption,
  ) (updatedRef []byte, retError error) {
  
  	shardContext, executionLease, err := e.getExecutionLease(ctx, ref)
  	if err != nil {
  		return nil, err
  	}
  	defer func() {
  		executionLease.GetReleaseFn()(retError)
  	}()
  
@@@ -655,78 -561,70 +681,78 @@@ func (e *ChasmEngine) handleReusePolicy
  func (e *ChasmEngine) getShardContext(
  	ref chasm.ComponentRef,
  ) (historyi.ShardContext, error) {
  	shardingKey, err := ref.ShardingKey(e.registry)
  	if err != nil {
  		return nil, err
  	}
  	shardID := common.ShardingKeyToShard(
  		shardingKey,
  		e.config.NumberOfShards,
  	)
  
  	return e.shardController.GetShardByID(shardID)
  }
  
 +// getExecutionLease returns shard context and mutable state for the execution identified by the
 +// supplied component reference, with the lock held. An error is returned if the state transition
 +// specified by the component reference is inconsistent with mutable state transition history. If
 +// the state transition specified by the component reference is consistent with mutable state being
 +// stale, then mutable state is reloaded from persistence before returning. It does not check that
 +// mutable state is non-stale after reload.
 +// TODO(dan): if mutable state is stale after reload, return an error (retryable iff the failover
 +// version is stale since that is expected under some multi-cluster scenarios).
  func (e *ChasmEngine) getExecutionLease(
  	ctx context.Context,
  	ref chasm.ComponentRef,
  ) (historyi.ShardContext, api.WorkflowLease, error) {
  	shardContext, err := e.getShardContext(ref)
  	if err != nil {
  		return nil, nil, err
  	}
  
  	consistencyChecker := api.NewWorkflowConsistencyChecker(
  		shardContext,
- 		e.entityCache,
+ 		e.executionCache,
  	)
  
  	lockPriority := locks.PriorityHigh
  	callerType := headers.GetCallerInfo(ctx).CallerType
  	if callerType == headers.CallerTypeBackgroundHigh || callerType == headers.CallerTypeBackgroundLow || callerType == headers.CallerTypePreemptable {
  		lockPriority = locks.PriorityLow
  	}
  
- 	archetype, err := ref.Archetype(e.registry)
+ 	archetypeID, err := ref.ArchetypeID(e.registry)
  	if err != nil {
  		return nil, nil, err
  	}
  
  	var staleReferenceErr error
- 	entityLease, err := consistencyChecker.GetChasmLeaseWithConsistencyCheck(
+ 	executionLease, err := consistencyChecker.GetChasmLeaseWithConsistencyCheck(
  		ctx,
  		nil,
  		func(mutableState historyi.MutableState) bool {
  			err := mutableState.ChasmTree().IsStale(ref)
  			if errors.Is(err, consts.ErrStaleState) {
  				return false
  			}
  
  			// Reference itself might be stale.
  			// No need to reload mutable state in this case, but request should be failed.
  			staleReferenceErr = err
  			return true
  		},
  		definition.NewWorkflowKey(
- 			ref.EntityKey.NamespaceID,
- 			ref.EntityKey.BusinessID,
- 			ref.EntityKey.EntityID,
+ 			ref.NamespaceID,
+ 			ref.BusinessID,
+ 			ref.RunID,
  		),
- 		archetype,
+ 		archetypeID,
  		lockPriority,
  	)
  	if err == nil && staleReferenceErr != nil {
- 		entityLease.GetReleaseFn()(nil)
+ 		executionLease.GetReleaseFn()(nil)
  		err = staleReferenceErr
  	}
  
- 	return shardContext, entityLease, err
+ 	return shardContext, executionLease, err
  }
diff --cc service/history/chasm_engine_test.go
index b1c782aa3,8f0e42a6e..f5278f526
--- a/service/history/chasm_engine_test.go
+++ b/service/history/chasm_engine_test.go
@@@ -95,143 -96,146 +96,147 @@@ func (s *chasmEngineSuite) SetupTest() 
  	s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes()
  	s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, tests.Version).Return(cluster.TestCurrentClusterName).AnyTimes()
  	s.mockNamespaceRegistry.EXPECT().GetNamespaceByID(s.namespaceEntry.ID()).Return(s.namespaceEntry, nil).AnyTimes()
  	s.mockNamespaceRegistry.EXPECT().GetNamespace(s.namespaceEntry.Name()).Return(s.namespaceEntry, nil).AnyTimes()
  
  	reg := hsm.NewRegistry()
  	err := workflow.RegisterStateMachine(reg)
  	s.NoError(err)
  	s.mockShard.SetStateMachineRegistry(reg)
  
  	s.registry = chasm.NewRegistry(s.mockShard.GetLogger())
  	err = s.registry.Register(&testChasmLibrary{})
  	s.NoError(err)
  	s.mockShard.SetChasmRegistry(s.registry)
  
+ 	var ok bool
+ 	s.archetypeID, ok = s.registry.ComponentIDFor(&testComponent{})
+ 	s.True(ok)
+ 
  	s.mockShard.SetEngineForTesting(s.mockEngine)
  	s.mockEngine.EXPECT().NotifyNewTasks(gomock.Any()).AnyTimes()
  	s.mockEngine.EXPECT().NotifyNewHistoryEvent(gomock.Any()).AnyTimes()
  
  	s.engine = newChasmEngine(
- 		s.entityCache,
+ 		s.executionCache,
  		s.registry,
  		s.config,
 +		NewChasmNotifier(),
  	)
  	s.engine.SetShardController(s.mockShardController)
  }
  
  func (s *chasmEngineSuite) SetupSubTest() {
  	s.initAssertions()
  }
  
  func (s *chasmEngineSuite) initAssertions() {
  	s.Assertions = require.New(s.T())
  	s.ProtoAssertions = protorequire.New(s.T())
  }
  
- func (s *chasmEngineSuite) TestNewEntity_BrandNew() {
+ func (s *chasmEngineSuite) TestNewExecution_BrandNew() {
  	tv := testvars.New(s.T())
  
  	ref := chasm.NewComponentRef[*testComponent](
- 		chasm.EntityKey{
+ 		chasm.ExecutionKey{
  			NamespaceID: string(tests.NamespaceID),
  			BusinessID:  tv.WorkflowID(),
- 			EntityID:    "",
+ 			RunID:       "",
  		},
  	)
  	newActivityID := tv.ActivityID()
  
  	var runID string
  	s.mockExecutionManager.EXPECT().CreateWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn(
  		func(
  			_ context.Context,
  			request *persistence.CreateWorkflowExecutionRequest,
  		) (*persistence.CreateWorkflowExecutionResponse, error) {
- 			s.validateCreateRequest(request, newActivityID, "", 0)
+ 			s.validateCreateRequest(request, s.archetypeID, newActivityID, "", 0)
  			runID = request.NewWorkflowSnapshot.ExecutionState.RunId
  			return tests.CreateWorkflowExecutionResponse, nil
  		},
  	).Times(1)
  
- 	entityKey, serializedRef, err := s.engine.NewEntity(
+ 	executionKey, serializedRef, err := s.engine.NewExecution(
  		context.Background(),
  		ref,
- 		s.newTestEntityFn(newActivityID),
+ 		s.newTestExecutionFn(newActivityID),
  		chasm.WithBusinessIDPolicy(
  			chasm.BusinessIDReusePolicyRejectDuplicate,
  			chasm.BusinessIDConflictPolicyFail,
  		),
  	)
  	s.NoError(err)
- 	expectedEntityKey := chasm.EntityKey{
+ 	expectedExecutionKey := chasm.ExecutionKey{
  		NamespaceID: string(tests.NamespaceID),
  		BusinessID:  tv.WorkflowID(),
- 		EntityID:    runID,
+ 		RunID:       runID,
  	}
- 	s.Equal(expectedEntityKey, entityKey)
- 	s.validateNewEntityResponseRef(serializedRef, expectedEntityKey)
+ 	s.Equal(expectedExecutionKey, executionKey)
+ 	s.validateNewExecutionResponseRef(serializedRef, expectedExecutionKey)
  }
  
- func (s *chasmEngineSuite) TestNewEntity_RequestIDDedup() {
+ func (s *chasmEngineSuite) TestNewExecution_RequestIDDedup() {
  	tv := testvars.New(s.T())
  	tv = tv.WithRunID(tv.Any().RunID())
  
  	ref := chasm.NewComponentRef[*testComponent](
- 		chasm.EntityKey{
+ 		chasm.ExecutionKey{
  			NamespaceID: string(tests.NamespaceID),
  			BusinessID:  tv.WorkflowID(),
- 			EntityID:    "",
+ 			RunID:       "",
  		},
  	)
  	newActivityID := tv.ActivityID()
  
  	s.mockExecutionManager.EXPECT().CreateWorkflowExecution(gomock.Any(), gomock.Any()).Return(
  		nil,
  		s.currentRunConditionFailedErr(
  			tv,
  			enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
  			enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
  		),
  	).Times(1)
  
- 	entityKey, serializedRef, err := s.engine.NewEntity(
+ 	executionKey, serializedRef, err := s.engine.NewExecution(
  		context.Background(),
  		ref,
- 		s.newTestEntityFn(newActivityID),
+ 		s.newTestExecutionFn(newActivityID),
  		chasm.WithRequestID(tv.RequestID()),
  	)
  	s.NoError(err)
  
- 	expectedEntityKey := chasm.EntityKey{
+ 	expectedExecutionKey := chasm.ExecutionKey{
  		NamespaceID: string(tests.NamespaceID),
  		BusinessID:  tv.WorkflowID(),
- 		EntityID:    tv.RunID(),
+ 		RunID:       tv.RunID(),
  	}
- 	s.Equal(expectedEntityKey, entityKey)
- 	s.validateNewEntityResponseRef(serializedRef, expectedEntityKey)
+ 	s.Equal(expectedExecutionKey, executionKey)
+ 	s.validateNewExecutionResponseRef(serializedRef, expectedExecutionKey)
  }
  
- func (s *chasmEngineSuite) TestNewEntity_ReusePolicy_AllowDuplicate() {
+ func (s *chasmEngineSuite) TestNewExecution_ReusePolicy_AllowDuplicate() {
  	tv := testvars.New(s.T())
  	tv = tv.WithRunID(tv.Any().RunID())
  
  	ref := chasm.NewComponentRef[*testComponent](
- 		chasm.EntityKey{
+ 		chasm.ExecutionKey{
  			NamespaceID: string(tests.NamespaceID),
  			BusinessID:  tv.WorkflowID(),
- 			EntityID:    "",
+ 			RunID:       "",
  		},
  	)
  	newActivityID := tv.ActivityID()
  	currentRunConditionFailedErr := s.currentRunConditionFailedErr(
  		tv,
  		enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED,
  		enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
  	)
  
  	var runID string
  	s.mockExecutionManager.EXPECT().CreateWorkflowExecution(gomock.Any(), gomock.Any()).Return(
  		nil,
  		currentRunConditionFailedErr,
  	).Times(1)
  	s.mockExecutionManager.EXPECT().CreateWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn(
@@@ -533,314 -621,124 +623,320 @@@ func (s *chasmEngineSuite) TestReadComp
  		context.Background(),
  		ref,
  		func(
  			ctx chasm.Context,
  			component chasm.Component,
  		) error {
  			tc, ok := component.(*testComponent)
  			s.True(ok)
  			s.Equal(expectedActivityID, tc.ActivityInfo.ActivityId)
  			return nil
  		},
  	)
  	s.NoError(err)
  }
  
 +// TestPollComponent_Success_NoWait tests the behavior of PollComponent when the predicate is
 +// satisfied at the outset.
 +func (s *chasmEngineSuite) TestPollComponent_Success_NoWait() {
 +	tv := testvars.New(s.T())
 +	tv = tv.WithRunID(tv.Any().RunID())
 +
 +	ref := chasm.NewComponentRef[*testComponent](
 +		chasm.EntityKey{
 +			NamespaceID: string(tests.NamespaceID),
 +			BusinessID:  tv.WorkflowID(),
 +			EntityID:    tv.RunID(),
 +		},
 +	)
 +	expectedActivityID := tv.ActivityID()
 +
 +	s.mockExecutionManager.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).
 +		Return(&persistence.GetWorkflowExecutionResponse{
 +			State: s.buildPersistenceMutableState(ref.EntityKey, &persistencespb.ActivityInfo{
 +				ActivityId: expectedActivityID,
 +			}),
 +		}, nil).Times(1)
 +
 +	newSerializedRef, err := s.engine.PollComponent(
 +		context.Background(),
 +		ref,
 +		func(ctx chasm.Context, component chasm.Component) (bool, error) {
 +			return true, nil
 +		},
 +	)
 +	s.NoError(err)
 +
 +	newRef, err := chasm.DeserializeComponentRef(newSerializedRef)
 +	s.NoError(err)
 +	s.Equal(ref.BusinessID, newRef.BusinessID)
 +}
 +
 +// TestPollComponent_Success_Wait tests the waiting behavior of PollComponent.
 +func (s *chasmEngineSuite) TestPollComponent_Success_Wait() {
 +	// The predicate is not satisfied at the outset, so the call blocks waiting for notifications.
 +	// UpdateComponent is used twice to update the execution in a way which does not satisfy the
 +	// predicate, and a final third time in a way that does satisfy the predicate, causing the
 +	// long-poll to return.
 +	const numUpdatesTotal = 3
 +	const updateAtWhichSatisfied = 2 // 0-indexed, so 3rd update
 +
 +	tv := testvars.New(s.T())
 +	tv = tv.WithRunID(tv.Any().RunID())
 +
 +	activityID := tv.ActivityID()
 +	ref := chasm.NewComponentRef[*testComponent](
 +		chasm.EntityKey{
 +			NamespaceID: string(tests.NamespaceID),
 +			BusinessID:  tv.WorkflowID(),
 +			EntityID:    tv.RunID(),
 +		},
 +	)
 +	s.mockExecutionManager.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).
 +		Return(&persistence.GetWorkflowExecutionResponse{
 +			State: s.buildPersistenceMutableState(ref.EntityKey, &persistencespb.ActivityInfo{}),
 +		}, nil).
 +		Times(1) // subsequent reads during UpdateComponent and PollComponent are from cache
 +	s.mockExecutionManager.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).
 +		Return(tests.UpdateWorkflowExecutionResponse, nil).
 +		Times(numUpdatesTotal)
 +	s.mockEngine.EXPECT().NotifyChasmExecution(ref.EntityKey, gomock.Any()).DoAndReturn(
 +		func(key chasm.EntityKey, ref []byte) {
 +			s.engine.notifier.Notify(key)
 +		},
 +	).Times(numUpdatesTotal)
 +
 +	pollErr := make(chan error)
 +	pollResult := make(chan []byte)
 +	pollComponent := func() {
 +		newSerializedRef, err := s.engine.PollComponent(
 +			context.Background(),
 +			ref,
 +			func(ctx chasm.Context, component chasm.Component) (bool, error) {
 +				tc, ok := component.(*testComponent)
 +				s.True(ok)
 +				satisfied := tc.ActivityInfo.ActivityId == activityID
 +				return satisfied, nil
 +			},
 +		)
 +		pollErr <- err
 +		pollResult <- newSerializedRef
 +	}
 +	updateComponent := func(satisfyPredicate bool) {
 +		_, err := s.engine.UpdateComponent(
 +			context.Background(),
 +			ref,
 +			func(ctx chasm.MutableContext, component chasm.Component) error {
 +				tc, ok := component.(*testComponent)
 +				s.True(ok)
 +				if satisfyPredicate {
 +					tc.ActivityInfo.ActivityId = activityID
 +				}
 +				return nil
 +			},
 +		)
 +		s.NoError(err)
 +	}
 +	assertEmptyChan := func(ch chan []byte) {
 +		select {
 +		case <-ch:
 +			s.FailNow("expected channel to be empty")
 +		default:
 +		}
 +	}
 +
 +	// Start a PollComponent call. It will not return until the third execution update.
 +	go pollComponent()
 +
 +	// Perform two execution updates that do not satisfy the predicate followed by one that does.
 +	for range 2 {
 +		updateComponent(false)
 +		time.Sleep(100 * time.Millisecond) //nolint:forbidigo
 +		assertEmptyChan(pollResult)
 +	}
 +	updateComponent(true)
 +	// The poll call has returned.
 +	s.NoError(<-pollErr)
 +	newSerializedRef := <-pollResult
 +	s.NotNil(newSerializedRef)
 +
 +	newRef, err := chasm.DeserializeComponentRef(newSerializedRef)
 +	s.NoError(err)
 +	s.Equal(tests.NamespaceID.String(), newRef.NamespaceID)
 +	s.Equal(tv.WorkflowID(), newRef.BusinessID)
 +	s.Equal(tv.RunID(), newRef.EntityID)
 +
 +	newActivityID := make(chan string, 1)
 +	err = s.engine.ReadComponent(
 +		context.Background(),
 +		newRef,
 +		func(
 +			ctx chasm.Context,
 +			component chasm.Component,
 +		) error {
 +			tc, ok := component.(*testComponent)
 +			s.True(ok)
 +			newActivityID <- tc.ActivityInfo.ActivityId
 +			return nil
 +		},
 +	)
 +	s.NoError(err)
 +	s.Equal(activityID, <-newActivityID)
 +}
 +
 +// TestPollComponent_StaleState tests that PollComponent returns a user-friendly Unavailable error
 +// when the submitted component reference is ahead of persisted state (e.g. due to namespace
 +// failover).
 +func (s *chasmEngineSuite) TestPollComponent_StaleState() {
 +	tv := testvars.New(s.T())
 +	tv = tv.WithRunID(tv.Any().RunID())
 +
 +	entityKey := chasm.EntityKey{
 +		NamespaceID: string(tests.NamespaceID),
 +		BusinessID:  tv.WorkflowID(),
 +		EntityID:    tv.RunID(),
 +	}
 +
 +	s.mockExecutionManager.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).
 +		Return(&persistence.GetWorkflowExecutionResponse{
 +			State: s.buildPersistenceMutableState(entityKey, &persistencespb.ActivityInfo{}),
 +		}, nil).AnyTimes()
 +
 +	pRef := &persistencespb.ChasmComponentRef{
 +		NamespaceId: entityKey.NamespaceID,
 +		BusinessId:  entityKey.BusinessID,
 +		EntityId:    entityKey.EntityID,
 +		Archetype:   "TestLibrary.test_component",
 +		EntityVersionedTransition: &persistencespb.VersionedTransition{
 +			NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion() + 1, // ahead of persisted state
 +			TransitionCount:          testTransitionCount,
 +		},
 +	}
 +	staleToken, err := pRef.Marshal()
 +	s.NoError(err)
 +	staleRef, err := chasm.DeserializeComponentRef(staleToken)
 +	s.NoError(err)
 +
 +	_, err = s.engine.PollComponent(
 +		context.Background(),
 +		staleRef,
 +		func(ctx chasm.Context, component chasm.Component) (bool, error) {
 +			s.Fail("predicate should not be called with stale ref")
 +			return false, nil
 +		},
 +	)
 +	s.Error(err)
 +	var unavailable *serviceerror.Unavailable
 +	s.ErrorAs(err, &unavailable)
 +	s.Equal("please retry", unavailable.Message)
 +}
 +
  func (s *chasmEngineSuite) buildPersistenceMutableState(
- 	key chasm.EntityKey,
+ 	key chasm.ExecutionKey,
  	componentState proto.Message,
  ) *persistencespb.WorkflowMutableState {
+ 
+ 	testComponentTypeID, ok := s.mockShard.ChasmRegistry().ComponentIDFor(&testComponent{})
+ 	s.True(ok)
+ 
  	return &persistencespb.WorkflowMutableState{
  		ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
  			NamespaceId: key.NamespaceID,
  			WorkflowId:  key.BusinessID,
  			VersionHistories: &historyspb.VersionHistories{
  				CurrentVersionHistoryIndex: 0,
  				Histories: []*historyspb.VersionHistory{
  					{},
  				},
  			},
  			TransitionHistory: []*persistencespb.VersionedTransition{
  				{
  					NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion(),
 -					TransitionCount:          10,
 +					TransitionCount:          testTransitionCount,
  				},
  			},
  			ExecutionStats: &persistencespb.ExecutionStats{},
  		},
  		ExecutionState: &persistencespb.WorkflowExecutionState{
- 			RunId:     key.EntityID,
+ 			RunId:     key.RunID,
  			State:     enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
  			Status:    enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
  			StartTime: timestamppb.New(s.mockShard.GetTimeSource().Now().Add(-1 * time.Minute)),
  		},
  		ChasmNodes: map[string]*persistencespb.ChasmNode{
  			"": {
  				Metadata: &persistencespb.ChasmNodeMetadata{
  					InitialVersionedTransition: &persistencespb.VersionedTransition{
  						NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion(),
  						TransitionCount:          1,
  					},
  					LastUpdateVersionedTransition: &persistencespb.VersionedTransition{
  						NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion(),
 -						TransitionCount:          10,
 +						TransitionCount:          testTransitionCount,
  					},
  					Attributes: &persistencespb.ChasmNodeMetadata_ComponentAttributes{
  						ComponentAttributes: &persistencespb.ChasmComponentAttributes{
- 							Type: "TestLibrary.test_component",
+ 							TypeId: testComponentTypeID,
  						},
  					},
  				},
  				Data: s.serializeComponentState(componentState),
  			},
  		},
  	}
  }
  
  func (s *chasmEngineSuite) serializeComponentState(
  	state proto.Message,
  ) *commonpb.DataBlob {
  	blob, err := serialization.ProtoEncode(state)
  	s.NoError(err)
  	return blob
  }
  
  const (
  	testComponentPausedSAName   = "PausedSA"
  	testComponentPausedMemoName = "PausedMemo"
 +	testTransitionCount         = 10
  )
  
  var (
+ 	testComponentPausedSearchAttribute = chasm.NewSearchAttributeBool(testComponentPausedSAName, chasm.SearchAttributeFieldBool01)
+ 
  	_ chasm.VisibilitySearchAttributesProvider = (*testComponent)(nil)
  	_ chasm.VisibilityMemoProvider             = (*testComponent)(nil)
  )
  
  type testComponent struct {
  	chasm.UnimplementedComponent
  
  	ActivityInfo *persistencespb.ActivityInfo
  }
  
  func (l *testComponent) LifecycleState(_ chasm.Context) chasm.LifecycleState {
  	return chasm.LifecycleStateRunning
  }
  
- func (l *testComponent) SearchAttributes(_ chasm.Context) map[string]chasm.VisibilityValue {
- 	return map[string]chasm.VisibilityValue{
- 		testComponentPausedSAName: chasm.VisibilityValueBool(l.ActivityInfo.Paused),
+ func (l *testComponent) SearchAttributes(_ chasm.Context) []chasm.SearchAttributeKeyValue {
+ 	return []chasm.SearchAttributeKeyValue{
+ 		testComponentPausedSearchAttribute.Value(l.ActivityInfo.Paused),
  	}
  }
  
- func (l *testComponent) Memo(_ chasm.Context) map[string]chasm.VisibilityValue {
- 	return map[string]chasm.VisibilityValue{
- 		testComponentPausedMemoName: chasm.VisibilityValueBool(l.ActivityInfo.Paused),
+ func (l *testComponent) Memo(_ chasm.Context) proto.Message {
+ 	return &persistencespb.WorkflowExecutionState{
+ 		RunId: l.ActivityInfo.ActivityId,
  	}
  }
  
  func newTestComponentStateBlob(info *persistencespb.ActivityInfo) *commonpb.DataBlob {
  	data, _ := info.Marshal()
  	return &commonpb.DataBlob{
  		EncodingType: enumspb.ENCODING_TYPE_PROTO3,
  		Data:         data,
  	}
  }
  
  type testChasmLibrary struct {
  	chasm.UnimplementedLibrary
  }
  
diff --cc service/history/fx.go
index 7d64ebedf,205b6c7c4..6252e9390
--- a/service/history/fx.go
+++ b/service/history/fx.go
@@@ -1,23 -1,20 +1,21 @@@
  package history
  
  import (
  	"go.temporal.io/server/api/historyservice/v1"
  	"go.temporal.io/server/chasm"
 +	"go.temporal.io/server/chasm/lib/activity"
- 	chasmscheduler "go.temporal.io/server/chasm/lib/scheduler"
- 	chasmworkflow "go.temporal.io/server/chasm/lib/workflow"
  	"go.temporal.io/server/common"
  	"go.temporal.io/server/common/clock"
  	"go.temporal.io/server/common/config"
  	"go.temporal.io/server/common/dynamicconfig"
  	"go.temporal.io/server/common/log"
  	"go.temporal.io/server/common/log/tag"
  	"go.temporal.io/server/common/membership"
  	"go.temporal.io/server/common/metrics"
  	"go.temporal.io/server/common/namespace"
  	persistenceClient "go.temporal.io/server/common/persistence/client"
  	"go.temporal.io/server/common/persistence/visibility"
  	"go.temporal.io/server/common/persistence/visibility/manager"
  	"go.temporal.io/server/common/persistence/visibility/store/elasticsearch"
  	"go.temporal.io/server/common/primitives"
  	"go.temporal.io/server/common/quotas/calculator"
@@@ -70,33 -68,30 +69,31 @@@ var Module = fx.Options
  	fx.Provide(ThrottledLoggerRpsFnProvider),
  	fx.Provide(PersistenceRateLimitingParamsProvider),
  	service.PersistenceLazyLoadedServiceResolverModule,
  	fx.Provide(ServiceResolverProvider),
  	fx.Provide(EventNotifierProvider),
  	fx.Provide(HistoryEngineFactoryProvider),
  	fx.Provide(HandlerProvider),
  	fx.Provide(ServerProvider),
  	fx.Provide(NewService),
  	fx.Provide(ReplicationProgressCacheProvider),
  	fx.Invoke(ServiceLifetimeHooks),
  
  	callbacks.Module,
  	nexusoperations.Module,
  	fx.Invoke(nexusworkflow.RegisterCommandHandlers),
- 	chasmscheduler.Module,
- 	chasmworkflow.Module,
 +	activity.HistoryModule,
  )
  
  func ServerProvider(grpcServerOptions []grpc.ServerOption) *grpc.Server {
  	return grpc.NewServer(grpcServerOptions...)
  }
  
  func ServiceResolverProvider(
  	membershipMonitor membership.Monitor,
  ) (membership.ServiceResolver, error) {
  	return membershipMonitor.GetResolver(primitives.HistoryService)
  }
  
  func HandlerProvider(args NewHandlerArgs) *Handler {
  	handler := &Handler{
  		status:                       common.DaemonStatusInitialized,
diff --cc service/history/interfaces/engine.go
index 1a120757e,417c0e368..cf91b5507
--- a/service/history/interfaces/engine.go
+++ b/service/history/interfaces/engine.go
@@@ -84,34 -84,35 +84,36 @@@ type 
  		GenerateLastHistoryReplicationTasks(ctx context.Context, request *historyservice.GenerateLastHistoryReplicationTasksRequest) (*historyservice.GenerateLastHistoryReplicationTasksResponse, error)
  		GetReplicationStatus(ctx context.Context, request *historyservice.GetReplicationStatusRequest) (*historyservice.ShardReplicationStatus, error)
  		UpdateWorkflowExecution(ctx context.Context, request *historyservice.UpdateWorkflowExecutionRequest) (*historyservice.UpdateWorkflowExecutionResponse, error)
  		PollWorkflowExecutionUpdate(ctx context.Context, request *historyservice.PollWorkflowExecutionUpdateRequest) (*historyservice.PollWorkflowExecutionUpdateResponse, error)
  		GetWorkflowExecutionHistory(ctx context.Context, request *historyservice.GetWorkflowExecutionHistoryRequest) (*historyservice.GetWorkflowExecutionHistoryResponseWithRaw, error)
  		GetWorkflowExecutionHistoryReverse(ctx context.Context, request *historyservice.GetWorkflowExecutionHistoryReverseRequest) (*historyservice.GetWorkflowExecutionHistoryReverseResponse, error)
  		GetWorkflowExecutionRawHistory(ctx context.Context, request *historyservice.GetWorkflowExecutionRawHistoryRequest) (*historyservice.GetWorkflowExecutionRawHistoryResponse, error)
  		GetWorkflowExecutionRawHistoryV2(ctx context.Context, request *historyservice.GetWorkflowExecutionRawHistoryV2Request) (*historyservice.GetWorkflowExecutionRawHistoryV2Response, error)
  		AddTasks(ctx context.Context, request *historyservice.AddTasksRequest) (*historyservice.AddTasksResponse, error)
  		ListTasks(ctx context.Context, request *historyservice.ListTasksRequest) (*historyservice.ListTasksResponse, error)
  		SyncWorkflowState(ctx context.Context, request *historyservice.SyncWorkflowStateRequest) (*historyservice.SyncWorkflowStateResponse, error)
  		UpdateActivityOptions(ctx context.Context, request *historyservice.UpdateActivityOptionsRequest) (*historyservice.UpdateActivityOptionsResponse, error)
  		PauseActivity(ctx context.Context, request *historyservice.PauseActivityRequest) (*historyservice.PauseActivityResponse, error)
  		UnpauseActivity(ctx context.Context, request *historyservice.UnpauseActivityRequest) (*historyservice.UnpauseActivityResponse, error)
  		ResetActivity(ctx context.Context, request *historyservice.ResetActivityRequest) (*historyservice.ResetActivityResponse, error)
+ 		PauseWorkflowExecution(ctx context.Context, request *historyservice.PauseWorkflowExecutionRequest) (*historyservice.PauseWorkflowExecutionResponse, error)
+ 		UnpauseWorkflowExecution(ctx context.Context, request *historyservice.UnpauseWorkflowExecutionRequest) (*historyservice.UnpauseWorkflowExecutionResponse, error)
  
  		NotifyNewHistoryEvent(event *events.Notification)
  		NotifyNewTasks(tasks map[tasks.Category][]tasks.Task)
 +		NotifyChasmExecution(executionKey chasm.EntityKey, componentRef []byte)
  		// TODO(bergundy): This Environment should be host level once shard level workflow cache is deprecated.
  		StateMachineEnvironment(operationTag metrics.Tag) hsm.Environment
  
  		ReplicationStream
  		Start()
  		Stop()
  	}
  )
  
  type (
  	SyncHSMRequest struct {
  		definition.WorkflowKey
  
  		StateMachineNode    *persistencespb.StateMachineNode
  		EventVersionHistory *historyspb.VersionHistory
diff --cc service/history/workflow/activity.go
index baa0f519a,fa6002682..de13209e2
--- a/service/history/workflow/activity.go
+++ b/service/history/workflow/activity.go
@@@ -48,37 -47,48 +48,39 @@@ import 
  	"google.golang.org/protobuf/types/known/durationpb"
  	"google.golang.org/protobuf/types/known/emptypb"
  	"google.golang.org/protobuf/types/known/timestamppb"
  )
  
  func GetActivityState(ai *persistencespb.ActivityInfo) enumspb.PendingActivityState {
  	if ai.CancelRequested {
  		return enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED
  	}
  	if ai.StartedEventId != common.EmptyEventID {
  		return enumspb.PENDING_ACTIVITY_STATE_STARTED
  	}
  	return enumspb.PENDING_ACTIVITY_STATE_SCHEDULED
  }
  
 -func makeBackoffAlgorithm(requestedDelay *time.Duration) BackoffCalculatorAlgorithmFunc {
 -	return func(duration *durationpb.Duration, coefficient float64, currentAttempt int32) time.Duration {
 -		if requestedDelay != nil {
 -			return *requestedDelay
 -		}
 -		return ExponentialBackoffAlgorithm(duration, coefficient, currentAttempt)
 -	}
 -}
 -
  func UpdateActivityInfoForRetries(
  	ai *persistencespb.ActivityInfo,
  	version int64,
  	attempt int32,
  	failure *failurepb.Failure,
  	nextScheduledTime *timestamppb.Timestamp,
+ 	isActivityRetryStampIncrementEnabled bool,
  ) *persistencespb.ActivityInfo {
+ 	previousAttempt := ai.Attempt
  	ai.Attempt = attempt
  	ai.Version = version
  	ai.ScheduledTime = nextScheduledTime
  	ai.StartedEventId = common.EmptyEventID
  	ai.StartVersion = common.EmptyVersion
  	ai.RequestId = ""
  	ai.StartedTime = nil
  	ai.TimerTaskStatus = TimerTaskStatusNone
  	ai.RetryLastWorkerIdentity = ai.StartedIdentity
  	ai.RetryLastFailure = failure
  	// this flag means the user resets the activity with "--reset-heartbeat" flag
  	// server sends heartbeat details to the worker with the new activity attempt
  	// if the current attempt was still running - worker can still send new heartbeats, and even complete the activity
  	// so for the current activity attempt server continue to accept the heartbeats, but reset it for the new attempt
  	if ai.ResetHeartbeats {
diff --cc service/history/workflow/mutable_state_impl_restart_activity_test.go
index caafea0ad,dcb59720d..2f96d2aa4
--- a/service/history/workflow/mutable_state_impl_restart_activity_test.go
+++ b/service/history/workflow/mutable_state_impl_restart_activity_test.go
@@@ -1,34 -1,33 +1,34 @@@
  package workflow
  
  import (
  	"errors"
  	"fmt"
  	"testing"
  	"time"
  
- 	"github.com/pborman/uuid"
+ 	"github.com/google/uuid"
  	"github.com/stretchr/testify/suite"
  	"github.com/uber-go/tally/v4"
  	commandpb "go.temporal.io/api/command/v1"
  	commonpb "go.temporal.io/api/common/v1"
  	enumspb "go.temporal.io/api/enums/v1"
  	failurepb "go.temporal.io/api/failure/v1"
  	taskqueuepb "go.temporal.io/api/taskqueue/v1"
  	enumsspb "go.temporal.io/server/api/enums/v1"
  	persistencespb "go.temporal.io/server/api/persistence/v1"
 +	"go.temporal.io/server/common/backoff"
  	commonclock "go.temporal.io/server/common/clock"
  	"go.temporal.io/server/common/log"
  	"go.temporal.io/server/common/primitives/timestamp"
  	"go.temporal.io/server/service/history/configs"
  	"go.temporal.io/server/service/history/events"
  	"go.temporal.io/server/service/history/hsm"
  	"go.temporal.io/server/service/history/shard"
  	"go.temporal.io/server/service/history/tests"
  	"go.uber.org/mock/gomock"
  	"google.golang.org/protobuf/types/known/durationpb"
  	"google.golang.org/protobuf/types/known/timestamppb"
  )
  
  type (
  	snapshot struct {
diff --cc service/history/workflow/retry.go
index 3363b2928,aa3fa9527..66f8348f9
--- a/service/history/workflow/retry.go
+++ b/service/history/workflow/retry.go
@@@ -1,23 -1,24 +1,23 @@@
  package workflow
  
  import (
  	"context"
 -	"math"
  	"slices"
  	"time"
  
- 	"github.com/pborman/uuid"
+ 	"github.com/google/uuid"
  	commonpb "go.temporal.io/api/common/v1"
  	deploymentpb "go.temporal.io/api/deployment/v1"
  	enumspb "go.temporal.io/api/enums/v1"
  	failurepb "go.temporal.io/api/failure/v1"
  	historypb "go.temporal.io/api/history/v1"
  	"go.temporal.io/api/serviceerror"
  	taskqueuepb "go.temporal.io/api/taskqueue/v1"
  	workflowpb "go.temporal.io/api/workflow/v1"
  	"go.temporal.io/api/workflowservice/v1"
  	clockspb "go.temporal.io/server/api/clock/v1"
  	"go.temporal.io/server/api/historyservice/v1"
  	workflowspb "go.temporal.io/server/api/workflow/v1"
  	"go.temporal.io/server/common/backoff"
  	"go.temporal.io/server/common/primitives/timestamp"
  	"go.temporal.io/server/common/retrypolicy"
diff --cc service/history/workflow/transaction_impl.go
index 017868cee,6301e4a59..998e00811
--- a/service/history/workflow/transaction_impl.go
+++ b/service/history/workflow/transaction_impl.go
@@@ -169,61 -174,41 +174,62 @@@ func (t *TransactionImpl) UpdateWorkflo
  ) (int64, int64, error) {
  
  	engine, err := t.shard.GetEngine(ctx)
  	if err != nil {
  		return 0, 0, err
  	}
  	resp, err := updateWorkflowExecution(
  		ctx,
  		t.shard,
  		currentWorkflowFailoverVersion,
  		newWorkflowFailoverVersion,
  		&persistence.UpdateWorkflowExecutionRequest{
  			ShardID: t.shard.GetShardID(),
  			// RangeID , this is set by shard context
  			Mode:                   updateMode,
+ 			ArchetypeID:            archetypeID,
  			UpdateWorkflowMutation: *currentWorkflowMutation,
  			UpdateWorkflowEvents:   currentWorkflowEventsSeq,
  			NewWorkflowSnapshot:    newWorkflowSnapshot,
  			NewWorkflowEvents:      newWorkflowEventsSeq,
  		},
  		isWorkflow,
  	)
  	if persistence.OperationPossiblySucceeded(err) {
  		NotifyWorkflowMutationTasks(engine, currentWorkflowMutation)
  		NotifyWorkflowSnapshotTasks(engine, newWorkflowSnapshot)
 +
 +		// TODO(dan): there is no test coverage for on-delete or on-create CHASM notifications.
 +
 +		// Notify for current workflow if it has CHASM updates
 +		if len(currentWorkflowMutation.UpsertChasmNodes) > 0 ||
 +			len(currentWorkflowMutation.DeleteChasmNodes) > 0 {
 +			engine.NotifyChasmExecution(chasm.EntityKey{
 +				NamespaceID: currentWorkflowMutation.ExecutionInfo.NamespaceId,
 +				BusinessID:  currentWorkflowMutation.ExecutionInfo.WorkflowId,
 +				EntityID:    currentWorkflowMutation.ExecutionState.RunId,
 +			}, nil)
 +		}
 +
 +		// Notify for new workflow if it has CHASM nodes
 +		if newWorkflowSnapshot != nil && len(newWorkflowSnapshot.ChasmNodes) > 0 {
 +			engine.NotifyChasmExecution(chasm.EntityKey{
 +				NamespaceID: newWorkflowSnapshot.ExecutionInfo.NamespaceId,
 +				BusinessID:  newWorkflowSnapshot.ExecutionInfo.WorkflowId,
 +				EntityID:    newWorkflowSnapshot.ExecutionState.RunId,
 +			}, nil)
 +		}
  	}
  	if err != nil {
  		return 0, 0, err
  	}
  
  	if err := NotifyNewHistoryMutationEvent(engine, currentWorkflowMutation); err != nil {
  		t.logger.Error("unable to notify workflow mutation", tag.Error(err))
  	}
  	if err := NotifyNewHistorySnapshotEvent(engine, newWorkflowSnapshot); err != nil {
  		t.logger.Error("unable to notify workflow creation", tag.Error(err))
  	}
  	updateHistorySizeDiff := int64(resp.UpdateMutableStateStats.HistoryStatistics.SizeDiff)
  	newHistorySizeDiff := int64(0)
  	if resp.NewMutableStateStats != nil {
  		newHistorySizeDiff = int64(resp.NewMutableStateStats.HistoryStatistics.SizeDiff)
diff --cc service/matching/forwarder.go
index c72a5c572,c74edb363..38cff771e
--- a/service/matching/forwarder.go
+++ b/service/matching/forwarder.go
@@@ -135,34 -135,33 +135,34 @@@ func (fwdr *Forwarder) ForwardTask(ctx 
  			},
  		)
  	case enumspb.TASK_QUEUE_TYPE_ACTIVITY:
  		_, err = fwdr.client.AddActivityTask(
  			ctx, &matchingservice.AddActivityTaskRequest{
  				NamespaceId: task.event.Data.GetNamespaceId(),
  				Execution:   task.workflowExecution(),
  				TaskQueue: &taskqueuepb.TaskQueue{
  					Name: target.RpcName(),
  					Kind: fwdr.partition.Kind(),
  				},
  				ScheduledEventId:       task.event.Data.GetScheduledEventId(),
  				Clock:                  task.event.Data.GetClock(),
  				ScheduleToStartTimeout: expirationDuration,
  				ForwardInfo:            fwdr.getForwardInfo(task),
- 				Stamp:                  task.event.Data.GetStamp(),
  				VersionDirective:       task.event.Data.GetVersionDirective(),
+ 				Stamp:                  task.event.Data.GetStamp(),
  				Priority:               task.event.Data.GetPriority(),
 +				ComponentRef:           task.event.Data.GetComponentRef(),
  			},
  		)
  	default:
  		return errInvalidTaskQueueType
  	}
  
  	return fwdr.handleErr(err)
  }
  
  func (fwdr *Forwarder) getForwardInfo(task *internalTask) *taskqueuespb.TaskForwardInfo {
  	if task.isForwarded() {
  		// task is already forwarded from a child partition, only overwrite SourcePartition
  		clone := common.CloneProto(task.forwardInfo)
  		clone.SourcePartition = fwdr.partition.RpcName()
  		return clone
diff --cc service/matching/matching_engine.go
index 673448385,333da88f6..63b304057
--- a/service/matching/matching_engine.go
+++ b/service/matching/matching_engine.go
@@@ -2910,38 -3035,38 +3036,39 @@@ func (e *matchingEngineImpl) recordActi
  			CallerType: headers.CallerTypeAPI,
  		})
  		if err != nil {
  			return nil, err
  		}
  	}
  
  	ctx, cancel := newRecordTaskStartedContext(ctx, task)
  	defer cancel()
  
  	recordStartedRequest := &historyservice.RecordActivityTaskStartedRequest{
  		NamespaceId:         task.event.Data.GetNamespaceId(),
  		WorkflowExecution:   task.workflowExecution(),
  		ScheduledEventId:    task.event.Data.GetScheduledEventId(),
  		Clock:               task.event.Data.GetClock(),
- 		RequestId:           uuid.New(),
+ 		RequestId:           uuid.NewString(),
  		PollRequest:         pollReq,
  		BuildIdRedirectInfo: task.redirectInfo,
  		Stamp:               task.event.Data.GetStamp(),
  		// TODO: stop sending ScheduledDeployment. [cleanup-old-wv]
- 		ScheduledDeployment: worker_versioning.DirectiveDeployment(task.event.Data.VersionDirective),
- 		VersionDirective:    task.event.Data.VersionDirective,
- 		ComponentRef:        task.event.Data.GetComponentRef(),
+ 		ScheduledDeployment:        worker_versioning.DirectiveDeployment(task.event.Data.VersionDirective),
+ 		VersionDirective:           task.event.Data.VersionDirective,
+ 		TaskDispatchRevisionNumber: task.taskDispatchRevisionNumber,
++		ComponentRef:               task.event.Data.GetComponentRef(),
  	}
  
  	return e.historyClient.RecordActivityTaskStarted(ctx, recordStartedRequest)
  }
  
  // newRecordTaskStartedContext creates a context for recording
  // activity or workflow task started. The parentCtx from
  // pollActivity/WorkflowTaskQueue endpoint (which is a long poll
  // API) has long timeout and unsuitable for recording task started,
  // especially if the task is doing sync match and has caller
  // (history transfer queue) waiting for response.
  func newRecordTaskStartedContext(
  	parentCtx context.Context,
  	task *internalTask,
  ) (context.Context, context.CancelFunc) {
diff --cc service/matching/pri_forwarder.go
index 1efc94e4c,186a1c005..9fab8088a
--- a/service/matching/pri_forwarder.go
+++ b/service/matching/pri_forwarder.go
@@@ -92,34 -93,33 +93,34 @@@ func (f *priForwarder) ForwardTask(ctx 
  			},
  		)
  	case enumspb.TASK_QUEUE_TYPE_ACTIVITY:
  		_, err = f.client.AddActivityTask(
  			ctx, &matchingservice.AddActivityTaskRequest{
  				NamespaceId: task.event.Data.GetNamespaceId(),
  				Execution:   task.workflowExecution(),
  				TaskQueue: &taskqueuepb.TaskQueue{
  					Name: target.RpcName(),
  					Kind: f.partition.Kind(),
  				},
  				ScheduledEventId:       task.event.Data.GetScheduledEventId(),
  				Clock:                  task.event.Data.GetClock(),
  				ScheduleToStartTimeout: expirationDuration,
  				ForwardInfo:            f.getForwardInfo(task),
- 				Stamp:                  task.event.Data.GetStamp(),
  				VersionDirective:       task.event.Data.GetVersionDirective(),
+ 				Stamp:                  task.event.Data.GetStamp(),
  				Priority:               task.event.Data.GetPriority(),
 +				ComponentRef:           task.event.Data.GetComponentRef(),
  			},
  		)
  	default:
  		return errInvalidTaskQueueType
  	}
  
  	return err
  }
  
  func (f *priForwarder) getForwardInfo(task *internalTask) *taskqueuespb.TaskForwardInfo {
  	if task.isForwarded() {
  		// task is already forwarded from a child partition, only overwrite SourcePartition
  		clone := common.CloneProto(task.forwardInfo)
  		clone.SourcePartition = f.partition.RpcName()
  		return clone

Note

Migrates CHASM to ExecutionKey and adds component_ref support for standalone activities, wiring new activity-execution APIs through frontend/history/matching and clients, plus related proto, notifier, and engine updates.

  • CHASM/Core:
    • Rename EntityKeyExecutionKey; update engine interfaces (NotifyExecution, polling), transition-history comparisons, and notifier/subscription to use execution keys.
    • Add structured component refs; ref/serialization validation; tree/ref paths and VT fields renamed (execution*).
  • Activity:
    • Use NewExecution and ExecutionKey throughout; introduce StoreOrSelf; simplify getters (TryGet), heartbeat/outcome handling.
    • Frontend module registers component-only library; service routing switches to business_id.
  • Frontend/History/Matching:
    • Add/wire standalone-activity endpoints (start/poll/terminate/cancel/reset/pause/unpause/update/list/count/delete); frontend stubs and logging tags; quotas updated.
    • Matching forwards component_ref; History adds execution notifier and polling logic with execution keys.
    • Frontend history handlers set component_ref for standalone activity tasks; implement workflow pause/unpause; add scheduler client wiring.
  • Protobuf/API:
    • RecordActivityTaskStartedRequest adds task_dispatch_revision_number and component_ref (field 14); generated code updated.
    • Bump go.temporal.io/api dependency.
  • Clients/Tests:
    • Client, metric, and retryable wrappers generated for new activity-execution APIs.
    • Tests and mocks updated for ExecutionKey, new fields, and assertions.

Written by Cursor Bugbot for commit 92907df. This will update automatically on new commits. Configure here.

@dandavison dandavison requested review from a team as code owners December 6, 2025 22:01
return []*chasm.RegistrableComponent{
chasm.NewRegistrableComponent[*Activity]("activity"),
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bergundy I am not sure that what I've done here is the right fix. I need to read the code a bit more to understand fx and ChasmLibraryOptions better but I haven't had time yet. The problem being addressed is that after merging main, the FooByID tests fail with:

unknown chasm component type: *activity.Activity

It's related to the changes in #8704.

e.g. TestStandaloneActivityTestSuite/TestActivityCancelledByID fails without this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented above.

) (*workflowservice.UpdateActivityExecutionOptionsResponse, error) {
return nil, serviceerror.NewUnimplemented("temporary stub during Standalone Activity prototyping")
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is at least partly Git's fault I believe. It gets confused about shared lines in merge conflict markers. I guess it would technically be possible to squash it into the merge commit but the code prior to this PR (after the merge commit) didn't compile anyway do maybe let's just leave it.

info := activityResp.GetInfo()
require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED, info.GetStatus())
require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED, info.GetStatus(),
"expected Canceled but is %s", info.GetStatus())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there's a helper to make helpful error messages in this situation, or if not maybe we can create one. But for now these error messages are much more helpful than the integer codes in test output.

StartToCloseTimeout: durationpb.New(1 * time.Second),
RetryPolicy: &commonpb.RetryPolicy{
MaximumAttempts: 1,
},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#8723 fixed things so that we do schedule retries when schedule-to-close is missing. But this test was expecting that the activity would fail on a start-to-close timeout. Hence it needed to be changed to limit to one attempt. This was actually broken on pre-merge standalone-activity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a comment in the code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
func (a *Activity) PopulateRecordStartedResponse(ctx chasm.Context, key chasm.ExecutionKey, response *historyservice.RecordActivityTaskStartedResponse) error {
attempt := a.LastAttempt.Get(ctx)
lastHeartbeat, _ := a.LastHeartbeat.TryGet(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why you switched to TryGet here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LastHeartbeat may be missing, and Get has been changed to panic on missing #8669

@fretz12
Copy link
Contributor

fretz12 commented Dec 8, 2025

Seems like the misc. checks is failing with Error: cmd/tools/getproto/files.go:53:60: undefined: enums.File_temporal_api_enums_v1_activity_proto
Do we know what's causing this @dandavison @bergundy?

@dandavison
Copy link
Contributor Author

Seems like the misc. checks is failing with Error: cmd/tools/getproto/files.go:53:60: undefined: enums.File_temporal_api_enums_v1_activity_proto
Do we know what's causing this @dandavison @bergundy?

That GitHub Actions job is attempting to make a comparison between the PR branch and the branch the PR is merging into, to check something about whether the PR makes invalid proto changes. But in this case, the branch we're merging into doesn't compile at all (and is missing that file) so no such check can be made. We should just ignore that CI check for this PR.

Copy link
Member

@bergundy bergundy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved with comments, overall LGTM.

}

if heartbeat == nil {
func (a *Activity) getLastHeartbeat(ctx chasm.MutableContext) *activitypb.ActivityHeartbeatState {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (a *Activity) getLastHeartbeat(ctx chasm.MutableContext) *activitypb.ActivityHeartbeatState {
func (a *Activity) LastHeartbeatOrSetDefault(ctx chasm.MutableContext) *activitypb.ActivityHeartbeatState {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I'm already changing the name in the heartbeat PR, so I'll leave it as-is here.


requestData := a.RequestData.Get(ctx)
attempt := a.LastAttempt.Get(ctx)
heartbeat, _ := a.LastHeartbeat.TryGet(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It's more idiomatic to use the ok boolean instead of doing a nil check but both are valid because the zero value of a pointer is nil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this comment -- there's no nil check done in this function; we defer to the proto getters for that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disregard, what you have is fine.


// GetStore returns the store for the activity. If the store is not set as a field (e.g. standalone
// activities), it returns the activity itself.
func (a *Activity) GetStore(ctx chasm.MutableContext) ActivityStore {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be accessible with an immutable context, and please avoid using Get for getters in Go.

Suggested change
func (a *Activity) GetStore(ctx chasm.MutableContext) ActivityStore {
func (a *Activity) StoreOrSelf(ctx chasm.Context) ActivityStore {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, done. I like that name -- not hiding implementation seems helpful in this case. (I'll remember not to use Get now...)


valid := activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED && task.Attempt == attempt.Count
return valid, nil
return (activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think the valid boolean helped readability here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I don't disagree. Reinstated named valid variable.


// componentOnlyLibrary only registers the Activity component. Used by frontend which needs to
// serialize ComponentRefs but doesn't need task executors.
type componentOnlyLibrary struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be the same library with two separate constructors. We need to iron out the library registration experience for CHASM on the frontend still but I am thinking that we would not redefine the libraries.

I would define this library in library.go and embed this in the full library struct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done

return []*chasm.RegistrableComponent{
chasm.NewRegistrableComponent[*Activity]("activity"),
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented above.

}

testComponentTypeID, ok := s.mockShard.ChasmRegistry().ComponentIDFor(&testComponent{})
s.Require().True(ok)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
s.Require().True(ok)
s.True(ok)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

StartToCloseTimeout: durationpb.New(1 * time.Second),
RetryPolicy: &commonpb.RetryPolicy{
MaximumAttempts: 1,
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a comment in the code?

@dandavison dandavison merged commit b778f07 into standalone-activity-with-main-merged Dec 8, 2025
56 of 57 checks passed
@dandavison dandavison deleted the standalone-activity-post-main-merged branch December 8, 2025 20:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants