diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 455c3927f5..a3ec909992 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -88,15 +88,6 @@ type kuberTask struct { podName string } -// func that attempts suicide -type jumper func(bindings.ExecutorDriver, <-chan struct{}) - -type suicideWatcher interface { - Next(time.Duration, bindings.ExecutorDriver, jumper) suicideWatcher - Reset(time.Duration) bool - Stop() bool -} - type podStatusFunc func() (*api.PodStatus, error) // KubeletInterface consists of the kubelet.Kubelet API's that we actually use @@ -338,38 +329,6 @@ func (k *KubernetesExecutor) LaunchTask(driver bindings.ExecutorDriver, taskInfo go k.launchTask(driver, taskId, pod) } -// TODO(jdef) add metrics for this? -type suicideTimer struct { - timer *time.Timer -} - -func (w *suicideTimer) Next(d time.Duration, driver bindings.ExecutorDriver, f jumper) suicideWatcher { - return &suicideTimer{ - timer: time.AfterFunc(d, func() { - log.Warningf("Suicide timeout (%v) expired", d) - f(driver, nil) - }), - } -} - -func (w *suicideTimer) Stop() (result bool) { - if w != nil && w.timer != nil { - log.Infoln("stopping suicide watch") //TODO(jdef) debug - result = w.timer.Stop() - } - return -} - -// return true if the timer was successfully reset -func (w *suicideTimer) Reset(d time.Duration) bool { - if w != nil && w.timer != nil { - log.Infoln("resetting suicide watch") //TODO(jdef) debug - w.timer.Reset(d) - return true - } - return false -} - // determine whether we need to start a suicide countdown. if so, then start // a timer that, upon expiration, causes this executor to commit suicide. // this implementation runs asynchronously. callers that wish to wait for the diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index 3a6dd5f068..965f7170af 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -45,185 +45,12 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/watch" - "github.com/golang/glog" - bindings "github.com/mesos/mesos-go/executor" "github.com/mesos/mesos-go/mesosproto" "github.com/mesos/mesos-go/mesosutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) -type suicideTracker struct { - suicideWatcher - stops uint32 - resets uint32 - timers uint32 - jumps *uint32 -} - -func (t *suicideTracker) Reset(d time.Duration) bool { - defer func() { t.resets++ }() - return t.suicideWatcher.Reset(d) -} - -func (t *suicideTracker) Stop() bool { - defer func() { t.stops++ }() - return t.suicideWatcher.Stop() -} - -func (t *suicideTracker) Next(d time.Duration, driver bindings.ExecutorDriver, f jumper) suicideWatcher { - tracker := &suicideTracker{ - stops: t.stops, - resets: t.resets, - jumps: t.jumps, - timers: t.timers + 1, - } - jumper := tracker.makeJumper(f) - tracker.suicideWatcher = t.suicideWatcher.Next(d, driver, jumper) - return tracker -} - -func (t *suicideTracker) makeJumper(_ jumper) jumper { - return jumper(func(driver bindings.ExecutorDriver, cancel <-chan struct{}) { - glog.Warningln("jumping?!") - if t.jumps != nil { - atomic.AddUint32(t.jumps, 1) - } - }) -} - -func TestSuicide_zeroTimeout(t *testing.T) { - defer glog.Flush() - - k := New(Config{}) - tracker := &suicideTracker{suicideWatcher: k.suicideWatch} - k.suicideWatch = tracker - - ch := k.resetSuicideWatch(nil) - - select { - case <-ch: - case <-time.After(2 * time.Second): - t.Fatalf("timeout waiting for reset of suicide watch") - } - if tracker.stops != 0 { - t.Fatalf("expected no stops since suicideWatchTimeout was never set") - } - if tracker.resets != 0 { - t.Fatalf("expected no resets since suicideWatchTimeout was never set") - } - if tracker.timers != 0 { - t.Fatalf("expected no timers since suicideWatchTimeout was never set") - } -} - -func TestSuicide_WithTasks(t *testing.T) { - defer glog.Flush() - - k := New(Config{ - SuicideTimeout: 50 * time.Millisecond, - }) - - jumps := uint32(0) - tracker := &suicideTracker{suicideWatcher: k.suicideWatch, jumps: &jumps} - k.suicideWatch = tracker - - k.tasks["foo"] = &kuberTask{} // prevent suicide attempts from succeeding - - // call reset with a nil timer - glog.Infoln("resetting suicide watch with 1 task") - select { - case <-k.resetSuicideWatch(nil): - tracker = k.suicideWatch.(*suicideTracker) - if tracker.stops != 1 { - t.Fatalf("expected suicide attempt to Stop() since there are registered tasks") - } - if tracker.resets != 0 { - t.Fatalf("expected no resets since") - } - if tracker.timers != 0 { - t.Fatalf("expected no timers since") - } - case <-time.After(1 * time.Second): - t.Fatalf("initial suicide watch setup failed") - } - - delete(k.tasks, "foo") // zero remaining tasks - k.suicideTimeout = 1500 * time.Millisecond - suicideStart := time.Now() - - // reset the suicide watch, which should actually start a timer now - glog.Infoln("resetting suicide watch with 0 tasks") - select { - case <-k.resetSuicideWatch(nil): - tracker = k.suicideWatch.(*suicideTracker) - if tracker.stops != 1 { - t.Fatalf("did not expect suicide attempt to Stop() since there are no registered tasks") - } - if tracker.resets != 1 { - t.Fatalf("expected 1 resets instead of %d", tracker.resets) - } - if tracker.timers != 1 { - t.Fatalf("expected 1 timers instead of %d", tracker.timers) - } - case <-time.After(1 * time.Second): - t.Fatalf("2nd suicide watch setup failed") - } - - k.lock.Lock() - k.tasks["foo"] = &kuberTask{} // prevent suicide attempts from succeeding - k.lock.Unlock() - - // reset the suicide watch, which should stop the existing timer - glog.Infoln("resetting suicide watch with 1 task") - select { - case <-k.resetSuicideWatch(nil): - tracker = k.suicideWatch.(*suicideTracker) - if tracker.stops != 2 { - t.Fatalf("expected 2 stops instead of %d since there are registered tasks", tracker.stops) - } - if tracker.resets != 1 { - t.Fatalf("expected 1 resets instead of %d", tracker.resets) - } - if tracker.timers != 1 { - t.Fatalf("expected 1 timers instead of %d", tracker.timers) - } - case <-time.After(1 * time.Second): - t.Fatalf("3rd suicide watch setup failed") - } - - k.lock.Lock() - delete(k.tasks, "foo") // allow suicide attempts to schedule - k.lock.Unlock() - - // reset the suicide watch, which should reset a stopped timer - glog.Infoln("resetting suicide watch with 0 tasks") - select { - case <-k.resetSuicideWatch(nil): - tracker = k.suicideWatch.(*suicideTracker) - if tracker.stops != 2 { - t.Fatalf("expected 2 stops instead of %d since there are no registered tasks", tracker.stops) - } - if tracker.resets != 2 { - t.Fatalf("expected 2 resets instead of %d", tracker.resets) - } - if tracker.timers != 1 { - t.Fatalf("expected 1 timers instead of %d", tracker.timers) - } - case <-time.After(1 * time.Second): - t.Fatalf("4th suicide watch setup failed") - } - - sinceWatch := time.Since(suicideStart) - time.Sleep(3*time.Second - sinceWatch) // give the first timer to misfire (it shouldn't since Stop() was called) - - if j := atomic.LoadUint32(&jumps); j != 1 { - t.Fatalf("expected 1 jumps instead of %d since stop was called", j) - } else { - glog.Infoln("jumps verified") // glog so we get a timestamp - } -} - // TestExecutorRegister ensures that the executor thinks it is connected // after Register is called. func TestExecutorRegister(t *testing.T) { diff --git a/contrib/mesos/pkg/executor/suicide.go b/contrib/mesos/pkg/executor/suicide.go new file mode 100644 index 0000000000..40b597fd1c --- /dev/null +++ b/contrib/mesos/pkg/executor/suicide.go @@ -0,0 +1,65 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package executor + +import ( + "time" + + log "github.com/golang/glog" + bindings "github.com/mesos/mesos-go/executor" +) + +// func that attempts suicide +type jumper func(bindings.ExecutorDriver, <-chan struct{}) + +type suicideWatcher interface { + Next(time.Duration, bindings.ExecutorDriver, jumper) suicideWatcher + Reset(time.Duration) bool + Stop() bool +} + +// TODO(jdef) add metrics for this? +type suicideTimer struct { + timer *time.Timer +} + +func (w *suicideTimer) Next(d time.Duration, driver bindings.ExecutorDriver, f jumper) suicideWatcher { + return &suicideTimer{ + timer: time.AfterFunc(d, func() { + log.Warningf("Suicide timeout (%v) expired", d) + f(driver, nil) + }), + } +} + +func (w *suicideTimer) Stop() (result bool) { + if w != nil && w.timer != nil { + log.Infoln("stopping suicide watch") //TODO(jdef) debug + result = w.timer.Stop() + } + return +} + +// return true if the timer was successfully reset +func (w *suicideTimer) Reset(d time.Duration) bool { + if w != nil && w.timer != nil { + log.Infoln("resetting suicide watch") //TODO(jdef) debug + w.timer.Reset(d) + return true + } + return false +} diff --git a/contrib/mesos/pkg/executor/suicide_test.go b/contrib/mesos/pkg/executor/suicide_test.go new file mode 100644 index 0000000000..cdb99f584d --- /dev/null +++ b/contrib/mesos/pkg/executor/suicide_test.go @@ -0,0 +1,197 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package executor + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/golang/glog" + bindings "github.com/mesos/mesos-go/executor" +) + +type suicideTracker struct { + suicideWatcher + stops uint32 + resets uint32 + timers uint32 + jumps *uint32 +} + +func (t *suicideTracker) Reset(d time.Duration) bool { + defer func() { t.resets++ }() + return t.suicideWatcher.Reset(d) +} + +func (t *suicideTracker) Stop() bool { + defer func() { t.stops++ }() + return t.suicideWatcher.Stop() +} + +func (t *suicideTracker) Next(d time.Duration, driver bindings.ExecutorDriver, f jumper) suicideWatcher { + tracker := &suicideTracker{ + stops: t.stops, + resets: t.resets, + jumps: t.jumps, + timers: t.timers + 1, + } + jumper := tracker.makeJumper(f) + tracker.suicideWatcher = t.suicideWatcher.Next(d, driver, jumper) + return tracker +} + +func (t *suicideTracker) makeJumper(_ jumper) jumper { + return jumper(func(driver bindings.ExecutorDriver, cancel <-chan struct{}) { + glog.Warningln("jumping?!") + if t.jumps != nil { + atomic.AddUint32(t.jumps, 1) + } + }) +} + +func TestSuicide_zeroTimeout(t *testing.T) { + defer glog.Flush() + + k := New(Config{}) + tracker := &suicideTracker{suicideWatcher: k.suicideWatch} + k.suicideWatch = tracker + + ch := k.resetSuicideWatch(nil) + + select { + case <-ch: + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for reset of suicide watch") + } + if tracker.stops != 0 { + t.Fatalf("expected no stops since suicideWatchTimeout was never set") + } + if tracker.resets != 0 { + t.Fatalf("expected no resets since suicideWatchTimeout was never set") + } + if tracker.timers != 0 { + t.Fatalf("expected no timers since suicideWatchTimeout was never set") + } +} + +func TestSuicide_WithTasks(t *testing.T) { + defer glog.Flush() + + k := New(Config{ + SuicideTimeout: 50 * time.Millisecond, + }) + + jumps := uint32(0) + tracker := &suicideTracker{suicideWatcher: k.suicideWatch, jumps: &jumps} + k.suicideWatch = tracker + + k.tasks["foo"] = &kuberTask{} // prevent suicide attempts from succeeding + + // call reset with a nil timer + glog.Infoln("resetting suicide watch with 1 task") + select { + case <-k.resetSuicideWatch(nil): + tracker = k.suicideWatch.(*suicideTracker) + if tracker.stops != 1 { + t.Fatalf("expected suicide attempt to Stop() since there are registered tasks") + } + if tracker.resets != 0 { + t.Fatalf("expected no resets since") + } + if tracker.timers != 0 { + t.Fatalf("expected no timers since") + } + case <-time.After(1 * time.Second): + t.Fatalf("initial suicide watch setup failed") + } + + delete(k.tasks, "foo") // zero remaining tasks + k.suicideTimeout = 1500 * time.Millisecond + suicideStart := time.Now() + + // reset the suicide watch, which should actually start a timer now + glog.Infoln("resetting suicide watch with 0 tasks") + select { + case <-k.resetSuicideWatch(nil): + tracker = k.suicideWatch.(*suicideTracker) + if tracker.stops != 1 { + t.Fatalf("did not expect suicide attempt to Stop() since there are no registered tasks") + } + if tracker.resets != 1 { + t.Fatalf("expected 1 resets instead of %d", tracker.resets) + } + if tracker.timers != 1 { + t.Fatalf("expected 1 timers instead of %d", tracker.timers) + } + case <-time.After(1 * time.Second): + t.Fatalf("2nd suicide watch setup failed") + } + + k.lock.Lock() + k.tasks["foo"] = &kuberTask{} // prevent suicide attempts from succeeding + k.lock.Unlock() + + // reset the suicide watch, which should stop the existing timer + glog.Infoln("resetting suicide watch with 1 task") + select { + case <-k.resetSuicideWatch(nil): + tracker = k.suicideWatch.(*suicideTracker) + if tracker.stops != 2 { + t.Fatalf("expected 2 stops instead of %d since there are registered tasks", tracker.stops) + } + if tracker.resets != 1 { + t.Fatalf("expected 1 resets instead of %d", tracker.resets) + } + if tracker.timers != 1 { + t.Fatalf("expected 1 timers instead of %d", tracker.timers) + } + case <-time.After(1 * time.Second): + t.Fatalf("3rd suicide watch setup failed") + } + + k.lock.Lock() + delete(k.tasks, "foo") // allow suicide attempts to schedule + k.lock.Unlock() + + // reset the suicide watch, which should reset a stopped timer + glog.Infoln("resetting suicide watch with 0 tasks") + select { + case <-k.resetSuicideWatch(nil): + tracker = k.suicideWatch.(*suicideTracker) + if tracker.stops != 2 { + t.Fatalf("expected 2 stops instead of %d since there are no registered tasks", tracker.stops) + } + if tracker.resets != 2 { + t.Fatalf("expected 2 resets instead of %d", tracker.resets) + } + if tracker.timers != 1 { + t.Fatalf("expected 1 timers instead of %d", tracker.timers) + } + case <-time.After(1 * time.Second): + t.Fatalf("4th suicide watch setup failed") + } + + sinceWatch := time.Since(suicideStart) + time.Sleep(3*time.Second - sinceWatch) // give the first timer to misfire (it shouldn't since Stop() was called) + + if j := atomic.LoadUint32(&jumps); j != 1 { + t.Fatalf("expected 1 jumps instead of %d since stop was called", j) + } else { + glog.Infoln("jumps verified") // glog so we get a timestamp + } +}