Add containerd ready channel to delay etcd node join

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit 88178ae65e)
pull/4287/head
Brad Davidson 2021-10-11 23:13:10 -07:00 committed by Brad Davidson
parent edde820e89
commit 1a8bd3156f
13 changed files with 162 additions and 67 deletions

View File

@ -84,6 +84,14 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
} }
} }
// the agent runtime is ready to host workloads when containerd is up and the airgap
// images have finished loading, as that portion of startup may block for an arbitrary
// amount of time depending on how long it takes to import whatever the user has placed
// in the images directory.
if cfg.AgentReady != nil {
close(cfg.AgentReady)
}
notifySocket := os.Getenv("NOTIFY_SOCKET") notifySocket := os.Getenv("NOTIFY_SOCKET")
os.Unsetenv("NOTIFY_SOCKET") os.Unsetenv("NOTIFY_SOCKET")

View File

@ -53,6 +53,9 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
return err return err
} }
// Do an immediate fill of proxy addresses from the server endpoint list, before going into the
// watch loop. This will fail on the first server, as the apiserver won't be started yet - but
// that's fine because the local server is already seeded into the proxy address list.
endpoint, _ := client.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{}) endpoint, _ := client.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{})
if endpoint != nil { if endpoint != nil {
addresses := util.GetAddresses(endpoint) addresses := util.GetAddresses(endpoint)
@ -61,8 +64,9 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
} }
} }
// Attempt to connect to supervisors, storing their cancellation function for later when we
// need to disconnect.
disconnect := map[string]context.CancelFunc{} disconnect := map[string]context.CancelFunc{}
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for _, address := range proxy.SupervisorAddresses() { for _, address := range proxy.SupervisorAddresses() {
if _, ok := disconnect[address]; !ok { if _, ok := disconnect[address]; !ok {
@ -70,7 +74,11 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
} }
} }
// Once the apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come
// and go from the cluster. We go into a faster but noisier connect loop if the watch fails
// following a successful connection.
go func() { go func() {
util.WaitForAPIServerReady(client, 30*time.Second)
connect: connect:
for { for {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)

View File

@ -46,6 +46,7 @@ type Agent struct {
Taints cli.StringSlice Taints cli.StringSlice
ImageCredProvBinDir string ImageCredProvBinDir string
ImageCredProvConfig string ImageCredProvConfig string
AgentReady chan<- struct{}
AgentShared AgentShared
} }

View File

@ -16,6 +16,7 @@ import (
"github.com/rancher/k3s/pkg/agent/loadbalancer" "github.com/rancher/k3s/pkg/agent/loadbalancer"
"github.com/rancher/k3s/pkg/cli/cmds" "github.com/rancher/k3s/pkg/cli/cmds"
"github.com/rancher/k3s/pkg/clientaccess" "github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/datadir" "github.com/rancher/k3s/pkg/datadir"
"github.com/rancher/k3s/pkg/etcd" "github.com/rancher/k3s/pkg/etcd"
"github.com/rancher/k3s/pkg/netutil" "github.com/rancher/k3s/pkg/netutil"
@ -83,8 +84,11 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
cfg.Token = cfg.ClusterSecret cfg.Token = cfg.ClusterSecret
} }
agentReady := make(chan struct{})
serverConfig := server.Config{} serverConfig := server.Config{}
serverConfig.DisableAgent = cfg.DisableAgent serverConfig.DisableAgent = cfg.DisableAgent
serverConfig.ControlConfig.Runtime = &config.ControlRuntime{AgentReady: agentReady}
serverConfig.ControlConfig.Token = cfg.Token serverConfig.ControlConfig.Token = cfg.Token
serverConfig.ControlConfig.AgentToken = cfg.AgentToken serverConfig.ControlConfig.AgentToken = cfg.AgentToken
serverConfig.ControlConfig.JoinURL = cfg.ServerURL serverConfig.ControlConfig.JoinURL = cfg.ServerURL
@ -415,6 +419,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
}() }()
if cfg.DisableAgent { if cfg.DisableAgent {
close(agentReady)
<-ctx.Done() <-ctx.Done()
return nil return nil
} }
@ -431,6 +436,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
} }
agentConfig := cmds.AgentConfig agentConfig := cmds.AgentConfig
agentConfig.AgentReady = agentReady
agentConfig.Debug = app.GlobalBool("debug") agentConfig.Debug = app.GlobalBool("debug")
agentConfig.DataDir = filepath.Dir(serverConfig.ControlConfig.DataDir) agentConfig.DataDir = filepath.Dir(serverConfig.ControlConfig.DataDir)
agentConfig.ServerURL = url agentConfig.ServerURL = url

