Merge pull request #13295 from mesosphere/minion-signals

MESOS: refactor process management in the minion controller
pull/6/head
Abhi Shah 2015-09-03 16:37:46 -07:00
commit c28b68d254
5 changed files with 790 additions and 105 deletions

View File

@ -21,15 +21,15 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"os/exec" "os/signal"
"path" "path"
"strings" "strings"
"time" "syscall"
exservice "k8s.io/kubernetes/contrib/mesos/pkg/executor/service" exservice "k8s.io/kubernetes/contrib/mesos/pkg/executor/service"
"k8s.io/kubernetes/contrib/mesos/pkg/hyperkube" "k8s.io/kubernetes/contrib/mesos/pkg/hyperkube"
"k8s.io/kubernetes/contrib/mesos/pkg/minion/config" "k8s.io/kubernetes/contrib/mesos/pkg/minion/config"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/minion/tasks"
"k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/resource"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
@ -39,6 +39,11 @@ import (
"gopkg.in/natefinch/lumberjack.v2" "gopkg.in/natefinch/lumberjack.v2"
) )
const (
proxyLogFilename = "proxy.log"
executorLogFilename = "executor.log"
)
type MinionServer struct { type MinionServer struct {
// embed the executor server to be able to use its flags // embed the executor server to be able to use its flags
// TODO(sttts): get rid of this mixing of the minion and the executor server with a multiflags implementation for km // TODO(sttts): get rid of this mixing of the minion and the executor server with a multiflags implementation for km
@ -48,8 +53,7 @@ type MinionServer struct {
hks hyperkube.Interface hks hyperkube.Interface
clientConfig *client.Config clientConfig *client.Config
kmBinary string kmBinary string
done chan struct{} // closed when shutting down tasks []*tasks.Task
exit chan error // to signal fatal errors
pathOverride string // the PATH environment for the sub-processes pathOverride string // the PATH environment for the sub-processes
cgroupPrefix string // e.g. mesos cgroupPrefix string // e.g. mesos
@ -69,14 +73,10 @@ func NewMinionServer() *MinionServer {
s := &MinionServer{ s := &MinionServer{
KubeletExecutorServer: exservice.NewKubeletExecutorServer(), KubeletExecutorServer: exservice.NewKubeletExecutorServer(),
privateMountNS: false, // disabled until Docker supports customization of the parent mount namespace privateMountNS: false, // disabled until Docker supports customization of the parent mount namespace
done: make(chan struct{}),
exit: make(chan error),
cgroupPrefix: config.DefaultCgroupPrefix, cgroupPrefix: config.DefaultCgroupPrefix,
logMaxSize: config.DefaultLogMaxSize(), logMaxSize: config.DefaultLogMaxSize(),
logMaxBackups: config.DefaultLogMaxBackups, logMaxBackups: config.DefaultLogMaxBackups,
logMaxAgeInDays: config.DefaultLogMaxAgeInDays, logMaxAgeInDays: config.DefaultLogMaxAgeInDays,
runProxy: true, runProxy: true,
} }
@ -141,10 +141,13 @@ func (ms *MinionServer) launchProxyServer() {
args = append(args, fmt.Sprintf("--hostname-override=%s", ms.KubeletExecutorServer.HostnameOverride)) args = append(args, fmt.Sprintf("--hostname-override=%s", ms.KubeletExecutorServer.HostnameOverride))
} }
ms.launchHyperkubeServer(hyperkube.CommandProxy, &args, "proxy.log") ms.launchHyperkubeServer(hyperkube.CommandProxy, args, proxyLogFilename, nil)
} }
func (ms *MinionServer) launchExecutorServer() { // launchExecutorServer returns a chan that closes upon kubelet-executor death. since the kubelet-
// executor doesn't support failover right now, the right thing to do is to fail completely since all
// pods will be lost upon restart and we want mesos to recover the resources from them.
func (ms *MinionServer) launchExecutorServer() <-chan struct{} {
allArgs := os.Args[1:] allArgs := os.Args[1:]
// filter out minion flags, leaving those for the executor // filter out minion flags, leaving those for the executor
@ -159,55 +162,27 @@ func (ms *MinionServer) launchExecutorServer() {
} }
// run executor and quit minion server when this exits cleanly // run executor and quit minion server when this exits cleanly
err := ms.launchHyperkubeServer(hyperkube.CommandExecutor, &executorArgs, "executor.log") execDied := make(chan struct{})
if err != nil { decorator := func(t *tasks.Task) *tasks.Task {
// just return, executor will be restarted on error t.Finished = func(_ bool) bool {
log.Error(err) // this func implements the task.finished spec, so when the executor exits
return // we return false to indicate that it should not be restarted. we also
// close execDied to signal interested listeners.
close(execDied)
return false
} }
// since we only expect to die once, and there is no restart; don't delay any longer than needed
log.Info("Executor exited cleanly, stopping the minion") t.RestartDelay = 0
ms.exit <- nil return t
}
ms.launchHyperkubeServer(hyperkube.CommandExecutor, executorArgs, executorLogFilename, decorator)
return execDied
} }
func (ms *MinionServer) launchHyperkubeServer(server string, args *[]string, logFileName string) error { func (ms *MinionServer) launchHyperkubeServer(server string, args []string, logFileName string, decorator func(*tasks.Task) *tasks.Task) {
log.V(2).Infof("Spawning hyperkube %v with args '%+v'", server, args) log.V(2).Infof("Spawning hyperkube %v with args '%+v'", server, args)
// prepare parameters kmArgs := append([]string{server}, args...)
kmArgs := []string{server}
for _, arg := range *args {
kmArgs = append(kmArgs, arg)
}
// create command
cmd := exec.Command(ms.kmBinary, kmArgs...)
if _, err := cmd.StdoutPipe(); err != nil {
// fatal error => terminate minion
err = fmt.Errorf("error getting stdout of %v: %v", server, err)
ms.exit <- err
return err
}
stderrLogs, err := cmd.StderrPipe()
if err != nil {
// fatal error => terminate minion
err = fmt.Errorf("error getting stderr of %v: %v", server, err)
ms.exit <- err
return err
}
ch := make(chan struct{})
go func() {
defer func() {
select {
case <-ch:
log.Infof("killing %v process...", server)
if err = cmd.Process.Kill(); err != nil {
log.Errorf("failed to kill %v process: %v", server, err)
}
default:
}
}()
maxSize := ms.logMaxSize.Value() maxSize := ms.logMaxSize.Value()
if maxSize > 0 { if maxSize > 0 {
// convert to MB // convert to MB
@ -217,53 +192,35 @@ func (ms *MinionServer) launchHyperkubeServer(server string, args *[]string, log
maxSize = 1 maxSize = 1
} }
} }
writer := &lumberjack.Logger{
writerFunc := func() io.WriteCloser {
return &lumberjack.Logger{
Filename: logFileName, Filename: logFileName,
MaxSize: int(maxSize), MaxSize: int(maxSize),
MaxBackups: ms.logMaxBackups, MaxBackups: ms.logMaxBackups,
MaxAge: ms.logMaxAgeInDays, MaxAge: ms.logMaxAgeInDays,
} }
defer writer.Close()
log.V(2).Infof("Starting logging for %v: max log file size %d MB, keeping %d backups, for %d days", server, maxSize, ms.logMaxBackups, ms.logMaxAgeInDays)
<-ch
written, err := io.Copy(writer, stderrLogs)
if err != nil {
log.Errorf("error writing data to %v: %v", logFileName, err)
} }
log.Infof("wrote %d bytes to %v", written, logFileName)
}()
// use given environment, but add /usr/sbin to the path for the iptables binary used in kube-proxy // use given environment, but add /usr/sbin to the path for the iptables binary used in kube-proxy
var kmEnv []string
if ms.pathOverride != "" { if ms.pathOverride != "" {
env := os.Environ() env := os.Environ()
cmd.Env = make([]string, 0, len(env)) kmEnv = make([]string, 0, len(env))
for _, e := range env { for _, e := range env {
if !strings.HasPrefix(e, "PATH=") { if !strings.HasPrefix(e, "PATH=") {
cmd.Env = append(cmd.Env, e) kmEnv = append(kmEnv, e)
} }
} }
cmd.Env = append(cmd.Env, "PATH="+ms.pathOverride) kmEnv = append(kmEnv, "PATH="+ms.pathOverride)
} }
// if the server fails to start then we exit the executor, otherwise t := tasks.New(server, ms.kmBinary, kmArgs, kmEnv, writerFunc)
// wait for the proxy process to end (and release resources after). if decorator != nil {
if err := cmd.Start(); err != nil { t = decorator(t)
// fatal error => terminate minion
err = fmt.Errorf("error starting %v: %v", server, err)
ms.exit <- err
return err
} }
close(ch) go t.Start()
if err := cmd.Wait(); err != nil { ms.tasks = append(ms.tasks, t)
log.Errorf("%v exited with error: %v", server, err)
err = fmt.Errorf("%v exited with error: %v", server, err)
return err
}
return nil
} }
// runs the main kubelet loop, closing the kubeletFinished chan when the loop exits. // runs the main kubelet loop, closing the kubeletFinished chan when the loop exits.
@ -295,15 +252,52 @@ func (ms *MinionServer) Run(hks hyperkube.Interface, _ []string) error {
cgroupLogger("using cgroup-root %q", ms.cgroupRoot) cgroupLogger("using cgroup-root %q", ms.cgroupRoot)
// run subprocesses until ms.done is closed on return of this function // run subprocesses until ms.done is closed on return of this function
defer close(ms.done)
if ms.runProxy { if ms.runProxy {
go runtime.Until(ms.launchProxyServer, 5*time.Second, ms.done) ms.launchProxyServer()
} }
go runtime.Until(ms.launchExecutorServer, 5*time.Second, ms.done)
// wait until minion exit is requested // abort closes when the kubelet-executor dies
// don't close ms.exit here to avoid panics of go routines writing an error to it abort := ms.launchExecutorServer()
return <-ms.exit shouldQuit := termSignalListener(abort)
te := tasks.MergeOutput(ms.tasks, shouldQuit)
// TODO(jdef) do something fun here, such as reporting task completion to the apiserver
<-te.Close().Done() // we don't listen for any specific events yet; wait for all tasks to finish
return nil
}
// termSignalListener returns a signal chan that closes when either (a) the process receives a termination
// signal: SIGTERM, SIGINT, or SIGHUP; or (b) the abort chan closes.
func termSignalListener(abort <-chan struct{}) <-chan struct{} {
shouldQuit := make(chan struct{})
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh)
go func() {
defer close(shouldQuit)
for {
select {
case <-abort:
log.Infof("executor died, aborting")
return
case s, ok := <-sigCh:
if !ok {
return
}
switch s {
case os.Interrupt, os.Signal(syscall.SIGTERM), os.Signal(syscall.SIGINT), os.Signal(syscall.SIGHUP):
log.Infof("received signal %q, aborting", s)
return
case os.Signal(syscall.SIGCHLD): // who cares?
default:
log.Errorf("unexpected signal: %T %#v", s, s)
}
}
}
}()
return shouldQuit
} }
func (ms *MinionServer) AddExecutorFlags(fs *pflag.FlagSet) { func (ms *MinionServer) AddExecutorFlags(fs *pflag.FlagSet) {

View File

@ -0,0 +1,20 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package tasks provides an API for supervising system processes as Task's.
// It provides stronger guarantees with respect to process lifecycle than a
// standalone kubelet running static pods.
package tasks

View File

@ -0,0 +1,98 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tasks
type Events interface {
// Close stops delivery of events in the completion and errors channels; callers must close this when they intend to no longer read from completion() or errors()
Close() Events
// Completion reports Completion events as they happen
Completion() <-chan *Completion
// Done returns a signal chan that closes when all tasks have completed and there are no more events to deliver
Done() <-chan struct{}
}
type eventsImpl struct {
tc chan *Completion
stopForwarding chan struct{}
done <-chan struct{}
}
func newEventsImpl(tcin <-chan *Completion, done <-chan struct{}) *eventsImpl {
ei := &eventsImpl{
tc: make(chan *Completion),
stopForwarding: make(chan struct{}),
done: done,
}
go func() {
defer close(ei.tc)
forwardCompletionUntil(tcin, ei.tc, ei.stopForwarding, done, nil)
}()
return ei
}
func (e *eventsImpl) Close() Events { close(e.stopForwarding); return e }
func (e *eventsImpl) Completion() <-chan *Completion { return e.tc }
func (e *eventsImpl) Done() <-chan struct{} { return e.done }
// forwardCompletionUntil is a generic pipe that forwards objects between channels.
// if discard is closed, objects are silently dropped.
// if tap != nil then it's invoked for each object as it's read from tin, but before it's written to tch.
// returns when either reading from tin completes (no more objects, and is closed), or else
// abort is closed, which ever happens first.
func forwardCompletionUntil(tin <-chan *Completion, tch chan<- *Completion, discard <-chan struct{}, abort <-chan struct{}, tap func(*Completion, bool)) {
var tc *Completion
var ok bool
forwardLoop:
for {
select {
case tc, ok = <-tin:
if !ok {
return
}
if tap != nil {
tap(tc, false)
}
select {
case <-abort:
break forwardLoop
case <-discard:
case tch <- tc:
}
case <-abort:
// best effort
select {
case tc, ok = <-tin:
if ok {
if tap != nil {
tap(tc, true)
}
break forwardLoop
}
default:
}
return
}
}
// best effort
select {
case tch <- tc:
case <-discard:
default:
}
}

View File

@ -0,0 +1,351 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tasks
import (
"fmt"
"io"
"os/exec"
"sync"
"sync/atomic"
"syscall"
"time"
log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
)
const defaultTaskRestartDelay = 5 * time.Second
// Completion represents the termination of a Task process. Each process execution should
// yield (barring drops because of an abort signal) exactly one Completion.
type Completion struct {
name string // name of the task
code int // exit code that the task process completed with
err error // process management errors are reported here
}
// systemProcess is a useful abstraction for testing
type systemProcess interface {
// Wait works like exec.Cmd.Wait()
Wait() error
// Kill returns the pid of the process that was killed
Kill() (int, error)
}
type cmdProcess struct {
delegate *exec.Cmd
}
func (cp *cmdProcess) Wait() error {
return cp.delegate.Wait()
}
func (cp *cmdProcess) Kill() (int, error) {
// kill the entire process group, not just the one process
pid := cp.delegate.Process.Pid
processGroup := 0 - pid
// we send a SIGTERM here for a graceful stop. users of this package should
// wait for tasks to complete normally. as a fallback/safeguard, child procs
// are spawned in notStartedTask to receive a SIGKILL when this process dies.
rc := syscall.Kill(processGroup, syscall.SIGTERM)
return pid, rc
}
// task is a specification for running a system process; it provides hooks for customizing
// logging and restart handling as well as provides event channels for communicating process
// termination and errors related to process management.
type Task struct {
Finished func(restarting bool) bool // callback invoked when a task process has completed; if `restarting` then it will be restarted if it returns true
RestartDelay time.Duration // interval between repeated task restarts
name string // required: unique name for this task
bin string // required: path to executable
args []string // optional: process arguments
env []string // optional: process environment override
createLogger func() io.WriteCloser // factory func that builds a log writer
cmd systemProcess // process that we started
completedCh chan *Completion // reports exit codes encountered when task processes exit, or errors during process management
shouldQuit chan struct{} // shouldQuit is closed to indicate that the task should stop its running process, if any
done chan struct{} // done closes when all processes related to the task have terminated
initialState taskStateFn // prepare and start a new live process, defaults to notStartedTask; should be set by run()
runLatch int32 // guard against multiple Task.run calls
}
// New builds a newly initialized task object but does not start any processes for it. callers
// are expected to invoke task.run(...) on their own.
func New(name, bin string, args, env []string, cl func() io.WriteCloser) *Task {
return &Task{
name: name,
bin: bin,
args: args,
env: env,
createLogger: cl,
completedCh: make(chan *Completion),
shouldQuit: make(chan struct{}),
done: make(chan struct{}),
RestartDelay: defaultTaskRestartDelay,
Finished: func(restarting bool) bool { return restarting },
}
}
// Start spawns a goroutine to execute the Task. Panics if invoked more than once.
func (t *Task) Start() {
go t.run(notStartedTask)
}
// run executes the state machine responsible for starting, monitoring, and possibly restarting
// a system process for the task. The initialState func is the entry point of the state machine.
// Upon returning the done and completedCh chans are all closed.
func (t *Task) run(initialState taskStateFn) {
if !atomic.CompareAndSwapInt32(&t.runLatch, 0, 1) {
panic("Task.run() may only be invoked once")
}
t.initialState = initialState
defer close(t.done)
defer close(t.completedCh)
state := initialState
for state != nil {
next := state(t)
state = next
}
}
func (t *Task) tryComplete(tc *Completion) {
select {
case <-t.shouldQuit:
// best effort
select {
case t.completedCh <- tc:
default:
}
case t.completedCh <- tc:
}
}
// tryError is a convenience func that invokes tryComplete with a completion error
func (t *Task) tryError(err error) {
t.tryComplete(&Completion{err: err})
}
type taskStateFn func(*Task) taskStateFn
func taskShouldRestart(t *Task) taskStateFn {
// make our best effort to stop here if signalled (shouldQuit). not doing so here
// could add cost later (a process might be launched).
// sleep for a bit; then return t.initialState
tm := time.NewTimer(t.RestartDelay)
defer tm.Stop()
select {
case <-tm.C:
select {
case <-t.shouldQuit:
default:
if t.Finished(true) {
select {
case <-t.shouldQuit:
// the world has changed, die
return nil
default:
}
return t.initialState
}
// finish call decided not to respawn, so die
return nil
}
case <-t.shouldQuit:
}
// we're quitting, tell the Finished callback and then die
t.Finished(false)
return nil
}
func (t *Task) initLogging(r io.Reader) {
writer := t.createLogger()
go func() {
defer writer.Close()
_, err := io.Copy(writer, r)
if err != nil && err != io.EOF {
// using tryComplete is racy because the state machine closes completedCh and
// so we don't want to attempt to write to a closed/closing chan. so
// just log this for now.
log.Errorf("logger for task %q crashed: %v", t.bin, err)
}
}()
}
// notStartedTask spawns the given task and transitions to a startedTask state
func notStartedTask(t *Task) taskStateFn {
log.Infof("starting task process %q with args '%+v'", t.bin, t.args)
// create command
cmd := exec.Command(t.bin, t.args...)
if _, err := cmd.StdoutPipe(); err != nil {
t.tryError(fmt.Errorf("error getting stdout of %v: %v", t.name, err))
return taskShouldRestart
}
stderrLogs, err := cmd.StderrPipe()
if err != nil {
t.tryError(fmt.Errorf("error getting stderr of %v: %v", t.name, err))
return taskShouldRestart
}
t.initLogging(stderrLogs)
if len(t.env) > 0 {
cmd.Env = t.env
}
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
Pdeathsig: syscall.SIGKILL, // see cmdProcess.Kill
}
// last min check for shouldQuit here
select {
case <-t.shouldQuit:
t.tryError(fmt.Errorf("task execution canceled, aborting process launch"))
return taskShouldRestart
default:
}
if err := cmd.Start(); err != nil {
t.tryError(fmt.Errorf("failed to start task process %q: %v", t.bin, err))
return taskShouldRestart
}
log.Infoln("task started", t.name)
t.cmd = &cmdProcess{delegate: cmd}
return taskRunning
}
type exitError interface {
error
// see os.ProcessState.Sys: returned value can be converted to something like syscall.WaitStatus
Sys() interface{}
}
func taskRunning(t *Task) taskStateFn {
waiter := t.cmd.Wait
var sendOnce sync.Once
trySend := func(wr *Completion) {
// guarded with once because we're only allowed to send a single "result" for each
// process termination. for example, if Kill() results in an error because Wait()
// already completed we only want to return a single result for the process.
sendOnce.Do(func() {
t.tryComplete(wr)
})
}
// listen for normal process completion in a goroutine; don't block because we need to listen for shouldQuit
waitCh := make(chan *Completion, 1)
go func() {
wr := &Completion{}
defer func() {
waitCh <- wr
close(waitCh)
}()
if err := waiter(); err != nil {
if exitError, ok := err.(exitError); ok {
if waitStatus, ok := exitError.Sys().(syscall.WaitStatus); ok {
wr.name = t.name
wr.code = waitStatus.ExitStatus()
return
}
}
wr.err = fmt.Errorf("task wait ended strangely for %q: %v", t.bin, err)
} else {
wr.name = t.name
}
}()
// wait for the process to complete, or else for a "quit" signal on the task (at which point we'll attempt to kill manually)
select {
case <-t.shouldQuit:
// check for tie
select {
case wr := <-waitCh:
// we got a signal to quit, but we're already finished; attempt best effort delvery
trySend(wr)
default:
// Wait() has not exited yet, kill the process
log.Infof("killing %s : %s", t.name, t.bin)
pid, err := t.cmd.Kill()
if err != nil {
trySend(&Completion{err: fmt.Errorf("failed to kill process: %q pid %d: %v", t.bin, pid, err)})
}
// else, Wait() should complete and send a completion event
}
case wr := <-waitCh:
// task has completed before we were told to quit, pass along completion and error information
trySend(wr)
}
return taskShouldRestart
}
// forwardUntil forwards task process completion status and errors to the given output
// chans until either the task terminates or abort is closed.
func (t *Task) forwardUntil(tch chan<- *Completion, abort <-chan struct{}) {
// merge task completion and error until we're told to die, then
// tell the task to stop
defer close(t.shouldQuit)
forwardCompletionUntil(t.completedCh, tch, nil, abort, nil)
}
// MergeOutput waits for the given tasks to complete. meanwhile it logs each time a task
// process completes or generates an error. when shouldQuit closes, tasks are canceled and this
// func eventually returns once all ongoing event handlers have completed running.
func MergeOutput(tasks []*Task, shouldQuit <-chan struct{}) Events {
tc := make(chan *Completion)
var waitForTasks sync.WaitGroup
waitForTasks.Add(len(tasks))
for _, t := range tasks {
t := t
// translate task dead signal into Done
go func() {
<-t.done
waitForTasks.Done()
}()
// fan-in task completion and error events to tc, ec
go t.forwardUntil(tc, shouldQuit)
}
tclistener := make(chan *Completion)
done := runtime.After(func() {
completionFinished := runtime.After(func() {
defer close(tclistener)
forwardCompletionUntil(tc, tclistener, nil, shouldQuit, func(tt *Completion, shutdown bool) {
prefix := ""
if shutdown {
prefix = "(shutdown) "
}
log.Infof(prefix+"task %q exited with status %d", tt.name, tt.code)
})
})
waitForTasks.Wait()
close(tc)
<-completionFinished
})
ei := newEventsImpl(tclistener, done)
return ei
}

View File

@ -0,0 +1,222 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tasks
import (
"bytes"
"errors"
"fmt"
"io"
"sync"
"syscall"
"testing"
log "github.com/golang/glog"
)
type badWriteCloser struct {
err error
}
func (b *badWriteCloser) Write(_ []byte) (int, error) { return 0, b.err }
func (b *badWriteCloser) Close() error { return b.err }
type discardCloser int
func (d discardCloser) Write(b []byte) (int, error) { return len(b), nil }
func (d discardCloser) Close() error { return nil }
var devNull = func() io.WriteCloser { return discardCloser(0) }
type fakeExitError uint32
func (f fakeExitError) Sys() interface{} { return syscall.WaitStatus(f << 8) }
func (f fakeExitError) Error() string { return fmt.Sprintf("fake-exit-error: %d", f) }
type fakeProcess struct {
done chan struct{}
pid int
err error
}
func (f *fakeProcess) Wait() error {
<-f.done
return f.err
}
func (f *fakeProcess) Kill() (int, error) {
close(f.done)
return f.pid, f.err
}
func (f *fakeProcess) exit(code int) {
f.err = fakeExitError(code)
close(f.done)
}
func newFakeProcess() *fakeProcess {
return &fakeProcess{
done: make(chan struct{}),
}
}
func TestBadLogger(t *testing.T) {
err := errors.New("qux")
fp := newFakeProcess()
tt := New("foo", "bar", nil, nil, func() io.WriteCloser {
defer func() {
fp.pid = 123 // sanity check
fp.Kill() // this causes Wait() to return
}()
return &badWriteCloser{err}
})
tt.RestartDelay = 0 // don't slow the test down for no good reason
finishCalled := make(chan struct{})
tt.Finished = func(ok bool) bool {
log.Infof("tt.Finished: ok %t", ok)
if ok {
close(finishCalled)
}
return false // never respawn, this causes t.done to close
}
// abuse eventsImpl: we're not going to listen on the task completion or event chans,
// and we don't want to block the state machine, so discard all task events as they happen
ei := newEventsImpl(tt.completedCh, tt.done)
ei.Close()
go tt.run(func(_ *Task) taskStateFn {
log.Infof("tt initialized")
tt.initLogging(bytes.NewBuffer(([]byte)("unlogged bytes")))
tt.cmd = fp
return taskRunning
})
// if the logger fails the task will be killed
// badWriteLogger generates an error immediately and results in a task kill
<-finishCalled
<-tt.done
// this should never data race since the state machine is dead at this point
if fp.pid != 123 {
t.Fatalf("incorrect pid, expected 123 not %d", fp.pid)
}
// TODO(jdef) would be nice to check for a specific error that indicates the logger died
}
func TestMergeOutput(t *testing.T) {
var tasksStarted, tasksDone sync.WaitGroup
tasksDone.Add(2)
tasksStarted.Add(2)
t1 := New("foo", "", nil, nil, devNull)
t1exited := make(chan struct{})
t1.RestartDelay = 0 // don't slow the test down for no good reason
t1.Finished = func(ok bool) bool {
// we expect each of these cases to happen exactly once
if !ok {
tasksDone.Done()
} else {
close(t1exited)
}
return ok
}
go t1.run(func(t *Task) taskStateFn {
defer tasksStarted.Done()
t.initLogging(bytes.NewBuffer([]byte{}))
t.cmd = newFakeProcess()
return taskRunning
})
t2 := New("bar", "", nil, nil, devNull)
t2exited := make(chan struct{})
t2.RestartDelay = 0 // don't slow the test down for no good reason
t2.Finished = func(ok bool) bool {
// we expect each of these cases to happen exactly once
if !ok {
tasksDone.Done()
} else {
close(t2exited)
}
return ok
}
go t2.run(func(t *Task) taskStateFn {
defer tasksStarted.Done()
t.initLogging(bytes.NewBuffer([]byte{}))
t.cmd = newFakeProcess()
return taskRunning
})
shouldQuit := make(chan struct{})
te := MergeOutput([]*Task{t1, t2}, shouldQuit)
tasksStarted.Wait()
tasksStarted.Add(2) // recycle the barrier
// kill each task once, let it restart; make sure that we get the completion status?
t1.cmd.(*fakeProcess).exit(1)
t2.cmd.(*fakeProcess).exit(2)
codes := map[int]struct{}{}
for i := 0; i < 2; i++ {
switch tc := <-te.Completion(); tc.code {
case 1, 2:
codes[tc.code] = struct{}{}
default:
if tc.err != nil {
t.Errorf("unexpected task completion error: %v", tc.err)
} else {
t.Errorf("unexpected task completion code: %d", tc.code)
}
}
}
te.Close() // we're not going to read any other completion or error events
if len(codes) != 2 {
t.Fatalf("expected each task process to exit once")
}
// each task invokes Finished() once
<-t1exited
<-t2exited
log.Infoln("each task process has completed one round")
tasksStarted.Wait() // tasks will auto-restart their exited procs
// assert that the tasks are not dead; TODO(jdef) not sure that these checks are useful
select {
case <-t1.done:
t.Fatalf("t1 is unexpectedly dead")
default:
}
select {
case <-t2.done:
t.Fatalf("t2 is unexpectedly dead")
default:
}
log.Infoln("firing quit signal")
close(shouldQuit) // fire shouldQuit, and everything should terminate gracefully
log.Infoln("waiting for tasks to die")
tasksDone.Wait() // our tasks should die
log.Infoln("waiting for merge to complete")
<-te.Done() // wait for the merge to complete
}