Skip to content

Commit ceb66a2

Browse files
committed
Move embed into separate package from executor
Better isolates the K3s implementation from the interface, and aligns the package path with other projects executors. This should also remove the indirect flannel dep from other projects that don't use the embedded executor. Signed-off-by: Brad Davidson <[email protected]>
1 parent 5adfed8 commit ceb66a2

File tree

9 files changed

+153
-63
lines changed

9 files changed

+153
-63
lines changed

cmd/server/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"github.com/sirupsen/logrus"
2626
"github.com/urfave/cli/v2"
2727
crictl2 "sigs.k8s.io/cri-tools/cmd/crictl"
28+
29+
_ "github.com/k3s-io/k3s/pkg/executor/embed"
2830
)
2931

3032
func init() {

main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"github.com/k3s-io/k3s/pkg/configfilearg"
1818
"github.com/sirupsen/logrus"
1919
"github.com/urfave/cli/v2"
20+
21+
_ "github.com/k3s-io/k3s/pkg/executor/embed"
2022
)
2123

2224
func main() {

pkg/daemons/executor/executor.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package executor
22

33
import (
44
"context"
5+
"errors"
56
"net/http"
67
"os"
78
"path/filepath"
@@ -19,6 +20,8 @@ import (
1920

2021
var (
2122
executor Executor
23+
24+
ErrNotInitialized = errors.New("executor not initialized")
2225
)
2326

2427
// TestFunc is the signature of a function that returns nil error when the component is ready.
@@ -152,54 +155,93 @@ func Set(driver Executor) {
152155
}
153156

154157
func Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error {
158+
if executor == nil {
159+
return ErrNotInitialized
160+
}
155161
return executor.Bootstrap(ctx, nodeConfig, cfg)
156162
}
157163

158164
func Kubelet(ctx context.Context, args []string) error {
165+
if executor == nil {
166+
return ErrNotInitialized
167+
}
159168
return executor.Kubelet(ctx, args)
160169
}
161170

162171
func KubeProxy(ctx context.Context, args []string) error {
172+
if executor == nil {
173+
return ErrNotInitialized
174+
}
163175
return executor.KubeProxy(ctx, args)
164176
}
165177

166178
func APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error) {
179+
if executor == nil {
180+
return nil, nil, ErrNotInitialized
181+
}
167182
return executor.APIServerHandlers(ctx)
168183
}
169184

170185
func APIServer(ctx context.Context, args []string) error {
186+
if executor == nil {
187+
return ErrNotInitialized
188+
}
171189
return executor.APIServer(ctx, args)
172190
}
173191

174192
func Scheduler(ctx context.Context, nodeReady <-chan struct{}, args []string) error {
193+
if executor == nil {
194+
return ErrNotInitialized
195+
}
175196
return executor.Scheduler(ctx, nodeReady, args)
176197
}
177198

178199
func ControllerManager(ctx context.Context, args []string) error {
200+
if executor == nil {
201+
return ErrNotInitialized
202+
}
179203
return executor.ControllerManager(ctx, args)
180204
}
181205

182206
func CurrentETCDOptions() (InitialOptions, error) {
207+
if executor == nil {
208+
return InitialOptions{}, ErrNotInitialized
209+
}
183210
return executor.CurrentETCDOptions()
184211
}
185212

186213
func ETCD(ctx context.Context, wg *sync.WaitGroup, args *ETCDConfig, extraArgs []string, test TestFunc) error {
214+
if executor == nil {
215+
return ErrNotInitialized
216+
}
187217
return executor.ETCD(ctx, wg, args, extraArgs, test)
188218
}
189219

190220
func CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error {
221+
if executor == nil {
222+
return ErrNotInitialized
223+
}
191224
return executor.CloudControllerManager(ctx, ccmRBACReady, args)
192225
}
193226

194227
func Containerd(ctx context.Context, config *daemonconfig.Node) error {
228+
if executor == nil {
229+
return ErrNotInitialized
230+
}
195231
return executor.Containerd(ctx, config)
196232
}
197233

198234
func Docker(ctx context.Context, config *daemonconfig.Node) error {
235+
if executor == nil {
236+
return ErrNotInitialized
237+
}
199238
return executor.Docker(ctx, config)
200239
}
201240

202241
func CRI(ctx context.Context, config *daemonconfig.Node) error {
242+
if executor == nil {
243+
return ErrNotInitialized
244+
}
203245
return executor.CRI(ctx, config)
204246
}
205247

@@ -208,18 +250,30 @@ func CNI(ctx context.Context, wg *sync.WaitGroup, config *daemonconfig.Node) err
208250
}
209251

210252
func APIServerReadyChan() <-chan struct{} {
253+
if executor == nil {
254+
return nil
255+
}
211256
return executor.APIServerReadyChan()
212257
}
213258

214259
func ETCDReadyChan() <-chan struct{} {
260+
if executor == nil {
261+
return nil
262+
}
215263
return executor.ETCDReadyChan()
216264
}
217265

218266
func CRIReadyChan() <-chan struct{} {
267+
if executor == nil {
268+
return nil
269+
}
219270
return executor.CRIReadyChan()
220271
}
221272

222273
func IsSelfHosted() bool {
274+
if executor == nil {
275+
return false
276+
}
223277
return executor.IsSelfHosted()
224278
}
225279

pkg/etcd/etcd.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/k3s-io/k3s/pkg/daemons/executor"
2929
"github.com/k3s-io/k3s/pkg/etcd/s3"
3030
"github.com/k3s-io/k3s/pkg/etcd/snapshot"
31+
embedded "github.com/k3s-io/k3s/pkg/executor/embed/etcd"
3132
"github.com/k3s-io/k3s/pkg/server/auth"
3233
"github.com/k3s-io/k3s/pkg/signals"
3334
"github.com/k3s-io/k3s/pkg/util"
@@ -1125,8 +1126,7 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context, wg *sync.WaitGroup) e
11251126
return err
11261127
}
11271128

