Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
be61432
add code for auto enabling priority and fairness
majiru Nov 14, 2025
87ae80e
use reasonable timeout for newTaskQueuePartitionManager in tests instead
majiru Nov 14, 2025
36864fa
fix linting issues
majiru Nov 15, 2025
b96901b
Refactor OnChange to be set after initialization through a seperate
majiru Nov 17, 2025
9a41e9a
Push taskQueuePartitionManager initialization into the background
majiru Nov 17, 2025
994cedb
Address comments made in review and simplify implementation
majiru Nov 18, 2025
dd19b25
Fix physicalTaskQueueManager
majiru Nov 18, 2025
0d9eb51
More lint fixes
majiru Nov 18, 2025
7a81262
Guard the defaultQueue behind the future
majiru Nov 19, 2025
f525e0f
We need to wait on ready before checking the cancellation of our
majiru Nov 19, 2025
1bdd110
make linter happy w.r.t deprecated functions
majiru Nov 19, 2025
7b38daf
Rework how we signal and handle waiting on initialization
majiru Nov 21, 2025
c18ce5f
Small linting issue
majiru Nov 21, 2025
b285444
Use an autoenable rpc and address other comments
majiru Nov 26, 2025
1ecb597
Be careful about updating UserData w.r.t. nill members in the path
majiru Dec 2, 2025
03054e9
address comments regarding matching engine handler
majiru Dec 2, 2025
9130ffa
Move matching engine handler to organize a bit better
majiru Dec 2, 2025
26f3636
Address small comments in partition manager
majiru Dec 2, 2025
d28dee6
add GetIfReady() to our future and use it in partition manager
majiru Dec 2, 2025
247c307
address rpc comments
majiru Dec 3, 2025
3a8ff9f
fix generated files
majiru Dec 3, 2025
d0f1b17
add new rpc to quotas
majiru Dec 4, 2025
a533b20
remove duplicate config setting for priority and fairness
majiru Dec 5, 2025
0f93485
Merge branch 'main' into fairness-and-priority-auto-enable
majiru Dec 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 139 additions & 65 deletions api/persistence/v1/task_queues.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,11 @@ second per poller by one physical queue manager`,
The metric has 2 dimensions: namespace_id and plugin_name. Disabled by default as this is
an optional feature and also requires a metrics collection system that can handle higher cardinalities.`,
)
MatchingAutoEnable = NewTaskQueueBoolSetting(
"matching.autoEnable",
false,
`MatchingAutoEnable automatically enables fairness when a fairness key is seen`,
)

// keys for history

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ message TaskQueueTypeUserData {
DeploymentData deployment_data = 1;

temporal.api.taskqueue.v1.TaskQueueConfig config = 2;

enum FairnessState {
FAIRNESS_STATE_UNSPECIFIED = 0;
FAIRNESS_STATE_V1 = 1;
FAIRNESS_STATE_V2 = 2;
};
FairnessState fairness_state = 3;
}

