From ace1714e0cfcc1b87aa7c8fd7c686c9a21dc9994 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Fri, 8 Dec 2023 19:51:28 +0000 Subject: [PATCH] Pin images instead of locking layers with lease Layer leases never did what we wanted anyways, and this is the new approved interface for ensuring that images do not get GCd Signed-off-by: Brad Davidson (cherry picked from commit 5c99bdd9bd1368778d90a912cd0e3e990c39eec1) Signed-off-by: Brad Davidson --- pkg/agent/containerd/containerd.go | 203 +++++++++++++++++++++-------- 1 file changed, 146 insertions(+), 57 deletions(-) diff --git a/pkg/agent/containerd/containerd.go b/pkg/agent/containerd/containerd.go index c18b335ae1..9e542f8fcd 100644 --- a/pkg/agent/containerd/containerd.go +++ b/pkg/agent/containerd/containerd.go @@ -14,9 +14,9 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" - "github.com/containerd/containerd/leases" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/pkg/cri/constants" + "github.com/containerd/containerd/pkg/cri/labels" "github.com/containerd/containerd/reference/docker" "github.com/k3s-io/k3s/pkg/agent/cri" util2 "github.com/k3s-io/k3s/pkg/agent/util" @@ -27,10 +27,17 @@ import ( "github.com/rancher/wharfie/pkg/tarfile" "github.com/rancher/wrangler/pkg/merr" "github.com/sirupsen/logrus" - "google.golang.org/grpc" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" ) +var ( + // In addition to using the CRI pinned label, we add our own label to indicate that + // the image was pinned by the import process, so that we can clear the pin on subsequent startups. + // ref: https://github.com/containerd/containerd/blob/release/1.7/pkg/cri/labels/labels.go + k3sPinnedImageLabelKey = "io.cattle." + version.Program + ".pinned" + k3sPinnedImageLabelValue = "pinned" +) + // Run configures and starts containerd as a child process. Once it is up, images are preloaded // or pulled from files found in the agent images directory. func Run(ctx context.Context, cfg *config.Node) error { @@ -134,38 +141,29 @@ func preloadImages(ctx context.Context, cfg *config.Node) error { } defer client.Close() + // Image pulls must be done using the CRI client, not the containerd client. + // Repository mirrors and rewrites are handled by the CRI service; if you pull directly + // using the containerd image service it will ignore the configured settings. criConn, err := cri.Connection(ctx, cfg.Containerd.Address) if err != nil { return err } defer criConn.Close() + imageClient := runtimeapi.NewImageServiceClient(criConn) // Ensure that our images are imported into the correct namespace ctx = namespaces.WithNamespace(ctx, constants.K8sContainerdNamespace) - // At startup all leases from k3s are cleared - ls := client.LeasesService() - existingLeases, err := ls.List(ctx) - if err != nil { - return err + // At startup all leases from k3s are cleared; we no longer use leases to lock content + if err := clearLeases(ctx, client); err != nil { + return errors.Wrap(err, "failed to clear leases") } - for _, lease := range existingLeases { - if lease.ID == version.Program { - logrus.Debugf("Deleting existing lease: %v", lease) - ls.Delete(ctx, lease) - } + // Clear the pinned labels on all images previously pinned by k3s + if err := clearLabels(ctx, client); err != nil { + return errors.Wrap(err, "failed to clear pinned labels") } - // Any images found on import are given a lease that never expires - lease, err := ls.Create(ctx, leases.WithID(version.Program)) - if err != nil { - return err - } - - // Ensure that our images are locked by the lease - ctx = leases.WithLease(ctx, lease.ID) - for _, fileInfo := range fileInfos { if fileInfo.IsDir() { continue @@ -174,7 +172,7 @@ func preloadImages(ctx context.Context, cfg *config.Node) error { start := time.Now() filePath := filepath.Join(cfg.Images, fileInfo.Name()) - if err := preloadFile(ctx, cfg, client, criConn, filePath); err != nil { + if err := preloadFile(ctx, cfg, client, imageClient, filePath); err != nil { logrus.Errorf("Error encountered while importing %s: %v", filePath, err) continue } @@ -186,7 +184,8 @@ func preloadImages(ctx context.Context, cfg *config.Node) error { // preloadFile handles loading images from a single tarball or pre-pull image list. // This is in its own function so that we can ensure that the various readers are properly closed, as some // decompressing readers need to be explicitly closed and others do not. -func preloadFile(ctx context.Context, cfg *config.Node, client *containerd.Client, criConn *grpc.ClientConn, filePath string) error { +func preloadFile(ctx context.Context, cfg *config.Node, client *containerd.Client, imageClient runtimeapi.ImageServiceClient, filePath string) error { + var images []images.Image if util2.HasSuffixI(filePath, ".txt") { file, err := os.Open(filePath) if err != nil { @@ -194,28 +193,102 @@ func preloadFile(ctx context.Context, cfg *config.Node, client *containerd.Clien } defer file.Close() logrus.Infof("Pulling images from %s", filePath) - return prePullImages(ctx, criConn, file) + images, err = prePullImages(ctx, client, imageClient, file) + if err != nil { + return errors.Wrap(err, "failed to pull images from "+filePath) + } + } else { + opener, err := tarfile.GetOpener(filePath) + if err != nil { + return err + } + + imageReader, err := opener() + if err != nil { + return err + } + defer imageReader.Close() + + logrus.Infof("Importing images from %s", filePath) + images, err = client.Import(ctx, imageReader, containerd.WithAllPlatforms(true)) + if err != nil { + return errors.Wrap(err, "failed to import images from "+filePath) + } } - opener, err := tarfile.GetOpener(filePath) + if err := labelImages(ctx, client, images); err != nil { + return errors.Wrap(err, "failed to add pinned label to images") + } + if err := retagImages(ctx, client, images, cfg.AgentConfig.AirgapExtraRegistry); err != nil { + return errors.Wrap(err, "failed to retag images") + } + + for _, image := range images { + logrus.Infof("Imported %s", image.Name) + } + return nil +} + +// clearLeases deletes any leases left by previous versions of k3s. +// We no longer use leases to lock content; they only locked the +// blobs, not the actual images. +func clearLeases(ctx context.Context, client *containerd.Client) error { + ls := client.LeasesService() + existingLeases, err := ls.List(ctx) if err != nil { return err } + for _, lease := range existingLeases { + if lease.ID == version.Program { + logrus.Debugf("Deleting existing lease: %v", lease) + ls.Delete(ctx, lease) + } + } + return nil +} - imageReader, err := opener() +// clearLabels removes the pinned labels on all images in the image store that were previously pinned by k3s +func clearLabels(ctx context.Context, client *containerd.Client) error { + var errs []error + imageService := client.ImageService() + images, err := imageService.List(ctx, fmt.Sprintf("labels.%q==%s", k3sPinnedImageLabelKey, k3sPinnedImageLabelValue)) if err != nil { return err } - defer imageReader.Close() - - logrus.Infof("Importing images from %s", filePath) - - images, err := client.Import(ctx, imageReader, containerd.WithAllPlatforms(true)) - if err != nil { - return err + for _, image := range images { + delete(image.Labels, k3sPinnedImageLabelKey) + delete(image.Labels, labels.PinnedImageLabelKey) + if _, err := imageService.Update(ctx, image, "labels"); err != nil { + errs = append(errs, errors.Wrap(err, "failed to delete labels from image "+image.Name)) + } } + return merr.NewErrors(errs...) +} - return retagImages(ctx, client, images, cfg.AgentConfig.AirgapExtraRegistry) +// labelImages adds labels to the listed images, indicating that they +// are pinned by k3s and should not be pruned. +func labelImages(ctx context.Context, client *containerd.Client, images []images.Image) error { + var errs []error + imageService := client.ImageService() + for i, image := range images { + if image.Labels[k3sPinnedImageLabelKey] == k3sPinnedImageLabelValue && + image.Labels[labels.PinnedImageLabelKey] == labels.PinnedImageLabelValue { + continue + } + + if image.Labels == nil { + image.Labels = map[string]string{} + } + image.Labels[k3sPinnedImageLabelKey] = k3sPinnedImageLabelValue + image.Labels[labels.PinnedImageLabelKey] = labels.PinnedImageLabelValue + updatedImage, err := imageService.Update(ctx, image, "labels") + if err != nil { + errs = append(errs, errors.Wrap(err, "failed to add labels to image "+image.Name)) + } else { + images[i] = updatedImage + } + } + return merr.NewErrors(errs...) } // retagImages retags all listed images as having been pulled from the given remote registries. @@ -227,24 +300,27 @@ func retagImages(ctx context.Context, client *containerd.Client, images []images for _, image := range images { name, err := parseNamedTagged(image.Name) if err != nil { - errs = append(errs, errors.Wrap(err, "failed to parse image name")) + errs = append(errs, errors.Wrap(err, "failed to parse tags for image "+image.Name)) continue } - logrus.Infof("Imported %s", image.Name) for _, registry := range registries { - image.Name = fmt.Sprintf("%s/%s:%s", registry, docker.Path(name), name.Tag()) + newName := fmt.Sprintf("%s/%s:%s", registry, docker.Path(name), name.Tag()) + if newName == image.Name { + continue + } + image.Name = newName if _, err = imageService.Create(ctx, image); err != nil { if errdefs.IsAlreadyExists(err) { if err = imageService.Delete(ctx, image.Name); err != nil { - errs = append(errs, errors.Wrap(err, "failed to delete existing image")) + errs = append(errs, errors.Wrap(err, "failed to delete existing image "+image.Name)) continue } if _, err = imageService.Create(ctx, image); err != nil { - errs = append(errs, errors.Wrap(err, "failed to tag after deleting existing image")) + errs = append(errs, errors.Wrap(err, "failed to tag after deleting existing image "+image.Name)) continue } } else { - errs = append(errs, errors.Wrap(err, "failed to tag image")) + errs = append(errs, errors.Wrap(err, "failed to tag image "+image.Name)) continue } } @@ -269,30 +345,43 @@ func parseNamedTagged(name string) (docker.NamedTagged, error) { } // prePullImages asks containerd to pull images in a given list, so that they -// are ready when the containers attempt to start later. -func prePullImages(ctx context.Context, conn *grpc.ClientConn, images io.Reader) error { - imageClient := runtimeapi.NewImageServiceClient(conn) - scanner := bufio.NewScanner(images) +// are ready when the containers attempt to start later. If the image already exists, +// or is successfully pulled, information about the image is retrieved from the image store. +// NOTE: Pulls MUST be done via CRI API, not containerd API, in order to use mirrors and rewrites. +func prePullImages(ctx context.Context, client *containerd.Client, imageClient runtimeapi.ImageServiceClient, imageList io.Reader) ([]images.Image, error) { + errs := []error{} + images := []images.Image{} + imageService := client.ImageService() + scanner := bufio.NewScanner(imageList) for scanner.Scan() { - line := strings.TrimSpace(scanner.Text()) - resp, err := imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{ + name := strings.TrimSpace(scanner.Text()) + if _, err := imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{ Image: &runtimeapi.ImageSpec{ - Image: line, + Image: name, }, - }) - if err == nil && resp.Image != nil { + }); err == nil { + logrus.Infof("Image %s has already been pulled", name) + if image, err := imageService.Get(ctx, name); err != nil { + errs = append(errs, err) + } else { + images = append(images, image) + } continue } - - logrus.Infof("Pulling image %s...", line) - _, err = imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{ + logrus.Infof("Pulling image %s", name) + if _, err := imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{ Image: &runtimeapi.ImageSpec{ - Image: line, + Image: name, }, - }) - if err != nil { - logrus.Errorf("Failed to pull %s: %v", line, err) + }); err != nil { + errs = append(errs, err) + } else { + if image, err := imageService.Get(ctx, name); err != nil { + errs = append(errs, err) + } else { + images = append(images, image) + } } } - return nil + return images, merr.NewErrors(errs...) }