diff --git a/contrib/mesos/pkg/minion/tasks/task.go b/contrib/mesos/pkg/minion/tasks/task.go index 1203bdcba9..c45ccb62ad 100644 --- a/contrib/mesos/pkg/minion/tasks/task.go +++ b/contrib/mesos/pkg/minion/tasks/task.go @@ -281,7 +281,7 @@ func taskRunning(t *Task) taskStateFn { select { case <-t.shouldQuit: - t.tryComplete(t.awaitDeath(defaultKillGracePeriod, waitCh)) + t.tryComplete(t.awaitDeath(&realTimer{}, defaultKillGracePeriod, waitCh)) case wr := <-waitCh: t.tryComplete(wr) } @@ -290,7 +290,9 @@ func taskRunning(t *Task) taskStateFn { // awaitDeath waits for the process to complete, or else for a "quit" signal on the task- // at which point we'll attempt to kill manually. -func (t *Task) awaitDeath(gracePeriod time.Duration, waitCh <-chan *Completion) *Completion { +func (t *Task) awaitDeath(timer timer, gracePeriod time.Duration, waitCh <-chan *Completion) *Completion { + defer timer.discard() + select { case wr := <-waitCh: // got a signal to quit, but we're already finished @@ -318,10 +320,11 @@ waitLoop: } // Wait for the kill to be processed, and child proc resources cleaned up; try to avoid zombies! + timer.set(gracePeriod) select { case wr = <-waitCh: break waitLoop - case <-time.After(gracePeriod): + case <-timer.await(): // want a timeout, but a shorter one than we used initially. // using /= 2 is deterministic and yields the desirable effect. gracePeriod /= 2 diff --git a/contrib/mesos/pkg/minion/tasks/task_test.go b/contrib/mesos/pkg/minion/tasks/task_test.go index a9d1803c33..a5717f391a 100644 --- a/contrib/mesos/pkg/minion/tasks/task_test.go +++ b/contrib/mesos/pkg/minion/tasks/task_test.go @@ -24,6 +24,7 @@ import ( "sync" "syscall" "testing" + "time" log "github.com/golang/glog" "github.com/stretchr/testify/assert" @@ -222,15 +223,28 @@ func TestMergeOutput(t *testing.T) { <-te.Done() // wait for the merge to complete } +type fakeTimer struct { + ch chan time.Time +} + +func (t *fakeTimer) set(d time.Duration) {} +func (t *fakeTimer) discard() {} +func (t *fakeTimer) await() <-chan time.Time { return t.ch } +func (t *fakeTimer) expire() { t.ch = make(chan time.Time); close(t.ch) } +func (t *fakeTimer) reset() { t.ch = nil } + func TestAfterDeath(t *testing.T) { // test kill escalation since that's not covered by other unit tests t1 := New("foo", "", nil, nil, devNull) kills := 0 waitCh := make(chan *Completion, 1) + timer := &fakeTimer{} + timer.expire() t1.killFunc = func(force bool) (int, error) { // > 0 is intentional, multiple calls to close() should panic if kills > 0 { assert.True(t, force) + timer.reset() // don't want to race w/ waitCh waitCh <- &Completion{name: t1.name, code: 123} close(waitCh) } else { @@ -239,7 +253,7 @@ func TestAfterDeath(t *testing.T) { kills++ return 0, nil } - wr := t1.awaitDeath(0, waitCh) + wr := t1.awaitDeath(timer, 0, waitCh) assert.Equal(t, "foo", wr.name) assert.Equal(t, 123, wr.code) assert.NoError(t, wr.err) @@ -252,7 +266,9 @@ func TestAfterDeath(t *testing.T) { t.Fatalf("should not attempt to kill a task that has already reported completion") return 0, nil } - wr = t1.awaitDeath(0, waitCh) + + timer.reset() // don't race w/ waitCh + wr = t1.awaitDeath(timer, 0, waitCh) assert.Equal(t, 456, wr.code) assert.NoError(t, wr.err) @@ -270,7 +286,8 @@ func TestAfterDeath(t *testing.T) { kills++ return 0, nil } - wr = t1.awaitDeath(0, nil) + timer.expire() + wr = t1.awaitDeath(timer, 0, nil) assert.Equal(t, "foo", wr.name) assert.Error(t, wr.err) @@ -287,7 +304,8 @@ func TestAfterDeath(t *testing.T) { kills++ return 0, killFailed } - wr = t1.awaitDeath(0, nil) + timer.expire() + wr = t1.awaitDeath(timer, 0, nil) assert.Equal(t, "foo", wr.name) assert.Error(t, wr.err) } diff --git a/contrib/mesos/pkg/minion/tasks/timer.go b/contrib/mesos/pkg/minion/tasks/timer.go new file mode 100644 index 0000000000..78595763a1 --- /dev/null +++ b/contrib/mesos/pkg/minion/tasks/timer.go @@ -0,0 +1,52 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tasks + +import ( + "time" +) + +type timer interface { + set(time.Duration) + discard() + await() <-chan time.Time +} + +type realTimer struct { + *time.Timer +} + +func (t *realTimer) set(d time.Duration) { + if t.Timer == nil { + t.Timer = time.NewTimer(d) + } else { + t.Reset(d) + } +} + +func (t *realTimer) await() <-chan time.Time { + if t.Timer == nil { + return nil + } + return t.C +} + +func (t *realTimer) discard() { + if t.Timer != nil { + t.Stop() + } +}