Move suicide logic into its own file

pull/6/head
Dr. Stefan Schimanski 2015-09-17 17:59:48 +02:00
parent 83fc0b783f
commit a9d7acea39
4 changed files with 262 additions and 214 deletions

View File

@ -88,15 +88,6 @@ type kuberTask struct {
podName string
}
// func that attempts suicide
type jumper func(bindings.ExecutorDriver, <-chan struct{})
type suicideWatcher interface {
Next(time.Duration, bindings.ExecutorDriver, jumper) suicideWatcher
Reset(time.Duration) bool
Stop() bool
}
type podStatusFunc func() (*api.PodStatus, error)
// KubeletInterface consists of the kubelet.Kubelet API's that we actually use
@ -338,38 +329,6 @@ func (k *KubernetesExecutor) LaunchTask(driver bindings.ExecutorDriver, taskInfo
go k.launchTask(driver, taskId, pod)
}
// TODO(jdef) add metrics for this?
type suicideTimer struct {
timer *time.Timer
}
func (w *suicideTimer) Next(d time.Duration, driver bindings.ExecutorDriver, f jumper) suicideWatcher {
return &suicideTimer{
timer: time.AfterFunc(d, func() {
log.Warningf("Suicide timeout (%v) expired", d)
f(driver, nil)
}),
}
}
func (w *suicideTimer) Stop() (result bool) {
if w != nil && w.timer != nil {
log.Infoln("stopping suicide watch") //TODO(jdef) debug
result = w.timer.Stop()
}
return
}
// return true if the timer was successfully reset
func (w *suicideTimer) Reset(d time.Duration) bool {
if w != nil && w.timer != nil {
log.Infoln("resetting suicide watch") //TODO(jdef) debug
w.timer.Reset(d)
return true
}
return false
}
// determine whether we need to start a suicide countdown. if so, then start
// a timer that, upon expiration, causes this executor to commit suicide.
// this implementation runs asynchronously. callers that wish to wait for the

View File

