k3s/pkg/agent/containerd/containerd.go

388 lines
13 KiB
Go

package containerd
import (
"bufio"
"context"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/cri/constants"
"github.com/containerd/containerd/pkg/cri/labels"
"github.com/containerd/containerd/reference/docker"
"github.com/k3s-io/k3s/pkg/agent/cri"
util2 "github.com/k3s-io/k3s/pkg/agent/util"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/version"
"github.com/natefinch/lumberjack"
"github.com/pkg/errors"
"github.com/rancher/wharfie/pkg/tarfile"
"github.com/rancher/wrangler/pkg/merr"
"github.com/sirupsen/logrus"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
)
var (
// In addition to using the CRI pinned label, we add our own label to indicate that
// the image was pinned by the import process, so that we can clear the pin on subsequent startups.
// ref: https://github.com/containerd/containerd/blob/release/1.7/pkg/cri/labels/labels.go
k3sPinnedImageLabelKey = "io.cattle." + version.Program + ".pinned"
k3sPinnedImageLabelValue = "pinned"
)
// Run configures and starts containerd as a child process. Once it is up, images are preloaded
// or pulled from files found in the agent images directory.
func Run(ctx context.Context, cfg *config.Node) error {
if err := setupContainerdConfig(ctx, cfg); err != nil {
return err
}
args := getContainerdArgs(cfg)
stdOut := io.Writer(os.Stdout)
stdErr := io.Writer(os.Stderr)
if cfg.Containerd.Log != "" {
logrus.Infof("Logging containerd to %s", cfg.Containerd.Log)
fileOut := &lumberjack.Logger{
Filename: cfg.Containerd.Log,
MaxSize: 50,
MaxBackups: 3,
MaxAge: 28,
Compress: true,
}
// If k3s is started with --debug, write logs to both the log file and stdout/stderr,
// even if a log path is set.
if cfg.Containerd.Debug {
stdOut = io.MultiWriter(stdOut, fileOut)
stdErr = io.MultiWriter(stdErr, fileOut)
} else {
stdOut = fileOut
stdErr = fileOut
}
}
go func() {
env := []string{}
cenv := []string{}
for _, e := range os.Environ() {
pair := strings.SplitN(e, "=", 2)
switch {
case pair[0] == "NOTIFY_SOCKET":
// elide NOTIFY_SOCKET to prevent spurious notifications to systemd
case pair[0] == "CONTAINERD_LOG_LEVEL":
// Turn CONTAINERD_LOG_LEVEL variable into log-level flag
args = append(args, "--log-level", pair[1])
case strings.HasPrefix(pair[0], "CONTAINERD_"):
// Strip variables with CONTAINERD_ prefix before passing through
// This allows doing things like setting a proxy for image pulls by setting
// CONTAINERD_https_proxy=http://proxy.example.com:8080
pair[0] = strings.TrimPrefix(pair[0], "CONTAINERD_")
cenv = append(cenv, strings.Join(pair, "="))
default:
env = append(env, strings.Join(pair, "="))
}
}
logrus.Infof("Running containerd %s", config.ArgString(args[1:]))
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
cmd.Stdout = stdOut
cmd.Stderr = stdErr
cmd.Env = append(env, cenv...)
addDeathSig(cmd)
if err := cmd.Run(); err != nil {
logrus.Errorf("containerd exited: %s", err)
}
os.Exit(1)
}()
if err := cri.WaitForService(ctx, cfg.Containerd.Address, "containerd"); err != nil {
return err
}
return preloadImages(ctx, cfg)
}
// preloadImages reads the contents of the agent images directory, and attempts to
// import into containerd any files found there. Supported compressed types are decompressed, and
// any .txt files are processed as a list of images that should be pre-pulled from remote registries.
// If configured, imported images are retagged as being pulled from additional registries.
func preloadImages(ctx context.Context, cfg *config.Node) error {
fileInfo, err := os.Stat(cfg.Images)
if os.IsNotExist(err) {
return nil
} else if err != nil {
logrus.Errorf("Unable to find images in %s: %v", cfg.Images, err)
return nil
}
if !fileInfo.IsDir() {
return nil
}
fileInfos, err := os.ReadDir(cfg.Images)
if err != nil {
logrus.Errorf("Unable to read images in %s: %v", cfg.Images, err)
return nil
}
client, err := Client(cfg.Containerd.Address)
if err != nil {
return err
}
defer client.Close()
// Image pulls must be done using the CRI client, not the containerd client.
// Repository mirrors and rewrites are handled by the CRI service; if you pull directly
// using the containerd image service it will ignore the configured settings.
criConn, err := cri.Connection(ctx, cfg.Containerd.Address)
if err != nil {
return err
}
defer criConn.Close()
imageClient := runtimeapi.NewImageServiceClient(criConn)
// Ensure that our images are imported into the correct namespace
ctx = namespaces.WithNamespace(ctx, constants.K8sContainerdNamespace)
// At startup all leases from k3s are cleared; we no longer use leases to lock content
if err := clearLeases(ctx, client); err != nil {
return errors.Wrap(err, "failed to clear leases")
}
// Clear the pinned labels on all images previously pinned by k3s
if err := clearLabels(ctx, client); err != nil {
return errors.Wrap(err, "failed to clear pinned labels")
}
for _, fileInfo := range fileInfos {
if fileInfo.IsDir() {
continue
}
start := time.Now()
filePath := filepath.Join(cfg.Images, fileInfo.Name())
if err := preloadFile(ctx, cfg, client, imageClient, filePath); err != nil {
logrus.Errorf("Error encountered while importing %s: %v", filePath, err)
continue
}
logrus.Infof("Imported images from %s in %s", filePath, time.Since(start))
}
return nil
}
// preloadFile handles loading images from a single tarball or pre-pull image list.
// This is in its own function so that we can ensure that the various readers are properly closed, as some
// decompressing readers need to be explicitly closed and others do not.
func preloadFile(ctx context.Context, cfg *config.Node, client *containerd.Client, imageClient runtimeapi.ImageServiceClient, filePath string) error {
var images []images.Image
if util2.HasSuffixI(filePath, ".txt") {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
logrus.Infof("Pulling images from %s", filePath)
images, err = prePullImages(ctx, client, imageClient, file)
if err != nil {
return errors.Wrap(err, "failed to pull images from "+filePath)
}
} else {
opener, err := tarfile.GetOpener(filePath)
if err != nil {
return err
}
imageReader, err := opener()
if err != nil {
return err
}
defer imageReader.Close()
logrus.Infof("Importing images from %s", filePath)
images, err = client.Import(ctx, imageReader, containerd.WithAllPlatforms(true))
if err != nil {
return errors.Wrap(err, "failed to import images from "+filePath)
}
}
if err := labelImages(ctx, client, images); err != nil {
return errors.Wrap(err, "failed to add pinned label to images")
}
if err := retagImages(ctx, client, images, cfg.AgentConfig.AirgapExtraRegistry); err != nil {
return errors.Wrap(err, "failed to retag images")
}
for _, image := range images {
logrus.Infof("Imported %s", image.Name)
}
return nil
}
// clearLeases deletes any leases left by previous versions of k3s.
// We no longer use leases to lock content; they only locked the
// blobs, not the actual images.
func clearLeases(ctx context.Context, client *containerd.Client) error {
ls := client.LeasesService()
existingLeases, err := ls.List(ctx)
if err != nil {
return err
}
for _, lease := range existingLeases {
if lease.ID == version.Program {
logrus.Debugf("Deleting existing lease: %v", lease)
ls.Delete(ctx, lease)
}
}
return nil
}
// clearLabels removes the pinned labels on all images in the image store that were previously pinned by k3s
func clearLabels(ctx context.Context, client *containerd.Client) error {
var errs []error
imageService := client.ImageService()
images, err := imageService.List(ctx, fmt.Sprintf("labels.%q==%s", k3sPinnedImageLabelKey, k3sPinnedImageLabelValue))
if err != nil {
return err
}
for _, image := range images {
delete(image.Labels, k3sPinnedImageLabelKey)
delete(image.Labels, labels.PinnedImageLabelKey)
if _, err := imageService.Update(ctx, image, "labels"); err != nil {
errs = append(errs, errors.Wrap(err, "failed to delete labels from image "+image.Name))
}
}
return merr.NewErrors(errs...)
}
// labelImages adds labels to the listed images, indicating that they
// are pinned by k3s and should not be pruned.
func labelImages(ctx context.Context, client *containerd.Client, images []images.Image) error {
var errs []error
imageService := client.ImageService()
for i, image := range images {
if image.Labels[k3sPinnedImageLabelKey] == k3sPinnedImageLabelValue &&
image.Labels[labels.PinnedImageLabelKey] == labels.PinnedImageLabelValue {
continue
}
if image.Labels == nil {
image.Labels = map[string]string{}
}
image.Labels[k3sPinnedImageLabelKey] = k3sPinnedImageLabelValue
image.Labels[labels.PinnedImageLabelKey] = labels.PinnedImageLabelValue
updatedImage, err := imageService.Update(ctx, image, "labels")
if err != nil {
errs = append(errs, errors.Wrap(err, "failed to add labels to image "+image.Name))
} else {
images[i] = updatedImage
}
}
return merr.NewErrors(errs...)
}
// retagImages retags all listed images as having been pulled from the given remote registries.
// If duplicate images exist, they are overwritten. This is most useful when using a private registry
// for all images, as can be configured by the RKE2/Rancher system-default-registry setting.
func retagImages(ctx context.Context, client *containerd.Client, images []images.Image, registries []string) error {
var errs []error
imageService := client.ImageService()
for _, image := range images {
name, err := parseNamedTagged(image.Name)
if err != nil {
errs = append(errs, errors.Wrap(err, "failed to parse tags for image "+image.Name))
continue
}
for _, registry := range registries {
newName := fmt.Sprintf("%s/%s:%s", registry, docker.Path(name), name.Tag())
if newName == image.Name {
continue
}
image.Name = newName
if _, err = imageService.Create(ctx, image); err != nil {
if errdefs.IsAlreadyExists(err) {
if err = imageService.Delete(ctx, image.Name); err != nil {
errs = append(errs, errors.Wrap(err, "failed to delete existing image "+image.Name))
continue
}
if _, err = imageService.Create(ctx, image); err != nil {
errs = append(errs, errors.Wrap(err, "failed to tag after deleting existing image "+image.Name))
continue
}
} else {
errs = append(errs, errors.Wrap(err, "failed to tag image "+image.Name))
continue
}
}
logrus.Infof("Tagged %s", image.Name)
}
}
return merr.NewErrors(errs...)
}
// parseNamedTagged parses and normalizes an image name, and converts the resulting reference
// to a type that exposes the tag.
func parseNamedTagged(name string) (docker.NamedTagged, error) {
ref, err := docker.ParseNormalizedNamed(name)
if err != nil {
return nil, err
}
tagged, ok := ref.(docker.NamedTagged)
if !ok {
return nil, fmt.Errorf("can't cast %T to NamedTagged", ref)
}
return tagged, nil
}
// prePullImages asks containerd to pull images in a given list, so that they
// are ready when the containers attempt to start later. If the image already exists,
// or is successfully pulled, information about the image is retrieved from the image store.
// NOTE: Pulls MUST be done via CRI API, not containerd API, in order to use mirrors and rewrites.
func prePullImages(ctx context.Context, client *containerd.Client, imageClient runtimeapi.ImageServiceClient, imageList io.Reader) ([]images.Image, error) {
errs := []error{}
images := []images.Image{}
imageService := client.ImageService()
scanner := bufio.NewScanner(imageList)
for scanner.Scan() {
name := strings.TrimSpace(scanner.Text())
if _, err := imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{
Image: &runtimeapi.ImageSpec{
Image: name,
},
}); err == nil {
logrus.Infof("Image %s has already been pulled", name)
if image, err := imageService.Get(ctx, name); err != nil {
errs = append(errs, err)
} else {
images = append(images, image)
}
continue
}
logrus.Infof("Pulling image %s", name)
if _, err := imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{
Image: &runtimeapi.ImageSpec{
Image: name,
},
}); err != nil {
errs = append(errs, err)
} else {
if image, err := imageService.Get(ctx, name); err != nil {
errs = append(errs, err)
} else {
images = append(images, image)
}
}
}
return images, merr.NewErrors(errs...)
}