[release-1.19] containerd: v1.4.3-k3s1 (#2628)

Signed-off-by: Jacob Blain Christen <jacob@rancher.com>
pull/2633/head v1.19.4-rc1+k3s2
Jacob Blain Christen 2020-12-04 14:49:19 -07:00 committed by GitHub
parent 4eba6a7201
commit 4a8915348b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 316 additions and 75 deletions

2
go.mod
View File

@ -8,7 +8,7 @@ replace (
github.com/containerd/btrfs => github.com/containerd/btrfs v0.0.0-20181101203652-af5082808c83
github.com/containerd/cgroups => github.com/containerd/cgroups v0.0.0-20200531161412-0dbf7f05ba59
github.com/containerd/console => github.com/containerd/console v0.0.0-20181022165439-0650fd9eeb50
github.com/containerd/containerd => github.com/rancher/containerd v1.4.1-k3s1
github.com/containerd/containerd => github.com/k3s-io/containerd v1.4.3-k3s1
github.com/containerd/continuity => github.com/containerd/continuity v0.0.0-20190815185530-f2a389ac0a02
github.com/containerd/cri => github.com/rancher/cri v1.4.0-k3s.2 // k3s-release/1.4
github.com/containerd/fifo => github.com/containerd/fifo v0.0.0-20190816180239-bda0ff6ed73c

4
go.sum
View File

@ -468,6 +468,8 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/k3s-io/containerd v1.4.3-k3s1 h1:4n8gpBDXa6IpinjzSl+uJsTPfg0l0Dus6zZBJLl3lDg=
github.com/k3s-io/containerd v1.4.3-k3s1/go.mod h1:SQt4M1HhVTRsIIYHCfaelw8OGnk1FAkZbfiCx1WpkvE=
github.com/k3s-io/kubernetes v1.19.4-k3s1 h1:6ehWJYzSUobbOoKaSnt9O0vbpunkVKeH7kU38ZtWEno=
github.com/k3s-io/kubernetes v1.19.4-k3s1/go.mod h1:yhT1/ltQajQsha3tnYc9QPFYSumGM45nlZdjf7WqE1A=
github.com/k3s-io/kubernetes/staging/src/k8s.io/api v1.19.4-k3s1 h1:Zl0AMZ6Ni5j900hFRt1Z7Q8tbuB0C+BddAyiHkUn94w=
@ -696,8 +698,6 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T
github.com/quobyte/api v0.1.2/go.mod h1:jL7lIHrmqQ7yh05OJ+eEEdHr0u/kmT1Ff9iHd+4H6VI=
github.com/rakelkar/gonetsh v0.0.0-20190930180311-e5c5ffe4bdf0 h1:iXE9kmlAqhusXxzkXictdNgWS7p4ZBnmv9SdyMgTf6E=
github.com/rakelkar/gonetsh v0.0.0-20190930180311-e5c5ffe4bdf0/go.mod h1:4XHkfaUj+URzGO9sohoAgt2V9Y8nIW7fugpu0E6gShk=
github.com/rancher/containerd v1.4.1-k3s1 h1:LjLaRPbmYma4wvYS1m0MLa77VV/FbqI6sjhREn1K5Ls=
github.com/rancher/containerd v1.4.1-k3s1/go.mod h1:sp2cwOHbS1O/EQtz0uNbkkkqiWvkyboQRooFTzNN34A=
github.com/rancher/cri v1.4.0-k3s.2 h1:fX9dGTD9xu6eKB2EDgla2DZHlyusZZzS/hVHvQd3UaQ=
github.com/rancher/cri v1.4.0-k3s.2/go.mod h1:Ht5T1dIKzm+4NExmb7wDVG6qR+j0xeXIjjhCv1d9geY=
github.com/rancher/cri-tools v1.19.0-k3s1 h1:c6lqNWyoAB5+NaUREbpZxKXCuYl9he24/DZEgHywg+A=

View File

@ -12,6 +12,7 @@ Arnaud Porterie <icecrime@gmail.com>
Arnaud Porterie <icecrime@gmail.com> <arnaud.porterie@docker.com>
Bob Mader <swapdisk@users.noreply.github.com>
Boris Popovschi <zyqsempai@mail.ru>
Bowen Yan <loneybw@gmail.com>
Brent Baude <bbaude@redhat.com>
Cao Zhihao <caozhihao@163.com>
Cao Zhihao <caozhihao@163.com> <caozhihao.xd@bytedance.com>

View File

@ -15,7 +15,7 @@ os:
- linux
go:
- "1.13.15"
- "1.15.5"
env:
- TRAVIS_GOOS=linux TEST_RUNTIME=io.containerd.runc.v1 TRAVIS_CGO_ENABLED=1 TRAVIS_DISTRO=bionic GOPROXY=direct

View File

@ -77,7 +77,7 @@ Vagrant.configure("2") do |config|
config.vm.provision "install-golang", type: "shell", run: "once" do |sh|
sh.upload_path = "/tmp/vagrant-install-golang"
sh.env = {
'GO_VERSION': ENV['GO_VERSION'] || "1.13.15",
'GO_VERSION': ENV['GO_VERSION'] || "1.15.5",
}
sh.inline = <<~SHELL
#!/usr/bin/env bash

View File

@ -71,7 +71,7 @@ var (
func init() {
flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs")
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve")
flag.StringVar(&socketFlag, "socket", "", "socket path to serve")
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
flag.StringVar(&workdirFlag, "workdir", "", "path used to storge large temporary data")
flag.StringVar(&runtimeRootFlag, "runtime-root", process.RuncRoot, "root directory for the runtime")
@ -202,10 +202,18 @@ func serve(ctx context.Context, server *ttrpc.Server, path string) error {
f.Close()
path = "[inherited from parent]"
} else {
if len(path) > 106 {
return errors.Errorf("%q: unix socket path too long (> 106)", path)
const (
abstractSocketPrefix = "\x00"
socketPathLimit = 106
)
p := strings.TrimPrefix(path, "unix://")
if len(p) == len(path) {
p = abstractSocketPrefix + p
}
l, err = net.Listen("unix", "\x00"+path)
if len(p) > socketPathLimit {
return errors.Errorf("%q: unix socket path too long (> %d)", p, socketPathLimit)
}
l, err = net.Listen("unix", p)
}
if err != nil {
return err

View File

@ -323,7 +323,7 @@ Loop:
func initPanicFile(path string) error {
var err error
panicFile, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0)
panicFile, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return err
}
@ -347,8 +347,8 @@ func initPanicFile(path string) error {
// Update STD_ERROR_HANDLE to point to the panic file so that Go writes to
// it when it panics. Remember the old stderr to restore it before removing
// the panic file.
sh := windows.STD_ERROR_HANDLE
h, err := windows.GetStdHandle(uint32(sh))
sh := uint32(windows.STD_ERROR_HANDLE)
h, err := windows.GetStdHandle(sh)
if err != nil {
return err
}
@ -372,7 +372,7 @@ func initPanicFile(path string) error {
func removePanicFile() {
if st, err := panicFile.Stat(); err == nil {
if st.Size() == 0 {
sh := windows.STD_ERROR_HANDLE
sh := uint32(windows.STD_ERROR_HANDLE)
setStdHandle.Call(uintptr(sh), uintptr(oldStderr))
panicFile.Close()
os.Remove(panicFile.Name())

View File

@ -24,6 +24,7 @@ import (
"io/ioutil"
"net"
"path/filepath"
"strings"
"github.com/containerd/console"
"github.com/containerd/containerd/cmd/ctr/commands"
@ -240,10 +241,11 @@ func getTaskService(context *cli.Context) (task.TaskService, error) {
s1 := filepath.Join(string(filepath.Separator), "containerd-shim", ns, id, "shim.sock")
// this should not error, ctr always get a default ns
ctx := namespaces.WithNamespace(gocontext.Background(), ns)
s2, _ := shim.SocketAddress(ctx, id)
s2, _ := shim.SocketAddress(ctx, context.GlobalString("address"), id)
s2 = strings.TrimPrefix(s2, "unix://")
for _, socket := range []string{s1, s2} {
conn, err := net.Dial("unix", "\x00"+socket)
for _, socket := range []string{s2, "\x00" + s1} {
conn, err := net.Dial("unix", socket)
if err == nil {
client := ttrpc.NewClient(conn)

View File

@ -87,21 +87,21 @@ func WithRestoreRuntime(ctx context.Context, id string, client *Client, checkpoi
return err
}
}
var options *ptypes.Any
var options ptypes.Any
if m != nil {
store := client.ContentStore()
data, err := content.ReadBlob(ctx, store, *m)
if err != nil {
return errors.Wrap(err, "unable to read checkpoint runtime")
}
if err := proto.Unmarshal(data, options); err != nil {
if err := proto.Unmarshal(data, &options); err != nil {
return err
}
}
c.Runtime = containers.RuntimeInfo{
Name: name,
Options: options,
Options: &options,
}
return nil
}

View File

@ -229,9 +229,47 @@ func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
return r, nil
}
// copyWithBuffer is very similar to io.CopyBuffer https://golang.org/pkg/io/#CopyBuffer
// but instead of using Read to read from the src, we use ReadAtLeast to make sure we have
// a full buffer before we do a write operation to dst to reduce overheads associated
// with the write operations of small buffers.
func copyWithBuffer(dst io.Writer, src io.Reader) (written int64, err error) {
buf := bufPool.Get().(*[]byte)
written, err = io.CopyBuffer(dst, src, *buf)
bufPool.Put(buf)
// If the reader has a WriteTo method, use it to do the copy.
// Avoids an allocation and a copy.
if wt, ok := src.(io.WriterTo); ok {
return wt.WriteTo(dst)
}
// Similarly, if the writer has a ReadFrom method, use it to do the copy.
if rt, ok := dst.(io.ReaderFrom); ok {
return rt.ReadFrom(src)
}
bufRef := bufPool.Get().(*[]byte)
defer bufPool.Put(bufRef)
buf := *bufRef
for {
nr, er := io.ReadAtLeast(src, buf, len(buf))
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if nw > 0 {
written += int64(nw)
}
if ew != nil {
err = ew
break
}
if nr != nw {
err = io.ErrShortWrite
break
}
}
if er != nil {
// If an EOF happens after reading fewer than the requested bytes,
// ReadAtLeast returns ErrUnexpectedEOF.
if er != io.EOF && er != io.ErrUnexpectedEOF {
err = er
}
break
}
}
return
}

View File

@ -232,6 +232,8 @@ func DefaultProfile(sp *specs.Spec) *specs.LinuxSeccomp {
"openat",
"openat2",
"pause",
"pidfd_open",
"pidfd_send_signal",
"pipe",
"pipe2",
"poll",
@ -571,6 +573,7 @@ func DefaultProfile(sp *specs.Spec) *specs.LinuxSeccomp {
s.Syscalls = append(s.Syscalls, specs.LinuxSyscall{
Names: []string{
"kcmp",
"pidfd_getfd",
"process_vm_readv",
"process_vm_writev",
"ptrace",

View File

@ -3,7 +3,7 @@ module github.com/containerd/containerd
go 1.14
replace (
github.com/containerd/cri => github.com/rancher/cri v1.11.1-0.20201105233134-0486eceb9306 // k3s-release/1.4
github.com/containerd/cri => github.com/k3s-io/cri v1.4.0-k3s.2 // k3s-release/1.4
k8s.io/api => k8s.io/api v0.19.0
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.19.0
k8s.io/apimachinery => k8s.io/apimachinery v0.19.0

View File

@ -417,6 +417,8 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/k3s-io/cri v1.4.0-k3s.2 h1:HiJLH0P7k6sSJwbzjPwIN0CeY0iA6bKlb7OyThMiaEo=
github.com/k3s-io/cri v1.4.0-k3s.2/go.mod h1:fGPUUHMKQik/vIegSe05DtX/m4miovdtvVLqRUFAkK0=
github.com/karrick/godirwalk v1.7.5/go.mod h1:2c9FRhkDxdIbgkOnCEvnSWs71Bhugbl46shStcFDJ34=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
@ -577,8 +579,6 @@ github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFB
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/quobyte/api v0.1.2/go.mod h1:jL7lIHrmqQ7yh05OJ+eEEdHr0u/kmT1Ff9iHd+4H6VI=
github.com/rancher/cri v1.11.1-0.20201105233134-0486eceb9306 h1:rgUBUm01ocN+oSQr/XInMPzUHG5aZu3dHSFBb84vBaA=
github.com/rancher/cri v1.11.1-0.20201105233134-0486eceb9306/go.mod h1:Ht5T1dIKzm+4NExmb7wDVG6qR+j0xeXIjjhCv1d9geY=
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M=
github.com/robfig/cron v1.1.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=

View File

@ -91,7 +91,7 @@ func ShimRemote(c *Config, daemonAddress, cgroup string, exitHandler func()) Shi
return func(b *bundle, ns string, ropts *runctypes.RuncOptions) (shim.Config, client.Opt) {
config := b.shimConfig(ns, c, ropts)
return config,
client.WithStart(c.Shim, b.shimAddress(ns), daemonAddress, cgroup, c.ShimDebug, exitHandler)
client.WithStart(c.Shim, b.shimAddress(ns, daemonAddress), daemonAddress, cgroup, c.ShimDebug, exitHandler)
}
}
@ -117,6 +117,11 @@ func (b *bundle) NewShimClient(ctx context.Context, namespace string, getClientO
// Delete deletes the bundle from disk
func (b *bundle) Delete() error {
address, _ := b.loadAddress()
if address != "" {
// we don't care about errors here
client.RemoveSocket(address)
}
err := atomicDelete(b.path)
if err == nil {
return atomicDelete(b.workDir)
@ -133,9 +138,11 @@ func (b *bundle) legacyShimAddress(namespace string) string {
return filepath.Join(string(filepath.Separator), "containerd-shim", namespace, b.id, "shim.sock")
}
func (b *bundle) shimAddress(namespace string) string {
d := sha256.Sum256([]byte(filepath.Join(namespace, b.id)))
return filepath.Join(string(filepath.Separator), "containerd-shim", fmt.Sprintf("%x.sock", d))
const socketRoot = "/run/containerd"
func (b *bundle) shimAddress(namespace, socketPath string) string {
d := sha256.Sum256([]byte(filepath.Join(socketPath, namespace, b.id)))
return fmt.Sprintf("unix://%s/%x", filepath.Join(socketRoot, "s"), d)
}
func (b *bundle) loadAddress() (string, error) {

View File

@ -59,9 +59,17 @@ func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHa
return func(ctx context.Context, config shim.Config) (_ shimapi.ShimService, _ io.Closer, err error) {
socket, err := newSocket(address)
if err != nil {
return nil, nil, err
if !eaddrinuse(err) {
return nil, nil, err
}
if err := RemoveSocket(address); err != nil {
return nil, nil, errors.Wrap(err, "remove already used socket")
}
if socket, err = newSocket(address); err != nil {
return nil, nil, err
}
}
defer socket.Close()
f, err := socket.File()
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to get fd for socket %s", address)
@ -108,6 +116,8 @@ func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHa
if stderrLog != nil {
stderrLog.Close()
}
socket.Close()
RemoveSocket(address)
}()
log.G(ctx).WithFields(logrus.Fields{
"pid": cmd.Process.Pid,
@ -142,6 +152,26 @@ func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHa
}
}
func eaddrinuse(err error) bool {
cause := errors.Cause(err)
netErr, ok := cause.(*net.OpError)
if !ok {
return false
}
if netErr.Op != "listen" {
return false
}
syscallErr, ok := netErr.Err.(*os.SyscallError)
if !ok {
return false
}
errno, ok := syscallErr.Err.(syscall.Errno)
if !ok {
return false
}
return errno == syscall.EADDRINUSE
}
// setupOOMScore gets containerd's oom score and adds +1 to it
// to ensure a shim has a lower* score than the daemons
func setupOOMScore(shimPid int) error {
@ -214,31 +244,73 @@ func writeFile(path, address string) error {
return os.Rename(tempPath, path)
}
func newSocket(address string) (*net.UnixListener, error) {
if len(address) > 106 {
return nil, errors.Errorf("%q: unix socket path too long (> 106)", address)
const (
abstractSocketPrefix = "\x00"
socketPathLimit = 106
)
type socket string
func (s socket) isAbstract() bool {
return !strings.HasPrefix(string(s), "unix://")
}
func (s socket) path() string {
path := strings.TrimPrefix(string(s), "unix://")
// if there was no trim performed, we assume an abstract socket
if len(path) == len(s) {
path = abstractSocketPrefix + path
}
l, err := net.Listen("unix", "\x00"+address)
return path
}
func newSocket(address string) (*net.UnixListener, error) {
if len(address) > socketPathLimit {
return nil, errors.Errorf("%q: unix socket path too long (> %d)", address, socketPathLimit)
}
var (
sock = socket(address)
path = sock.path()
)
if !sock.isAbstract() {
if err := os.MkdirAll(filepath.Dir(path), 0600); err != nil {
return nil, errors.Wrapf(err, "%s", path)
}
}
l, err := net.Listen("unix", path)
if err != nil {
return nil, errors.Wrapf(err, "failed to listen to abstract unix socket %q", address)
return nil, errors.Wrapf(err, "failed to listen to unix socket %q (abstract: %t)", address, sock.isAbstract())
}
if err := os.Chmod(path, 0600); err != nil {
l.Close()
return nil, err
}
return l.(*net.UnixListener), nil
}
// RemoveSocket removes the socket at the specified address if
// it exists on the filesystem
func RemoveSocket(address string) error {
sock := socket(address)
if !sock.isAbstract() {
return os.Remove(sock.path())
}
return nil
}
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
return d(address, 100*time.Second)
}
func annonDialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return dialer.Dialer("\x00"+address, timeout)
func anonDialer(address string, timeout time.Duration) (net.Conn, error) {
return dialer.Dialer(socket(address).path(), timeout)
}
// WithConnect connects to an existing shim
func WithConnect(address string, onClose func()) Opt {
return func(ctx context.Context, config shim.Config) (shimapi.ShimService, io.Closer, error) {
conn, err := connect(address, annonDialer)
conn, err := connect(address, anonDialer)
if err != nil {
return nil, nil, err
}

View File

@ -25,7 +25,6 @@ import (
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
@ -105,6 +104,10 @@ func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func
return nil, errors.Wrap(err, "failed to initialized platform behavior")
}
go s.forward(ctx, publisher)
if address, err := shim.ReadAddress("address"); err == nil {
s.shimAddress = address
}
return s, nil
}
@ -124,7 +127,8 @@ type service struct {
containers map[string]*runc.Container
cancel func()
shimAddress string
cancel func()
}
func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) {
@ -183,30 +187,48 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container
break
}
}
address, err := shim.SocketAddress(ctx, grouping)
address, err := shim.SocketAddress(ctx, containerdAddress, grouping)
if err != nil {
return "", err
}
socket, err := shim.NewSocket(address)
if err != nil {
if strings.Contains(err.Error(), "address already in use") {
// the only time where this would happen is if there is a bug and the socket
// was not cleaned up in the cleanup method of the shim or we are using the
// grouping functionality where the new process should be run with the same
// shim as an existing container
if !shim.SocketEaddrinuse(err) {
return "", errors.Wrap(err, "create new shim socket")
}
if shim.CanConnect(address) {
if err := shim.WriteAddress("address", address); err != nil {
return "", err
return "", errors.Wrap(err, "write existing socket for shim")
}
return address, nil
}
return "", err
if err := shim.RemoveSocket(address); err != nil {
return "", errors.Wrap(err, "remove pre-existing socket")
}
if socket, err = shim.NewSocket(address); err != nil {
return "", errors.Wrap(err, "try create new shim socket 2x")
}
}
defer socket.Close()
defer func() {
if retErr != nil {
socket.Close()
_ = shim.RemoveSocket(address)
}
}()
f, err := socket.File()
if err != nil {
return "", err
}
defer f.Close()
cmd.ExtraFiles = append(cmd.ExtraFiles, f)
if err := cmd.Start(); err != nil {
f.Close()
return "", err
}
defer func() {
@ -273,7 +295,6 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error)
if err != nil {
return nil, err
}
runtime, err := runc.ReadRuntime(path)
if err != nil {
return nil, err
@ -652,7 +673,9 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt
if s.platform != nil {
s.platform.Close()
}
if s.shimAddress != "" {
_ = shim.RemoveSocket(s.shimAddress)
}
return empty, nil
}

View File

@ -433,10 +433,14 @@ func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) {
}
func (s *shim) Process(ctx context.Context, id string) (runtime.Process, error) {
return &process{
p := &process{
id: id,
shim: s,
}, nil
}
if _, err := p.State(ctx); err != nil {
return nil, err
}
return p, nil
}
func (s *shim) State(ctx context.Context) (runtime.State, error) {

View File

@ -104,7 +104,7 @@ func parseFlags() {
flag.BoolVar(&versionFlag, "v", false, "show the shim version and exit")
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
flag.StringVar(&idFlag, "id", "", "id of the task")
flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve")
flag.StringVar(&socketFlag, "socket", "", "socket path to serve")
flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir")
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
@ -195,7 +195,6 @@ func run(id string, initFunc Init, config Config) error {
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id))
ctx, cancel := context.WithCancel(ctx)
service, err := initFunc(ctx, idFlag, publisher, cancel)
if err != nil {
return err
@ -300,11 +299,15 @@ func serve(ctx context.Context, server *ttrpc.Server, path string) error {
return err
}
go func() {
defer l.Close()
if err := server.Serve(ctx, l); err != nil &&
!strings.Contains(err.Error(), "use of closed network connection") {
logrus.WithError(err).Fatal("containerd-shim: ttrpc server failure")
}
l.Close()
if address, err := ReadAddress("address"); err == nil {
_ = RemoveSocket(address)
}
}()
return nil
}

View File

@ -58,15 +58,15 @@ func serveListener(path string) (net.Listener, error) {
l, err = net.FileListener(os.NewFile(3, "socket"))
path = "[inherited from parent]"
} else {
if len(path) > 106 {
return nil, errors.Errorf("%q: unix socket path too long (> 106)", path)
if len(path) > socketPathLimit {
return nil, errors.Errorf("%q: unix socket path too long (> %d)", path, socketPathLimit)
}
l, err = net.Listen("unix", "\x00"+path)
l, err = net.Listen("unix", path)
}
if err != nil {
return nil, err
}
logrus.WithField("socket", path).Debug("serving api on abstract socket")
logrus.WithField("socket", path).Debug("serving api on socket")
return l, nil
}

View File

@ -169,7 +169,7 @@ func WriteAddress(path, address string) error {
// ErrNoAddress is returned when the address file has no content
var ErrNoAddress = errors.New("no shim address")
// ReadAddress returns the shim's abstract socket address from the path
// ReadAddress returns the shim's socket address from the path
func ReadAddress(path string) (string, error) {
path, err := filepath.Abs(path)
if err != nil {

View File

@ -35,7 +35,10 @@ import (
"github.com/pkg/errors"
)
const shimBinaryFormat = "containerd-shim-%s-%s"
const (
shimBinaryFormat = "containerd-shim-%s-%s"
socketPathLimit = 106
)
func getSysProcAttr() *syscall.SysProcAttr {
return &syscall.SysProcAttr{
@ -63,20 +66,21 @@ func AdjustOOMScore(pid int) error {
return nil
}
// SocketAddress returns an abstract socket address
func SocketAddress(ctx context.Context, id string) (string, error) {
const socketRoot = "/run/containerd"
// SocketAddress returns a socket address
func SocketAddress(ctx context.Context, socketPath, id string) (string, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return "", err
}
d := sha256.Sum256([]byte(filepath.Join(ns, id)))
return filepath.Join(string(filepath.Separator), "containerd-shim", fmt.Sprintf("%x.sock", d)), nil
d := sha256.Sum256([]byte(filepath.Join(socketPath, ns, id)))
return fmt.Sprintf("unix://%s/%x", filepath.Join(socketRoot, "s"), d), nil
}
// AnonDialer returns a dialer for an abstract socket
// AnonDialer returns a dialer for a socket
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return dialer.Dialer("\x00"+address, timeout)
return dialer.Dialer(socket(address).path(), timeout)
}
func AnonReconnectDialer(address string, timeout time.Duration) (net.Conn, error) {
@ -85,12 +89,82 @@ func AnonReconnectDialer(address string, timeout time.Duration) (net.Conn, error
// NewSocket returns a new socket
func NewSocket(address string) (*net.UnixListener, error) {
if len(address) > 106 {
return nil, errors.Errorf("%q: unix socket path too long (> 106)", address)
var (
sock = socket(address)
path = sock.path()
)
if !sock.isAbstract() {
if err := os.MkdirAll(filepath.Dir(path), 0600); err != nil {
return nil, errors.Wrapf(err, "%s", path)
}
}
l, err := net.Listen("unix", "\x00"+address)
l, err := net.Listen("unix", path)
if err != nil {
return nil, errors.Wrapf(err, "failed to listen to abstract unix socket %q", address)
return nil, err
}
if err := os.Chmod(path, 0600); err != nil {
os.Remove(sock.path())
l.Close()
return nil, err
}
return l.(*net.UnixListener), nil
}
const abstractSocketPrefix = "\x00"
type socket string
func (s socket) isAbstract() bool {
return !strings.HasPrefix(string(s), "unix://")
}
func (s socket) path() string {
path := strings.TrimPrefix(string(s), "unix://")
// if there was no trim performed, we assume an abstract socket
if len(path) == len(s) {
path = abstractSocketPrefix + path
}
return path
}
// RemoveSocket removes the socket at the specified address if
// it exists on the filesystem
func RemoveSocket(address string) error {
sock := socket(address)
if !sock.isAbstract() {
return os.Remove(sock.path())
}
return nil
}
// SocketEaddrinuse returns true if the provided error is caused by the
// EADDRINUSE error number
func SocketEaddrinuse(err error) bool {
netErr, ok := err.(*net.OpError)
if !ok {
return false
}
if netErr.Op != "listen" {
return false
}
syscallErr, ok := netErr.Err.(*os.SyscallError)
if !ok {
return false
}
errno, ok := syscallErr.Err.(syscall.Errno)
if !ok {
return false
}
return errno == syscall.EADDRINUSE
}
// CanConnect returns true if the socket provided at the address
// is accepting new connections
func CanConnect(address string) bool {
conn, err := AnonDialer(address, 100*time.Millisecond)
if err != nil {
return false
}
conn.Close()
return true
}

View File

@ -79,3 +79,9 @@ func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
return c, nil
}
}
// RemoveSocket removes the socket at the specified address if
// it exists on the filesystem
func RemoveSocket(address string) error {
return nil
}

View File

@ -23,7 +23,7 @@ var (
Package = "github.com/containerd/containerd"
// Version holds the complete version number. Filled in at linking time.
Version = "1.4.1+unknown"
Version = "1.4.3+unknown"
// Revision is filled with the VCS (e.g. git) revision being used to build
// the program at linking time.

4
vendor/modules.txt vendored
View File

@ -172,7 +172,7 @@ github.com/containerd/cgroups/v2
github.com/containerd/cgroups/v2/stats
# github.com/containerd/console v1.0.0 => github.com/containerd/console v0.0.0-20181022165439-0650fd9eeb50
github.com/containerd/console
# github.com/containerd/containerd v1.4.0 => github.com/rancher/containerd v1.4.1-k3s1
# github.com/containerd/containerd v1.4.0 => github.com/k3s-io/containerd v1.4.3-k3s1
## explicit
github.com/containerd/containerd
github.com/containerd/containerd/api/events
@ -2927,7 +2927,7 @@ vbom.ml/util/sortorder
# github.com/containerd/btrfs => github.com/containerd/btrfs v0.0.0-20181101203652-af5082808c83
# github.com/containerd/cgroups => github.com/containerd/cgroups v0.0.0-20200531161412-0dbf7f05ba59
# github.com/containerd/console => github.com/containerd/console v0.0.0-20181022165439-0650fd9eeb50
# github.com/containerd/containerd => github.com/rancher/containerd v1.4.1-k3s1
# github.com/containerd/containerd => github.com/k3s-io/containerd v1.4.3-k3s1
# github.com/containerd/continuity => github.com/containerd/continuity v0.0.0-20190815185530-f2a389ac0a02
# github.com/containerd/cri => github.com/rancher/cri v1.4.0-k3s.2
# github.com/containerd/fifo => github.com/containerd/fifo v0.0.0-20190816180239-bda0ff6ed73c