From 124929e373b638721d80c6f29e81c4e16adb07c7 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Fri, 28 Aug 2015 07:19:54 +0000 Subject: [PATCH] refactoring of child process handling code - tasks subpackage responsible for managing system process lifecycle - minion propagates SIGTERM to child procs - child procs will be SIGKILL'd upon parent process death --- contrib/mesos/pkg/minion/server.go | 204 ++++++------ contrib/mesos/pkg/minion/tasks/doc.go | 20 ++ contrib/mesos/pkg/minion/tasks/events.go | 98 ++++++ contrib/mesos/pkg/minion/tasks/task.go | 351 ++++++++++++++++++++ contrib/mesos/pkg/minion/tasks/task_test.go | 222 +++++++++++++ 5 files changed, 790 insertions(+), 105 deletions(-) create mode 100644 contrib/mesos/pkg/minion/tasks/doc.go create mode 100644 contrib/mesos/pkg/minion/tasks/events.go create mode 100644 contrib/mesos/pkg/minion/tasks/task.go create mode 100644 contrib/mesos/pkg/minion/tasks/task_test.go diff --git a/contrib/mesos/pkg/minion/server.go b/contrib/mesos/pkg/minion/server.go index 7a6adff4cc..1ec81f7cb4 100644 --- a/contrib/mesos/pkg/minion/server.go +++ b/contrib/mesos/pkg/minion/server.go @@ -21,15 +21,15 @@ import ( "io" "io/ioutil" "os" - "os/exec" + "os/signal" "path" "strings" - "time" + "syscall" exservice "k8s.io/kubernetes/contrib/mesos/pkg/executor/service" "k8s.io/kubernetes/contrib/mesos/pkg/hyperkube" "k8s.io/kubernetes/contrib/mesos/pkg/minion/config" - "k8s.io/kubernetes/contrib/mesos/pkg/runtime" + "k8s.io/kubernetes/contrib/mesos/pkg/minion/tasks" "k8s.io/kubernetes/pkg/api/resource" client "k8s.io/kubernetes/pkg/client/unversioned" @@ -39,6 +39,11 @@ import ( "gopkg.in/natefinch/lumberjack.v2" ) +const ( + proxyLogFilename = "proxy.log" + executorLogFilename = "executor.log" +) + type MinionServer struct { // embed the executor server to be able to use its flags // TODO(sttts): get rid of this mixing of the minion and the executor server with a multiflags implementation for km @@ -48,8 +53,7 @@ type MinionServer struct { hks hyperkube.Interface clientConfig *client.Config kmBinary string - done chan struct{} // closed when shutting down - exit chan error // to signal fatal errors + tasks []*tasks.Task pathOverride string // the PATH environment for the sub-processes cgroupPrefix string // e.g. mesos @@ -69,15 +73,11 @@ func NewMinionServer() *MinionServer { s := &MinionServer{ KubeletExecutorServer: exservice.NewKubeletExecutorServer(), privateMountNS: false, // disabled until Docker supports customization of the parent mount namespace - done: make(chan struct{}), - exit: make(chan error), - - cgroupPrefix: config.DefaultCgroupPrefix, - logMaxSize: config.DefaultLogMaxSize(), - logMaxBackups: config.DefaultLogMaxBackups, - logMaxAgeInDays: config.DefaultLogMaxAgeInDays, - - runProxy: true, + cgroupPrefix: config.DefaultCgroupPrefix, + logMaxSize: config.DefaultLogMaxSize(), + logMaxBackups: config.DefaultLogMaxBackups, + logMaxAgeInDays: config.DefaultLogMaxAgeInDays, + runProxy: true, } // cache this for later use @@ -141,10 +141,13 @@ func (ms *MinionServer) launchProxyServer() { args = append(args, fmt.Sprintf("--hostname-override=%s", ms.KubeletExecutorServer.HostnameOverride)) } - ms.launchHyperkubeServer(hyperkube.CommandProxy, &args, "proxy.log") + ms.launchHyperkubeServer(hyperkube.CommandProxy, args, proxyLogFilename, nil) } -func (ms *MinionServer) launchExecutorServer() { +// launchExecutorServer returns a chan that closes upon kubelet-executor death. since the kubelet- +// executor doesn't support failover right now, the right thing to do is to fail completely since all +// pods will be lost upon restart and we want mesos to recover the resources from them. +func (ms *MinionServer) launchExecutorServer() <-chan struct{} { allArgs := os.Args[1:] // filter out minion flags, leaving those for the executor @@ -159,111 +162,65 @@ func (ms *MinionServer) launchExecutorServer() { } // run executor and quit minion server when this exits cleanly - err := ms.launchHyperkubeServer(hyperkube.CommandExecutor, &executorArgs, "executor.log") - if err != nil { - // just return, executor will be restarted on error - log.Error(err) - return + execDied := make(chan struct{}) + decorator := func(t *tasks.Task) *tasks.Task { + t.Finished = func(_ bool) bool { + // this func implements the task.finished spec, so when the executor exits + // we return false to indicate that it should not be restarted. we also + // close execDied to signal interested listeners. + close(execDied) + return false + } + // since we only expect to die once, and there is no restart; don't delay any longer than needed + t.RestartDelay = 0 + return t } - - log.Info("Executor exited cleanly, stopping the minion") - ms.exit <- nil + ms.launchHyperkubeServer(hyperkube.CommandExecutor, executorArgs, executorLogFilename, decorator) + return execDied } -func (ms *MinionServer) launchHyperkubeServer(server string, args *[]string, logFileName string) error { +func (ms *MinionServer) launchHyperkubeServer(server string, args []string, logFileName string, decorator func(*tasks.Task) *tasks.Task) { log.V(2).Infof("Spawning hyperkube %v with args '%+v'", server, args) - // prepare parameters - kmArgs := []string{server} - for _, arg := range *args { - kmArgs = append(kmArgs, arg) - } - - // create command - cmd := exec.Command(ms.kmBinary, kmArgs...) - if _, err := cmd.StdoutPipe(); err != nil { - // fatal error => terminate minion - err = fmt.Errorf("error getting stdout of %v: %v", server, err) - ms.exit <- err - return err - } - stderrLogs, err := cmd.StderrPipe() - if err != nil { - // fatal error => terminate minion - err = fmt.Errorf("error getting stderr of %v: %v", server, err) - ms.exit <- err - return err - } - - ch := make(chan struct{}) - go func() { - defer func() { - select { - case <-ch: - log.Infof("killing %v process...", server) - if err = cmd.Process.Kill(); err != nil { - log.Errorf("failed to kill %v process: %v", server, err) - } - default: - } - }() - - maxSize := ms.logMaxSize.Value() - if maxSize > 0 { - // convert to MB - maxSize = maxSize / 1024 / 1024 - if maxSize == 0 { - log.Warning("maximal log file size is rounded to 1 MB") - maxSize = 1 - } + kmArgs := append([]string{server}, args...) + maxSize := ms.logMaxSize.Value() + if maxSize > 0 { + // convert to MB + maxSize = maxSize / 1024 / 1024 + if maxSize == 0 { + log.Warning("maximal log file size is rounded to 1 MB") + maxSize = 1 } - writer := &lumberjack.Logger{ + } + + writerFunc := func() io.WriteCloser { + return &lumberjack.Logger{ Filename: logFileName, MaxSize: int(maxSize), MaxBackups: ms.logMaxBackups, MaxAge: ms.logMaxAgeInDays, } - defer writer.Close() - - log.V(2).Infof("Starting logging for %v: max log file size %d MB, keeping %d backups, for %d days", server, maxSize, ms.logMaxBackups, ms.logMaxAgeInDays) - - <-ch - written, err := io.Copy(writer, stderrLogs) - if err != nil { - log.Errorf("error writing data to %v: %v", logFileName, err) - } - - log.Infof("wrote %d bytes to %v", written, logFileName) - }() + } // use given environment, but add /usr/sbin to the path for the iptables binary used in kube-proxy + var kmEnv []string if ms.pathOverride != "" { env := os.Environ() - cmd.Env = make([]string, 0, len(env)) + kmEnv = make([]string, 0, len(env)) for _, e := range env { if !strings.HasPrefix(e, "PATH=") { - cmd.Env = append(cmd.Env, e) + kmEnv = append(kmEnv, e) } } - cmd.Env = append(cmd.Env, "PATH="+ms.pathOverride) + kmEnv = append(kmEnv, "PATH="+ms.pathOverride) } - // if the server fails to start then we exit the executor, otherwise - // wait for the proxy process to end (and release resources after). - if err := cmd.Start(); err != nil { - // fatal error => terminate minion - err = fmt.Errorf("error starting %v: %v", server, err) - ms.exit <- err - return err + t := tasks.New(server, ms.kmBinary, kmArgs, kmEnv, writerFunc) + if decorator != nil { + t = decorator(t) } - close(ch) - if err := cmd.Wait(); err != nil { - log.Errorf("%v exited with error: %v", server, err) - err = fmt.Errorf("%v exited with error: %v", server, err) - return err - } - - return nil + go t.Start() + ms.tasks = append(ms.tasks, t) } // runs the main kubelet loop, closing the kubeletFinished chan when the loop exits. @@ -295,15 +252,52 @@ func (ms *MinionServer) Run(hks hyperkube.Interface, _ []string) error { cgroupLogger("using cgroup-root %q", ms.cgroupRoot) // run subprocesses until ms.done is closed on return of this function - defer close(ms.done) if ms.runProxy { - go runtime.Until(ms.launchProxyServer, 5*time.Second, ms.done) + ms.launchProxyServer() } - go runtime.Until(ms.launchExecutorServer, 5*time.Second, ms.done) - // wait until minion exit is requested - // don't close ms.exit here to avoid panics of go routines writing an error to it - return <-ms.exit + // abort closes when the kubelet-executor dies + abort := ms.launchExecutorServer() + shouldQuit := termSignalListener(abort) + te := tasks.MergeOutput(ms.tasks, shouldQuit) + + // TODO(jdef) do something fun here, such as reporting task completion to the apiserver + + <-te.Close().Done() // we don't listen for any specific events yet; wait for all tasks to finish + return nil +} + +// termSignalListener returns a signal chan that closes when either (a) the process receives a termination +// signal: SIGTERM, SIGINT, or SIGHUP; or (b) the abort chan closes. +func termSignalListener(abort <-chan struct{}) <-chan struct{} { + shouldQuit := make(chan struct{}) + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh) + + go func() { + defer close(shouldQuit) + for { + select { + case <-abort: + log.Infof("executor died, aborting") + return + case s, ok := <-sigCh: + if !ok { + return + } + switch s { + case os.Interrupt, os.Signal(syscall.SIGTERM), os.Signal(syscall.SIGINT), os.Signal(syscall.SIGHUP): + log.Infof("received signal %q, aborting", s) + return + case os.Signal(syscall.SIGCHLD): // who cares? + default: + log.Errorf("unexpected signal: %T %#v", s, s) + } + + } + } + }() + return shouldQuit } func (ms *MinionServer) AddExecutorFlags(fs *pflag.FlagSet) { diff --git a/contrib/mesos/pkg/minion/tasks/doc.go b/contrib/mesos/pkg/minion/tasks/doc.go new file mode 100644 index 0000000000..51ad8ac5ed --- /dev/null +++ b/contrib/mesos/pkg/minion/tasks/doc.go @@ -0,0 +1,20 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package tasks provides an API for supervising system processes as Task's. +// It provides stronger guarantees with respect to process lifecycle than a +// standalone kubelet running static pods. +package tasks diff --git a/contrib/mesos/pkg/minion/tasks/events.go b/contrib/mesos/pkg/minion/tasks/events.go new file mode 100644 index 0000000000..aff85af01f --- /dev/null +++ b/contrib/mesos/pkg/minion/tasks/events.go @@ -0,0 +1,98 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tasks + +type Events interface { + // Close stops delivery of events in the completion and errors channels; callers must close this when they intend to no longer read from completion() or errors() + Close() Events + + // Completion reports Completion events as they happen + Completion() <-chan *Completion + + // Done returns a signal chan that closes when all tasks have completed and there are no more events to deliver + Done() <-chan struct{} +} + +type eventsImpl struct { + tc chan *Completion + stopForwarding chan struct{} + done <-chan struct{} +} + +func newEventsImpl(tcin <-chan *Completion, done <-chan struct{}) *eventsImpl { + ei := &eventsImpl{ + tc: make(chan *Completion), + stopForwarding: make(chan struct{}), + done: done, + } + go func() { + defer close(ei.tc) + forwardCompletionUntil(tcin, ei.tc, ei.stopForwarding, done, nil) + }() + return ei +} + +func (e *eventsImpl) Close() Events { close(e.stopForwarding); return e } +func (e *eventsImpl) Completion() <-chan *Completion { return e.tc } +func (e *eventsImpl) Done() <-chan struct{} { return e.done } + +// forwardCompletionUntil is a generic pipe that forwards objects between channels. +// if discard is closed, objects are silently dropped. +// if tap != nil then it's invoked for each object as it's read from tin, but before it's written to tch. +// returns when either reading from tin completes (no more objects, and is closed), or else +// abort is closed, which ever happens first. +func forwardCompletionUntil(tin <-chan *Completion, tch chan<- *Completion, discard <-chan struct{}, abort <-chan struct{}, tap func(*Completion, bool)) { + var tc *Completion + var ok bool +forwardLoop: + for { + select { + case tc, ok = <-tin: + if !ok { + return + } + if tap != nil { + tap(tc, false) + } + select { + case <-abort: + break forwardLoop + case <-discard: + case tch <- tc: + } + case <-abort: + // best effort + select { + case tc, ok = <-tin: + if ok { + if tap != nil { + tap(tc, true) + } + break forwardLoop + } + default: + } + return + } + } + // best effort + select { + case tch <- tc: + case <-discard: + default: + } +} diff --git a/contrib/mesos/pkg/minion/tasks/task.go b/contrib/mesos/pkg/minion/tasks/task.go new file mode 100644 index 0000000000..51dfcaf07f --- /dev/null +++ b/contrib/mesos/pkg/minion/tasks/task.go @@ -0,0 +1,351 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tasks + +import ( + "fmt" + "io" + "os/exec" + "sync" + "sync/atomic" + "syscall" + "time" + + log "github.com/golang/glog" + "k8s.io/kubernetes/contrib/mesos/pkg/runtime" +) + +const defaultTaskRestartDelay = 5 * time.Second + +// Completion represents the termination of a Task process. Each process execution should +// yield (barring drops because of an abort signal) exactly one Completion. +type Completion struct { + name string // name of the task + code int // exit code that the task process completed with + err error // process management errors are reported here +} + +// systemProcess is a useful abstraction for testing +type systemProcess interface { + // Wait works like exec.Cmd.Wait() + Wait() error + + // Kill returns the pid of the process that was killed + Kill() (int, error) +} + +type cmdProcess struct { + delegate *exec.Cmd +} + +func (cp *cmdProcess) Wait() error { + return cp.delegate.Wait() +} + +func (cp *cmdProcess) Kill() (int, error) { + // kill the entire process group, not just the one process + pid := cp.delegate.Process.Pid + processGroup := 0 - pid + + // we send a SIGTERM here for a graceful stop. users of this package should + // wait for tasks to complete normally. as a fallback/safeguard, child procs + // are spawned in notStartedTask to receive a SIGKILL when this process dies. + rc := syscall.Kill(processGroup, syscall.SIGTERM) + return pid, rc +} + +// task is a specification for running a system process; it provides hooks for customizing +// logging and restart handling as well as provides event channels for communicating process +// termination and errors related to process management. +type Task struct { + Finished func(restarting bool) bool // callback invoked when a task process has completed; if `restarting` then it will be restarted if it returns true + RestartDelay time.Duration // interval between repeated task restarts + + name string // required: unique name for this task + bin string // required: path to executable + args []string // optional: process arguments + env []string // optional: process environment override + createLogger func() io.WriteCloser // factory func that builds a log writer + cmd systemProcess // process that we started + completedCh chan *Completion // reports exit codes encountered when task processes exit, or errors during process management + shouldQuit chan struct{} // shouldQuit is closed to indicate that the task should stop its running process, if any + done chan struct{} // done closes when all processes related to the task have terminated + initialState taskStateFn // prepare and start a new live process, defaults to notStartedTask; should be set by run() + runLatch int32 // guard against multiple Task.run calls +} + +// New builds a newly initialized task object but does not start any processes for it. callers +// are expected to invoke task.run(...) on their own. +func New(name, bin string, args, env []string, cl func() io.WriteCloser) *Task { + return &Task{ + name: name, + bin: bin, + args: args, + env: env, + createLogger: cl, + completedCh: make(chan *Completion), + shouldQuit: make(chan struct{}), + done: make(chan struct{}), + RestartDelay: defaultTaskRestartDelay, + Finished: func(restarting bool) bool { return restarting }, + } +} + +// Start spawns a goroutine to execute the Task. Panics if invoked more than once. +func (t *Task) Start() { + go t.run(notStartedTask) +} + +// run executes the state machine responsible for starting, monitoring, and possibly restarting +// a system process for the task. The initialState func is the entry point of the state machine. +// Upon returning the done and completedCh chans are all closed. +func (t *Task) run(initialState taskStateFn) { + if !atomic.CompareAndSwapInt32(&t.runLatch, 0, 1) { + panic("Task.run() may only be invoked once") + } + t.initialState = initialState + + defer close(t.done) + defer close(t.completedCh) + + state := initialState + for state != nil { + next := state(t) + state = next + } +} + +func (t *Task) tryComplete(tc *Completion) { + select { + case <-t.shouldQuit: + // best effort + select { + case t.completedCh <- tc: + default: + } + case t.completedCh <- tc: + } +} + +// tryError is a convenience func that invokes tryComplete with a completion error +func (t *Task) tryError(err error) { + t.tryComplete(&Completion{err: err}) +} + +type taskStateFn func(*Task) taskStateFn + +func taskShouldRestart(t *Task) taskStateFn { + // make our best effort to stop here if signalled (shouldQuit). not doing so here + // could add cost later (a process might be launched). + + // sleep for a bit; then return t.initialState + tm := time.NewTimer(t.RestartDelay) + defer tm.Stop() + select { + case <-tm.C: + select { + case <-t.shouldQuit: + default: + if t.Finished(true) { + select { + case <-t.shouldQuit: + // the world has changed, die + return nil + default: + } + return t.initialState + } + // finish call decided not to respawn, so die + return nil + } + case <-t.shouldQuit: + } + + // we're quitting, tell the Finished callback and then die + t.Finished(false) + return nil +} + +func (t *Task) initLogging(r io.Reader) { + writer := t.createLogger() + go func() { + defer writer.Close() + _, err := io.Copy(writer, r) + if err != nil && err != io.EOF { + // using tryComplete is racy because the state machine closes completedCh and + // so we don't want to attempt to write to a closed/closing chan. so + // just log this for now. + log.Errorf("logger for task %q crashed: %v", t.bin, err) + } + }() +} + +// notStartedTask spawns the given task and transitions to a startedTask state +func notStartedTask(t *Task) taskStateFn { + log.Infof("starting task process %q with args '%+v'", t.bin, t.args) + + // create command + cmd := exec.Command(t.bin, t.args...) + if _, err := cmd.StdoutPipe(); err != nil { + t.tryError(fmt.Errorf("error getting stdout of %v: %v", t.name, err)) + return taskShouldRestart + } + stderrLogs, err := cmd.StderrPipe() + if err != nil { + t.tryError(fmt.Errorf("error getting stderr of %v: %v", t.name, err)) + return taskShouldRestart + } + + t.initLogging(stderrLogs) + if len(t.env) > 0 { + cmd.Env = t.env + } + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + Pdeathsig: syscall.SIGKILL, // see cmdProcess.Kill + } + + // last min check for shouldQuit here + select { + case <-t.shouldQuit: + t.tryError(fmt.Errorf("task execution canceled, aborting process launch")) + return taskShouldRestart + default: + } + + if err := cmd.Start(); err != nil { + t.tryError(fmt.Errorf("failed to start task process %q: %v", t.bin, err)) + return taskShouldRestart + } + log.Infoln("task started", t.name) + t.cmd = &cmdProcess{delegate: cmd} + return taskRunning +} + +type exitError interface { + error + + // see os.ProcessState.Sys: returned value can be converted to something like syscall.WaitStatus + Sys() interface{} +} + +func taskRunning(t *Task) taskStateFn { + waiter := t.cmd.Wait + var sendOnce sync.Once + trySend := func(wr *Completion) { + // guarded with once because we're only allowed to send a single "result" for each + // process termination. for example, if Kill() results in an error because Wait() + // already completed we only want to return a single result for the process. + sendOnce.Do(func() { + t.tryComplete(wr) + }) + } + // listen for normal process completion in a goroutine; don't block because we need to listen for shouldQuit + waitCh := make(chan *Completion, 1) + go func() { + wr := &Completion{} + defer func() { + waitCh <- wr + close(waitCh) + }() + + if err := waiter(); err != nil { + if exitError, ok := err.(exitError); ok { + if waitStatus, ok := exitError.Sys().(syscall.WaitStatus); ok { + wr.name = t.name + wr.code = waitStatus.ExitStatus() + return + } + } + wr.err = fmt.Errorf("task wait ended strangely for %q: %v", t.bin, err) + } else { + wr.name = t.name + } + }() + + // wait for the process to complete, or else for a "quit" signal on the task (at which point we'll attempt to kill manually) + select { + case <-t.shouldQuit: + // check for tie + select { + case wr := <-waitCh: + // we got a signal to quit, but we're already finished; attempt best effort delvery + trySend(wr) + default: + // Wait() has not exited yet, kill the process + log.Infof("killing %s : %s", t.name, t.bin) + pid, err := t.cmd.Kill() + if err != nil { + trySend(&Completion{err: fmt.Errorf("failed to kill process: %q pid %d: %v", t.bin, pid, err)}) + } + // else, Wait() should complete and send a completion event + } + case wr := <-waitCh: + // task has completed before we were told to quit, pass along completion and error information + trySend(wr) + } + return taskShouldRestart +} + +// forwardUntil forwards task process completion status and errors to the given output +// chans until either the task terminates or abort is closed. +func (t *Task) forwardUntil(tch chan<- *Completion, abort <-chan struct{}) { + // merge task completion and error until we're told to die, then + // tell the task to stop + defer close(t.shouldQuit) + forwardCompletionUntil(t.completedCh, tch, nil, abort, nil) +} + +// MergeOutput waits for the given tasks to complete. meanwhile it logs each time a task +// process completes or generates an error. when shouldQuit closes, tasks are canceled and this +// func eventually returns once all ongoing event handlers have completed running. +func MergeOutput(tasks []*Task, shouldQuit <-chan struct{}) Events { + tc := make(chan *Completion) + + var waitForTasks sync.WaitGroup + waitForTasks.Add(len(tasks)) + + for _, t := range tasks { + t := t + // translate task dead signal into Done + go func() { + <-t.done + waitForTasks.Done() + }() + // fan-in task completion and error events to tc, ec + go t.forwardUntil(tc, shouldQuit) + } + + tclistener := make(chan *Completion) + done := runtime.After(func() { + completionFinished := runtime.After(func() { + defer close(tclistener) + forwardCompletionUntil(tc, tclistener, nil, shouldQuit, func(tt *Completion, shutdown bool) { + prefix := "" + if shutdown { + prefix = "(shutdown) " + } + log.Infof(prefix+"task %q exited with status %d", tt.name, tt.code) + }) + }) + waitForTasks.Wait() + close(tc) + <-completionFinished + }) + ei := newEventsImpl(tclistener, done) + return ei +} diff --git a/contrib/mesos/pkg/minion/tasks/task_test.go b/contrib/mesos/pkg/minion/tasks/task_test.go new file mode 100644 index 0000000000..255e63489b --- /dev/null +++ b/contrib/mesos/pkg/minion/tasks/task_test.go @@ -0,0 +1,222 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tasks + +import ( + "bytes" + "errors" + "fmt" + "io" + "sync" + "syscall" + "testing" + + log "github.com/golang/glog" +) + +type badWriteCloser struct { + err error +} + +func (b *badWriteCloser) Write(_ []byte) (int, error) { return 0, b.err } +func (b *badWriteCloser) Close() error { return b.err } + +type discardCloser int + +func (d discardCloser) Write(b []byte) (int, error) { return len(b), nil } +func (d discardCloser) Close() error { return nil } + +var devNull = func() io.WriteCloser { return discardCloser(0) } + +type fakeExitError uint32 + +func (f fakeExitError) Sys() interface{} { return syscall.WaitStatus(f << 8) } +func (f fakeExitError) Error() string { return fmt.Sprintf("fake-exit-error: %d", f) } + +type fakeProcess struct { + done chan struct{} + pid int + err error +} + +func (f *fakeProcess) Wait() error { + <-f.done + return f.err +} +func (f *fakeProcess) Kill() (int, error) { + close(f.done) + return f.pid, f.err +} +func (f *fakeProcess) exit(code int) { + f.err = fakeExitError(code) + close(f.done) +} + +func newFakeProcess() *fakeProcess { + return &fakeProcess{ + done: make(chan struct{}), + } +} + +func TestBadLogger(t *testing.T) { + err := errors.New("qux") + fp := newFakeProcess() + tt := New("foo", "bar", nil, nil, func() io.WriteCloser { + defer func() { + fp.pid = 123 // sanity check + fp.Kill() // this causes Wait() to return + }() + return &badWriteCloser{err} + }) + + tt.RestartDelay = 0 // don't slow the test down for no good reason + + finishCalled := make(chan struct{}) + tt.Finished = func(ok bool) bool { + log.Infof("tt.Finished: ok %t", ok) + if ok { + close(finishCalled) + } + return false // never respawn, this causes t.done to close + } + + // abuse eventsImpl: we're not going to listen on the task completion or event chans, + // and we don't want to block the state machine, so discard all task events as they happen + ei := newEventsImpl(tt.completedCh, tt.done) + ei.Close() + + go tt.run(func(_ *Task) taskStateFn { + log.Infof("tt initialized") + tt.initLogging(bytes.NewBuffer(([]byte)("unlogged bytes"))) + tt.cmd = fp + return taskRunning + }) + + // if the logger fails the task will be killed + // badWriteLogger generates an error immediately and results in a task kill + <-finishCalled + <-tt.done + + // this should never data race since the state machine is dead at this point + if fp.pid != 123 { + t.Fatalf("incorrect pid, expected 123 not %d", fp.pid) + } + + // TODO(jdef) would be nice to check for a specific error that indicates the logger died +} + +func TestMergeOutput(t *testing.T) { + var tasksStarted, tasksDone sync.WaitGroup + tasksDone.Add(2) + tasksStarted.Add(2) + + t1 := New("foo", "", nil, nil, devNull) + t1exited := make(chan struct{}) + t1.RestartDelay = 0 // don't slow the test down for no good reason + t1.Finished = func(ok bool) bool { + // we expect each of these cases to happen exactly once + if !ok { + tasksDone.Done() + } else { + close(t1exited) + } + return ok + } + go t1.run(func(t *Task) taskStateFn { + defer tasksStarted.Done() + t.initLogging(bytes.NewBuffer([]byte{})) + t.cmd = newFakeProcess() + return taskRunning + }) + + t2 := New("bar", "", nil, nil, devNull) + t2exited := make(chan struct{}) + t2.RestartDelay = 0 // don't slow the test down for no good reason + t2.Finished = func(ok bool) bool { + // we expect each of these cases to happen exactly once + if !ok { + tasksDone.Done() + } else { + close(t2exited) + } + return ok + } + go t2.run(func(t *Task) taskStateFn { + defer tasksStarted.Done() + t.initLogging(bytes.NewBuffer([]byte{})) + t.cmd = newFakeProcess() + return taskRunning + }) + + shouldQuit := make(chan struct{}) + te := MergeOutput([]*Task{t1, t2}, shouldQuit) + + tasksStarted.Wait() + tasksStarted.Add(2) // recycle the barrier + + // kill each task once, let it restart; make sure that we get the completion status? + t1.cmd.(*fakeProcess).exit(1) + t2.cmd.(*fakeProcess).exit(2) + + codes := map[int]struct{}{} + for i := 0; i < 2; i++ { + switch tc := <-te.Completion(); tc.code { + case 1, 2: + codes[tc.code] = struct{}{} + default: + if tc.err != nil { + t.Errorf("unexpected task completion error: %v", tc.err) + } else { + t.Errorf("unexpected task completion code: %d", tc.code) + } + } + } + + te.Close() // we're not going to read any other completion or error events + + if len(codes) != 2 { + t.Fatalf("expected each task process to exit once") + } + + // each task invokes Finished() once + <-t1exited + <-t2exited + + log.Infoln("each task process has completed one round") + tasksStarted.Wait() // tasks will auto-restart their exited procs + + // assert that the tasks are not dead; TODO(jdef) not sure that these checks are useful + select { + case <-t1.done: + t.Fatalf("t1 is unexpectedly dead") + default: + } + select { + case <-t2.done: + t.Fatalf("t2 is unexpectedly dead") + default: + } + + log.Infoln("firing quit signal") + close(shouldQuit) // fire shouldQuit, and everything should terminate gracefully + + log.Infoln("waiting for tasks to die") + tasksDone.Wait() // our tasks should die + + log.Infoln("waiting for merge to complete") + <-te.Done() // wait for the merge to complete +}