View File

@ -199,6 +199,7 @@ type ControlRuntime struct {
HTTPBootstrap bool HTTPBootstrap bool
APIServerReady <-chan struct{} APIServerReady <-chan struct{}
AgentReady <-chan struct{}
ETCDReady <-chan struct{} ETCDReady <-chan struct{}
ClusterControllerStart func(ctx context.Context) error ClusterControllerStart func(ctx context.Context) error
LeaderElectedClusterControllerStart func(ctx context.Context) error LeaderElectedClusterControllerStart func(ctx context.Context) error
@ -218,6 +219,7 @@ type ControlRuntime struct {
ServingKubeletKey string ServingKubeletKey string
ServerToken string ServerToken string
AgentToken string AgentToken string
APIServer http.Handler
Handler http.Handler Handler http.Handler
Tunnel http.Handler Tunnel http.Handler
Authenticator authenticator.Request Authenticator authenticator.Request

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"math/rand" "math/rand"
"net" "net"
"net/http"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
@ -23,7 +22,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes" "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
@ -37,9 +35,7 @@ var localhostIP = net.ParseIP("127.0.0.1")
func Server(ctx context.Context, cfg *config.Control) error { func Server(ctx context.Context, cfg *config.Control) error {
rand.Seed(time.Now().UTC().UnixNano()) rand.Seed(time.Now().UTC().UnixNano())
runtime := cfg.Runtime
runtime := &config.ControlRuntime{}
cfg.Runtime = runtime
if err := prepare(ctx, cfg, runtime); err != nil { if err := prepare(ctx, cfg, runtime); err != nil {
return errors.Wrap(err, "preparing server") return errors.Wrap(err, "preparing server")
@ -48,13 +44,16 @@ func Server(ctx context.Context, cfg *config.Control) error {
cfg.Runtime.Tunnel = setupTunnel() cfg.Runtime.Tunnel = setupTunnel()
proxyutil.DisableProxyHostnameCheck = true proxyutil.DisableProxyHostnameCheck = true
var auth authenticator.Request basicAuth, err := basicAuthenticator(runtime.PasswdFile)
var handler http.Handler if err != nil {
var err error return err
}
runtime.Authenticator = basicAuth
if !cfg.DisableAPIServer { if !cfg.DisableAPIServer {
auth, handler, err = apiServer(ctx, cfg, runtime) go waitForAPIServerHandlers(ctx, runtime)
if err != nil {
if err := apiServer(ctx, cfg, runtime); err != nil {
return err return err
} }
@ -62,13 +61,6 @@ func Server(ctx context.Context, cfg *config.Control) error {
return err return err
} }
} }
basicAuth, err := basicAuthenticator(runtime.PasswdFile)
if err != nil {
return err
}
runtime.Authenticator = combineAuthenticators(basicAuth, auth)
runtime.Handler = handler
if !cfg.DisableScheduler { if !cfg.DisableScheduler {
if err := scheduler(cfg, runtime); err != nil { if err := scheduler(cfg, runtime); err != nil {
@ -144,7 +136,7 @@ func scheduler(cfg *config.Control, runtime *config.ControlRuntime) error {
return executor.Scheduler(runtime.APIServerReady, args) return executor.Scheduler(runtime.APIServerReady, args)
} }
func apiServer(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) (authenticator.Request, http.Handler, error) { func apiServer(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) error {
argsMap := make(map[string]string) argsMap := make(map[string]string)
setupStorageBackend(argsMap, cfg) setupStorageBackend(argsMap, cfg)
@ -363,6 +355,15 @@ func checkForCloudControllerPrivileges(runtime *config.ControlRuntime, timeout t
return nil return nil
} }
func waitForAPIServerHandlers(ctx context.Context, runtime *config.ControlRuntime) {
auth, handler, err := executor.APIServerHandlers()
if err != nil {
logrus.Fatalf("Failed to get request handlers from apiserver: %v", err)
}
runtime.Authenticator = combineAuthenticators(runtime.Authenticator, auth)
runtime.APIServer = handler
}
func waitForAPIServerInBackground(ctx context.Context, runtime *config.ControlRuntime) error { func waitForAPIServerInBackground(ctx context.Context, runtime *config.ControlRuntime) error {
restConfig, err := clientcmd.BuildConfigFromFlags("", runtime.KubeConfigAdmin) restConfig, err := clientcmd.BuildConfigFromFlags("", runtime.KubeConfigAdmin)
if err != nil { if err != nil {

View File

@ -60,17 +60,21 @@ func (Embedded) KubeProxy(args []string) error {
return nil return nil
} }
func (Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) (authenticator.Request, http.Handler, error) { func (Embedded) APIServerHandlers() (authenticator.Request, http.Handler, error) {
<-etcdReady startupConfig := <-app.StartupConfig
return startupConfig.Authenticator, startupConfig.Handler, nil
}
func (Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error {
command := app.NewAPIServerCommand(ctx.Done()) command := app.NewAPIServerCommand(ctx.Done())
command.SetArgs(args) command.SetArgs(args)
go func() { go func() {
<-etcdReady
logrus.Fatalf("apiserver exited: %v", command.Execute()) logrus.Fatalf("apiserver exited: %v", command.Execute())
}() }()
startupConfig := <-app.StartupConfig return nil
return startupConfig.Authenticator, startupConfig.Handler, nil
} }
func (Embedded) Scheduler(apiReady <-chan struct{}, args []string) error { func (Embedded) Scheduler(apiReady <-chan struct{}, args []string) error {

View File

@ -29,7 +29,7 @@ func (e Embedded) ETCD(ctx context.Context, args ETCDConfig) error {
} }
etcd, err := embed.StartEtcd(cfg) etcd, err := embed.StartEtcd(cfg)
if err != nil { if err != nil {
return nil return err
} }
go func() { go func() {

View File

@ -22,7 +22,8 @@ type Executor interface {
Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error
Kubelet(args []string) error Kubelet(args []string) error
KubeProxy(args []string) error KubeProxy(args []string) error
APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) (authenticator.Request, http.Handler, error) APIServerHandlers() (authenticator.Request, http.Handler, error)
APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error
Scheduler(apiReady <-chan struct{}, args []string) error Scheduler(apiReady <-chan struct{}, args []string) error
ControllerManager(apiReady <-chan struct{}, args []string) error ControllerManager(apiReady <-chan struct{}, args []string) error
CurrentETCDOptions() (InitialOptions, error) CurrentETCDOptions() (InitialOptions, error)
@ -97,7 +98,11 @@ func KubeProxy(args []string) error {
return executor.KubeProxy(args) return executor.KubeProxy(args)
} }
func APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) (authenticator.Request, http.Handler, error) { func APIServerHandlers() (authenticator.Request, http.Handler, error) {
return executor.APIServerHandlers()
}
func APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error {
return executor.APIServer(ctx, etcdReady, args) return executor.APIServer(ctx, etcdReady, args)
} }

View File

@ -186,6 +186,7 @@ func (e *ETCD) IsInitialized(ctx context.Context, config *config.Control) (bool,
func (e *ETCD) Reset(ctx context.Context, rebootstrap func() error) error { func (e *ETCD) Reset(ctx context.Context, rebootstrap func() error) error {
// Wait for etcd to come up as a new single-node cluster, then exit // Wait for etcd to come up as a new single-node cluster, then exit
go func() { go func() {
<-e.runtime.AgentReady
t := time.NewTicker(5 * time.Second) t := time.NewTicker(5 * time.Second)
defer t.Stop() defer t.Stop()
for range t.C { for range t.C {
@ -285,8 +286,14 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e
return e.newCluster(ctx, false) return e.newCluster(ctx, false)
} }
err = e.join(ctx, clientAccessInfo) go func() {
return errors.Wrap(err, "joining etcd cluster") <-e.runtime.AgentReady
if err := e.join(ctx, clientAccessInfo); err != nil {
logrus.Fatalf("ETCD join failed: %v", err)
}
}()
return nil
} }
// join attempts to add a member to an existing cluster // join attempts to add a member to an existing cluster
@ -329,9 +336,9 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er
// make sure to remove the name file if a duplicate node name is used // make sure to remove the name file if a duplicate node name is used
nameFile := nameFile(e.config) nameFile := nameFile(e.config)
if err := os.Remove(nameFile); err != nil { if err := os.Remove(nameFile); err != nil {
return err logrus.Errorf("Failed to remove etcd name file %s: %v", nameFile, err)
} }
return errors.New("Failed to join etcd cluster due to duplicate node names, please use unique node name for the server") return errors.New("duplicate node name found, please use a unique name for this node")
} }
for _, peer := range member.PeerURLs { for _, peer := range member.PeerURLs {
u, err := url.Parse(peer) u, err := url.Parse(peer)
@ -352,7 +359,7 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er
} }
if add { if add {
logrus.Infof("Adding %s to etcd cluster %v", e.peerURL(), cluster) logrus.Infof("Adding member %s=%s to etcd cluster %v", e.name, e.peerURL(), cluster)
if _, err = client.MemberAddAsLearner(clientCtx, []string{e.peerURL()}); err != nil { if _, err = client.MemberAddAsLearner(clientCtx, []string{e.peerURL()}); err != nil {
return err return err
} }
@ -438,7 +445,7 @@ func (e *ETCD) handler(next http.Handler) http.Handler {
return mux return mux
} }
// infoHandler returns etcd cluster information. This is used by new members when joining the custer. // infoHandler returns etcd cluster information. This is used by new members when joining the cluster.
func (e *ETCD) infoHandler() http.Handler { func (e *ETCD) infoHandler() http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 2*time.Second) ctx, cancel := context.WithTimeout(req.Context(), 2*time.Second)
@ -494,6 +501,10 @@ func getClientConfig(ctx context.Context, runtime *config.ControlRuntime, endpoi
// toTLSConfig converts the ControlRuntime configuration to TLS configuration suitable // toTLSConfig converts the ControlRuntime configuration to TLS configuration suitable
// for use by etcd. // for use by etcd.
func toTLSConfig(runtime *config.ControlRuntime) (*tls.Config, error) { func toTLSConfig(runtime *config.ControlRuntime) (*tls.Config, error) {
if runtime.ClientETCDCert == "" || runtime.ClientETCDKey == "" || runtime.ETCDServerCA == "" {
return nil, errors.New("runtime is not ready yet")
}
clientCert, err := tls.LoadX509KeyPair(runtime.ClientETCDCert, runtime.ClientETCDKey) clientCert, err := tls.LoadX509KeyPair(runtime.ClientETCDCert, runtime.ClientETCDKey)
if err != nil { if err != nil {
return nil, err return nil, err
@ -527,8 +538,8 @@ func GetAdvertiseAddress(advertiseIP string) (string, error) {
// newCluster returns options to set up etcd for a new cluster // newCluster returns options to set up etcd for a new cluster
func (e *ETCD) newCluster(ctx context.Context, reset bool) error { func (e *ETCD) newCluster(ctx context.Context, reset bool) error {
return e.cluster(ctx, reset, executor.InitialOptions{ return e.cluster(ctx, reset, executor.InitialOptions{
AdvertisePeerURL: fmt.Sprintf("https://%s:2380", e.address), AdvertisePeerURL: e.peerURL(),
Cluster: fmt.Sprintf("%s=https://%s:2380", e.name, e.address), Cluster: fmt.Sprintf("%s=%s", e.name, e.peerURL()),
State: "new", State: "new",
}) })
} }
@ -621,6 +632,7 @@ func (e *ETCD) RemovePeer(ctx context.Context, name, address string, allowSelfRe
// being promoted to full voting member. The checks only run on the cluster member that is // being promoted to full voting member. The checks only run on the cluster member that is
// the etcd leader. // the etcd leader.
func (e *ETCD) manageLearners(ctx context.Context) error { func (e *ETCD) manageLearners(ctx context.Context) error {
<-e.runtime.AgentReady
t := time.NewTicker(manageTickerTime) t := time.NewTicker(manageTickerTime)
defer t.Stop() defer t.Stop()
@ -1316,9 +1328,6 @@ func backupDirWithRetention(dir string, maxBackupRetention int) (string, error)
// GetAPIServerURLFromETCD will try to fetch the version.Program/apiaddresses key from etcd // GetAPIServerURLFromETCD will try to fetch the version.Program/apiaddresses key from etcd
// when it succeed it will parse the first address in the list and return back an address // when it succeed it will parse the first address in the list and return back an address
func GetAPIServerURLFromETCD(ctx context.Context, cfg *config.Control) (string, error) { func GetAPIServerURLFromETCD(ctx context.Context, cfg *config.Control) (string, error) {
if cfg.Runtime == nil {
return "", fmt.Errorf("runtime is not ready yet")
}
cl, err := GetClient(ctx, cfg.Runtime, endpoint) cl, err := GetClient(ctx, cfg.Runtime, endpoint)
if err != nil { if err != nil {
return "", err return "", err

View File

@ -7,25 +7,35 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
"time"
"github.com/rancher/k3s/pkg/clientaccess" "github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/daemons/config" "github.com/rancher/k3s/pkg/daemons/config"
testutil "github.com/rancher/k3s/tests/util" testutil "github.com/rancher/k3s/tests/util"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
etcd "go.etcd.io/etcd/clientv3" clientv3 "go.etcd.io/etcd/clientv3"
utilnet "k8s.io/apimachinery/pkg/util/net"
) )
func generateTestConfig() *config.Control { func mustGetAddress() string {
_, clusterIPNet, _ := net.ParseCIDR("10.42.0.0/16") ipAddr, err := utilnet.ChooseHostInterface()
_, serviceIPNet, _ := net.ParseCIDR("10.43.0.0/16") if err != nil {
panic(err)
}
return ipAddr.String()
}
func generateTestConfig() *config.Control {
agentReady := make(chan struct{})
close(agentReady)
return &config.Control{ return &config.Control{
Runtime: &config.ControlRuntime{AgentReady: agentReady},
HTTPSPort: 6443, HTTPSPort: 6443,
SupervisorPort: 6443, SupervisorPort: 6443,
AdvertisePort: 6443, AdvertisePort: 6443,
ClusterDomain: "cluster.local", ClusterDomain: "cluster.local",
ClusterDNS: net.ParseIP("10.43.0.10"), ClusterDNS: net.ParseIP("10.43.0.10"),
ClusterIPRange: clusterIPNet, ClusterIPRange: testutil.ClusterIPNet(),
DataDir: "/tmp/k3s/", // Different than the default value DataDir: "/tmp/k3s/", // Different than the default value
FlannelBackend: "vxlan", FlannelBackend: "vxlan",
EtcdSnapshotName: "etcd-snapshot", EtcdSnapshotName: "etcd-snapshot",
@ -34,7 +44,7 @@ func generateTestConfig() *config.Control {
EtcdS3Endpoint: "s3.amazonaws.com", EtcdS3Endpoint: "s3.amazonaws.com",
EtcdS3Region: "us-east-1", EtcdS3Region: "us-east-1",
SANs: []string{"127.0.0.1"}, SANs: []string{"127.0.0.1"},
ServiceIPRange: serviceIPNet, ServiceIPRange: testutil.ServiceIPNet(),
} }
} }
@ -192,42 +202,48 @@ func Test_UnitETCD_Register(t *testing.T) {
} }
func Test_UnitETCD_Start(t *testing.T) { func Test_UnitETCD_Start(t *testing.T) {
type contextInfo struct {
ctx context.Context
cancel context.CancelFunc
}
type fields struct { type fields struct {
client *etcd.Client context contextInfo
client *clientv3.Client
config *config.Control config *config.Control
name string name string
runtime *config.ControlRuntime
address string address string
cron *cron.Cron cron *cron.Cron
s3 *S3 s3 *S3
} }
type args struct { type args struct {
ctx context.Context
clientAccessInfo *clientaccess.Info clientAccessInfo *clientaccess.Info
} }
tests := []struct { tests := []struct {
name string name string
fields fields fields fields
args args args args
setup func(cnf *config.Control) error setup func(cnf *config.Control, ctxInfo *contextInfo) error
teardown func(cnf *config.Control) error teardown func(cnf *config.Control, ctxInfo *contextInfo) error
wantErr bool wantErr bool
}{ }{
{ {
name: "Start etcd without clientAccessInfo and without snapshots", name: "Start etcd without clientAccessInfo and without snapshots",
fields: fields{ fields: fields{
config: generateTestConfig(), config: generateTestConfig(),
address: "192.168.1.123", // Local IP address address: mustGetAddress(),
name: "default",
}, },
args: args{ args: args{
ctx: context.TODO(),
clientAccessInfo: nil, clientAccessInfo: nil,
}, },
setup: func(cnf *config.Control) error { setup: func(cnf *config.Control, ctxInfo *contextInfo) error {
ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background())
cnf.EtcdDisableSnapshots = true cnf.EtcdDisableSnapshots = true
return testutil.GenerateRuntime(cnf) return testutil.GenerateRuntime(cnf)
}, },
teardown: func(cnf *config.Control) error { teardown: func(cnf *config.Control, ctxInfo *contextInfo) error {
ctxInfo.cancel()
time.Sleep(5 * time.Second)
testutil.CleanupDataDir(cnf) testutil.CleanupDataDir(cnf)
return nil return nil
}, },
@ -236,17 +252,20 @@ func Test_UnitETCD_Start(t *testing.T) {
name: "Start etcd without clientAccessInfo on", name: "Start etcd without clientAccessInfo on",
fields: fields{ fields: fields{
config: generateTestConfig(), config: generateTestConfig(),
address: "192.168.1.123", // Local IP address address: mustGetAddress(),
name: "default",
cron: cron.New(), cron: cron.New(),
}, },
args: args{ args: args{
ctx: context.TODO(),
clientAccessInfo: nil, clientAccessInfo: nil,
}, },
setup: func(cnf *config.Control) error { setup: func(cnf *config.Control, ctxInfo *contextInfo) error {
ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background())
return testutil.GenerateRuntime(cnf) return testutil.GenerateRuntime(cnf)
}, },
teardown: func(cnf *config.Control) error { teardown: func(cnf *config.Control, ctxInfo *contextInfo) error {
ctxInfo.cancel()
time.Sleep(5 * time.Second)
testutil.CleanupDataDir(cnf) testutil.CleanupDataDir(cnf)
return nil return nil
}, },
@ -255,20 +274,23 @@ func Test_UnitETCD_Start(t *testing.T) {
name: "Start etcd with an existing cluster", name: "Start etcd with an existing cluster",
fields: fields{ fields: fields{
config: generateTestConfig(), config: generateTestConfig(),
address: "192.168.1.123", // Local IP address address: mustGetAddress(),
name: "default",
cron: cron.New(), cron: cron.New(),
}, },
args: args{ args: args{
ctx: context.TODO(),
clientAccessInfo: nil, clientAccessInfo: nil,
}, },
setup: func(cnf *config.Control) error { setup: func(cnf *config.Control, ctxInfo *contextInfo) error {
ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background())
if err := testutil.GenerateRuntime(cnf); err != nil { if err := testutil.GenerateRuntime(cnf); err != nil {
return err return err
} }
return os.MkdirAll(walDir(cnf), 0700) return os.MkdirAll(walDir(cnf), 0700)
}, },
teardown: func(cnf *config.Control) error { teardown: func(cnf *config.Control, ctxInfo *contextInfo) error {
ctxInfo.cancel()
time.Sleep(5 * time.Second)
testutil.CleanupDataDir(cnf) testutil.CleanupDataDir(cnf)
os.Remove(walDir(cnf)) os.Remove(walDir(cnf))
return nil return nil
@ -281,17 +303,17 @@ func Test_UnitETCD_Start(t *testing.T) {
client: tt.fields.client, client: tt.fields.client,
config: tt.fields.config, config: tt.fields.config,
name: tt.fields.name, name: tt.fields.name,
runtime: tt.fields.runtime, runtime: tt.fields.config.Runtime,
address: tt.fields.address, address: tt.fields.address,
cron: tt.fields.cron, cron: tt.fields.cron,
s3: tt.fields.s3, s3: tt.fields.s3,
} }
defer tt.teardown(e.config) defer tt.teardown(e.config, &tt.fields.context)
if err := tt.setup(e.config); err != nil { if err := tt.setup(e.config, &tt.fields.context); err != nil {
t.Errorf("Setup for ETCD.Start() failed = %v", err) t.Errorf("Setup for ETCD.Start() failed = %v", err)
return return
} }
if err := e.Start(tt.args.ctx, tt.args.clientAccessInfo); (err != nil) != tt.wantErr { if err := e.Start(tt.fields.context.ctx, tt.args.clientAccessInfo); (err != nil) != tt.wantErr {
t.Errorf("ETCD.Start() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("ETCD.Start() error = %v, wantErr %v", err, tt.wantErr)
} }
}) })

View File

@ -39,7 +39,7 @@ func router(ctx context.Context, config *Config, cfg *cmds.Server) http.Handler
prefix := "/v1-" + version.Program prefix := "/v1-" + version.Program
authed := mux.NewRouter() authed := mux.NewRouter()
authed.Use(authMiddleware(serverConfig, version.Program+":agent")) authed.Use(authMiddleware(serverConfig, version.Program+":agent"))
authed.NotFoundHandler = serverConfig.Runtime.Handler authed.NotFoundHandler = apiserver(serverConfig.Runtime)
authed.Path(prefix + "/serving-kubelet.crt").Handler(servingKubeletCert(serverConfig, serverConfig.Runtime.ServingKubeletKey, nodeAuth)) authed.Path(prefix + "/serving-kubelet.crt").Handler(servingKubeletCert(serverConfig, serverConfig.Runtime.ServingKubeletKey, nodeAuth))
authed.Path(prefix + "/client-kubelet.crt").Handler(clientKubeletCert(serverConfig, serverConfig.Runtime.ClientKubeletKey, nodeAuth)) authed.Path(prefix + "/client-kubelet.crt").Handler(clientKubeletCert(serverConfig, serverConfig.Runtime.ClientKubeletKey, nodeAuth))
authed.Path(prefix + "/client-kube-proxy.crt").Handler(fileHandler(serverConfig.Runtime.ClientKubeProxyCert, serverConfig.Runtime.ClientKubeProxyKey)) authed.Path(prefix + "/client-kube-proxy.crt").Handler(fileHandler(serverConfig.Runtime.ClientKubeProxyCert, serverConfig.Runtime.ClientKubeProxyKey))
@ -72,6 +72,20 @@ func router(ctx context.Context, config *Config, cfg *cmds.Server) http.Handler
return router return router
} }
func apiserver(runtime *config.ControlRuntime) http.Handler {
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
if runtime != nil && runtime.APIServer != nil {
runtime.APIServer.ServeHTTP(resp, req)
} else {
data := []byte("apiserver not ready")
resp.WriteHeader(http.StatusInternalServerError)
resp.Header().Set("Content-Type", "text/plain")
resp.Header().Set("Content-length", strconv.Itoa(len(data)))
resp.Write(data)
}
})
}
func cacerts(serverCA string) http.Handler { func cacerts(serverCA string) http.Handler {
var ca []byte var ca []byte
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
@ -269,7 +283,10 @@ func configHandler(server *config.Control, cfg *cmds.Server) http.Handler {
// At this time we don't sync all the fields, just those known to be touched by startup hooks. // At this time we don't sync all the fields, just those known to be touched by startup hooks.
server.DisableKubeProxy = cfg.DisableKubeProxy server.DisableKubeProxy = cfg.DisableKubeProxy
resp.Header().Set("content-type", "application/json") resp.Header().Set("content-type", "application/json")
json.NewEncoder(resp).Encode(server) if err := json.NewEncoder(resp).Encode(server); err != nil {
logrus.Errorf("Failed to encode agent config: %v", err)
resp.WriteHeader(http.StatusInternalServerError)
}
}) })
} }

View File

@ -1,6 +1,7 @@
package util package util
import ( import (
"net"
"os" "os"
"path/filepath" "path/filepath"
@ -33,8 +34,9 @@ func GenerateDataDir(cnf *config.Control) error {
} }
// CleanupDataDir removes the associated "/tmp/k3s/<RANDOM_STRING>" // CleanupDataDir removes the associated "/tmp/k3s/<RANDOM_STRING>"
// directory. // directory along with the 'latest' symlink that points at it.
func CleanupDataDir(cnf *config.Control) { func CleanupDataDir(cnf *config.Control) {
os.Remove(filepath.Join(cnf.DataDir, "..", "latest"))
os.RemoveAll(cnf.DataDir) os.RemoveAll(cnf.DataDir)
} }
@ -57,3 +59,13 @@ func GenerateRuntime(cnf *config.Control) error {
cnf.Runtime = runtime cnf.Runtime = runtime
return nil return nil
} }
func ClusterIPNet() *net.IPNet {
_, clusterIPNet, _ := net.ParseCIDR("10.42.0.0/16")
return clusterIPNet
}
func ServiceIPNet() *net.IPNet {
_, serviceIPNet, _ := net.ParseCIDR("10.43.0.0/16")
return serviceIPNet
}