package containerd import ( "bufio" "context" "fmt" "io" "os" "os/exec" "path/filepath" "strings" "time" "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/pkg/cri/constants" "github.com/containerd/containerd/pkg/cri/labels" "github.com/containerd/containerd/reference/docker" "github.com/k3s-io/k3s/pkg/agent/cri" util2 "github.com/k3s-io/k3s/pkg/agent/util" "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/version" "github.com/natefinch/lumberjack" "github.com/pkg/errors" "github.com/rancher/wharfie/pkg/tarfile" "github.com/rancher/wrangler/pkg/merr" "github.com/sirupsen/logrus" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" ) var ( // In addition to using the CRI pinned label, we add our own label to indicate that // the image was pinned by the import process, so that we can clear the pin on subsequent startups. // ref: https://github.com/containerd/containerd/blob/release/1.7/pkg/cri/labels/labels.go k3sPinnedImageLabelKey = "io.cattle." + version.Program + ".pinned" k3sPinnedImageLabelValue = "pinned" ) // Run configures and starts containerd as a child process. Once it is up, images are preloaded // or pulled from files found in the agent images directory. func Run(ctx context.Context, cfg *config.Node) error { args := getContainerdArgs(cfg) stdOut := io.Writer(os.Stdout) stdErr := io.Writer(os.Stderr) if cfg.Containerd.Log != "" { logrus.Infof("Logging containerd to %s", cfg.Containerd.Log) fileOut := &lumberjack.Logger{ Filename: cfg.Containerd.Log, MaxSize: 50, MaxBackups: 3, MaxAge: 28, Compress: true, } // If k3s is started with --debug, write logs to both the log file and stdout/stderr, // even if a log path is set. if cfg.Containerd.Debug { stdOut = io.MultiWriter(stdOut, fileOut) stdErr = io.MultiWriter(stdErr, fileOut) } else { stdOut = fileOut stdErr = fileOut } } go func() { env := []string{} cenv := []string{} for _, e := range os.Environ() { pair := strings.SplitN(e, "=", 2) switch { case pair[0] == "NOTIFY_SOCKET": // elide NOTIFY_SOCKET to prevent spurious notifications to systemd case pair[0] == "CONTAINERD_LOG_LEVEL": // Turn CONTAINERD_LOG_LEVEL variable into log-level flag args = append(args, "--log-level", pair[1]) case strings.HasPrefix(pair[0], "CONTAINERD_"): // Strip variables with CONTAINERD_ prefix before passing through // This allows doing things like setting a proxy for image pulls by setting // CONTAINERD_https_proxy=http://proxy.example.com:8080 pair[0] = strings.TrimPrefix(pair[0], "CONTAINERD_") cenv = append(cenv, strings.Join(pair, "=")) default: env = append(env, strings.Join(pair, "=")) } } logrus.Infof("Running containerd %s", config.ArgString(args[1:])) cmd := exec.CommandContext(ctx, args[0], args[1:]...) cmd.Stdout = stdOut cmd.Stderr = stdErr cmd.Env = append(env, cenv...) addDeathSig(cmd) err := cmd.Run() if err != nil && !errors.Is(err, context.Canceled) { logrus.Errorf("containerd exited: %s", err) os.Exit(1) } os.Exit(0) }() if err := cri.WaitForService(ctx, cfg.Containerd.Address, "containerd"); err != nil { return err } return PreloadImages(ctx, cfg) } // PreloadImages reads the contents of the agent images directory, and attempts to // import into containerd any files found there. Supported compressed types are decompressed, and // any .txt files are processed as a list of images that should be pre-pulled from remote registries. // If configured, imported images are retagged as being pulled from additional registries. func PreloadImages(ctx context.Context, cfg *config.Node) error { fileInfo, err := os.Stat(cfg.Images) if os.IsNotExist(err) { return nil } else if err != nil { logrus.Errorf("Unable to find images in %s: %v", cfg.Images, err) return nil } if !fileInfo.IsDir() { return nil } fileInfos, err := os.ReadDir(cfg.Images) if err != nil { logrus.Errorf("Unable to read images in %s: %v", cfg.Images, err) return nil } client, err := Client(cfg.Containerd.Address) if err != nil { return err } defer client.Close() // Image pulls must be done using the CRI client, not the containerd client. // Repository mirrors and rewrites are handled by the CRI service; if you pull directly // using the containerd image service it will ignore the configured settings. criConn, err := cri.Connection(ctx, cfg.Containerd.Address) if err != nil { return err } defer criConn.Close() imageClient := runtimeapi.NewImageServiceClient(criConn) // Ensure that our images are imported into the correct namespace ctx = namespaces.WithNamespace(ctx, constants.K8sContainerdNamespace) // At startup all leases from k3s are cleared; we no longer use leases to lock content if err := clearLeases(ctx, client); err != nil { return errors.Wrap(err, "failed to clear leases") } // Clear the pinned labels on all images previously pinned by k3s if err := clearLabels(ctx, client); err != nil { return errors.Wrap(err, "failed to clear pinned labels") } for _, fileInfo := range fileInfos { if fileInfo.IsDir() { continue } start := time.Now() filePath := filepath.Join(cfg.Images, fileInfo.Name()) if err := preloadFile(ctx, cfg, client, imageClient, filePath); err != nil { logrus.Errorf("Error encountered while importing %s: %v", filePath, err) continue } logrus.Infof("Imported images from %s in %s", filePath, time.Since(start)) } return nil } // preloadFile handles loading images from a single tarball or pre-pull image list. // This is in its own function so that we can ensure that the various readers are properly closed, as some // decompressing readers need to be explicitly closed and others do not. func preloadFile(ctx context.Context, cfg *config.Node, client *containerd.Client, imageClient runtimeapi.ImageServiceClient, filePath string) error { var images []images.Image if util2.HasSuffixI(filePath, ".txt") { file, err := os.Open(filePath) if err != nil { return err } defer file.Close() logrus.Infof("Pulling images from %s", filePath) images, err = prePullImages(ctx, client, imageClient, file) if err != nil { return errors.Wrap(err, "failed to pull images from "+filePath) } } else { opener, err := tarfile.GetOpener(filePath) if err != nil { return err } imageReader, err := opener() if err != nil { return err } defer imageReader.Close() logrus.Infof("Importing images from %s", filePath) images, err = client.Import(ctx, imageReader, containerd.WithAllPlatforms(true)) if err != nil { return errors.Wrap(err, "failed to import images from "+filePath) } } if err := labelImages(ctx, client, images); err != nil { return errors.Wrap(err, "failed to add pinned label to images") } if err := retagImages(ctx, client, images, cfg.AgentConfig.AirgapExtraRegistry); err != nil { return errors.Wrap(err, "failed to retag images") } for _, image := range images { logrus.Infof("Imported %s", image.Name) } return nil } // clearLeases deletes any leases left by previous versions of k3s. // We no longer use leases to lock content; they only locked the // blobs, not the actual images. func clearLeases(ctx context.Context, client *containerd.Client) error { ls := client.LeasesService() existingLeases, err := ls.List(ctx) if err != nil { return err } for _, lease := range existingLeases { if lease.ID == version.Program { logrus.Debugf("Deleting existing lease: %v", lease) ls.Delete(ctx, lease) } } return nil } // clearLabels removes the pinned labels on all images in the image store that were previously pinned by k3s func clearLabels(ctx context.Context, client *containerd.Client) error { var errs []error imageService := client.ImageService() images, err := imageService.List(ctx, fmt.Sprintf("labels.%q==%s", k3sPinnedImageLabelKey, k3sPinnedImageLabelValue)) if err != nil { return err } for _, image := range images { delete(image.Labels, k3sPinnedImageLabelKey) delete(image.Labels, labels.PinnedImageLabelKey) if _, err := imageService.Update(ctx, image, "labels"); err != nil { errs = append(errs, errors.Wrap(err, "failed to delete labels from image "+image.Name)) } } return merr.NewErrors(errs...) } // labelImages adds labels to the listed images, indicating that they // are pinned by k3s and should not be pruned. func labelImages(ctx context.Context, client *containerd.Client, images []images.Image) error { var errs []error imageService := client.ImageService() for i, image := range images { if image.Labels[k3sPinnedImageLabelKey] == k3sPinnedImageLabelValue && image.Labels[labels.PinnedImageLabelKey] == labels.PinnedImageLabelValue { continue } if image.Labels == nil { image.Labels = map[string]string{} } image.Labels[k3sPinnedImageLabelKey] = k3sPinnedImageLabelValue image.Labels[labels.PinnedImageLabelKey] = labels.PinnedImageLabelValue updatedImage, err := imageService.Update(ctx, image, "labels") if err != nil { errs = append(errs, errors.Wrap(err, "failed to add labels to image "+image.Name)) } else { images[i] = updatedImage } } return merr.NewErrors(errs...) } // retagImages retags all listed images as having been pulled from the given remote registries. // If duplicate images exist, they are overwritten. This is most useful when using a private registry // for all images, as can be configured by the RKE2/Rancher system-default-registry setting. func retagImages(ctx context.Context, client *containerd.Client, images []images.Image, registries []string) error { var errs []error imageService := client.ImageService() for _, image := range images { name, err := parseNamedTagged(image.Name) if err != nil { errs = append(errs, errors.Wrap(err, "failed to parse tags for image "+image.Name)) continue } for _, registry := range registries { newName := fmt.Sprintf("%s/%s:%s", registry, docker.Path(name), name.Tag()) if newName == image.Name { continue } image.Name = newName if _, err = imageService.Create(ctx, image); err != nil { if errdefs.IsAlreadyExists(err) { if err = imageService.Delete(ctx, image.Name); err != nil { errs = append(errs, errors.Wrap(err, "failed to delete existing image "+image.Name)) continue } if _, err = imageService.Create(ctx, image); err != nil { errs = append(errs, errors.Wrap(err, "failed to tag after deleting existing image "+image.Name)) continue } } else { errs = append(errs, errors.Wrap(err, "failed to tag image "+image.Name)) continue } } logrus.Infof("Tagged %s", image.Name) } } return merr.NewErrors(errs...) } // parseNamedTagged parses and normalizes an image name, and converts the resulting reference // to a type that exposes the tag. func parseNamedTagged(name string) (docker.NamedTagged, error) { ref, err := docker.ParseNormalizedNamed(name) if err != nil { return nil, err } tagged, ok := ref.(docker.NamedTagged) if !ok { return nil, fmt.Errorf("can't cast %T to NamedTagged", ref) } return tagged, nil } // prePullImages asks containerd to pull images in a given list, so that they // are ready when the containers attempt to start later. If the image already exists, // or is successfully pulled, information about the image is retrieved from the image store. // NOTE: Pulls MUST be done via CRI API, not containerd API, in order to use mirrors and rewrites. func prePullImages(ctx context.Context, client *containerd.Client, imageClient runtimeapi.ImageServiceClient, imageList io.Reader) ([]images.Image, error) { errs := []error{} images := []images.Image{} imageService := client.ImageService() scanner := bufio.NewScanner(imageList) for scanner.Scan() { name := strings.TrimSpace(scanner.Text()) if status, err := imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{ Image: &runtimeapi.ImageSpec{ Image: name, }, }); err == nil && status.Image != nil && len(status.Image.RepoTags) > 0 { logrus.Infof("Image %s has already been pulled", name) for _, tag := range status.Image.RepoTags { if image, err := imageService.Get(ctx, tag); err != nil { errs = append(errs, err) } else { images = append(images, image) } } continue } logrus.Infof("Pulling image %s", name) if _, err := imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{ Image: &runtimeapi.ImageSpec{ Image: name, }, }); err != nil { errs = append(errs, err) } else { if image, err := imageService.Get(ctx, name); err != nil { errs = append(errs, err) } else { images = append(images, image) } } } return images, merr.NewErrors(errs...) }