Update vendor

pull/921/head
Erik Wilson 2019-10-18 12:31:18 -07:00
parent 1df72d14b8
commit e22eb82d61
3 changed files with 131 additions and 68 deletions

View File

@ -19,10 +19,10 @@
package main package main
import ( import (
v1 "github.com/containerd/containerd/runtime/v2/runc/v1" v2 "github.com/containerd/containerd/runtime/v2/runc/v2"
"github.com/containerd/containerd/runtime/v2/shim" "github.com/containerd/containerd/runtime/v2/shim"
) )
func main() { func main() {
shim.Run("io.containerd.runc.v1", v1.New) shim.Run("io.containerd.runc.v2", v2.New)
} }

View File

@ -16,7 +16,7 @@
limitations under the License. limitations under the License.
*/ */
package v1 package v2
import ( import (
"context" "context"
@ -25,6 +25,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"strings"
"sync" "sync"
"syscall" "syscall"
"time" "time"
@ -60,6 +61,18 @@ var (
empty = &ptypes.Empty{} empty = &ptypes.Empty{}
) )
// group labels specifies how the shim groups services.
// currently supports a runc.v2 specific .group label and the
// standard k8s pod label. Order matters in this list
var groupLabels = []string{
"io.containerd.runc.v2.group",
"io.kubernetes.cri.sandbox-id",
}
type spec struct {
Annotations map[string]string `json:"annotations,omitempty"`
}
// New returns a new shim service that can be used via GRPC // New returns a new shim service that can be used via GRPC
func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) { func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
ep, err := oom.New(publisher) ep, err := oom.New(publisher)
@ -68,12 +81,13 @@ func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func
} }
go ep.Run(ctx) go ep.Run(ctx)
s := &service{ s := &service{
id: id, id: id,
context: ctx, context: ctx,
events: make(chan interface{}, 128), events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(), ec: reaper.Default.Subscribe(),
ep: ep, ep: ep,
cancel: shutdown, cancel: shutdown,
containers: make(map[string]*runc.Container),
} }
go s.processExits() go s.processExits()
runcC.Monitor = reaper.Default runcC.Monitor = reaper.Default
@ -96,8 +110,10 @@ type service struct {
ec chan runcC.Exit ec chan runcC.Exit
ep *oom.Epoller ep *oom.Epoller
id string // id only used in cleanup case
container *runc.Container id string
containers map[string]*runc.Container
cancel func() cancel func()
} }
@ -122,24 +138,54 @@ func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, co
} }
cmd := exec.Command(self, args...) cmd := exec.Command(self, args...)
cmd.Dir = cwd cmd.Dir = cwd
cmd.Env = append(os.Environ(), "GOMAXPROCS=2") cmd.Env = append(os.Environ(), "GOMAXPROCS=4")
cmd.SysProcAttr = &syscall.SysProcAttr{ cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true, Setpgid: true,
} }
return cmd, nil return cmd, nil
} }
func readSpec() (*spec, error) {
f, err := os.Open("config.json")
if err != nil {
return nil, err
}
defer f.Close()
var s spec
if err := json.NewDecoder(f).Decode(&s); err != nil {
return nil, err
}
return &s, nil
}
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) { func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) {
cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress, containerdTTRPCAddress) cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress, containerdTTRPCAddress)
if err != nil { if err != nil {
return "", err return "", err
} }
address, err := shim.SocketAddress(ctx, id) grouping := id
spec, err := readSpec()
if err != nil {
return "", err
}
for _, group := range groupLabels {
if groupID, ok := spec.Annotations[group]; ok {
grouping = groupID
break
}
}
address, err := shim.SocketAddress(ctx, grouping)
if err != nil { if err != nil {
return "", err return "", err
} }
socket, err := shim.NewSocket(address) socket, err := shim.NewSocket(address)
if err != nil { if err != nil {
if strings.Contains(err.Error(), "address already in use") {
if err := shim.WriteAddress("address", address); err != nil {
return "", err
}
return address, nil
}
return "", err return "", err
} }
defer socket.Close() defer socket.Close()
@ -161,9 +207,6 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container
}() }()
// make sure to wait after start // make sure to wait after start
go cmd.Wait() go cmd.Wait()
if err := shim.WritePidFile("shim.pid", cmd.Process.Pid); err != nil {
return "", err
}
if err := shim.WriteAddress("address", address); err != nil { if err := shim.WriteAddress("address", address); err != nil {
return "", err return "", err
} }
@ -199,14 +242,16 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container
} }
func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) {
path, err := os.Getwd() cwd, err := os.Getwd()
if err != nil { if err != nil {
return nil, err return nil, err
} }
path := filepath.Join(filepath.Dir(cwd), s.id)
ns, err := namespaces.NamespaceRequired(ctx) ns, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
runtime, err := runc.ReadRuntime(path) runtime, err := runc.ReadRuntime(path)
if err != nil { if err != nil {
return nil, err return nil, err
@ -236,7 +281,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
return nil, err return nil, err
} }
s.container = container s.containers[r.ID] = container
s.send(&eventstypes.TaskCreate{ s.send(&eventstypes.TaskCreate{
ContainerID: r.ID, ContainerID: r.ID,
@ -259,7 +304,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
// Start a process // Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
container, err := s.getContainer() container, err := s.getContainer(r.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -295,7 +340,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
// Delete the initial process and container // Delete the initial process and container
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
container, err := s.getContainer() container, err := s.getContainer(r.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -305,7 +350,11 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
} }
// if we deleted our init task, close the platform and send the task delete event // if we deleted our init task, close the platform and send the task delete event
if r.ExecID == "" { if r.ExecID == "" {
if s.platform != nil { s.mu.Lock()
delete(s.containers, r.ID)
hasContainers := len(s.containers) > 0
s.mu.Unlock()
if s.platform != nil && !hasContainers {
s.platform.Close() s.platform.Close()
} }
s.send(&eventstypes.TaskDelete{ s.send(&eventstypes.TaskDelete{
@ -324,7 +373,7 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
// Exec an additional process inside the container // Exec an additional process inside the container
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
container, err := s.getContainer() container, err := s.getContainer(r.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -339,7 +388,7 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
} }
s.send(&eventstypes.TaskExecAdded{ s.send(&eventstypes.TaskExecAdded{
ContainerID: s.container.ID, ContainerID: container.ID,
ExecID: process.ID(), ExecID: process.ID(),
}) })
return empty, nil return empty, nil
@ -347,7 +396,7 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
// ResizePty of a process // ResizePty of a process
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
container, err := s.getContainer() container, err := s.getContainer(r.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -359,7 +408,11 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*
// State returns runtime state information for a process // State returns runtime state information for a process
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
p, err := s.getProcess(r.ExecID) container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
p, err := container.Process(r.ExecID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -383,7 +436,7 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
sio := p.Stdio() sio := p.Stdio()
return &taskAPI.StateResponse{ return &taskAPI.StateResponse{
ID: p.ID(), ID: p.ID(),
Bundle: s.container.Bundle, Bundle: container.Bundle,
Pid: uint32(p.Pid()), Pid: uint32(p.Pid()),
Status: status, Status: status,
Stdin: sio.Stdin, Stdin: sio.Stdin,
@ -397,7 +450,7 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
// Pause the container // Pause the container
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
container, err := s.getContainer() container, err := s.getContainer(r.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -412,7 +465,7 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
// Resume the container // Resume the container
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
container, err := s.getContainer() container, err := s.getContainer(r.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -427,7 +480,7 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
// Kill a process with the provided signal // Kill a process with the provided signal
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
container, err := s.getContainer() container, err := s.getContainer(r.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -439,7 +492,7 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Emp
// Pids returns all pids inside the container // Pids returns all pids inside the container
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
container, err := s.getContainer() container, err := s.getContainer(r.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -474,7 +527,7 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
// CloseIO of a process // CloseIO of a process
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
container, err := s.getContainer() container, err := s.getContainer(r.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -486,7 +539,7 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp
// Checkpoint the container // Checkpoint the container
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
container, err := s.getContainer() container, err := s.getContainer(r.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -498,7 +551,7 @@ func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskReque
// Update a running container // Update a running container
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
container, err := s.getContainer() container, err := s.getContainer(r.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -510,7 +563,7 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*pt
// Wait for a process to exit // Wait for a process to exit
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
container, err := s.getContainer() container, err := s.getContainer(r.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -529,8 +582,8 @@ func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.Wa
// Connect returns shim information such as the shim's pid // Connect returns shim information such as the shim's pid
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
var pid int var pid int
if s.container != nil { if container, err := s.getContainer(r.ID); err == nil {
pid = s.container.Pid() pid = container.Pid()
} }
return &taskAPI.ConnectResponse{ return &taskAPI.ConnectResponse{
ShimPid: uint32(os.Getpid()), ShimPid: uint32(os.Getpid()),
@ -539,13 +592,23 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
} }
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
s.mu.Lock()
// return out if the shim is still servicing containers
if len(s.containers) > 0 {
s.mu.Unlock()
return empty, nil
}
s.cancel() s.cancel()
close(s.events) close(s.events)
return empty, nil return empty, nil
} }
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
cg := s.container.Cgroup() container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
cg := container.Cgroup()
if cg == nil { if cg == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
} }
@ -579,27 +642,34 @@ func (s *service) sendL(evt interface{}) {
} }
func (s *service) checkProcesses(e runcC.Exit) { func (s *service) checkProcesses(e runcC.Exit) {
container, err := s.getContainer() s.mu.Lock()
if err != nil { defer s.mu.Unlock()
return
}
shouldKillAll, err := shouldKillAllOnExit(container.Bundle) for _, container := range s.containers {
if err != nil { if !container.HasPid(e.Pid) {
log.G(s.context).WithError(err).Error("failed to check shouldKillAll") continue
} }
for _, p := range container.All() { for _, p := range container.All() {
if p.Pid() == e.Pid { if p.Pid() != e.Pid {
if shouldKillAll { continue
if ip, ok := p.(*process.Init); ok { }
// Ensure all children are killed
if ip, ok := p.(*process.Init); ok {
shouldKillAll, err := shouldKillAllOnExit(container.Bundle)
if err != nil {
log.G(s.context).WithError(err).Error("failed to check shouldKillAll")
}
// Ensure all children are killed
if shouldKillAll {
if err := ip.KillAll(s.context); err != nil { if err := ip.KillAll(s.context); err != nil {
logrus.WithError(err).WithField("id", ip.ID()). logrus.WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children") Error("failed to kill init's children")
} }
} }
} }
p.SetExited(e.Status) p.SetExited(e.Status)
s.sendL(&eventstypes.TaskExit{ s.sendL(&eventstypes.TaskExit{
ContainerID: container.ID, ContainerID: container.ID,
@ -610,6 +680,7 @@ func (s *service) checkProcesses(e runcC.Exit) {
}) })
return return
} }
return
} }
} }
@ -623,7 +694,7 @@ func shouldKillAllOnExit(bundlePath string) (bool, error) {
if bundleSpec.Linux != nil { if bundleSpec.Linux != nil {
for _, ns := range bundleSpec.Linux.Namespaces { for _, ns := range bundleSpec.Linux.Namespaces {
if ns.Type == specs.PIDNamespace { if ns.Type == specs.PIDNamespace && ns.Path == "" {
return false, nil return false, nil
} }
} }
@ -633,7 +704,11 @@ func shouldKillAllOnExit(bundlePath string) (bool, error) {
} }
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
p, err := s.container.Process("") container, err := s.getContainer(id)
if err != nil {
return nil, err
}
p, err := container.Process("")
if err != nil { if err != nil {
return nil, errdefs.ToGRPC(err) return nil, errdefs.ToGRPC(err)
} }
@ -662,9 +737,9 @@ func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
publisher.Close() publisher.Close()
} }
func (s *service) getContainer() (*runc.Container, error) { func (s *service) getContainer(id string) (*runc.Container, error) {
s.mu.Lock() s.mu.Lock()
container := s.container container := s.containers[id]
s.mu.Unlock() s.mu.Unlock()
if container == nil { if container == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created") return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created")
@ -672,18 +747,6 @@ func (s *service) getContainer() (*runc.Container, error) {
return container, nil return container, nil
} }
func (s *service) getProcess(execID string) (process.Process, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
p, err := container.Process(execID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return p, nil
}
// initialize a single epoll fd to manage our consoles. `initPlatform` should // initialize a single epoll fd to manage our consoles. `initPlatform` should
// only be called once. // only be called once.
func (s *service) initPlatform() error { func (s *service) initPlatform() error {

4
vendor/modules.txt vendored
View File

@ -172,7 +172,7 @@ github.com/containerd/containerd/archive
github.com/containerd/containerd/archive/compression github.com/containerd/containerd/archive/compression
github.com/containerd/containerd/cio github.com/containerd/containerd/cio
github.com/containerd/containerd/cmd/containerd-shim github.com/containerd/containerd/cmd/containerd-shim
github.com/containerd/containerd/cmd/containerd-shim-runc-v1 github.com/containerd/containerd/cmd/containerd-shim-runc-v2
github.com/containerd/containerd/cmd/containerd/command github.com/containerd/containerd/cmd/containerd/command
github.com/containerd/containerd/cmd/ctr/app github.com/containerd/containerd/cmd/ctr/app
github.com/containerd/containerd/cmd/ctr/commands github.com/containerd/containerd/cmd/ctr/commands
@ -251,7 +251,7 @@ github.com/containerd/containerd/runtime/v1/shim/v1
github.com/containerd/containerd/runtime/v2 github.com/containerd/containerd/runtime/v2
github.com/containerd/containerd/runtime/v2/runc github.com/containerd/containerd/runtime/v2/runc
github.com/containerd/containerd/runtime/v2/runc/options github.com/containerd/containerd/runtime/v2/runc/options
github.com/containerd/containerd/runtime/v2/runc/v1 github.com/containerd/containerd/runtime/v2/runc/v2
github.com/containerd/containerd/runtime/v2/shim github.com/containerd/containerd/runtime/v2/shim
github.com/containerd/containerd/runtime/v2/task github.com/containerd/containerd/runtime/v2/task
github.com/containerd/containerd/services github.com/containerd/containerd/services