|
|
|
@ -15,7 +15,11 @@ import (
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/containerd/containerd"
|
|
|
|
|
"github.com/containerd/containerd/errdefs"
|
|
|
|
|
"github.com/containerd/containerd/images"
|
|
|
|
|
"github.com/containerd/containerd/namespaces"
|
|
|
|
|
"github.com/containerd/containerd/reference/docker"
|
|
|
|
|
"github.com/klauspost/compress/zstd"
|
|
|
|
|
"github.com/natefinch/lumberjack"
|
|
|
|
|
"github.com/opencontainers/runc/libcontainer/system"
|
|
|
|
|
"github.com/pierrec/lz4"
|
|
|
|
@ -24,6 +28,7 @@ import (
|
|
|
|
|
util2 "github.com/rancher/k3s/pkg/agent/util"
|
|
|
|
|
"github.com/rancher/k3s/pkg/daemons/config"
|
|
|
|
|
"github.com/rancher/k3s/pkg/version"
|
|
|
|
|
"github.com/rancher/wrangler/pkg/merr"
|
|
|
|
|
"github.com/sirupsen/logrus"
|
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
|
yaml "gopkg.in/yaml.v2"
|
|
|
|
@ -35,6 +40,8 @@ const (
|
|
|
|
|
maxMsgSize = 1024 * 1024 * 16
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Run configures and starts containerd as a child process. Once it is up, images are preloaded
|
|
|
|
|
// or pulled from files found in the agent images directory.
|
|
|
|
|
func Run(ctx context.Context, cfg *config.Node) error {
|
|
|
|
|
args := []string{
|
|
|
|
|
"containerd",
|
|
|
|
@ -110,6 +117,7 @@ func Run(ctx context.Context, cfg *config.Node) error {
|
|
|
|
|
return preloadImages(ctx, cfg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// criConnection connects to a CRI socket at the given path.
|
|
|
|
|
func criConnection(ctx context.Context, address string) (*grpc.ClientConn, error) {
|
|
|
|
|
addr, dialer, err := util.GetAddressAndDialer("unix://" + address)
|
|
|
|
|
if err != nil {
|
|
|
|
@ -133,6 +141,10 @@ func criConnection(ctx context.Context, address string) (*grpc.ClientConn, error
|
|
|
|
|
return conn, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// preloadImages reads the contents of the agent images directory, and attempts to
|
|
|
|
|
// import into containerd any files found there. Supported compressed types are decompressed, and
|
|
|
|
|
// any .txt files are processed as a list of images that should be pre-pulled from remote registries.
|
|
|
|
|
// If configured, imported images are retagged as being pulled from additional registries.
|
|
|
|
|
func preloadImages(ctx context.Context, cfg *config.Node) error {
|
|
|
|
|
fileInfo, err := os.Stat(cfg.Images)
|
|
|
|
|
if os.IsNotExist(err) {
|
|
|
|
@ -164,55 +176,132 @@ func preloadImages(ctx context.Context, cfg *config.Node) error {
|
|
|
|
|
}
|
|
|
|
|
defer criConn.Close()
|
|
|
|
|
|
|
|
|
|
ctxContainerD := namespaces.WithNamespace(context.Background(), "k8s.io")
|
|
|
|
|
// Ensure that nothing else can modify the image store while we're importing,
|
|
|
|
|
// and that our images are imported into the k8s.io namespace
|
|
|
|
|
ctx, done, err := client.WithLease(namespaces.WithNamespace(ctx, "k8s.io"))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer done(ctx)
|
|
|
|
|
|
|
|
|
|
for _, fileInfo := range fileInfos {
|
|
|
|
|
if fileInfo.IsDir() {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
start := time.Now()
|
|
|
|
|
filePath := filepath.Join(cfg.Images, fileInfo.Name())
|
|
|
|
|
|
|
|
|
|
file, err := os.Open(filePath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logrus.Errorf("Unable to read %s: %v", filePath, err)
|
|
|
|
|
if err := preloadFile(ctx, cfg, client, criConn, filePath); err != nil {
|
|
|
|
|
logrus.Errorf("Error encountered while importing %s: %v", filePath, err)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if strings.HasSuffix(fileInfo.Name(), ".txt") {
|
|
|
|
|
prePullImages(ctx, criConn, file)
|
|
|
|
|
file.Close()
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logrus.Debugf("Import %s", filePath)
|
|
|
|
|
var imageReader io.Reader
|
|
|
|
|
imageReader = file
|
|
|
|
|
if strings.HasSuffix(fileInfo.Name(), ".tar.bz2") {
|
|
|
|
|
imageReader = bzip2.NewReader(file)
|
|
|
|
|
}
|
|
|
|
|
if strings.HasSuffix(fileInfo.Name(), ".tar.lz4") {
|
|
|
|
|
imageReader = lz4.NewReader(file)
|
|
|
|
|
}
|
|
|
|
|
if strings.HasSuffix(fileInfo.Name(), ".tar.gz") {
|
|
|
|
|
// WARNING: gzip reader close does not close the underlying image
|
|
|
|
|
imageReader, err = gzip.NewReader(file)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logrus.Errorf("Unable to import %s: %v", filePath, err)
|
|
|
|
|
file.Close()
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_, err = client.Import(ctxContainerD, imageReader, containerd.WithAllPlatforms(true))
|
|
|
|
|
file.Close()
|
|
|
|
|
if err != nil {
|
|
|
|
|
logrus.Errorf("Unable to import %s: %v", filePath, err)
|
|
|
|
|
}
|
|
|
|
|
logrus.Debugf("Imported images from %s in %s", filePath, time.Since(start))
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func prePullImages(ctx context.Context, conn *grpc.ClientConn, images io.Reader) {
|
|
|
|
|
// preloadFile handles loading images from a single tarball or pre-pull image list.
|
|
|
|
|
// This is in its own function so that we can ensure that the various readers are properly closed, as some
|
|
|
|
|
// decompressing readers need to be explicitly closed and others do not.
|
|
|
|
|
func preloadFile(ctx context.Context, cfg *config.Node, client *containerd.Client, criConn *grpc.ClientConn, filePath string) error {
|
|
|
|
|
file, err := os.Open(filePath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer file.Close()
|
|
|
|
|
|
|
|
|
|
var imageReader io.Reader
|
|
|
|
|
switch {
|
|
|
|
|
case util2.HasSuffixI(filePath, ".txt"):
|
|
|
|
|
return prePullImages(ctx, criConn, file)
|
|
|
|
|
case util2.HasSuffixI(filePath, ".tar"):
|
|
|
|
|
imageReader = file
|
|
|
|
|
case util2.HasSuffixI(filePath, ".tar.lz4"):
|
|
|
|
|
imageReader = lz4.NewReader(file)
|
|
|
|
|
case util2.HasSuffixI(filePath, ".tar.bz2", ".tbz"):
|
|
|
|
|
imageReader = bzip2.NewReader(file)
|
|
|
|
|
case util2.HasSuffixI(filePath, ".tar.gz", ".tgz"):
|
|
|
|
|
zr, err := gzip.NewReader(file)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer zr.Close()
|
|
|
|
|
imageReader = zr
|
|
|
|
|
case util2.HasSuffixI(filePath, "tar.zst", ".tzst"):
|
|
|
|
|
zr, err := zstd.NewReader(file)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer zr.Close()
|
|
|
|
|
imageReader = zr
|
|
|
|
|
default:
|
|
|
|
|
return errors.New("unhandled file type")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logrus.Infof("Importing images from %s", filePath)
|
|
|
|
|
|
|
|
|
|
images, err := client.Import(ctx, imageReader, containerd.WithAllPlatforms(true))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return retagImages(ctx, client, images, cfg.AgentConfig.AirgapExtraRegistry)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// retagImages retags all listed images as having been pulled from the given remote registries.
|
|
|
|
|
// If duplicate images exist, they are overwritten. This is most useful when using a private registry
|
|
|
|
|
// for all images, as can be configured by the RKE2/Rancher system-default-registry setting.
|
|
|
|
|
func retagImages(ctx context.Context, client *containerd.Client, images []images.Image, registries []string) error {
|
|
|
|
|
var errs []error
|
|
|
|
|
imageService := client.ImageService()
|
|
|
|
|
for _, image := range images {
|
|
|
|
|
name, err := parseNamedTagged(image.Name)
|
|
|
|
|
if err != nil {
|
|
|
|
|
errs = append(errs, errors.Wrap(err, "failed to parse image name"))
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
logrus.Infof("Imported %s", image.Name)
|
|
|
|
|
for _, registry := range registries {
|
|
|
|
|
image.Name = fmt.Sprintf("%s/%s:%s", registry, docker.Path(name), name.Tag())
|
|
|
|
|
if _, err = imageService.Create(ctx, image); err != nil {
|
|
|
|
|
if errdefs.IsAlreadyExists(err) {
|
|
|
|
|
if err = imageService.Delete(ctx, image.Name); err != nil {
|
|
|
|
|
errs = append(errs, errors.Wrap(err, "failed to delete existing image"))
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if _, err = imageService.Create(ctx, image); err != nil {
|
|
|
|
|
errs = append(errs, errors.Wrap(err, "failed to tag after deleting existing image"))
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
errs = append(errs, errors.Wrap(err, "failed to tag image"))
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
logrus.Infof("Tagged %s", image.Name)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return merr.NewErrors(errs...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// parseNamedTagged parses and normalizes an image name, and converts the resulting reference
|
|
|
|
|
// to a type that exposes the tag.
|
|
|
|
|
func parseNamedTagged(name string) (docker.NamedTagged, error) {
|
|
|
|
|
ref, err := docker.ParseNormalizedNamed(name)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
tagged, ok := ref.(docker.NamedTagged)
|
|
|
|
|
if !ok {
|
|
|
|
|
return nil, fmt.Errorf("can't cast %T to NamedTagged", ref)
|
|
|
|
|
}
|
|
|
|
|
return tagged, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// prePullImages asks containerd to pull images in a given list, so that they
|
|
|
|
|
// are ready when the containers attempt to start later.
|
|
|
|
|
func prePullImages(ctx context.Context, conn *grpc.ClientConn, images io.Reader) error {
|
|
|
|
|
imageClient := runtimeapi.NewImageServiceClient(conn)
|
|
|
|
|
scanner := bufio.NewScanner(images)
|
|
|
|
|
for scanner.Scan() {
|
|
|
|
@ -236,8 +325,11 @@ func prePullImages(ctx context.Context, conn *grpc.ClientConn, images io.Reader)
|
|
|
|
|
logrus.Errorf("Failed to pull %s: %v", line, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// setupContainerdConfig generates the containerd.toml, using a template combined with various
|
|
|
|
|
// runtime configurations and registry mirror settings provided by the administrator.
|
|
|
|
|
func setupContainerdConfig(ctx context.Context, cfg *config.Node) error {
|
|
|
|
|
privRegistries, err := getPrivateRegistries(ctx, cfg)
|
|
|
|
|
if err != nil {
|
|
|
|
@ -278,6 +370,7 @@ func setupContainerdConfig(ctx context.Context, cfg *config.Node) error {
|
|
|
|
|
return util2.WriteFile(cfg.Containerd.Config, parsedTemplate)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// getPrivateRegistries loads the registry mirror configuration from registries.yaml
|
|
|
|
|
func getPrivateRegistries(ctx context.Context, cfg *config.Node) (*templates.Registry, error) {
|
|
|
|
|
privRegistries := &templates.Registry{}
|
|
|
|
|
privRegistryFile, err := ioutil.ReadFile(cfg.AgentConfig.PrivateRegistry)
|
|
|
|
|