diff --git a/vendor/github.com/containerd/containerd/cmd/containerd-shim-runc-v1/main.go b/vendor/github.com/containerd/containerd/cmd/containerd-shim-runc-v2/main.go similarity index 87% rename from vendor/github.com/containerd/containerd/cmd/containerd-shim-runc-v1/main.go rename to vendor/github.com/containerd/containerd/cmd/containerd-shim-runc-v2/main.go index 1b1b106aac..4f5d804d9a 100644 --- a/vendor/github.com/containerd/containerd/cmd/containerd-shim-runc-v1/main.go +++ b/vendor/github.com/containerd/containerd/cmd/containerd-shim-runc-v2/main.go @@ -19,10 +19,10 @@ package main import ( - v1 "github.com/containerd/containerd/runtime/v2/runc/v1" + v2 "github.com/containerd/containerd/runtime/v2/runc/v2" "github.com/containerd/containerd/runtime/v2/shim" ) func main() { - shim.Run("io.containerd.runc.v1", v1.New) + shim.Run("io.containerd.runc.v2", v2.New) } diff --git a/vendor/github.com/containerd/containerd/runtime/v2/runc/v1/service.go b/vendor/github.com/containerd/containerd/runtime/v2/runc/v2/service.go similarity index 83% rename from vendor/github.com/containerd/containerd/runtime/v2/runc/v1/service.go rename to vendor/github.com/containerd/containerd/runtime/v2/runc/v2/service.go index 05c4d66466..9bdef0e24d 100644 --- a/vendor/github.com/containerd/containerd/runtime/v2/runc/v1/service.go +++ b/vendor/github.com/containerd/containerd/runtime/v2/runc/v2/service.go @@ -16,7 +16,7 @@ limitations under the License. */ -package v1 +package v2 import ( "context" @@ -25,6 +25,7 @@ import ( "os" "os/exec" "path/filepath" + "strings" "sync" "syscall" "time" @@ -60,6 +61,18 @@ var ( empty = &ptypes.Empty{} ) +// group labels specifies how the shim groups services. +// currently supports a runc.v2 specific .group label and the +// standard k8s pod label. Order matters in this list +var groupLabels = []string{ + "io.containerd.runc.v2.group", + "io.kubernetes.cri.sandbox-id", +} + +type spec struct { + Annotations map[string]string `json:"annotations,omitempty"` +} + // New returns a new shim service that can be used via GRPC func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) { ep, err := oom.New(publisher) @@ -68,12 +81,13 @@ func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func } go ep.Run(ctx) s := &service{ - id: id, - context: ctx, - events: make(chan interface{}, 128), - ec: reaper.Default.Subscribe(), - ep: ep, - cancel: shutdown, + id: id, + context: ctx, + events: make(chan interface{}, 128), + ec: reaper.Default.Subscribe(), + ep: ep, + cancel: shutdown, + containers: make(map[string]*runc.Container), } go s.processExits() runcC.Monitor = reaper.Default @@ -96,8 +110,10 @@ type service struct { ec chan runcC.Exit ep *oom.Epoller - id string - container *runc.Container + // id only used in cleanup case + id string + + containers map[string]*runc.Container cancel func() } @@ -122,24 +138,54 @@ func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, co } cmd := exec.Command(self, args...) cmd.Dir = cwd - cmd.Env = append(os.Environ(), "GOMAXPROCS=2") + cmd.Env = append(os.Environ(), "GOMAXPROCS=4") cmd.SysProcAttr = &syscall.SysProcAttr{ Setpgid: true, } return cmd, nil } +func readSpec() (*spec, error) { + f, err := os.Open("config.json") + if err != nil { + return nil, err + } + defer f.Close() + var s spec + if err := json.NewDecoder(f).Decode(&s); err != nil { + return nil, err + } + return &s, nil +} + func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) { cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress, containerdTTRPCAddress) if err != nil { return "", err } - address, err := shim.SocketAddress(ctx, id) + grouping := id + spec, err := readSpec() + if err != nil { + return "", err + } + for _, group := range groupLabels { + if groupID, ok := spec.Annotations[group]; ok { + grouping = groupID + break + } + } + address, err := shim.SocketAddress(ctx, grouping) if err != nil { return "", err } socket, err := shim.NewSocket(address) if err != nil { + if strings.Contains(err.Error(), "address already in use") { + if err := shim.WriteAddress("address", address); err != nil { + return "", err + } + return address, nil + } return "", err } defer socket.Close() @@ -161,9 +207,6 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container }() // make sure to wait after start go cmd.Wait() - if err := shim.WritePidFile("shim.pid", cmd.Process.Pid); err != nil { - return "", err - } if err := shim.WriteAddress("address", address); err != nil { return "", err } @@ -199,14 +242,16 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container } func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { - path, err := os.Getwd() + cwd, err := os.Getwd() if err != nil { return nil, err } + path := filepath.Join(filepath.Dir(cwd), s.id) ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err } + runtime, err := runc.ReadRuntime(path) if err != nil { return nil, err @@ -236,7 +281,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * return nil, err } - s.container = container + s.containers[r.ID] = container s.send(&eventstypes.TaskCreate{ ContainerID: r.ID, @@ -259,7 +304,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * // Start a process func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { - container, err := s.getContainer() + container, err := s.getContainer(r.ID) if err != nil { return nil, err } @@ -295,7 +340,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. // Delete the initial process and container func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { - container, err := s.getContainer() + container, err := s.getContainer(r.ID) if err != nil { return nil, err } @@ -305,7 +350,11 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP } // if we deleted our init task, close the platform and send the task delete event if r.ExecID == "" { - if s.platform != nil { + s.mu.Lock() + delete(s.containers, r.ID) + hasContainers := len(s.containers) > 0 + s.mu.Unlock() + if s.platform != nil && !hasContainers { s.platform.Close() } s.send(&eventstypes.TaskDelete{ @@ -324,7 +373,7 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP // Exec an additional process inside the container func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { - container, err := s.getContainer() + container, err := s.getContainer(r.ID) if err != nil { return nil, err } @@ -339,7 +388,7 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty } s.send(&eventstypes.TaskExecAdded{ - ContainerID: s.container.ID, + ContainerID: container.ID, ExecID: process.ID(), }) return empty, nil @@ -347,7 +396,7 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty // ResizePty of a process func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { - container, err := s.getContainer() + container, err := s.getContainer(r.ID) if err != nil { return nil, err } @@ -359,7 +408,11 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (* // State returns runtime state information for a process func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { - p, err := s.getProcess(r.ExecID) + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + p, err := container.Process(r.ExecID) if err != nil { return nil, err } @@ -383,7 +436,7 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI. sio := p.Stdio() return &taskAPI.StateResponse{ ID: p.ID(), - Bundle: s.container.Bundle, + Bundle: container.Bundle, Pid: uint32(p.Pid()), Status: status, Stdin: sio.Stdin, @@ -397,7 +450,7 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI. // Pause the container func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { - container, err := s.getContainer() + container, err := s.getContainer(r.ID) if err != nil { return nil, err } @@ -412,7 +465,7 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E // Resume the container func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { - container, err := s.getContainer() + container, err := s.getContainer(r.ID) if err != nil { return nil, err } @@ -427,7 +480,7 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes // Kill a process with the provided signal func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { - container, err := s.getContainer() + container, err := s.getContainer(r.ID) if err != nil { return nil, err } @@ -439,7 +492,7 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Emp // Pids returns all pids inside the container func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { - container, err := s.getContainer() + container, err := s.getContainer(r.ID) if err != nil { return nil, err } @@ -474,7 +527,7 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi // CloseIO of a process func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { - container, err := s.getContainer() + container, err := s.getContainer(r.ID) if err != nil { return nil, err } @@ -486,7 +539,7 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp // Checkpoint the container func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { - container, err := s.getContainer() + container, err := s.getContainer(r.ID) if err != nil { return nil, err } @@ -498,7 +551,7 @@ func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskReque // Update a running container func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { - container, err := s.getContainer() + container, err := s.getContainer(r.ID) if err != nil { return nil, err } @@ -510,7 +563,7 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*pt // Wait for a process to exit func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { - container, err := s.getContainer() + container, err := s.getContainer(r.ID) if err != nil { return nil, err } @@ -529,8 +582,8 @@ func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.Wa // Connect returns shim information such as the shim's pid func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { var pid int - if s.container != nil { - pid = s.container.Pid() + if container, err := s.getContainer(r.ID); err == nil { + pid = container.Pid() } return &taskAPI.ConnectResponse{ ShimPid: uint32(os.Getpid()), @@ -539,13 +592,23 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task } func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { + s.mu.Lock() + // return out if the shim is still servicing containers + if len(s.containers) > 0 { + s.mu.Unlock() + return empty, nil + } s.cancel() close(s.events) return empty, nil } func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { - cg := s.container.Cgroup() + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + cg := container.Cgroup() if cg == nil { return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") } @@ -579,27 +642,34 @@ func (s *service) sendL(evt interface{}) { } func (s *service) checkProcesses(e runcC.Exit) { - container, err := s.getContainer() - if err != nil { - return - } + s.mu.Lock() + defer s.mu.Unlock() - shouldKillAll, err := shouldKillAllOnExit(container.Bundle) - if err != nil { - log.G(s.context).WithError(err).Error("failed to check shouldKillAll") - } + for _, container := range s.containers { + if !container.HasPid(e.Pid) { + continue + } - for _, p := range container.All() { - if p.Pid() == e.Pid { - if shouldKillAll { - if ip, ok := p.(*process.Init); ok { - // Ensure all children are killed + for _, p := range container.All() { + if p.Pid() != e.Pid { + continue + } + + if ip, ok := p.(*process.Init); ok { + shouldKillAll, err := shouldKillAllOnExit(container.Bundle) + if err != nil { + log.G(s.context).WithError(err).Error("failed to check shouldKillAll") + } + + // Ensure all children are killed + if shouldKillAll { if err := ip.KillAll(s.context); err != nil { logrus.WithError(err).WithField("id", ip.ID()). Error("failed to kill init's children") } } } + p.SetExited(e.Status) s.sendL(&eventstypes.TaskExit{ ContainerID: container.ID, @@ -610,6 +680,7 @@ func (s *service) checkProcesses(e runcC.Exit) { }) return } + return } } @@ -623,7 +694,7 @@ func shouldKillAllOnExit(bundlePath string) (bool, error) { if bundleSpec.Linux != nil { for _, ns := range bundleSpec.Linux.Namespaces { - if ns.Type == specs.PIDNamespace { + if ns.Type == specs.PIDNamespace && ns.Path == "" { return false, nil } } @@ -633,7 +704,11 @@ func shouldKillAllOnExit(bundlePath string) (bool, error) { } func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { - p, err := s.container.Process("") + container, err := s.getContainer(id) + if err != nil { + return nil, err + } + p, err := container.Process("") if err != nil { return nil, errdefs.ToGRPC(err) } @@ -662,9 +737,9 @@ func (s *service) forward(ctx context.Context, publisher shim.Publisher) { publisher.Close() } -func (s *service) getContainer() (*runc.Container, error) { +func (s *service) getContainer(id string) (*runc.Container, error) { s.mu.Lock() - container := s.container + container := s.containers[id] s.mu.Unlock() if container == nil { return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created") @@ -672,18 +747,6 @@ func (s *service) getContainer() (*runc.Container, error) { return container, nil } -func (s *service) getProcess(execID string) (process.Process, error) { - container, err := s.getContainer() - if err != nil { - return nil, err - } - p, err := container.Process(execID) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - return p, nil -} - // initialize a single epoll fd to manage our consoles. `initPlatform` should // only be called once. func (s *service) initPlatform() error { diff --git a/vendor/modules.txt b/vendor/modules.txt index b7b18185ef..fde7bc5a36 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -172,7 +172,7 @@ github.com/containerd/containerd/archive github.com/containerd/containerd/archive/compression github.com/containerd/containerd/cio github.com/containerd/containerd/cmd/containerd-shim -github.com/containerd/containerd/cmd/containerd-shim-runc-v1 +github.com/containerd/containerd/cmd/containerd-shim-runc-v2 github.com/containerd/containerd/cmd/containerd/command github.com/containerd/containerd/cmd/ctr/app github.com/containerd/containerd/cmd/ctr/commands @@ -251,7 +251,7 @@ github.com/containerd/containerd/runtime/v1/shim/v1 github.com/containerd/containerd/runtime/v2 github.com/containerd/containerd/runtime/v2/runc github.com/containerd/containerd/runtime/v2/runc/options -github.com/containerd/containerd/runtime/v2/runc/v1 +github.com/containerd/containerd/runtime/v2/runc/v2 github.com/containerd/containerd/runtime/v2/shim github.com/containerd/containerd/runtime/v2/task github.com/containerd/containerd/services