// Container for all persistent user provided data for a task queue family.
Expand Down
7 changes: 7 additions & 0 deletions service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type (
MembershipUnloadDelay dynamicconfig.DurationPropertyFn
TaskQueueInfoByBuildIdTTL dynamicconfig.DurationPropertyFnWithTaskQueueFilter
PriorityLevels dynamicconfig.IntPropertyFnWithTaskQueueFilter
AutoEnableSub dynamicconfig.TypedSubscribableWithTaskQueueFilter[bool]

RateLimiterRefreshInterval time.Duration
FairnessKeyRateLimitCacheSize dynamicconfig.IntPropertyFnWithTaskQueueFilter
Expand Down Expand Up @@ -149,6 +150,8 @@ type (
EnableFairness bool
EnableFairnessSub func(func(bool)) (bool, func())
EnableMigration func() bool
AutoEnable bool
AutoEnableSub func(func(bool)) (bool, func())
GetTasksBatchSize func() int
GetTasksReloadAt func() int
UpdateAckInterval func() time.Duration
Expand Down Expand Up @@ -308,6 +311,7 @@ func NewConfig(
FairnessKeyRateLimitCacheSize: dynamicconfig.MatchingFairnessKeyRateLimitCacheSize.Get(dc),
MaxFairnessKeyWeightOverrides: dynamicconfig.MatchingMaxFairnessKeyWeightOverrides.Get(dc),
MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc),
AutoEnableSub: dynamicconfig.MatchingAutoEnable.Subscribe(dc),

AdminNamespaceToPartitionDispatchRate: dynamicconfig.AdminMatchingNamespaceToPartitionDispatchRate.Get(dc),
AdminNamespaceToPartitionRateSub: dynamicconfig.AdminMatchingNamespaceToPartitionDispatchRate.Subscribe(dc),
Expand Down Expand Up @@ -355,6 +359,9 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) *
EnableMigration: func() bool {
return config.EnableMigration(ns.String(), taskQueueName, taskType)
},
AutoEnableSub: func(cb func(bool)) (bool, func()) {
return config.AutoEnableSub(ns.String(), taskQueueName, taskType, cb)
},
GetTasksBatchSize: func() int {
return config.GetTasksBatchSize(ns.String(), taskQueueName, taskType)
},
Expand Down
13 changes: 10 additions & 3 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,18 +430,18 @@ func (e *matchingEngineImpl) getTaskQueuePartitionManager(
tqConfig.loadCause = loadCause
logger, throttledLogger, metricsHandler := e.loggerAndMetricsForPartition(namespaceEntry, partition, tqConfig)
onFatalErr := func(cause unloadCause) { newPM.unloadFromEngine(cause) }
onUserDataChanged := func() { newPM.userDataChanged() }
userDataManager := newUserDataManager(
e.taskManager,
e.matchingRawClient,
onFatalErr,
onUserDataChanged,
partition,
tqConfig,
logger,
e.namespaceRegistry,
)
child, cancel := context.WithCancel(ctx)
newPM, err = newTaskQueuePartitionManager(
child,
e,
namespaceEntry,
partition,
Expand All @@ -452,6 +452,7 @@ func (e *matchingEngineImpl) getTaskQueuePartitionManager(
userDataManager,
)
if err != nil {
cancel()
return nil, false, err
}

Expand All @@ -460,13 +461,19 @@ func (e *matchingEngineImpl) getTaskQueuePartitionManager(
pm, ok = e.partitions[key]
if ok {
e.partitionsLock.Unlock()
cancel()
return pm, false, nil
}

e.partitions[key] = newPM
e.partitionsLock.Unlock()

newPM.Start()
err = newPM.Start()
cancel()
if err != nil {
return nil, false, err
}
userDataManager.SetOnChange(newPM.userDataChanged)
return newPM, true, nil
}

Expand Down
18 changes: 12 additions & 6 deletions service/matching/matching_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,10 @@ func newMatchingEngine(
func (s *matchingEngineSuite) newPartitionManager(prtn tqid.Partition, config *Config) taskQueuePartitionManager {
tqConfig := newTaskQueueConfig(prtn.TaskQueue(), config, matchingTestNamespace)
logger, _, metricsHandler := s.matchingEngine.loggerAndMetricsForPartition(s.ns, prtn, tqConfig)
pm, err := newTaskQueuePartitionManager(s.matchingEngine, s.ns, prtn, tqConfig, logger, logger, metricsHandler, &mockUserDataManager{})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
pm, err := newTaskQueuePartitionManager(ctx, s.matchingEngine, s.ns, prtn, tqConfig, logger, logger, metricsHandler, &mockUserDataManager{})
s.Require().NoError(err)
cancel()
return pm
}

Expand Down Expand Up @@ -1022,7 +1024,8 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() {
mgr.GetRateLimitManager().SetAdminRateForTesting(25000.0)

s.matchingEngine.updateTaskQueue(dbq.partition, mgr)
mgr.Start()
err := mgr.Start()
s.NoError(err)

taskQueue := &taskqueuepb.TaskQueue{
Name: tl,
Expand Down Expand Up @@ -1209,7 +1212,8 @@ func (s *matchingEngineSuite) TestRateLimiterAcrossVersionedQueues() {
s.True(ok)

s.matchingEngine.updateTaskQueue(dbq.partition, mgr)
mgr.Start()
err := mgr.Start()
s.NoError(err)

taskQueue := &taskqueuepb.TaskQueue{
Name: tl,
Expand Down Expand Up @@ -1281,7 +1285,7 @@ func (s *matchingEngineSuite) TestRateLimiterAcrossVersionedQueues() {
// Independent activity tasks are those if the task queue the task is scheduled on is not part of the workflow's pinned
// deployment.
updateOptions := UserDataUpdateOptions{Source: "SyncDeploymentUserData"}
_, err := tqPTM.GetUserDataManager().UpdateUserData(context.Background(), updateOptions, func(data *persistencespb.TaskQueueUserData) (*persistencespb.TaskQueueUserData, bool, error) {
_, err = tqPTM.GetUserDataManager().UpdateUserData(context.Background(), updateOptions, func(data *persistencespb.TaskQueueUserData) (*persistencespb.TaskQueueUserData, bool, error) {

newData := &persistencespb.TaskQueueUserData{
PerType: map[int32]*persistencespb.TaskQueueTypeUserData{
Expand Down Expand Up @@ -1412,7 +1416,8 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(

mgr := s.newPartitionManager(dbq.partition, s.matchingEngine.config)
s.matchingEngine.updateTaskQueue(dbq.partition, mgr)
mgr.Start()
err := mgr.Start()
s.NoError(err)

taskQueue := &taskqueuepb.TaskQueue{
Name: tl,
Expand Down Expand Up @@ -2191,7 +2196,8 @@ func (s *matchingEngineSuite) TestTaskQueueManager_CyclingBehavior() {
prevGetTasksCount := s.taskManager.getGetTasksCount(dbq)

mgr := s.newPartitionManager(dbq.partition, config)
mgr.Start()
err := mgr.Start()
s.NoError(err)

// tlMgr.taskWriter startup is async so give it time to complete
time.Sleep(100 * time.Millisecond)
Expand Down
13 changes: 11 additions & 2 deletions service/matching/physical_task_queue_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,15 @@ func (s *PhysicalTaskQueueManagerTestSuite) SetupTest() {
prtn := s.physicalTaskQueueKey.Partition()
tqConfig := newTaskQueueConfig(prtn.TaskQueue(), engine.config, nsName)
onFatalErr := func(unloadCause) { s.T().Fatal("user data manager called onFatalErr") }
udMgr := newUserDataManager(engine.taskManager, engine.matchingRawClient, onFatalErr, nil, prtn, tqConfig, engine.logger, engine.namespaceRegistry)
udMgr := newUserDataManager(engine.taskManager, engine.matchingRawClient, onFatalErr, prtn, tqConfig, engine.logger, engine.namespaceRegistry)

prtnMgr, err := newTaskQueuePartitionManager(engine, ns, prtn, tqConfig, engine.logger, nil, metrics.NoopMetricsHandler, udMgr)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
prtnMgr, err := newTaskQueuePartitionManager(ctx, engine, ns, prtn, tqConfig, engine.logger, nil, metrics.NoopMetricsHandler, udMgr)
s.NoError(err)
// Bit of a hack, we need to wait until the initialization is done but
// we don't want to start the default queue that it sets because we overwrite it later
err = prtnMgr.initGroup.Wait()
cancel()
s.NoError(err)
engine.partitions[prtn.Key()] = prtnMgr

Expand Down Expand Up @@ -334,7 +340,10 @@ func (s *PhysicalTaskQueueManagerTestSuite) TestAddTaskStandby() {
s.tqMgr.namespaceRegistry = mockNamespaceCache

s.tqMgr.Start()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe give 100ms even for a unit test

s.tqMgr.WaitUntilInitialized(ctx)
defer s.tqMgr.Stop(unloadCauseShuttingDown)
cancel()

// stop taskWriter so that we can check if there's any call to it
// otherwise the task persist process is async and hard to test
Expand Down
111 changes: 96 additions & 15 deletions service/matching/task_queue_partition_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.temporal.io/server/api/matchingservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
Expand All @@ -24,6 +25,7 @@ import (
serviceerrors "go.temporal.io/server/common/serviceerror"
"go.temporal.io/server/common/tqid"
"go.temporal.io/server/common/worker_versioning"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -63,8 +65,12 @@ type (
// TODO(stephanos): move cache out of partition manager
cache cache.Cache // non-nil for root-partition

fairnessState persistencespb.TaskQueueTypeUserData_FairnessState // Set once on initialization and read only after
initGroup *errgroup.Group

cancelNewMatcherSub func()
cancelFairnessSub func()
cancelAutoEnableSub func()

// rateLimitManager is used to manage the rate limit for task queues.
rateLimitManager *rateLimitManager
Expand All @@ -82,6 +88,7 @@ func (pm *taskQueuePartitionManagerImpl) GetCache(key any) any {
var _ taskQueuePartitionManager = (*taskQueuePartitionManagerImpl)(nil)

func newTaskQueuePartitionManager(
ctx context.Context,
e *matchingEngineImpl,
ns *namespace.Namespace,
partition tqid.Partition,
Expand All @@ -107,6 +114,7 @@ func newTaskQueuePartitionManager(
versionedQueues: make(map[PhysicalTaskQueueVersion]physicalTaskQueueManager),
userDataManager: userDataManager,
rateLimitManager: rateLimitManager,
initGroup: &errgroup.Group{},
}

if pm.partition.IsRoot() {
Expand All @@ -115,32 +123,72 @@ func newTaskQueuePartitionManager(
)
}

pm.initGroup.Go(func() error {
return pm.initialize(ctx)
})

return pm, nil
}

func (pm *taskQueuePartitionManagerImpl) initialize(ctx context.Context) error {
unload := func(bool) {
pm.unloadFromEngine(unloadCauseConfigChange)
}

var fairness bool
fairness, pm.cancelFairnessSub = tqConfig.EnableFairnessSub(unload)
// Fairness is disabled for sticky queues for now so that we can still use TTLs.
tqConfig.EnableFairness = fairness && partition.Kind() != enumspb.TASK_QUEUE_KIND_STICKY
if fairness {
tqConfig.NewMatcher = true
} else {
tqConfig.NewMatcher, pm.cancelNewMatcherSub = tqConfig.NewMatcherSub(unload)
pm.userDataManager.Start()
err := pm.userDataManager.WaitUntilInitialized(ctx)
if err != nil {
return err
}
// todo(moody): do we need to be more cautious loading this? Do we need to ensure that we have PerType()? Check for a nil return?
data, _, err := pm.getPerTypeUserData()
if err != nil {
return err
}

pm.config.AutoEnable, pm.cancelAutoEnableSub = pm.config.AutoEnableSub(unload)
pm.fairnessState = data.GetFairnessState()
switch {
case !pm.config.AutoEnable || pm.fairnessState == persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_UNSPECIFIED:
var fairness bool
fairness, pm.cancelFairnessSub = pm.config.EnableFairnessSub(unload)
// Fairness is disabled for sticky queues for now so that we can still use TTLs.
pm.config.EnableFairness = fairness && pm.partition.Kind() != enumspb.TASK_QUEUE_KIND_STICKY
if fairness {
pm.config.NewMatcher = true
} else {
pm.config.NewMatcher, pm.cancelNewMatcherSub = pm.config.NewMatcherSub(unload)
}
case pm.fairnessState == persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_V1:
pm.config.NewMatcher = true
pm.config.EnableFairness = false
case pm.fairnessState == persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_V2:
pm.config.NewMatcher = true
if pm.partition.Kind() == enumspb.TASK_QUEUE_KIND_STICKY {
pm.config.EnableFairness = false
} else {
pm.config.EnableFairness = true
}
default:
return serviceerror.NewInternal("Unknown FairnessState in UserData")
}

defaultQ, err := newPhysicalTaskQueueManager(pm, UnversionedQueueKey(partition))
defaultQ, err := newPhysicalTaskQueueManager(pm, UnversionedQueueKey(pm.partition))
if err != nil {
return nil, err
return err
}
pm.defaultQueue = defaultQ
return pm, nil
return nil
}

func (pm *taskQueuePartitionManagerImpl) Start() {
func (pm *taskQueuePartitionManagerImpl) Start() error {
err := pm.initGroup.Wait()
if err != nil {
return err
}
pm.engine.updateTaskQueuePartitionGauge(pm.Namespace(), pm.partition, 1)
pm.userDataManager.Start()
pm.defaultQueue.Start()
return nil
}

func (pm *taskQueuePartitionManagerImpl) GetRateLimitManager() *rateLimitManager {
Expand All @@ -159,6 +207,9 @@ func (pm *taskQueuePartitionManagerImpl) Stop(unloadCause unloadCause) {
if pm.cancelNewMatcherSub != nil {
pm.cancelNewMatcherSub()
}
if pm.cancelAutoEnableSub != nil {
pm.cancelAutoEnableSub()
}

// First, stop all queues to wrap up ongoing operations.
for _, vq := range pm.versionedQueues {
Expand All @@ -184,7 +235,7 @@ func (pm *taskQueuePartitionManagerImpl) MarkAlive() {
}

func (pm *taskQueuePartitionManagerImpl) WaitUntilInitialized(ctx context.Context) error {
err := pm.userDataManager.WaitUntilInitialized(ctx)
err := pm.initGroup.Wait()
if err != nil {
return err
}
Expand All @@ -197,6 +248,24 @@ func (pm *taskQueuePartitionManagerImpl) AddTask(
) (buildId string, syncMatched bool, err error) {
var spoolQueue, syncMatchQueue physicalTaskQueueManager
directive := params.taskInfo.GetVersionDirective()

if pm.Partition().IsRoot() && pm.config.AutoEnable && pm.fairnessState == persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_UNSPECIFIED {
// TODO(moody): unsure about this check, what is the correct way to check to see if PriorityKey is set?
// TODO(moody): This was originally discussed to be a separate API, but is just exposing this through the generic UpdateUserData sufficient?
// what is the pro/cons of adding the new API and perhaps invoking that instead? We're going to unload either way...
if params.taskInfo.Priority != nil && (params.taskInfo.Priority.FairnessKey != "" || params.taskInfo.Priority.PriorityKey != int32(pm.config.DefaultPriorityKey)) {
updateFn := func(old *persistencespb.TaskQueueUserData) (*persistencespb.TaskQueueUserData, bool, error) {
data := common.CloneProto(old)
perType := data.GetPerType()[int32(pm.Partition().TaskType())]
perType.FairnessState = persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_V2
return data, true, nil
}
_, err := pm.userDataManager.UpdateUserData(ctx, UserDataUpdateOptions{Source: "Matching auto enable"}, updateFn)
if err != nil {
pm.logger.Error("could not update userdata for autoenable: " + err.Error())
}
}
}
// spoolQueue will be nil iff task is forwarded.
reredirectTask:
spoolQueue, syncMatchQueue, _, taskDispatchRevisionNumber, err := pm.getPhysicalQueuesForAdd(ctx, directive, params.forwardInfo, params.taskInfo.GetRunId(), params.taskInfo.GetWorkflowId(), false)
Expand Down Expand Up @@ -1303,7 +1372,19 @@ func (pm *taskQueuePartitionManagerImpl) getPerTypeUserData() (*persistencespb.T
return perType, userDataChanged, nil
}

func (pm *taskQueuePartitionManagerImpl) userDataChanged() {
func (pm *taskQueuePartitionManagerImpl) userDataChanged(from, to *persistencespb.VersionedTaskQueueUserData) {
// TODO(moody): this stinks, do we need this more verbose, is this interface bad?
// TODO(moody): we get calls into this callback quite a bit with data being nil(sampled from unit tests), do we want to go through the full update
// even in those cases? Should we pass the inner data and avoid a callback when that is nil? I am not totally sure about the implcations....
if from != nil && from.GetData() != nil && from.GetData().GetPerType() != nil && to != nil && to.GetData() != nil && to.GetData().GetPerType() != nil {
taskType := int32(pm.Partition().TaskType())
if from.GetData().GetPerType()[taskType] != nil && to.GetData().GetPerType()[taskType] != nil {
if from.GetData().GetPerType()[taskType].FairnessState != to.GetData().GetPerType()[taskType].FairnessState {
pm.unloadFromEngine(unloadCauseConfigChange)
return
}
}
}
// Update rateLimits if any change is userData.
pm.rateLimitManager.UserDataChanged()

Expand Down
Loading
Loading