1128-
embedded := executor.Embedded{}
1129-
return embedded.ETCD(ctx, wg, &executor.ETCDConfig{
1129+
return embedded.StartETCD(ctx, wg, &executor.ETCDConfig{
11301130
InitialOptions: executor.InitialOptions{AdvertisePeerURL: peerURL},
11311131
DataDir: tmpDataDir,
11321132
ForceNewCluster: true,
@@ -1146,7 +1146,7 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context, wg *sync.WaitGroup) e
11461146
},
11471147
ExperimentalInitialCorruptCheck: true,
11481148
ExperimentalWatchProgressNotifyInterval: e.config.Datastore.NotifyInterval,
1149-
}, append(e.config.ExtraEtcdArgs, "--max-snapshots=0", "--max-wals=0"), e.Test)
1149+
}, append(e.config.ExtraEtcdArgs, "--max-snapshots=0", "--max-wals=0"))
11501150
}
11511151

11521152
func addPort(address string, offset int) (string, error) {

pkg/daemons/executor/embed.go renamed to pkg/executor/embed/embed.go

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//go:build !no_embedded_executor
22
// +build !no_embedded_executor
33

4-
package executor
4+
package embed
55

66
import (
77
"context"
@@ -24,6 +24,8 @@ import (
2424
"github.com/k3s-io/k3s/pkg/agent/netpol"
2525
"github.com/k3s-io/k3s/pkg/cli/cmds"
2626
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
27+
"github.com/k3s-io/k3s/pkg/daemons/executor"
28+
"github.com/k3s-io/k3s/pkg/executor/embed/etcd"
2729
"github.com/k3s-io/k3s/pkg/signals"
2830
"github.com/k3s-io/k3s/pkg/util"
2931
"github.com/k3s-io/k3s/pkg/version"
@@ -52,7 +54,17 @@ import (
5254
var once sync.Once
5355

5456
func init() {
55-
executor = &Embedded{}
57+
executor.Set(&Embedded{})
58+
}
59+
60+
// explicit type check
61+
var _ executor.Executor = &Embedded{}
62+
63+
type Embedded struct {
64+
apiServerReady <-chan struct{}
65+
etcdReady chan struct{}
66+
criReady chan struct{}
67+
nodeConfig *daemonconfig.Node
5668
}
5769

5870
func (e *Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error {
@@ -314,24 +326,50 @@ func (*Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan
314326
return nil
315327
}
316328

317-
func (e *Embedded) CurrentETCDOptions() (InitialOptions, error) {
318-
return InitialOptions{}, nil
329+
func (e *Embedded) CurrentETCDOptions() (executor.InitialOptions, error) {
330+
return executor.InitialOptions{}, nil
331+
}
332+
333+
func (e *Embedded) ETCD(ctx context.Context, wg *sync.WaitGroup, args *executor.ETCDConfig, extraArgs []string, test executor.TestFunc) error {
334+
// Start a goroutine to call the provided test function until it returns true.
335+
// The test function is reponsible for ensuring that the etcd server is up
336+
// and ready to accept client requests.
337+
if e.etcdReady != nil {
338+
go func() {
339+
for {
340+
if err := test(ctx, true); err != nil {
341+
logrus.Infof("Failed to test etcd connection: %v", err)
342+
} else {
343+
logrus.Info("Connection to etcd is ready")
344+
close(e.etcdReady)
345+
return
346+
}
347+
348+
select {
349+
case <-time.After(5 * time.Second):
350+
case <-ctx.Done():
351+
return
352+
}
353+
}
354+
}()
355+
}
356+
return etcd.StartETCD(ctx, wg, args, extraArgs)
319357
}
320358

321359
func (e *Embedded) Containerd(ctx context.Context, cfg *daemonconfig.Node) error {
322-
return CloseIfNilErr(containerd.Run(ctx, cfg), e.criReady)
360+
return executor.CloseIfNilErr(containerd.Run(ctx, cfg), e.criReady)
323361
}
324362

325363
func (e *Embedded) Docker(ctx context.Context, cfg *daemonconfig.Node) error {
326-
return CloseIfNilErr(cridockerd.Run(ctx, cfg), e.criReady)
364+
return executor.CloseIfNilErr(cridockerd.Run(ctx, cfg), e.criReady)
327365
}
328366

329367
func (e *Embedded) CRI(ctx context.Context, cfg *daemonconfig.Node) error {
330368
// agentless sets cri socket path to /dev/null to indicate no CRI is needed
331369
if cfg.ContainerRuntimeEndpoint != "/dev/null" {
332-
return CloseIfNilErr(cri.WaitForService(ctx, cfg.ContainerRuntimeEndpoint, "CRI"), e.criReady)
370+
return executor.CloseIfNilErr(cri.WaitForService(ctx, cfg.ContainerRuntimeEndpoint, "CRI"), e.criReady)
333371
}
334-
return CloseIfNilErr(nil, e.criReady)
372+
return executor.CloseIfNilErr(nil, e.criReady)
335373
}
336374

337375
func (e *Embedded) CNI(ctx context.Context, wg *sync.WaitGroup, cfg *daemonconfig.Node) error {

pkg/daemons/executor/embed_linux.go renamed to pkg/executor/embed/embed_linux.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
//go:build linux && !no_embedded_executor
22
// +build linux,!no_embedded_executor
33

4-
package executor
4+
package embed
55

66
import (
77
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
8-
9-
// registering k3s cloud provider
10-
_ "github.com/k3s-io/k3s/pkg/cloudprovider"
118
)
129

1310
func platformKubeProxyArgs(nodeConfig *daemonconfig.Node) map[string]string {

pkg/daemons/executor/embed_windows.go renamed to pkg/executor/embed/embed_windows.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//go:build windows && !no_embedded_executor
22
// +build windows,!no_embedded_executor
33

4-
package executor
4+
package embed
55

66
import (
77
"encoding/json"
@@ -11,11 +11,8 @@ import (
1111
"time"
1212

1313
"github.com/Microsoft/hcsshim"
14-
"github.com/sirupsen/logrus"
15-
16-
// registering k3s cloud provider
17-
_ "github.com/k3s-io/k3s/pkg/cloudprovider"
1814
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
15+
"github.com/sirupsen/logrus"
1916
)
2017

2118
const (

pkg/daemons/executor/etcd.go renamed to pkg/executor/embed/etcd/etcd.go

Lines changed: 7 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,24 @@
1-
package executor
1+
package etcd
22

33
import (
44
"context"
55
"errors"
66
"os"
77
"path/filepath"
88
"sync"
9-
"time"
109

11-
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
10+
"github.com/k3s-io/k3s/pkg/daemons/executor"
1211
"github.com/k3s-io/k3s/pkg/version"
1312
"github.com/sirupsen/logrus"
1413
"go.etcd.io/etcd/server/v3/embed"
1514
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
1615
)
1716

18-
// Embedded is defined here so that we can use embedded.ETCD even when the rest
19-
// of the embedded execututor is disabled by build flags
20-
type Embedded struct {
21-
apiServerReady <-chan struct{}
22-
etcdReady chan struct{}
23-
criReady chan struct{}
24-
nodeConfig *daemonconfig.Node
25-
}
26-
27-
func (e *Embedded) ETCD(ctx context.Context, wg *sync.WaitGroup, args *ETCDConfig, extraArgs []string, test TestFunc) error {
28-
// An unbootstrapped executor is used to start up a temporary embedded etcd when reconciling.
29-
// This temporary executor doesn't have any ready channels set up, so don't bother testing.
30-
if e.etcdReady != nil {
31-
go func() {
32-
for {
33-
if err := test(ctx, true); err != nil {
34-
logrus.Infof("Failed to test etcd connection: %v", err)
35-
} else {
36-
logrus.Info("Connection to etcd is ready")
37-
close(e.etcdReady)
38-
return
39-
}
40-
41-
select {
42-
case <-time.After(5 * time.Second):
43-
case <-ctx.Done():
44-
return
45-
}
46-
}
47-
}()
48-
}
49-
50-
// nil args indicates a no-op start; all we need to do is wait for the test
51-
// func to indicate readiness and close the channel.
17+
// StartETCD runs an embedded etcd server instance with the provided config.
18+
// This function will return if the server has been successfully started.
19+
// The server will continue to run until the context is cancelled or some internal error occurs.
20+
func StartETCD(ctx context.Context, wg *sync.WaitGroup, args *executor.ETCDConfig, extraArgs []string) error {
21+
// nil args indicates a no-op start
5222
if args == nil {
5323
return nil
5424
}

0 commit comments

Comments
 (0)