@ -45,185 +45,12 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
bindings "github.com/mesos/mesos-go/executor"
"github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
type suicideTracker struct {
suicideWatcher
stops uint32
resets uint32
timers uint32
jumps *uint32
}
func (t *suicideTracker) Reset(d time.Duration) bool {
defer func() { t.resets++ }()
return t.suicideWatcher.Reset(d)
}
func (t *suicideTracker) Stop() bool {
defer func() { t.stops++ }()
return t.suicideWatcher.Stop()
}
func (t *suicideTracker) Next(d time.Duration, driver bindings.ExecutorDriver, f jumper) suicideWatcher {
tracker := &suicideTracker{
stops: t.stops,
resets: t.resets,
jumps: t.jumps,
timers: t.timers + 1,
}
jumper := tracker.makeJumper(f)
tracker.suicideWatcher = t.suicideWatcher.Next(d, driver, jumper)
return tracker
}
func (t *suicideTracker) makeJumper(_ jumper) jumper {
return jumper(func(driver bindings.ExecutorDriver, cancel <-chan struct{}) {
glog.Warningln("jumping?!")
if t.jumps != nil {
atomic.AddUint32(t.jumps, 1)
}
})
}
func TestSuicide_zeroTimeout(t *testing.T) {
defer glog.Flush()
k := New(Config{})
tracker := &suicideTracker{suicideWatcher: k.suicideWatch}
k.suicideWatch = tracker
ch := k.resetSuicideWatch(nil)
select {
case <-ch:
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for reset of suicide watch")
}
if tracker.stops != 0 {
t.Fatalf("expected no stops since suicideWatchTimeout was never set")
}
if tracker.resets != 0 {
t.Fatalf("expected no resets since suicideWatchTimeout was never set")
}
if tracker.timers != 0 {
t.Fatalf("expected no timers since suicideWatchTimeout was never set")
}
}
func TestSuicide_WithTasks(t *testing.T) {
defer glog.Flush()
k := New(Config{
SuicideTimeout: 50 * time.Millisecond,
})
jumps := uint32(0)
tracker := &suicideTracker{suicideWatcher: k.suicideWatch, jumps: &jumps}
k.suicideWatch = tracker
k.tasks["foo"] = &kuberTask{} // prevent suicide attempts from succeeding
// call reset with a nil timer
glog.Infoln("resetting suicide watch with 1 task")
select {
case <-k.resetSuicideWatch(nil):
tracker = k.suicideWatch.(*suicideTracker)
if tracker.stops != 1 {
t.Fatalf("expected suicide attempt to Stop() since there are registered tasks")
}
if tracker.resets != 0 {
t.Fatalf("expected no resets since")
}
if tracker.timers != 0 {
t.Fatalf("expected no timers since")
}
case <-time.After(1 * time.Second):
t.Fatalf("initial suicide watch setup failed")
}
delete(k.tasks, "foo") // zero remaining tasks
k.suicideTimeout = 1500 * time.Millisecond
suicideStart := time.Now()
// reset the suicide watch, which should actually start a timer now
glog.Infoln("resetting suicide watch with 0 tasks")
select {
case <-k.resetSuicideWatch(nil):
tracker = k.suicideWatch.(*suicideTracker)
if tracker.stops != 1 {
t.Fatalf("did not expect suicide attempt to Stop() since there are no registered tasks")
}
if tracker.resets != 1 {
t.Fatalf("expected 1 resets instead of %d", tracker.resets)
}
if tracker.timers != 1 {
t.Fatalf("expected 1 timers instead of %d", tracker.timers)
}
case <-time.After(1 * time.Second):
t.Fatalf("2nd suicide watch setup failed")
}
k.lock.Lock()
k.tasks["foo"] = &kuberTask{} // prevent suicide attempts from succeeding
k.lock.Unlock()
// reset the suicide watch, which should stop the existing timer
glog.Infoln("resetting suicide watch with 1 task")
select {
case <-k.resetSuicideWatch(nil):
tracker = k.suicideWatch.(*suicideTracker)
if tracker.stops != 2 {
t.Fatalf("expected 2 stops instead of %d since there are registered tasks", tracker.stops)
}
if tracker.resets != 1 {
t.Fatalf("expected 1 resets instead of %d", tracker.resets)
}
if tracker.timers != 1 {
t.Fatalf("expected 1 timers instead of %d", tracker.timers)
}
case <-time.After(1 * time.Second):
t.Fatalf("3rd suicide watch setup failed")
}
k.lock.Lock()
delete(k.tasks, "foo") // allow suicide attempts to schedule
k.lock.Unlock()
// reset the suicide watch, which should reset a stopped timer
glog.Infoln("resetting suicide watch with 0 tasks")
select {
case <-k.resetSuicideWatch(nil):
tracker = k.suicideWatch.(*suicideTracker)
if tracker.stops != 2 {
t.Fatalf("expected 2 stops instead of %d since there are no registered tasks", tracker.stops)
}
if tracker.resets != 2 {
t.Fatalf("expected 2 resets instead of %d", tracker.resets)
}
if tracker.timers != 1 {
t.Fatalf("expected 1 timers instead of %d", tracker.timers)
}
case <-time.After(1 * time.Second):
t.Fatalf("4th suicide watch setup failed")
}
sinceWatch := time.Since(suicideStart)
time.Sleep(3*time.Second - sinceWatch) // give the first timer to misfire (it shouldn't since Stop() was called)
if j := atomic.LoadUint32(&jumps); j != 1 {
t.Fatalf("expected 1 jumps instead of %d since stop was called", j)
} else {
glog.Infoln("jumps verified") // glog so we get a timestamp
}
}
// TestExecutorRegister ensures that the executor thinks it is connected
// after Register is called.
func TestExecutorRegister(t *testing.T) {

View File

@ -0,0 +1,65 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executor
import (
"time"
log "github.com/golang/glog"
bindings "github.com/mesos/mesos-go/executor"
)
// func that attempts suicide
type jumper func(bindings.ExecutorDriver, <-chan struct{})
type suicideWatcher interface {
Next(time.Duration, bindings.ExecutorDriver, jumper) suicideWatcher
Reset(time.Duration) bool
Stop() bool
}
// TODO(jdef) add metrics for this?
type suicideTimer struct {
timer *time.Timer
}
func (w *suicideTimer) Next(d time.Duration, driver bindings.ExecutorDriver, f jumper) suicideWatcher {
return &suicideTimer{
timer: time.AfterFunc(d, func() {
log.Warningf("Suicide timeout (%v) expired", d)
f(driver, nil)
}),
}
}
func (w *suicideTimer) Stop() (result bool) {
if w != nil && w.timer != nil {
log.Infoln("stopping suicide watch") //TODO(jdef) debug
result = w.timer.Stop()
}
return
}
// return true if the timer was successfully reset
func (w *suicideTimer) Reset(d time.Duration) bool {
if w != nil && w.timer != nil {
log.Infoln("resetting suicide watch") //TODO(jdef) debug
w.timer.Reset(d)
return true
}
return false
}

View File

@ -0,0 +1,197 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executor
import (
"sync/atomic"
"testing"
"time"
"github.com/golang/glog"
bindings "github.com/mesos/mesos-go/executor"
)
type suicideTracker struct {
suicideWatcher
stops uint32
resets uint32
timers uint32
jumps *uint32
}
func (t *suicideTracker) Reset(d time.Duration) bool {
defer func() { t.resets++ }()
return t.suicideWatcher.Reset(d)
}
func (t *suicideTracker) Stop() bool {
defer func() { t.stops++ }()
return t.suicideWatcher.Stop()
}
func (t *suicideTracker) Next(d time.Duration, driver bindings.ExecutorDriver, f jumper) suicideWatcher {
tracker := &suicideTracker{
stops: t.stops,
resets: t.resets,
jumps: t.jumps,
timers: t.timers + 1,
}
jumper := tracker.makeJumper(f)
tracker.suicideWatcher = t.suicideWatcher.Next(d, driver, jumper)
return tracker
}
func (t *suicideTracker) makeJumper(_ jumper) jumper {
return jumper(func(driver bindings.ExecutorDriver, cancel <-chan struct{}) {
glog.Warningln("jumping?!")
if t.jumps != nil {
atomic.AddUint32(t.jumps, 1)
}
})
}
func TestSuicide_zeroTimeout(t *testing.T) {
defer glog.Flush()
k := New(Config{})
tracker := &suicideTracker{suicideWatcher: k.suicideWatch}
k.suicideWatch = tracker
ch := k.resetSuicideWatch(nil)
select {
case <-ch:
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for reset of suicide watch")
}
if tracker.stops != 0 {
t.Fatalf("expected no stops since suicideWatchTimeout was never set")
}
if tracker.resets != 0 {
t.Fatalf("expected no resets since suicideWatchTimeout was never set")
}
if tracker.timers != 0 {
t.Fatalf("expected no timers since suicideWatchTimeout was never set")
}
}
func TestSuicide_WithTasks(t *testing.T) {
defer glog.Flush()
k := New(Config{
SuicideTimeout: 50 * time.Millisecond,
})
jumps := uint32(0)
tracker := &suicideTracker{suicideWatcher: k.suicideWatch, jumps: &jumps}
k.suicideWatch = tracker
k.tasks["foo"] = &kuberTask{} // prevent suicide attempts from succeeding
// call reset with a nil timer
glog.Infoln("resetting suicide watch with 1 task")
select {
case <-k.resetSuicideWatch(nil):
tracker = k.suicideWatch.(*suicideTracker)
if tracker.stops != 1 {
t.Fatalf("expected suicide attempt to Stop() since there are registered tasks")
}
if tracker.resets != 0 {
t.Fatalf("expected no resets since")
}
if tracker.timers != 0 {
t.Fatalf("expected no timers since")
}
case <-time.After(1 * time.Second):
t.Fatalf("initial suicide watch setup failed")
}
delete(k.tasks, "foo") // zero remaining tasks
k.suicideTimeout = 1500 * time.Millisecond
suicideStart := time.Now()
// reset the suicide watch, which should actually start a timer now
glog.Infoln("resetting suicide watch with 0 tasks")
select {
case <-k.resetSuicideWatch(nil):
tracker = k.suicideWatch.(*suicideTracker)
if tracker.stops != 1 {
t.Fatalf("did not expect suicide attempt to Stop() since there are no registered tasks")
}
if tracker.resets != 1 {
t.Fatalf("expected 1 resets instead of %d", tracker.resets)
}
if tracker.timers != 1 {
t.Fatalf("expected 1 timers instead of %d", tracker.timers)
}
case <-time.After(1 * time.Second):
t.Fatalf("2nd suicide watch setup failed")
}
k.lock.Lock()
k.tasks["foo"] = &kuberTask{} // prevent suicide attempts from succeeding
k.lock.Unlock()
// reset the suicide watch, which should stop the existing timer
glog.Infoln("resetting suicide watch with 1 task")
select {
case <-k.resetSuicideWatch(nil):
tracker = k.suicideWatch.(*suicideTracker)
if tracker.stops != 2 {
t.Fatalf("expected 2 stops instead of %d since there are registered tasks", tracker.stops)
}
if tracker.resets != 1 {
t.Fatalf("expected 1 resets instead of %d", tracker.resets)
}
if tracker.timers != 1 {
t.Fatalf("expected 1 timers instead of %d", tracker.timers)
}
case <-time.After(1 * time.Second):
t.Fatalf("3rd suicide watch setup failed")
}
k.lock.Lock()
delete(k.tasks, "foo") // allow suicide attempts to schedule
k.lock.Unlock()
// reset the suicide watch, which should reset a stopped timer
glog.Infoln("resetting suicide watch with 0 tasks")
select {
case <-k.resetSuicideWatch(nil):
tracker = k.suicideWatch.(*suicideTracker)
if tracker.stops != 2 {
t.Fatalf("expected 2 stops instead of %d since there are no registered tasks", tracker.stops)
}
if tracker.resets != 2 {
t.Fatalf("expected 2 resets instead of %d", tracker.resets)
}
if tracker.timers != 1 {
t.Fatalf("expected 1 timers instead of %d", tracker.timers)
}
case <-time.After(1 * time.Second):
t.Fatalf("4th suicide watch setup failed")
}
sinceWatch := time.Since(suicideStart)
time.Sleep(3*time.Second - sinceWatch) // give the first timer to misfire (it shouldn't since Stop() was called)
if j := atomic.LoadUint32(&jumps); j != 1 {
t.Fatalf("expected 1 jumps instead of %d since stop was called", j)
} else {
glog.Infoln("jumps verified") // glog so we get a timestamp
}
}