mirror of https://github.com/k3s-io/k3s
Pin images instead of locking layers with lease
Layer leases never did what we wanted anyways, and this is the new approved interface for ensuring that images do not get GCd
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit 5c99bdd9bd
)
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/9216/head
parent
3b863906e0
commit
ace1714e0c
|
@ -14,9 +14,9 @@ import (
|
|||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/leases"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/pkg/cri/constants"
|
||||
"github.com/containerd/containerd/pkg/cri/labels"
|
||||
"github.com/containerd/containerd/reference/docker"
|
||||
"github.com/k3s-io/k3s/pkg/agent/cri"
|
||||
util2 "github.com/k3s-io/k3s/pkg/agent/util"
|
||||
|
@ -27,10 +27,17 @@ import (
|
|||
"github.com/rancher/wharfie/pkg/tarfile"
|
||||
"github.com/rancher/wrangler/pkg/merr"
|
||||
"github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
)
|
||||
|
||||
var (
|
||||
// In addition to using the CRI pinned label, we add our own label to indicate that
|
||||
// the image was pinned by the import process, so that we can clear the pin on subsequent startups.
|
||||
// ref: https://github.com/containerd/containerd/blob/release/1.7/pkg/cri/labels/labels.go
|
||||
k3sPinnedImageLabelKey = "io.cattle." + version.Program + ".pinned"
|
||||
k3sPinnedImageLabelValue = "pinned"
|
||||
)
|
||||
|
||||
// Run configures and starts containerd as a child process. Once it is up, images are preloaded
|
||||
// or pulled from files found in the agent images directory.
|
||||
func Run(ctx context.Context, cfg *config.Node) error {
|
||||
|
@ -134,38 +141,29 @@ func preloadImages(ctx context.Context, cfg *config.Node) error {
|
|||
}
|
||||
defer client.Close()
|
||||
|
||||
// Image pulls must be done using the CRI client, not the containerd client.
|
||||
// Repository mirrors and rewrites are handled by the CRI service; if you pull directly
|
||||
// using the containerd image service it will ignore the configured settings.
|
||||
criConn, err := cri.Connection(ctx, cfg.Containerd.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer criConn.Close()
|
||||
imageClient := runtimeapi.NewImageServiceClient(criConn)
|
||||
|
||||
// Ensure that our images are imported into the correct namespace
|
||||
ctx = namespaces.WithNamespace(ctx, constants.K8sContainerdNamespace)
|
||||
|
||||
// At startup all leases from k3s are cleared
|
||||
ls := client.LeasesService()
|
||||
existingLeases, err := ls.List(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
// At startup all leases from k3s are cleared; we no longer use leases to lock content
|
||||
if err := clearLeases(ctx, client); err != nil {
|
||||
return errors.Wrap(err, "failed to clear leases")
|
||||
}
|
||||
|
||||
for _, lease := range existingLeases {
|
||||
if lease.ID == version.Program {
|
||||
logrus.Debugf("Deleting existing lease: %v", lease)
|
||||
ls.Delete(ctx, lease)
|
||||
}
|
||||
// Clear the pinned labels on all images previously pinned by k3s
|
||||
if err := clearLabels(ctx, client); err != nil {
|
||||
return errors.Wrap(err, "failed to clear pinned labels")
|
||||
}
|
||||
|
||||
// Any images found on import are given a lease that never expires
|
||||
lease, err := ls.Create(ctx, leases.WithID(version.Program))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Ensure that our images are locked by the lease
|
||||
ctx = leases.WithLease(ctx, lease.ID)
|
||||
|
||||
for _, fileInfo := range fileInfos {
|
||||
if fileInfo.IsDir() {
|
||||
continue
|
||||
|
@ -174,7 +172,7 @@ func preloadImages(ctx context.Context, cfg *config.Node) error {
|
|||
start := time.Now()
|
||||
filePath := filepath.Join(cfg.Images, fileInfo.Name())
|
||||
|
||||
if err := preloadFile(ctx, cfg, client, criConn, filePath); err != nil {
|
||||
if err := preloadFile(ctx, cfg, client, imageClient, filePath); err != nil {
|
||||
logrus.Errorf("Error encountered while importing %s: %v", filePath, err)
|
||||
continue
|
||||
}
|
||||
|
@ -186,7 +184,8 @@ func preloadImages(ctx context.Context, cfg *config.Node) error {
|
|||
// preloadFile handles loading images from a single tarball or pre-pull image list.
|
||||
// This is in its own function so that we can ensure that the various readers are properly closed, as some
|
||||
// decompressing readers need to be explicitly closed and others do not.
|
||||
func preloadFile(ctx context.Context, cfg *config.Node, client *containerd.Client, criConn *grpc.ClientConn, filePath string) error {
|
||||
func preloadFile(ctx context.Context, cfg *config.Node, client *containerd.Client, imageClient runtimeapi.ImageServiceClient, filePath string) error {
|
||||
var images []images.Image
|
||||
if util2.HasSuffixI(filePath, ".txt") {
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
|
@ -194,28 +193,102 @@ func preloadFile(ctx context.Context, cfg *config.Node, client *containerd.Clien
|
|||
}
|
||||
defer file.Close()
|
||||
logrus.Infof("Pulling images from %s", filePath)
|
||||
return prePullImages(ctx, criConn, file)
|
||||
images, err = prePullImages(ctx, client, imageClient, file)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to pull images from "+filePath)
|
||||
}
|
||||
} else {
|
||||
opener, err := tarfile.GetOpener(filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
imageReader, err := opener()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer imageReader.Close()
|
||||
|
||||
logrus.Infof("Importing images from %s", filePath)
|
||||
images, err = client.Import(ctx, imageReader, containerd.WithAllPlatforms(true))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to import images from "+filePath)
|
||||
}
|
||||
}
|
||||
|
||||
opener, err := tarfile.GetOpener(filePath)
|
||||
if err := labelImages(ctx, client, images); err != nil {
|
||||
return errors.Wrap(err, "failed to add pinned label to images")
|
||||
}
|
||||
if err := retagImages(ctx, client, images, cfg.AgentConfig.AirgapExtraRegistry); err != nil {
|
||||
return errors.Wrap(err, "failed to retag images")
|
||||
}
|
||||
|
||||
for _, image := range images {
|
||||
logrus.Infof("Imported %s", image.Name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// clearLeases deletes any leases left by previous versions of k3s.
|
||||
// We no longer use leases to lock content; they only locked the
|
||||
// blobs, not the actual images.
|
||||
func clearLeases(ctx context.Context, client *containerd.Client) error {
|
||||
ls := client.LeasesService()
|
||||
existingLeases, err := ls.List(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, lease := range existingLeases {
|
||||
if lease.ID == version.Program {
|
||||
logrus.Debugf("Deleting existing lease: %v", lease)
|
||||
ls.Delete(ctx, lease)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
imageReader, err := opener()
|
||||
// clearLabels removes the pinned labels on all images in the image store that were previously pinned by k3s
|
||||
func clearLabels(ctx context.Context, client *containerd.Client) error {
|
||||
var errs []error
|
||||
imageService := client.ImageService()
|
||||
images, err := imageService.List(ctx, fmt.Sprintf("labels.%q==%s", k3sPinnedImageLabelKey, k3sPinnedImageLabelValue))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer imageReader.Close()
|
||||
|
||||
logrus.Infof("Importing images from %s", filePath)
|
||||
|
||||
images, err := client.Import(ctx, imageReader, containerd.WithAllPlatforms(true))
|
||||
if err != nil {
|
||||
return err
|
||||
for _, image := range images {
|
||||
delete(image.Labels, k3sPinnedImageLabelKey)
|
||||
delete(image.Labels, labels.PinnedImageLabelKey)
|
||||
if _, err := imageService.Update(ctx, image, "labels"); err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "failed to delete labels from image "+image.Name))
|
||||
}
|
||||
}
|
||||
return merr.NewErrors(errs...)
|
||||
}
|
||||
|
||||
return retagImages(ctx, client, images, cfg.AgentConfig.AirgapExtraRegistry)
|
||||
// labelImages adds labels to the listed images, indicating that they
|
||||
// are pinned by k3s and should not be pruned.
|
||||
func labelImages(ctx context.Context, client *containerd.Client, images []images.Image) error {
|
||||
var errs []error
|
||||
imageService := client.ImageService()
|
||||
for i, image := range images {
|
||||
if image.Labels[k3sPinnedImageLabelKey] == k3sPinnedImageLabelValue &&
|
||||
image.Labels[labels.PinnedImageLabelKey] == labels.PinnedImageLabelValue {
|
||||
continue
|
||||
}
|
||||
|
||||
if image.Labels == nil {
|
||||
image.Labels = map[string]string{}
|
||||
}
|
||||
image.Labels[k3sPinnedImageLabelKey] = k3sPinnedImageLabelValue
|
||||
image.Labels[labels.PinnedImageLabelKey] = labels.PinnedImageLabelValue
|
||||
updatedImage, err := imageService.Update(ctx, image, "labels")
|
||||
if err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "failed to add labels to image "+image.Name))
|
||||
} else {
|
||||
images[i] = updatedImage
|
||||
}
|
||||
}
|
||||
return merr.NewErrors(errs...)
|
||||
}
|
||||
|
||||
// retagImages retags all listed images as having been pulled from the given remote registries.
|
||||
|
@ -227,24 +300,27 @@ func retagImages(ctx context.Context, client *containerd.Client, images []images
|
|||
for _, image := range images {
|
||||
name, err := parseNamedTagged(image.Name)
|
||||
if err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "failed to parse image name"))
|
||||
errs = append(errs, errors.Wrap(err, "failed to parse tags for image "+image.Name))
|
||||
continue
|
||||
}
|
||||
logrus.Infof("Imported %s", image.Name)
|
||||
for _, registry := range registries {
|
||||
image.Name = fmt.Sprintf("%s/%s:%s", registry, docker.Path(name), name.Tag())
|
||||
newName := fmt.Sprintf("%s/%s:%s", registry, docker.Path(name), name.Tag())
|
||||
if newName == image.Name {
|
||||
continue
|
||||
}
|
||||
image.Name = newName
|
||||
if _, err = imageService.Create(ctx, image); err != nil {
|
||||
if errdefs.IsAlreadyExists(err) {
|
||||
if err = imageService.Delete(ctx, image.Name); err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "failed to delete existing image"))
|
||||
errs = append(errs, errors.Wrap(err, "failed to delete existing image "+image.Name))
|
||||
continue
|
||||
}
|
||||
if _, err = imageService.Create(ctx, image); err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "failed to tag after deleting existing image"))
|
||||
errs = append(errs, errors.Wrap(err, "failed to tag after deleting existing image "+image.Name))
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
errs = append(errs, errors.Wrap(err, "failed to tag image"))
|
||||
errs = append(errs, errors.Wrap(err, "failed to tag image "+image.Name))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
@ -269,30 +345,43 @@ func parseNamedTagged(name string) (docker.NamedTagged, error) {
|
|||
}
|
||||
|
||||
// prePullImages asks containerd to pull images in a given list, so that they
|
||||
// are ready when the containers attempt to start later.
|
||||
func prePullImages(ctx context.Context, conn *grpc.ClientConn, images io.Reader) error {
|
||||
imageClient := runtimeapi.NewImageServiceClient(conn)
|
||||
scanner := bufio.NewScanner(images)
|
||||
// are ready when the containers attempt to start later. If the image already exists,
|
||||
// or is successfully pulled, information about the image is retrieved from the image store.
|
||||
// NOTE: Pulls MUST be done via CRI API, not containerd API, in order to use mirrors and rewrites.
|
||||
func prePullImages(ctx context.Context, client *containerd.Client, imageClient runtimeapi.ImageServiceClient, imageList io.Reader) ([]images.Image, error) {
|
||||
errs := []error{}
|
||||
images := []images.Image{}
|
||||
imageService := client.ImageService()
|
||||
scanner := bufio.NewScanner(imageList)
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
resp, err := imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{
|
||||
name := strings.TrimSpace(scanner.Text())
|
||||
if _, err := imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{
|
||||
Image: &runtimeapi.ImageSpec{
|
||||
Image: line,
|
||||
Image: name,
|
||||
},
|
||||
})
|
||||
if err == nil && resp.Image != nil {
|
||||
}); err == nil {
|
||||
logrus.Infof("Image %s has already been pulled", name)
|
||||
if image, err := imageService.Get(ctx, name); err != nil {
|
||||
errs = append(errs, err)
|
||||
} else {
|
||||
images = append(images, image)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
logrus.Infof("Pulling image %s...", line)
|
||||
_, err = imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{
|
||||
logrus.Infof("Pulling image %s", name)
|
||||
if _, err := imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{
|
||||
Image: &runtimeapi.ImageSpec{
|
||||
Image: line,
|
||||
Image: name,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to pull %s: %v", line, err)
|
||||
}); err != nil {
|
||||
errs = append(errs, err)
|
||||
} else {
|
||||
if image, err := imageService.Get(ctx, name); err != nil {
|
||||
errs = append(errs, err)
|
||||
} else {
|
||||
images = append(images, image)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return images, merr.NewErrors(errs...)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue