Skip to content

Commit c7290f8

Browse files
committed
feat: [x] refactor leases
1 parent ae51f8a commit c7290f8

File tree

11 files changed

+629
-157
lines changed

11 files changed

+629
-157
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ integration-tests: build-init build-toolkit ## Run integration tests (only tests
373373
STORAGE_ACCESSKEYID=$(ROOT_MINIO_USER) \
374374
STORAGE_SECRETACCESSKEY=$(ROOT_MINIO_PASSWORD) \
375375
$(GOTESTSUM) --format short-verbose --junitfile integration-tests.xml --jsonfile integration-tests.json -- \
376-
-coverprofile=integration-coverage.out -covermode=atomic -run "_Integration$$" ./internal/... ./pkg/... ./test/integration/components/...
376+
-coverprofile=integration-coverage.out -covermode=atomic -run "_Integration$$" ./internal/... ./pkg/... ./test/integration/... ./test/e2e/...
377377

378378
.PHONY: cover
379379
cover: unit-tests ## Generate and open test coverage report

cmd/api-server/main.go

Lines changed: 31 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,6 @@ func main() {
217217
)
218218
}
219219

220-
leaderTasks := make([]leader.Task, 0)
221-
222220
// If we don't have an API key but we do have a token for registration then attempt to register the runner.
223221
if cfg.TestkubeProAPIKey == "" && cfg.TestkubeProAgentRegToken != "" {
224222
runnerName := cfg.RunnerName
@@ -302,6 +300,22 @@ func main() {
302300
clusterId, _ := configMapConfig.GetUniqueClusterId(ctx)
303301
telemetryEnabled, _ := configMapConfig.GetTelemetryEnabled(ctx)
304302

303+
leaderIdentifier := resolveLeaderIdentifier()
304+
leaderClusterID := clusterId
305+
if leaderClusterID == "" {
306+
leaderClusterID = "testkube-core"
307+
} else {
308+
leaderClusterID = fmt.Sprintf("%s-core", leaderClusterID)
309+
}
310+
311+
coordinatorLogger := log.DefaultLogger.With("component", "leader-coordinator")
312+
leaderCoordinator := leader.New(leaderLeaseBackend, leaderIdentifier, leaderClusterID, coordinatorLogger)
313+
hasLeaderTasks := false
314+
registerLeaderTask := func(task leader.Task) {
315+
hasLeaderTasks = true
316+
leaderCoordinator.Register(task)
317+
}
318+
305319
// k8s clients
306320
webhooksClient := executorsclientv1.NewWebhooksClient(kubeClient, cfg.TestkubeNamespace)
307321
webhookTemplatesClient := executorsclientv1.NewWebhookTemplatesClient(kubeClient, cfg.TestkubeNamespace)
@@ -508,9 +522,12 @@ func main() {
508522
// Update TestWorkflowExecution Kubernetes resource objects on status change
509523
eventsEmitter.RegisterLoader(testworkflowexecutions.NewLoader(ctx, cfg.TestkubeNamespace, kubeClient))
510524

511-
g.Go(func() error {
512-
eventsEmitter.Listen(ctx)
513-
return nil
525+
registerLeaderTask(leader.Task{
526+
Name: "event-emitter",
527+
Start: func(taskCtx context.Context) error {
528+
eventsEmitter.RunLeader(taskCtx)
529+
return nil
530+
},
514531
})
515532

516533
/////////////////////////////////
@@ -641,7 +658,7 @@ func main() {
641658
eventsEmitter,
642659
)
643660
commons.ExitOnError("starting agent", err)
644-
leaderTasks = append(leaderTasks, leader.Task{
661+
registerLeaderTask(leader.Task{
645662
Name: "agent",
646663
Start: func(taskCtx context.Context) error {
647664
err := agentHandle.Run(taskCtx)
@@ -662,26 +679,12 @@ func main() {
662679
// TODO: Check why this simpler options is not working
663680
// testkubeClientset := testkubeclientset.New(clientset.RESTClient())
664681

665-
var triggersLeaseBackend leasebackend.Repository
666-
if controlPlane != nil {
667-
triggersLeaseBackend = controlPlane.GetRepositoryManager().LeaseBackend()
668-
} else {
669-
// Fallback: Kubernetes Lease-based coordination (no external DB required)
670-
triggersLeaseBackend = leasebackendk8s.NewK8sLeaseBackend(
671-
clientset,
672-
"testkube-triggers-lease",
673-
cfg.TestkubeNamespace,
674-
leasebackendk8s.WithLeaseName(cfg.TestkubeLeaseName),
675-
)
676-
}
677-
678-
triggerService := triggers.NewService(
682+
triggerTasks := triggers.NewService(
679683
cfg.RunnerName,
680684
clientset,
681685
testkubeClientset,
682686
testWorkflowsClient,
683687
testTriggersClient,
684-
triggersLeaseBackend,
685688
log.DefaultLogger,
686689
eventBus,
687690
metrics,
@@ -695,17 +698,16 @@ func main() {
695698
triggers.WithTestTriggerControlPlane(cfg.TestTriggerControlPlane),
696699
triggers.WithEventLabels(cfg.EventLabels),
697700
)
698-
log.DefaultLogger.Info("starting trigger service")
699-
g.Go(func() error {
700-
triggerService.Run(ctx)
701-
return nil
702-
})
701+
log.DefaultLogger.Info("registering trigger tasks with shared leader coordinator")
702+
for _, task := range triggerTasks {
703+
registerLeaderTask(task)
704+
}
703705
} else {
704706
log.DefaultLogger.Info("test triggers are disabled")
705707
}
706708

707709
// telemetry based functions
708-
leaderTasks = append(leaderTasks, leader.Task{
710+
registerLeaderTask(leader.Task{
709711
Name: "telemetry-heartbeat",
710712
Start: func(taskCtx context.Context) error {
711713
services.HandleTelemetryHeartbeat(taskCtx, clusterId, configMapConfig)
@@ -755,7 +757,7 @@ func main() {
755757
schedulableResourceWatcher.WatchTestWorkflowTemplates,
756758
)
757759
// Start the new scheduler.
758-
leaderTasks = append(leaderTasks, leader.Task{
760+
registerLeaderTask(leader.Task{
759761
Name: "cron-scheduler",
760762
Start: func(taskCtx context.Context) error {
761763
go func() {
@@ -779,22 +781,7 @@ func main() {
779781
return httpServer.Run(ctx)
780782
})
781783

782-
if len(leaderTasks) > 0 {
783-
leaderIdentifier := resolveLeaderIdentifier()
784-
785-
leaderClusterID := clusterId
786-
if leaderClusterID == "" {
787-
leaderClusterID = "testkube-core"
788-
} else {
789-
leaderClusterID = fmt.Sprintf("%s-core", leaderClusterID)
790-
}
791-
792-
coordinatorLogger := log.DefaultLogger.With("component", "leader-coordinator")
793-
leaderCoordinator := leader.New(leaderLeaseBackend, leaderIdentifier, leaderClusterID, coordinatorLogger)
794-
for _, task := range leaderTasks {
795-
leaderCoordinator.Register(task)
796-
}
797-
784+
if hasLeaderTasks {
798785
g.Go(func() error {
799786
return leaderCoordinator.Run(ctx)
800787
})

pkg/event/emitter.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,28 @@ const (
126126
leaseClusterID string = "event-emitters"
127127
)
128128

129+
// RunLeader executes the leader-only loop under an externally managed coordinator.
130+
// It does not perform lease acquisition itself; it assumes the provided context
131+
// is canceled when leadership is lost or when shutdown is requested.
132+
func (e *Emitter) RunLeader(ctx context.Context) {
133+
e.log.Info("event emitter starting under external coordinator")
134+
e.closeBusOnContext(ctx)
135+
e.leaderLoop(ctx)
136+
}
137+
138+
func (e *Emitter) closeBusOnContext(ctx context.Context) {
139+
go func() {
140+
<-ctx.Done()
141+
e.log.Info("event emitter closing event bus")
142+
err := e.bus.Close()
143+
if err != nil {
144+
e.log.Errorw("error while closing event bus", "error", err)
145+
} else {
146+
e.log.Info("event emitter closed event bus")
147+
}
148+
}()
149+
}
150+
129151
// TODO(emil): convert to using new common coordinator package for lease acquisition
130152
func (e *Emitter) leaseCheckLoop(ctx context.Context, leaseChan chan<- bool) {
131153
e.log.Info("event emitter waiting for lease")
@@ -154,17 +176,7 @@ func (e *Emitter) leaseCheck(ctx context.Context, leaseChan chan<- bool) {
154176
// notifications.
155177
func (e *Emitter) Listen(ctx context.Context) {
156178
e.log.Info("event emitter starting")
157-
// Clean up
158-
go func() {
159-
<-ctx.Done()
160-
e.log.Info("event emitter closing event bus")
161-
err := e.bus.Close()
162-
if err != nil {
163-
e.log.Errorw("error while closing event bus", "error", err)
164-
} else {
165-
e.log.Info("event emitter closed event bus")
166-
}
167-
}()
179+
e.closeBusOnContext(ctx)
168180

169181
// Start lease check loop
170182
leaseChan := make(chan bool)

pkg/event/emitter_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ package event
33
import (
44
"context"
55
"os"
6+
"sync"
67
"testing"
78
"time"
89

910
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
1012
"go.uber.org/mock/gomock"
1113

1214
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
@@ -294,6 +296,99 @@ func TestEmitter_Listen_reconciliation(t *testing.T) {
294296

295297
}
296298

299+
type trackingBus struct {
300+
subscribeTopicCalls []struct {
301+
Topic string
302+
Queue string
303+
}
304+
unsubscribeCalls []string
305+
closeCalls int
306+
mu sync.Mutex
307+
}
308+
309+
func (b *trackingBus) Publish(event testkube.Event) error {
310+
return nil
311+
}
312+
313+
func (b *trackingBus) Subscribe(queue string, handler bus.Handler) error {
314+
return nil
315+
}
316+
317+
func (b *trackingBus) Unsubscribe(queue string) error {
318+
b.mu.Lock()
319+
defer b.mu.Unlock()
320+
b.unsubscribeCalls = append(b.unsubscribeCalls, queue)
321+
return nil
322+
}
323+
324+
func (b *trackingBus) PublishTopic(topic string, event testkube.Event) error {
325+
return nil
326+
}
327+
328+
func (b *trackingBus) SubscribeTopic(topic string, queue string, handler bus.Handler) error {
329+
b.mu.Lock()
330+
defer b.mu.Unlock()
331+
b.subscribeTopicCalls = append(b.subscribeTopicCalls, struct {
332+
Topic string
333+
Queue string
334+
}{Topic: topic, Queue: queue})
335+
return nil
336+
}
337+
338+
func (b *trackingBus) Close() error {
339+
b.mu.Lock()
340+
defer b.mu.Unlock()
341+
b.closeCalls++
342+
return nil
343+
}
344+
345+
type noopLeaseBackend struct{}
346+
347+
func (noopLeaseBackend) TryAcquire(ctx context.Context, id, clusterID string) (bool, error) {
348+
return false, nil
349+
}
350+
351+
func TestEmitter_RunLeader_UsesExternalCoordinator(t *testing.T) {
352+
t.Parallel()
353+
354+
tb := &trackingBus{}
355+
emitter := NewEmitter(tb, noopLeaseBackend{}, "agentevents", "")
356+
357+
ctx, cancel := context.WithCancel(context.Background())
358+
done := make(chan struct{})
359+
go func() {
360+
emitter.RunLeader(ctx)
361+
close(done)
362+
}()
363+
364+
// Allow leader loop to subscribe then cancel.
365+
time.Sleep(20 * time.Millisecond)
366+
cancel()
367+
368+
select {
369+
case <-done:
370+
case <-time.After(time.Second):
371+
t.Fatal("expected RunLeader to exit after context cancellation")
372+
}
373+
374+
assert.Len(t, tb.subscribeTopicCalls, 1, "expected one topic subscription")
375+
assert.Equal(t, "agentevents.>", tb.subscribeTopicCalls[0].Topic)
376+
assert.Equal(t, "emitter", tb.subscribeTopicCalls[0].Queue)
377+
378+
require.Eventually(t, func() bool {
379+
tb.mu.Lock()
380+
defer tb.mu.Unlock()
381+
return len(tb.unsubscribeCalls) == 1
382+
}, time.Second, 10*time.Millisecond, "expected unsubscribe on shutdown")
383+
384+
assert.Equal(t, "emitter", tb.unsubscribeCalls[0])
385+
386+
tb.mu.Lock()
387+
closeCalls := tb.closeCalls
388+
tb.mu.Unlock()
389+
assert.Equal(t, 1, closeCalls, "expected bus close on shutdown")
390+
}
391+
297392
func newExampleTestEvent1() testkube.Event {
298393
return testkube.Event{
299394
Id: "eventID1",

0 commit comments

Comments
 (0)