From be8d8bb412eed588c277e0924e187d7d13af3cfc Mon Sep 17 00:00:00 2001 From: Derek Nola Date: Tue, 31 Jan 2023 09:43:23 -0800 Subject: [PATCH] Wait for cri-dockerd socket (#6853) * Wait for cri-dockerd socket * Consolidate cri utility functions Signed-off-by: Derek Nola --- pkg/agent/containerd/config_linux.go | 27 ------------------ pkg/agent/containerd/config_windows.go | 27 ------------------ pkg/agent/containerd/containerd.go | 35 ++--------------------- pkg/agent/cri/cri.go | 36 ++++++++++++++++++++++++ pkg/agent/cri/cri_linux.go | 39 ++++++++++++++++++++++++++ pkg/agent/cri/cri_windows.go | 37 ++++++++++++++++++++++++ pkg/agent/cridockerd/cridockerd.go | 5 +++- 7 files changed, 119 insertions(+), 87 deletions(-) create mode 100644 pkg/agent/cri/cri.go create mode 100644 pkg/agent/cri/cri_linux.go create mode 100644 pkg/agent/cri/cri_windows.go diff --git a/pkg/agent/containerd/config_linux.go b/pkg/agent/containerd/config_linux.go index f09ad85695..2930b9a045 100644 --- a/pkg/agent/containerd/config_linux.go +++ b/pkg/agent/containerd/config_linux.go @@ -6,7 +6,6 @@ package containerd import ( "context" "os" - "time" "github.com/containerd/containerd" "github.com/docker/docker/pkg/parsers/kernel" @@ -20,8 +19,6 @@ import ( "github.com/rancher/wharfie/pkg/registries" "github.com/sirupsen/logrus" "golang.org/x/sys/unix" - "google.golang.org/grpc" - runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" "k8s.io/kubernetes/pkg/kubelet/util" ) @@ -99,30 +96,6 @@ func setupContainerdConfig(ctx context.Context, cfg *config.Node) error { return util2.WriteFile(cfg.Containerd.Config, parsedTemplate) } -// criConnection connects to a CRI socket at the given path. -func CriConnection(ctx context.Context, address string) (*grpc.ClientConn, error) { - addr, dialer, err := util.GetAddressAndDialer(socketPrefix + address) - if err != nil { - return nil, err - } - - conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(3*time.Second), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize))) - if err != nil { - return nil, err - } - - c := runtimeapi.NewRuntimeServiceClient(conn) - _, err = c.Version(ctx, &runtimeapi.VersionRequest{ - Version: "0.1.0", - }) - if err != nil { - conn.Close() - return nil, err - } - - return conn, nil -} - func Client(address string) (*containerd.Client, error) { addr, _, err := util.GetAddressAndDialer(socketPrefix + address) if err != nil { diff --git a/pkg/agent/containerd/config_windows.go b/pkg/agent/containerd/config_windows.go index 5445edc28a..62be14c0b4 100644 --- a/pkg/agent/containerd/config_windows.go +++ b/pkg/agent/containerd/config_windows.go @@ -6,7 +6,6 @@ package containerd import ( "context" "os" - "time" "github.com/containerd/containerd" "github.com/k3s-io/k3s/pkg/agent/templates" @@ -14,8 +13,6 @@ import ( "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/rancher/wharfie/pkg/registries" "github.com/sirupsen/logrus" - "google.golang.org/grpc" - runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" "k8s.io/kubernetes/pkg/kubelet/util" ) @@ -66,30 +63,6 @@ func setupContainerdConfig(ctx context.Context, cfg *config.Node) error { return util2.WriteFile(cfg.Containerd.Config, parsedTemplate) } -// criConnection connects to a CRI socket at the given path. -func CriConnection(ctx context.Context, address string) (*grpc.ClientConn, error) { - addr, dialer, err := util.GetAddressAndDialer(address) - if err != nil { - return nil, err - } - - conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(3*time.Second), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize))) - if err != nil { - return nil, err - } - - c := runtimeapi.NewRuntimeServiceClient(conn) - _, err = c.Version(ctx, &runtimeapi.VersionRequest{ - Version: "0.1.0", - }) - if err != nil { - conn.Close() - return nil, err - } - - return conn, nil -} - func Client(address string) (*containerd.Client, error) { addr, _, err := util.GetAddressAndDialer(address) if err != nil { diff --git a/pkg/agent/containerd/containerd.go b/pkg/agent/containerd/containerd.go index 0b2190072d..51a825a7cf 100644 --- a/pkg/agent/containerd/containerd.go +++ b/pkg/agent/containerd/containerd.go @@ -18,6 +18,7 @@ import ( "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/pkg/cri/constants" "github.com/containerd/containerd/reference/docker" + "github.com/k3s-io/k3s/pkg/agent/cri" util2 "github.com/k3s-io/k3s/pkg/agent/util" "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/version" @@ -30,10 +31,6 @@ import ( runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" ) -const ( - maxMsgSize = 1024 * 1024 * 16 -) - // Run configures and starts containerd as a child process. Once it is up, images are preloaded // or pulled from files found in the agent images directory. func Run(ctx context.Context, cfg *config.Node) error { @@ -101,39 +98,13 @@ func Run(ctx context.Context, cfg *config.Node) error { os.Exit(1) }() - if err := WaitForContainerd(ctx, cfg.Containerd.Address); err != nil { + if err := cri.WaitForService(ctx, cfg.Containerd.Address, "containerd"); err != nil { return err } return preloadImages(ctx, cfg) } -// WaitForContainerd blocks in a retry loop until the Containerd CRI service -// is functional at the provided socket address. It will return only on success, -// or when the context is cancelled. -func WaitForContainerd(ctx context.Context, address string) error { - first := true - for { - conn, err := CriConnection(ctx, address) - if err == nil { - conn.Close() - break - } - if first { - first = false - } else { - logrus.Infof("Waiting for containerd startup: %v", err) - } - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(time.Second): - } - } - logrus.Info("Containerd is now running") - return nil -} - // preloadImages reads the contents of the agent images directory, and attempts to // import into containerd any files found there. Supported compressed types are decompressed, and // any .txt files are processed as a list of images that should be pre-pulled from remote registries. @@ -163,7 +134,7 @@ func preloadImages(ctx context.Context, cfg *config.Node) error { } defer client.Close() - criConn, err := CriConnection(ctx, cfg.Containerd.Address) + criConn, err := cri.Connection(ctx, cfg.Containerd.Address) if err != nil { return err } diff --git a/pkg/agent/cri/cri.go b/pkg/agent/cri/cri.go new file mode 100644 index 0000000000..cd92f6390a --- /dev/null +++ b/pkg/agent/cri/cri.go @@ -0,0 +1,36 @@ +package cri + +import ( + "context" + "time" + + "github.com/sirupsen/logrus" +) + +const maxMsgSize = 1024 * 1024 * 16 + +// WaitForService blocks in a retry loop until the CRI service +// is functional at the provided socket address. It will return only on success, +// or when the context is cancelled. +func WaitForService(ctx context.Context, address string, service string) error { + first := true + for { + conn, err := Connection(ctx, address) + if err == nil { + conn.Close() + break + } + if first { + first = false + } else { + logrus.Infof("Waiting for %s startup: %v", service, err) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second): + } + } + logrus.Infof("%s is now running", service) + return nil +} diff --git a/pkg/agent/cri/cri_linux.go b/pkg/agent/cri/cri_linux.go new file mode 100644 index 0000000000..7d4774aace --- /dev/null +++ b/pkg/agent/cri/cri_linux.go @@ -0,0 +1,39 @@ +//go:build linux +// +build linux + +package cri + +import ( + "context" + "time" + + "google.golang.org/grpc" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + k8sutil "k8s.io/kubernetes/pkg/kubelet/util" +) + +const socketPrefix = "unix://" + +// Connection connects to a CRI socket at the given path. +func Connection(ctx context.Context, address string) (*grpc.ClientConn, error) { + addr, dialer, err := k8sutil.GetAddressAndDialer(socketPrefix + address) + if err != nil { + return nil, err + } + + conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(3*time.Second), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize))) + if err != nil { + return nil, err + } + + c := runtimeapi.NewRuntimeServiceClient(conn) + _, err = c.Version(ctx, &runtimeapi.VersionRequest{ + Version: "0.1.0", + }) + if err != nil { + conn.Close() + return nil, err + } + + return conn, nil +} diff --git a/pkg/agent/cri/cri_windows.go b/pkg/agent/cri/cri_windows.go new file mode 100644 index 0000000000..ab700b6b51 --- /dev/null +++ b/pkg/agent/cri/cri_windows.go @@ -0,0 +1,37 @@ +//go:build windows +// +build windows + +package cri + +import ( + "context" + "time" + + "google.golang.org/grpc" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/kubernetes/pkg/kubelet/util" +) + +// Connection connects to a CRI socket at the given path. +func Connection(ctx context.Context, address string) (*grpc.ClientConn, error) { + addr, dialer, err := util.GetAddressAndDialer(address) + if err != nil { + return nil, err + } + + conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(3*time.Second), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize))) + if err != nil { + return nil, err + } + + c := runtimeapi.NewRuntimeServiceClient(conn) + _, err = c.Version(ctx, &runtimeapi.VersionRequest{ + Version: "0.1.0", + }) + if err != nil { + conn.Close() + return nil, err + } + + return conn, nil +} diff --git a/pkg/agent/cridockerd/cridockerd.go b/pkg/agent/cridockerd/cridockerd.go index fd5d3bd1de..fd3e284f1f 100644 --- a/pkg/agent/cridockerd/cridockerd.go +++ b/pkg/agent/cridockerd/cridockerd.go @@ -8,9 +8,12 @@ import ( "github.com/Mirantis/cri-dockerd/cmd" "github.com/Mirantis/cri-dockerd/cmd/version" + + "github.com/k3s-io/k3s/pkg/agent/cri" "github.com/k3s-io/k3s/pkg/cgroups" "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/sirupsen/logrus" + utilsnet "k8s.io/utils/net" ) @@ -34,7 +37,7 @@ func Run(ctx context.Context, cfg *config.Node) error { logrus.Fatalf("cri-dockerd exited: %v", command.ExecuteContext(ctx)) }() - return nil + return cri.WaitForService(ctx, cfg.CRIDockerd.Address, "cri-dockerd") } func getDockerCRIArgs(cfg *config.Node) []string {