From 5a51a8de45cdefe3e80a6faf16fa71a3d4200c6b Mon Sep 17 00:00:00 2001 From: Akihiro Suda Date: Wed, 29 May 2019 15:01:38 +0900 Subject: [PATCH] rootless: use built-in port driver Signed-off-by: Akihiro Suda --- pkg/rootless/rootless.go | 5 +- .../rootlesskit/pkg/port/builtin/builtin.go | 487 ++++++++++++++++++ .../rootlesskit/pkg/port/socat/socat.go | 218 -------- 3 files changed, 490 insertions(+), 220 deletions(-) create mode 100644 vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/builtin.go delete mode 100644 vendor/github.com/rootless-containers/rootlesskit/pkg/port/socat/socat.go diff --git a/pkg/rootless/rootless.go b/pkg/rootless/rootless.go index 398475cba1..7ab1813d9a 100644 --- a/pkg/rootless/rootless.go +++ b/pkg/rootless/rootless.go @@ -13,7 +13,7 @@ import ( "github.com/rootless-containers/rootlesskit/pkg/copyup/tmpfssymlink" "github.com/rootless-containers/rootlesskit/pkg/network/slirp4netns" "github.com/rootless-containers/rootlesskit/pkg/parent" - "github.com/rootless-containers/rootlesskit/pkg/port/socat" + portbuiltin "github.com/rootless-containers/rootlesskit/pkg/port/builtin" "github.com/sirupsen/logrus" ) @@ -103,7 +103,7 @@ func createParentOpt(stateDir string) (*parent.Opt, error) { return nil, err } opt.NetworkDriver = slirp4netns.NewParentDriver(binary, mtu, ipnet, disableHostLoopback, "") - opt.PortDriver, err = socat.NewParentDriver(&logrusDebugWriter{}) + opt.PortDriver, err = portbuiltin.NewParentDriver(&logrusDebugWriter{}, stateDir) if err != nil { return nil, err } @@ -127,6 +127,7 @@ func createChildOpt() (*child.Opt, error) { opt.TargetCmd = os.Args opt.PipeFDEnvKey = pipeFD opt.NetworkDriver = slirp4netns.NewChildDriver() + opt.PortDriver = portbuiltin.NewChildDriver(&logrusDebugWriter{}) opt.CopyUpDirs = []string{"/etc", "/run"} opt.CopyUpDriver = tmpfssymlink.NewChildDriver() return opt, nil diff --git a/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/builtin.go b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/builtin.go new file mode 100644 index 0000000000..a098714c0a --- /dev/null +++ b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/builtin.go @@ -0,0 +1,487 @@ +package builtin + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "net" + "os" + "path/filepath" + "sync" + "syscall" + "time" + + "github.com/pkg/errors" + "golang.org/x/sys/unix" + + "github.com/rootless-containers/rootlesskit/pkg/msgutil" + "github.com/rootless-containers/rootlesskit/pkg/port" + "github.com/rootless-containers/rootlesskit/pkg/port/portutil" +) + +const ( + opaqueKeySocketPath = "builtin.socketpath" + opaqueKeyChildReadyPipePath = "builtin.readypipepath" +) + +// NewParentDriver for builtin driver. +func NewParentDriver(logWriter io.Writer, stateDir string) (port.ParentDriver, error) { + // TODO: consider using socketpair FD instead of socket file + socketPath := filepath.Join(stateDir, ".bp.sock") + childReadyPipePath := filepath.Join(stateDir, ".bp-ready.pipe") + // remove the path just incase the previous rootlesskit instance crashed + if err := os.RemoveAll(childReadyPipePath); err != nil { + return nil, errors.Wrapf(err, "cannot remove %s", childReadyPipePath) + } + if err := syscall.Mkfifo(childReadyPipePath, 0600); err != nil { + return nil, errors.Wrapf(err, "cannot mkfifo %s", childReadyPipePath) + } + d := driver{ + logWriter: logWriter, + socketPath: socketPath, + childReadyPipePath: childReadyPipePath, + ports: make(map[int]*port.Status, 0), + stoppers: make(map[int]func() error, 0), + nextID: 1, + } + return &d, nil +} + +type driver struct { + logWriter io.Writer + socketPath string + childReadyPipePath string + mu sync.Mutex + ports map[int]*port.Status + stoppers map[int]func() error + nextID int +} + +func (d *driver) OpaqueForChild() map[string]string { + return map[string]string{ + opaqueKeySocketPath: d.socketPath, + opaqueKeyChildReadyPipePath: d.childReadyPipePath, + } +} + +func (d *driver) RunParentDriver(initComplete chan struct{}, quit <-chan struct{}, _ *port.ChildContext) error { + childReadyPipeR, err := os.OpenFile(d.childReadyPipePath, os.O_RDONLY, os.ModeNamedPipe) + if err != nil { + return err + } + if _, err = ioutil.ReadAll(childReadyPipeR); err != nil { + return err + } + childReadyPipeR.Close() + var dialer net.Dialer + conn, err := dialer.Dial("unix", d.socketPath) + if err != nil { + return err + } + err = initiate(conn.(*net.UnixConn)) + conn.Close() + if err != nil { + return err + } + initComplete <- struct{}{} + <-quit + return nil +} + +func (d *driver) AddPort(ctx context.Context, spec port.Spec) (*port.Status, error) { + d.mu.Lock() + err := portutil.ValidatePortSpec(spec, d.ports) + d.mu.Unlock() + if err != nil { + return nil, err + } + routineStopCh := make(chan struct{}) + routineStop := func() error { + close(routineStopCh) + return nil // FIXME + } + switch spec.Proto { + case "tcp": + err = startTCPRoutines(d.socketPath, spec, routineStopCh, d.logWriter) + case "udp": + err = startUDPRoutines(d.socketPath, spec, routineStopCh, d.logWriter) + default: + // NOTREACHED + return nil, errors.New("spec was not validated?") + } + if err != nil { + return nil, err + } + d.mu.Lock() + id := d.nextID + st := port.Status{ + ID: id, + Spec: spec, + } + d.ports[id] = &st + d.stoppers[id] = routineStop + d.nextID++ + d.mu.Unlock() + return &st, nil +} + +func (d *driver) ListPorts(ctx context.Context) ([]port.Status, error) { + var ports []port.Status + d.mu.Lock() + for _, p := range d.ports { + ports = append(ports, *p) + } + d.mu.Unlock() + return ports, nil +} + +func (d *driver) RemovePort(ctx context.Context, id int) error { + d.mu.Lock() + defer d.mu.Unlock() + stop, ok := d.stoppers[id] + if !ok { + return errors.Errorf("unknown id: %d", id) + } + err := stop() + delete(d.stoppers, id) + delete(d.ports, id) + return err +} + +func initiate(c *net.UnixConn) error { + req := request{ + Type: requestTypeInit, + } + if _, err := msgutil.MarshalToWriter(c, &req); err != nil { + return err + } + if err := c.CloseWrite(); err != nil { + return err + } + var rep reply + if _, err := msgutil.UnmarshalFromReader(c, &rep); err != nil { + return err + } + return c.CloseRead() +} + +func connectToChild(socketPath string, spec port.Spec) (int, error) { + var dialer net.Dialer + conn, err := dialer.Dial("unix", socketPath) + if err != nil { + return 0, err + } + defer conn.Close() + c := conn.(*net.UnixConn) + req := request{ + Type: requestTypeConnect, + Proto: spec.Proto, + Port: spec.ChildPort, + } + if _, err := msgutil.MarshalToWriter(c, &req); err != nil { + return 0, err + } + if err := c.CloseWrite(); err != nil { + return 0, err + } + oobSpace := unix.CmsgSpace(4) + oob := make([]byte, oobSpace) + _, oobN, _, _, err := c.ReadMsgUnix(nil, oob) + if err != nil { + return 0, err + } + if oobN != oobSpace { + return 0, errors.Errorf("expected OOB space %d, got %d", oobSpace, oobN) + } + oob = oob[:oobN] + fd, err := parseFDFromOOB(oob) + if err != nil { + return 0, err + } + if err := c.CloseRead(); err != nil { + return 0, err + } + return fd, nil +} + +func connectToChildWithRetry(socketPath string, spec port.Spec, retries int) (int, error) { + for i := 0; i < retries; i++ { + fd, err := connectToChild(socketPath, spec) + if i == retries-1 && err != nil { + return 0, err + } + if err == nil { + return fd, err + } + // TODO: backoff + time.Sleep(time.Duration(i*5) * time.Millisecond) + } + // NOT REACHED + return 0, errors.New("reached max retry") +} + +func parseFDFromOOB(oob []byte) (int, error) { + scms, err := unix.ParseSocketControlMessage(oob) + if err != nil { + return 0, err + } + if len(scms) != 1 { + return 0, errors.Errorf("unexpected scms: %v", scms) + } + scm := scms[0] + fds, err := unix.ParseUnixRights(&scm) + if err != nil { + return 0, err + } + if len(fds) != 1 { + return 0, errors.Errorf("unexpected fds: %v", fds) + } + return fds[0], nil +} + +func startTCPRoutines(socketPath string, spec port.Spec, stopCh <-chan struct{}, logWriter io.Writer) error { + ln, err := net.Listen("tcp", fmt.Sprintf("%s:%d", spec.ParentIP, spec.ParentPort)) + if err != nil { + fmt.Fprintf(logWriter, "listen: %v\n", err) + return err + } + newConns := make(chan net.Conn) + go func() { + for { + c, err := ln.Accept() + if err != nil { + fmt.Fprintf(logWriter, "accept: %v\n", err) + close(newConns) + return + } + newConns <- c + } + }() + go func() { + defer ln.Close() + for { + select { + case c, ok := <-newConns: + if !ok { + return + } + go func() { + if err := copyConnToChild(c, socketPath, spec, stopCh); err != nil { + fmt.Fprintf(logWriter, "copyConnToChild: %v\n", err) + return + } + }() + case <-stopCh: + return + } + } + }() + // no wait + return nil +} + +func startUDPRoutines(socketPath string, spec port.Spec, stopCh <-chan struct{}, logWriter io.Writer) error { + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", spec.ParentIP, spec.ParentPort)) + if err != nil { + return err + } + c, err := net.ListenUDP("udp", addr) + if err != nil { + return err + } + go func() { + if err := copyConnToChild(c, socketPath, spec, stopCh); err != nil { + fmt.Fprintf(logWriter, "copyConnToChild: %v\n", err) + return + } + }() + // no wait + return nil +} + +func copyConnToChild(c net.Conn, socketPath string, spec port.Spec, stopCh <-chan struct{}) error { + defer c.Close() + // get fd from the child as an SCM_RIGHTS cmsg + fd, err := connectToChildWithRetry(socketPath, spec, 10) + if err != nil { + return err + } + f := os.NewFile(uintptr(fd), "") + defer f.Close() + fc, err := net.FileConn(f) + if err != nil { + return err + } + defer fc.Close() + bicopy(c, fc, stopCh) + return nil +} + +// bicopy is based on libnetwork/cmd/proxy/tcp_proxy.go . +// NOTE: sendfile(2) cannot be used for sockets +func bicopy(x, y net.Conn, quit <-chan struct{}) { + var wg sync.WaitGroup + var broker = func(to, from net.Conn) { + io.Copy(to, from) + if fromTCP, ok := from.(*net.TCPConn); ok { + fromTCP.CloseRead() + } + if toTCP, ok := to.(*net.TCPConn); ok { + toTCP.CloseWrite() + } + wg.Done() + } + + wg.Add(2) + go broker(x, y) + go broker(y, x) + finish := make(chan struct{}) + go func() { + wg.Wait() + close(finish) + }() + + select { + case <-quit: + case <-finish: + } + x.Close() + y.Close() + <-finish +} + +const ( + requestTypeInit = "init" + requestTypeConnect = "connect" +) + +// request and response are encoded as JSON with uint32le length header. +type request struct { + Type string // "init" or "connect" + Proto string // "tcp" or "udp" + Port int +} + +// may contain FD as OOB +type reply struct { + Error string +} + +func NewChildDriver(logWriter io.Writer) port.ChildDriver { + return &childDriver{ + logWriter: logWriter, + } +} + +type childDriver struct { + logWriter io.Writer +} + +func (d *childDriver) RunChildDriver(opaque map[string]string, quit <-chan struct{}) error { + socketPath := opaque[opaqueKeySocketPath] + if socketPath == "" { + return errors.New("socket path not set") + } + childReadyPipePath := opaque[opaqueKeyChildReadyPipePath] + if childReadyPipePath == "" { + return errors.New("child ready pipe path not set") + } + childReadyPipeW, err := os.OpenFile(childReadyPipePath, os.O_WRONLY, os.ModeNamedPipe) + if err != nil { + return err + } + ln, err := net.ListenUnix("unix", &net.UnixAddr{ + Name: socketPath, + Net: "unix", + }) + if err != nil { + return err + } + // write nothing, just close + if err = childReadyPipeW.Close(); err != nil { + return err + } + stopAccept := make(chan struct{}, 1) + go func() { + <-quit + stopAccept <- struct{}{} + ln.Close() + }() + for { + c, err := ln.AcceptUnix() + if err != nil { + select { + case <-stopAccept: + return nil + default: + } + return err + } + go func() { + if rerr := d.routine(c); rerr != nil { + rep := reply{ + Error: rerr.Error(), + } + msgutil.MarshalToWriter(c, &rep) + } + c.Close() + }() + } + return nil +} + +func (d *childDriver) routine(c *net.UnixConn) error { + var req request + if _, err := msgutil.UnmarshalFromReader(c, &req); err != nil { + return err + } + switch req.Type { + case requestTypeInit: + return d.handleConnectInit(c, &req) + case requestTypeConnect: + return d.handleConnectRequest(c, &req) + default: + return errors.Errorf("unknown request type %q", req.Type) + } +} + +func (d *childDriver) handleConnectInit(c *net.UnixConn, req *request) error { + _, err := msgutil.MarshalToWriter(c, nil) + return err +} + +func (d *childDriver) handleConnectRequest(c *net.UnixConn, req *request) error { + switch req.Proto { + case "tcp": + case "udp": + default: + return errors.Errorf("unknown proto: %q", req.Proto) + } + var dialer net.Dialer + targetConn, err := dialer.Dial(req.Proto, fmt.Sprintf("127.0.0.1:%d", req.Port)) + if err != nil { + return err + } + defer targetConn.Close() // no effect on duplicated FD + targetConnFiler, ok := targetConn.(filer) + if !ok { + return errors.Errorf("unknown target connection: %+v", targetConn) + } + targetConnFile, err := targetConnFiler.File() + if err != nil { + return err + } + oob := unix.UnixRights(int(targetConnFile.Fd())) + f, err := c.File() + if err != nil { + return err + } + err = unix.Sendmsg(int(f.Fd()), []byte("dummy"), oob, nil, 0) + return err +} + +// filer is implemented by *net.TCPConn and *net.UDPConn +type filer interface { + File() (f *os.File, err error) +} diff --git a/vendor/github.com/rootless-containers/rootlesskit/pkg/port/socat/socat.go b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/socat/socat.go deleted file mode 100644 index 9dbb0a5c3a..0000000000 --- a/vendor/github.com/rootless-containers/rootlesskit/pkg/port/socat/socat.go +++ /dev/null @@ -1,218 +0,0 @@ -package socat - -import ( - "context" - "fmt" - "io" - "net" - "os" - "os/exec" - "sync" - "syscall" - "time" - - "github.com/pkg/errors" - - "github.com/rootless-containers/rootlesskit/pkg/port" - "github.com/rootless-containers/rootlesskit/pkg/port/portutil" -) - -func NewParentDriver(logWriter io.Writer) (port.ParentDriver, error) { - if _, err := exec.LookPath("socat"); err != nil { - return nil, err - } - if _, err := exec.LookPath("nsenter"); err != nil { - return nil, err - } - d := driver{ - logWriter: logWriter, - ports: make(map[int]*port.Status, 0), - stoppers: make(map[int]func() error, 0), - nextID: 1, - } - return &d, nil -} - -type driver struct { - logWriter io.Writer - mu sync.Mutex - childPID int - ports map[int]*port.Status - stoppers map[int]func() error - nextID int -} - -func (d *driver) OpaqueForChild() map[string]string { - // NOP, as this driver does not have child-side logic. - return nil -} - -func (d *driver) RunParentDriver(initComplete chan struct{}, quit <-chan struct{}, cctx *port.ChildContext) error { - if cctx == nil || cctx.PID <= 0 { - return errors.New("child PID not set") - } - d.childPID = cctx.PID - initComplete <- struct{}{} - <-quit - return nil -} - -func (d *driver) AddPort(ctx context.Context, spec port.Spec) (*port.Status, error) { - if d.childPID <= 0 { - return nil, errors.New("child PID not set") - } - d.mu.Lock() - err := portutil.ValidatePortSpec(spec, d.ports) - d.mu.Unlock() - if err != nil { - return nil, err - } - cf := func() (*exec.Cmd, error) { - return createSocatCmd(ctx, spec, d.logWriter, d.childPID) - } - routineErrorCh := make(chan error) - routineStopCh := make(chan struct{}) - routineStop := func() error { - close(routineStopCh) - return <-routineErrorCh - } - go portRoutine(cf, routineStopCh, routineErrorCh, d.logWriter) - d.mu.Lock() - id := d.nextID - st := port.Status{ - ID: id, - Spec: spec, - } - d.ports[id] = &st - d.stoppers[id] = routineStop - d.nextID++ - d.mu.Unlock() - return &st, nil -} - -func (d *driver) ListPorts(ctx context.Context) ([]port.Status, error) { - var ports []port.Status - d.mu.Lock() - for _, p := range d.ports { - ports = append(ports, *p) - } - d.mu.Unlock() - return ports, nil -} - -func (d *driver) RemovePort(ctx context.Context, id int) error { - d.mu.Lock() - defer d.mu.Unlock() - stop, ok := d.stoppers[id] - if !ok { - return errors.Errorf("unknown port id: %d", id) - } - err := stop() - delete(d.stoppers, id) - delete(d.ports, id) - return err -} - -func createSocatCmd(ctx context.Context, spec port.Spec, logWriter io.Writer, childPID int) (*exec.Cmd, error) { - if spec.Proto != "tcp" && spec.Proto != "udp" { - return nil, errors.Errorf("unsupported proto: %s", spec.Proto) - } - ipStr := "0.0.0.0" - if spec.ParentIP != "" { - ip := net.ParseIP(spec.ParentIP) - if ip == nil { - return nil, errors.Errorf("unsupported parentIP: %s", spec.ParentIP) - } - ip = ip.To4() - if ip == nil { - return nil, errors.Errorf("unsupported parentIP (v6?): %s", spec.ParentIP) - } - ipStr = ip.String() - } - if spec.ParentPort < 1 || spec.ParentPort > 65535 { - return nil, errors.Errorf("unsupported parentPort: %d", spec.ParentPort) - } - if spec.ChildPort < 1 || spec.ChildPort > 65535 { - return nil, errors.Errorf("unsupported childPort: %d", spec.ChildPort) - } - var cmd *exec.Cmd - switch spec.Proto { - case "tcp": - cmd = exec.CommandContext(ctx, - "socat", - fmt.Sprintf("TCP-LISTEN:%d,bind=%s,reuseaddr,fork,rcvbuf=65536,sndbuf=65536", spec.ParentPort, ipStr), - fmt.Sprintf("EXEC:\"%s\",nofork", - fmt.Sprintf("nsenter -U -n --preserve-credentials -t %d socat STDIN TCP4:127.0.0.1:%d", childPID, spec.ChildPort))) - case "udp": - cmd = exec.CommandContext(ctx, - "socat", - fmt.Sprintf("UDP-LISTEN:%d,bind=%s,reuseaddr,fork,rcvbuf=65536,sndbuf=65536", spec.ParentPort, ipStr), - fmt.Sprintf("EXEC:\"%s\",nofork", - fmt.Sprintf("nsenter -U -n --preserve-credentials -t %d socat STDIN UDP4:127.0.0.1:%d", childPID, spec.ChildPort))) - } - cmd.Env = os.Environ() - cmd.Stdout = logWriter - cmd.Stderr = logWriter - cmd.SysProcAttr = &syscall.SysProcAttr{ - Pdeathsig: syscall.SIGKILL, - } - return cmd, nil -} - -type cmdFactory func() (*exec.Cmd, error) - -func portRoutine(cf cmdFactory, stopCh <-chan struct{}, errWCh chan error, logWriter io.Writer) { - retry := 0 - doneCh := make(chan error) - for { - cmd, err := cf() - if err != nil { - errWCh <- err - return - } - cmdDesc := fmt.Sprintf("%s %v", cmd.Path, cmd.Args) - fmt.Fprintf(logWriter, "[exec] starting cmd %s\n", cmdDesc) - if err := cmd.Start(); err != nil { - errWCh <- err - return - } - pid := cmd.Process.Pid - go func() { - err := cmd.Wait() - doneCh <- err - }() - select { - case err := <-doneCh: - // even if err == nil (unexpected for socat), continue the loop - retry++ - sleepDuration := time.Duration((retry*100)%(30*1000)) * time.Millisecond - fmt.Fprintf(logWriter, "[exec] retrying cmd %s after sleeping %v, count=%d, err=%v\n", - cmdDesc, sleepDuration, retry, err) - select { - case <-time.After(sleepDuration): - case <-stopCh: - errWCh <- err - return - } - case <-stopCh: - fmt.Fprintf(logWriter, "[exec] killing cmd %s pid %d\n", cmdDesc, pid) - syscall.Kill(pid, syscall.SIGKILL) - fmt.Fprintf(logWriter, "[exec] killed cmd %s pid %d\n", cmdDesc, pid) - close(errWCh) - return - } - } -} - -func NewChildDriver() port.ChildDriver { - return &childDriver{} -} - -type childDriver struct { -} - -func (d *childDriver) RunChildDriver(opaque map[string]string, quit <-chan struct{}) error { - // NOP - <-quit - return nil -}