diff --git a/pkg/agent/containerd/containerd.go b/pkg/agent/containerd/containerd.go index c18b335ae1..9e542f8fcd 100644 --- a/pkg/agent/containerd/containerd.go +++ b/pkg/agent/containerd/containerd.go @@ -14,9 +14,9 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" - "github.com/containerd/containerd/leases" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/pkg/cri/constants" + "github.com/containerd/containerd/pkg/cri/labels" "github.com/containerd/containerd/reference/docker" "github.com/k3s-io/k3s/pkg/agent/cri" util2 "github.com/k3s-io/k3s/pkg/agent/util" @@ -27,10 +27,17 @@ import ( "github.com/rancher/wharfie/pkg/tarfile" "github.com/rancher/wrangler/pkg/merr" "github.com/sirupsen/logrus" - "google.golang.org/grpc" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" ) +var ( + // In addition to using the CRI pinned label, we add our own label to indicate that + // the image was pinned by the import process, so that we can clear the pin on subsequent startups. + // ref: https://github.com/containerd/containerd/blob/release/1.7/pkg/cri/labels/labels.go + k3sPinnedImageLabelKey = "io.cattle." + version.Program + ".pinned" + k3sPinnedImageLabelValue = "pinned" +) + // Run configures and starts containerd as a child process. Once it is up, images are preloaded // or pulled from files found in the agent images directory. func Run(ctx context.Context, cfg *config.Node) error { @@ -134,38 +141,29 @@ func preloadImages(ctx context.Context, cfg *config.Node) error { } defer client.Close() + // Image pulls must be done using the CRI client, not the containerd client. + // Repository mirrors and rewrites are handled by the CRI service; if you pull directly + // using the containerd image service it will ignore the configured settings. criConn, err := cri.Connection(ctx, cfg.Containerd.Address) if err != nil { return err } defer criConn.Close() + imageClient := runtimeapi.NewImageServiceClient(criConn) // Ensure that our images are imported into the correct namespace ctx = namespaces.WithNamespace(ctx, constants.K8sContainerdNamespace) - // At startup all leases from k3s are cleared - ls := client.LeasesService() - existingLeases, err := ls.List(ctx) - if err != nil { - return err + // At startup all leases from k3s are cleared; we no longer use leases to lock content + if err := clearLeases(ctx, client); err != nil { + return errors.Wrap(err, "failed to clear leases") } - for _, lease := range existingLeases { - if lease.ID == version.Program { - logrus.Debugf("Deleting existing lease: %v", lease) - ls.Delete(ctx, lease) - } + // Clear the pinned labels on all images previously pinned by k3s + if err := clearLabels(ctx, client); err != nil { + return errors.Wrap(err, "failed to clear pinned labels") } - // Any images found on import are given a lease that never expires - lease, err := ls.Create(ctx, leases.WithID(version.Program)) - if err != nil { - return err - } - - // Ensure that our images are locked by the lease - ctx = leases.WithLease(ctx, lease.ID) - for _, fileInfo := range fileInfos { if fileInfo.IsDir() { continue @@ -174,7 +172,7 @@ func preloadImages(ctx context.Context, cfg *config.Node) error { start := time.Now() filePath := filepath.Join(cfg.Images, fileInfo.Name()) - if err := preloadFile(ctx, cfg, client, criConn, filePath); err != nil { + if err := preloadFile(ctx, cfg, client, imageClient, filePath); err != nil { logrus.Errorf("Error encountered while importing %s: %v", filePath, err) continue } @@ -186,7 +184,8 @@ func preloadImages(ctx context.Context, cfg *config.Node) error { // preloadFile handles loading images from a single tarball or pre-pull image list. // This is in its own function so that we can ensure that the various readers are properly closed, as some // decompressing readers need to be explicitly closed and others do not. -func preloadFile(ctx context.Context, cfg *config.Node, client *containerd.Client, criConn *grpc.ClientConn, filePath string) error { +func preloadFile(ctx context.Context, cfg *config.Node, client *containerd.Client, imageClient runtimeapi.ImageServiceClient, filePath string) error { + var images []images.Image if util2.HasSuffixI(filePath, ".txt") { file, err := os.Open(filePath) if err != nil { @@ -194,28 +193,102 @@ func preloadFile(ctx context.Context, cfg *config.Node, client *containerd.Clien } defer file.Close() logrus.Infof("Pulling images from %s", filePath) - return prePullImages(ctx, criConn, file) + images, err = prePullImages(ctx, client, imageClient, file) + if err != nil { + return errors.Wrap(err, "failed to pull images from "+filePath) + } + } else { + opener, err := tarfile.GetOpener(filePath) + if err != nil { + return err + } + + imageReader, err := opener() + if err != nil { + return err + } + defer imageReader.Close() + + logrus.Infof("Importing images from %s", filePath) + images, err = client.Import(ctx, imageReader, containerd.WithAllPlatforms(true)) + if err != nil { + return errors.Wrap(err, "failed to import images from "+filePath) + } } - opener, err := tarfile.GetOpener(filePath) + if err := labelImages(ctx, client, images); err != nil { + return errors.Wrap(err, "failed to add pinned label to images") + } + if err := retagImages(ctx, client, images, cfg.AgentConfig.AirgapExtraRegistry); err != nil { + return errors.Wrap(err, "failed to retag images") + } + + for _, image := range images { + logrus.Infof("Imported %s", image.Name) + } + return nil +} + +// clearLeases deletes any leases left by previous versions of k3s. +// We no longer use leases to lock content; they only locked the +// blobs, not the actual images. +func clearLeases(ctx context.Context, client *containerd.Client) error { + ls := client.LeasesService() + existingLeases, err := ls.List(ctx) if err != nil { return err } + for _, lease := range existingLeases { + if lease.ID == version.Program { + logrus.Debugf("Deleting existing lease: %v", lease) + ls.Delete(ctx, lease) + } + } + return nil +} - imageReader, err := opener() +// clearLabels removes the pinned labels on all images in the image store that were previously pinned by k3s +func clearLabels(ctx context.Context, client *containerd.Client) error { + var errs []error + imageService := client.ImageService() + images, err := imageService.List(ctx, fmt.Sprintf("labels.%q==%s", k3sPinnedImageLabelKey, k3sPinnedImageLabelValue)) if err != nil { return err } - defer imageReader.Close() - - logrus.Infof("Importing images from %s", filePath) - - images, err := client.Import(ctx, imageReader, containerd.WithAllPlatforms(true)) - if err != nil { - return err + for _, image := range images { + delete(image.Labels, k3sPinnedImageLabelKey) + delete(image.Labels, labels.PinnedImageLabelKey) + if _, err := imageService.Update(ctx, image, "labels"); err != nil { + errs = append(errs, errors.Wrap(err, "failed to delete labels from image "+image.Name)) + } } + return merr.NewErrors(errs...) +} - return retagImages(ctx, client, images, cfg.AgentConfig.AirgapExtraRegistry) +// labelImages adds labels to the listed images, indicating that they +// are pinned by k3s and should not be pruned. +func labelImages(ctx context.Context, client *containerd.Client, images []images.Image) error { + var errs []error + imageService := client.ImageService() + for i, image := range images { + if image.Labels[k3sPinnedImageLabelKey] == k3sPinnedImageLabelValue && + image.Labels[labels.PinnedImageLabelKey] == labels.PinnedImageLabelValue { + continue + } + + if image.Labels == nil { + image.Labels = map[string]string{} + } + image.Labels[k3sPinnedImageLabelKey] = k3sPinnedImageLabelValue + image.Labels[labels.PinnedImageLabelKey] = labels.PinnedImageLabelValue + updatedImage, err := imageService.Update(ctx, image, "labels") + if err != nil { + errs = append(errs, errors.Wrap(err, "failed to add labels to image "+image.Name)) + } else { + images[i] = updatedImage + } + } + return merr.NewErrors(errs...) } // retagImages retags all listed images as having been pulled from the given remote registries. @@ -227,24 +300,27 @@ func retagImages(ctx context.Context, client *containerd.Client, images []images for _, image := range images { name, err := parseNamedTagged(image.Name) if err != nil { - errs = append(errs, errors.Wrap(err, "failed to parse image name")) + errs = append(errs, errors.Wrap(err, "failed to parse tags for image "+image.Name)) continue } - logrus.Infof("Imported %s", image.Name) for _, registry := range registries { - image.Name = fmt.Sprintf("%s/%s:%s", registry, docker.Path(name), name.Tag()) + newName := fmt.Sprintf("%s/%s:%s", registry, docker.Path(name), name.Tag()) + if newName == image.Name { + continue + } + image.Name = newName if _, err = imageService.Create(ctx, image); err != nil { if errdefs.IsAlreadyExists(err) { if err = imageService.Delete(ctx, image.Name); err != nil { - errs = append(errs, errors.Wrap(err, "failed to delete existing image")) + errs = append(errs, errors.Wrap(err, "failed to delete existing image "+image.Name)) continue } if _, err = imageService.Create(ctx, image); err != nil { - errs = append(errs, errors.Wrap(err, "failed to tag after deleting existing image")) + errs = append(errs, errors.Wrap(err, "failed to tag after deleting existing image "+image.Name)) continue } } else { - errs = append(errs, errors.Wrap(err, "failed to tag image")) + errs = append(errs, errors.Wrap(err, "failed to tag image "+image.Name)) continue } } @@ -269,30 +345,43 @@ func parseNamedTagged(name string) (docker.NamedTagged, error) { } // prePullImages asks containerd to pull images in a given list, so that they -// are ready when the containers attempt to start later. -func prePullImages(ctx context.Context, conn *grpc.ClientConn, images io.Reader) error { - imageClient := runtimeapi.NewImageServiceClient(conn) - scanner := bufio.NewScanner(images) +// are ready when the containers attempt to start later. If the image already exists, +// or is successfully pulled, information about the image is retrieved from the image store. +// NOTE: Pulls MUST be done via CRI API, not containerd API, in order to use mirrors and rewrites. +func prePullImages(ctx context.Context, client *containerd.Client, imageClient runtimeapi.ImageServiceClient, imageList io.Reader) ([]images.Image, error) { + errs := []error{} + images := []images.Image{} + imageService := client.ImageService() + scanner := bufio.NewScanner(imageList) for scanner.Scan() { - line := strings.TrimSpace(scanner.Text()) - resp, err := imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{ + name := strings.TrimSpace(scanner.Text()) + if _, err := imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{ Image: &runtimeapi.ImageSpec{ - Image: line, + Image: name, }, - }) - if err == nil && resp.Image != nil { + }); err == nil { + logrus.Infof("Image %s has already been pulled", name) + if image, err := imageService.Get(ctx, name); err != nil { + errs = append(errs, err) + } else { + images = append(images, image) + } continue } - - logrus.Infof("Pulling image %s...", line) - _, err = imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{ + logrus.Infof("Pulling image %s", name) + if _, err := imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{ Image: &runtimeapi.ImageSpec{ - Image: line, + Image: name, }, - }) - if err != nil { - logrus.Errorf("Failed to pull %s: %v", line, err) + }); err != nil { + errs = append(errs, err) + } else { + if image, err := imageService.Get(ctx, name); err != nil { + errs = append(errs, err) + } else { + images = append(images, image) + } } } - return nil + return images, merr.NewErrors(errs...) }