- forward updated labels/annotations for downward API compat

- refactor queue.Pod construction to take functional options, privatize Pod fields
- refactor DelayFIFO and HistoricalFIFO to offer consistent, more useful Pop() funcs
- refactor pod update processing changes; long term we should somehow combine with the special pod config source that we are using for mirror pods
- task launch timer cleanup
pull/6/head
James DeFelice 2015-11-29 19:34:27 +00:00
parent 5885728318
commit af95e3fe0e
14 changed files with 454 additions and 169 deletions

View File

@ -20,6 +20,7 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"math"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -34,13 +35,12 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
"k8s.io/kubernetes/contrib/mesos/pkg/node" "k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/podutil" "k8s.io/kubernetes/contrib/mesos/pkg/podutil"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/executorinfo" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/executorinfo"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
unversionedapi "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -89,16 +89,12 @@ func (s *stateType) transitionTo(to stateType, unless ...stateType) bool {
type kuberTask struct { type kuberTask struct {
mesosTaskInfo *mesos.TaskInfo mesosTaskInfo *mesos.TaskInfo
podName string // empty until pod is sent to kubelet and registed in KubernetesExecutor.pods launchTimer *time.Timer // launchTimer expires when the launch-task process duration exceeds launchGracePeriod
podName string // empty until pod is sent to kubelet and registed in KubernetesExecutor.pods
} }
type podStatusFunc func() (*api.PodStatus, error) type podStatusFunc func() (*api.PodStatus, error)
type NodeInfo struct {
Cores int
Mem int64 // in bytes
}
// KubernetesExecutor is an mesos executor that runs pods // KubernetesExecutor is an mesos executor that runs pods
// in a minion machine. // in a minion machine.
type Executor struct { type Executor struct {
@ -118,9 +114,9 @@ type Executor struct {
exitFunc func(int) exitFunc func(int)
podStatusFunc func(*api.Pod) (*api.PodStatus, error) podStatusFunc func(*api.Pod) (*api.PodStatus, error)
staticPodsConfigPath string staticPodsConfigPath string
podController *framework.Controller
launchGracePeriod time.Duration launchGracePeriod time.Duration
nodeInfos chan<- NodeInfo nodeInfos chan<- NodeInfo
initCompleted chan struct{} // closes upon completion of Init()
} }
type Config struct { type Config struct {
@ -144,6 +140,13 @@ func (k *Executor) isConnected() bool {
// New creates a new kubernetes executor. // New creates a new kubernetes executor.
func New(config Config) *Executor { func New(config Config) *Executor {
launchGracePeriod := config.LaunchGracePeriod
if launchGracePeriod == 0 {
// this is the equivalent of saying "the timer never expires" and simplies nil
// timer checks elsewhere in the code. it's a little hacky but less code to
// maintain that alternative approaches.
launchGracePeriod = time.Duration(math.MaxInt64)
}
k := &Executor{ k := &Executor{
updateChan: config.Updates, updateChan: config.Updates,
state: disconnectedState, state: disconnectedState,
@ -160,41 +163,22 @@ func New(config Config) *Executor {
exitFunc: config.ExitFunc, exitFunc: config.ExitFunc,
podStatusFunc: config.PodStatusFunc, podStatusFunc: config.PodStatusFunc,
staticPodsConfigPath: config.StaticPodsConfigPath, staticPodsConfigPath: config.StaticPodsConfigPath,
launchGracePeriod: config.LaunchGracePeriod, launchGracePeriod: launchGracePeriod,
nodeInfos: config.NodeInfos, nodeInfos: config.NodeInfos,
initCompleted: make(chan struct{}),
} }
runtime.On(k.initCompleted, k.runSendLoop)
// watch pods from the given pod ListWatch po := newPodObserver(config.PodLW, k.updateTask, k.terminate)
if config.PodLW == nil { runtime.On(k.initCompleted, po.run)
// fail early to make debugging easier
panic("cannot create executor with nil PodLW")
}
_, k.podController = framework.NewInformer(config.PodLW, &api.Pod{}, podRelistPeriod, &framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*api.Pod)
log.V(4).Infof("pod %s/%s created on apiserver", pod.Namespace, pod.Name)
k.handleChangedApiserverPod(pod)
},
UpdateFunc: func(oldObj, newObj interface{}) {
pod := newObj.(*api.Pod)
log.V(4).Infof("pod %s/%s updated on apiserver", pod.Namespace, pod.Name)
k.handleChangedApiserverPod(pod)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*api.Pod)
log.V(4).Infof("pod %s/%s deleted on apiserver", pod.Namespace, pod.Name)
},
})
return k return k
} }
func (k *Executor) Init(driver bindings.ExecutorDriver) { func (k *Executor) Init(driver bindings.ExecutorDriver) {
defer close(k.initCompleted)
k.killKubeletContainers() k.killKubeletContainers()
k.resetSuicideWatch(driver) k.resetSuicideWatch(driver)
go k.podController.Run(k.terminate)
go k.sendLoop()
//TODO(jdef) monitor kubeletFinished and shutdown if it happens //TODO(jdef) monitor kubeletFinished and shutdown if it happens
} }
@ -251,7 +235,7 @@ func (k *Executor) Registered(
} }
} }
annotations, err := executorInfoToAnnotations(executorInfo) annotations, err := annotationsFor(executorInfo)
if err != nil { if err != nil {
log.Errorf( log.Errorf(
"cannot get node annotations from executor info %v error %v", "cannot get node annotations from executor info %v error %v",
@ -374,10 +358,10 @@ func (k *Executor) LaunchTask(driver bindings.ExecutorDriver, taskInfo *mesos.Ta
return return
} }
taskId := taskInfo.GetTaskId().GetValue()
k.lock.Lock() k.lock.Lock()
defer k.lock.Unlock() defer k.lock.Unlock()
taskId := taskInfo.GetTaskId().GetValue()
if _, found := k.tasks[taskId]; found { if _, found := k.tasks[taskId]; found {
log.Errorf("task already launched\n") log.Errorf("task already launched\n")
// Not to send back TASK_RUNNING here, because // Not to send back TASK_RUNNING here, because
@ -390,51 +374,13 @@ func (k *Executor) LaunchTask(driver bindings.ExecutorDriver, taskInfo *mesos.Ta
// (c) we're leaving podName == "" for now, indicates we don't need to delete containers // (c) we're leaving podName == "" for now, indicates we don't need to delete containers
k.tasks[taskId] = &kuberTask{ k.tasks[taskId] = &kuberTask{
mesosTaskInfo: taskInfo, mesosTaskInfo: taskInfo,
launchTimer: time.NewTimer(k.launchGracePeriod),
} }
k.resetSuicideWatch(driver) k.resetSuicideWatch(driver)
go k.launchTask(driver, taskId, pod) go k.launchTask(driver, taskId, pod)
} }
func (k *Executor) handleChangedApiserverPod(pod *api.Pod) {
// exclude "pre-scheduled" pods which have a NodeName set to this node without being scheduled already
taskId := pod.Annotations[meta.TaskIdKey]
if taskId == "" {
log.V(5).Infof("ignoring pod update for %s/%s because %s annotation is missing", pod.Namespace, pod.Name, meta.TaskIdKey)
return
}
k.lock.Lock()
defer k.lock.Unlock()
// exclude tasks which are already deleted from our task registry
task := k.tasks[taskId]
if task == nil {
log.Warningf("task %s for pod %s/%s not found", taskId, pod.Namespace, pod.Name)
return
}
oldPod := k.pods[task.podName]
// terminating pod?
if oldPod != nil && pod.Status.Phase == api.PodRunning {
timeModified := differentTime(oldPod.DeletionTimestamp, pod.DeletionTimestamp)
graceModified := differentPeriod(oldPod.DeletionGracePeriodSeconds, pod.DeletionGracePeriodSeconds)
if timeModified || graceModified {
log.Infof("pod %s/%s is terminating at %v with %vs grace period, telling kubelet", pod.Namespace, pod.Name, *pod.DeletionTimestamp, *pod.DeletionGracePeriodSeconds)
// modify the pod in our registry instead of sending the new pod. The later
// would allow that other changes bleed into the kubelet. For now we are
// very conservative changing this behaviour.
// TODO(sttts): check whether we can and should send all changes down to the kubelet
oldPod.DeletionTimestamp = pod.DeletionTimestamp
oldPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds
k.sendPodsSnapshot()
}
}
}
// determine whether we need to start a suicide countdown. if so, then start // determine whether we need to start a suicide countdown. if so, then start
// a timer that, upon expiration, causes this executor to commit suicide. // a timer that, upon expiration, causes this executor to commit suicide.
// this implementation runs asynchronously. callers that wish to wait for the // this implementation runs asynchronously. callers that wish to wait for the
@ -619,17 +565,10 @@ func (k *Executor) launchTask(driver bindings.ExecutorDriver, taskId string, pod
psf := podStatusFunc(func() (*api.PodStatus, error) { psf := podStatusFunc(func() (*api.PodStatus, error) {
return k.podStatusFunc(pod) return k.podStatusFunc(pod)
}) })
go k._launchTask(driver, taskId, podFullName, psf) go k._launchTask(driver, taskId, podFullName, psf, task.launchTimer.C)
} }
func (k *Executor) _launchTask(driver bindings.ExecutorDriver, taskId, podFullName string, psf podStatusFunc) { func (k *Executor) _launchTask(driver bindings.ExecutorDriver, taskId, podFullName string, psf podStatusFunc, expired <-chan time.Time) {
expired := make(chan struct{})
if k.launchGracePeriod > 0 {
time.AfterFunc(k.launchGracePeriod, func() { close(expired) })
}
getMarshalledInfo := func() (data []byte, cancel bool) { getMarshalledInfo := func() (data []byte, cancel bool) {
// potentially long call.. // potentially long call..
if podStatus, err := psf(); err == nil && podStatus != nil { if podStatus, err := psf(); err == nil && podStatus != nil {
@ -677,7 +616,8 @@ waitForRunningPod:
} else { } else {
k.lock.Lock() k.lock.Lock()
defer k.lock.Unlock() defer k.lock.Unlock()
if _, found := k.tasks[taskId]; !found { task, found := k.tasks[taskId]
if !found {
goto reportLost goto reportLost
} }
@ -689,6 +629,7 @@ waitForRunningPod:
} }
k.sendStatus(driver, statusUpdate) k.sendStatus(driver, statusUpdate)
task.launchTimer.Stop()
// continue to monitor the health of the pod // continue to monitor the health of the pod
go k.__launchTask(driver, taskId, podFullName, psf) go k.__launchTask(driver, taskId, podFullName, psf)
@ -946,7 +887,7 @@ func (k *Executor) sendFrameworkMessage(driver bindings.ExecutorDriver, msg stri
} }
} }
func (k *Executor) sendLoop() { func (k *Executor) runSendLoop() {
defer log.V(1).Info("sender loop exiting") defer log.V(1).Info("sender loop exiting")
for { for {
select { select {
@ -982,53 +923,7 @@ func (k *Executor) sendLoop() {
} }
} }
func differentTime(a, b *unversionedapi.Time) bool { func annotationsFor(ei *mesos.ExecutorInfo) (annotations map[string]string, err error) {
return (a == nil) != (b == nil) || (a != nil && b != nil && *a != *b)
}
func differentPeriod(a, b *int64) bool {
return (a == nil) != (b == nil) || (a != nil && b != nil && *a != *b)
}
func nodeInfo(si *mesos.SlaveInfo, ei *mesos.ExecutorInfo) NodeInfo {
var executorCPU, executorMem float64
// get executor resources
if ei != nil {
for _, r := range ei.GetResources() {
if r == nil || r.GetType() != mesos.Value_SCALAR {
continue
}
switch r.GetName() {
case "cpus":
executorCPU = r.GetScalar().GetValue()
case "mem":
executorMem = r.GetScalar().GetValue()
}
}
}
// get resource capacity of the node
ni := NodeInfo{}
for _, r := range si.GetResources() {
if r == nil || r.GetType() != mesos.Value_SCALAR {
continue
}
switch r.GetName() {
case "cpus":
// We intentionally take the floor of executorCPU because cores are integers
// and we would loose a complete cpu here if the value is <1.
// TODO(sttts): switch to float64 when "Machine Allocables" are implemented
ni.Cores = int(r.GetScalar().GetValue() - float64(int(executorCPU)))
case "mem":
ni.Mem = int64(r.GetScalar().GetValue()-executorMem) * 1024 * 1024
}
}
return ni
}
func executorInfoToAnnotations(ei *mesos.ExecutorInfo) (annotations map[string]string, err error) {
annotations = map[string]string{} annotations = map[string]string{}
if ei == nil { if ei == nil {
return return
@ -1044,3 +939,36 @@ func executorInfoToAnnotations(ei *mesos.ExecutorInfo) (annotations map[string]s
return return
} }
// updateTask executes some mutating operation for the given task/pod, blocking until the update is either
// attempted or discarded. uses the executor state lock to synchronize concurrent invocation. returns true
// only if the specified update operation was attempted and also returns true. a result of true also indicates
// changes have been sent upstream to the kubelet.
func (k *Executor) updateTask(taskId string, f func(*kuberTask, *api.Pod) bool) (changed bool, err error) {
k.lock.Lock()
defer k.lock.Unlock()
// exclude tasks which are already deleted from our task registry
task := k.tasks[taskId]
if task == nil {
// the pod has completed the launch-task-binding phase because it's been annotated with
// the task-id, but we don't have a record of it; it's best to let the scheduler reconcile.
// it's also possible that our update queue is backed up and hasn't caught up with the state
// of the world yet.
// TODO(jdef) should we hint to the scheduler (via TASK_FAILED, reason=PodRefersToUnknownTask)?
err = fmt.Errorf("task %s not found", taskId)
return
}
oldPod := k.pods[task.podName]
changed = f(task, oldPod)
// TODO(jdef) this abstraction isn't perfect since changes that only impact the task struct,
// and not the pod, don't require a new pod snapshot sent back to the kubelet.
if changed {
k.sendPodsSnapshot()
}
return
}

View File

@ -608,3 +608,77 @@ func TestExecutorsendFrameworkMessage(t *testing.T) {
} }
mockDriver.AssertExpectations(t) mockDriver.AssertExpectations(t)
} }
func TestExecutor_updateMetaMap(t *testing.T) {
for i, tc := range []struct {
oldmap map[string]string
newmap map[string]string
wants bool
}{
{
oldmap: nil,
newmap: nil,
wants: false,
},
{
oldmap: nil,
newmap: map[string]string{},
wants: false,
},
{
oldmap: map[string]string{},
newmap: nil,
wants: false,
},
{
oldmap: nil,
newmap: map[string]string{
"foo": "bar",
},
wants: true,
},
{
oldmap: map[string]string{},
newmap: map[string]string{
"foo": "bar",
},
wants: true,
},
{
oldmap: map[string]string{
"baz": "qax",
},
newmap: map[string]string{
"foo": "bar",
},
wants: true,
},
{
oldmap: map[string]string{
"baz": "qax",
},
newmap: nil,
wants: true,
},
{
oldmap: map[string]string{
"baz": "qax",
"qwe": "iop",
},
newmap: map[string]string{
"foo": "bar",
"qwe": "iop",
},
wants: true,
},
} {
// do work here
actual := updateMetaMap(&tc.oldmap, tc.newmap)
if actual != tc.wants {
t.Fatalf("test case %d failed, expected %v but got %v instead", i, tc.wants, actual)
}
if len(tc.oldmap) != len(tc.newmap) || (len(tc.oldmap) > 0 && !reflect.DeepEqual(tc.oldmap, tc.newmap)) {
t.Fatalf("test case %d failed, expected %v but got %v instead", i, tc.newmap, tc.oldmap)
}
}
}

View File

@ -0,0 +1,64 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executor
import (
mesos "github.com/mesos/mesos-go/mesosproto"
)
type NodeInfo struct {
Cores int
Mem int64 // in bytes
}
func nodeInfo(si *mesos.SlaveInfo, ei *mesos.ExecutorInfo) NodeInfo {
var executorCPU, executorMem float64
// get executor resources
if ei != nil {
for _, r := range ei.GetResources() {
if r == nil || r.GetType() != mesos.Value_SCALAR {
continue
}
switch r.GetName() {
case "cpus":
executorCPU = r.GetScalar().GetValue()
case "mem":
executorMem = r.GetScalar().GetValue()
}
}
}
// get resource capacity of the node
ni := NodeInfo{}
for _, r := range si.GetResources() {
if r == nil || r.GetType() != mesos.Value_SCALAR {
continue
}
switch r.GetName() {
case "cpus":
// We intentionally take the floor of executorCPU because cores are integers
// and we would loose a complete cpu here if the value is <1.
// TODO(sttts): switch to float64 when "Machine Allocables" are implemented
ni.Cores = int(r.GetScalar().GetValue() - float64(int(executorCPU)))
case "mem":
ni.Mem = int64(r.GetScalar().GetValue()-executorMem) * 1024 * 1024
}
}
return ni
}

View File

@ -0,0 +1,171 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executor
import (
log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller/framework"
)
// taskUpdateTx execute a task update transaction f for the task identified by
// taskId. if no such task exists then f is not invoked and an error is
// returned. if f is invoked then taskUpdateTx returns the bool result of f.
type taskUpdateTx func(taskId string, f func(*kuberTask, *api.Pod) bool) (changed bool, err error)
// podObserver receives callbacks for every pod state change on the apiserver and
// for each decides whether to execute a task update transaction.
type podObserver struct {
podController *framework.Controller
terminate <-chan struct{}
taskUpdateTx taskUpdateTx
}
func newPodObserver(podLW cache.ListerWatcher, taskUpdateTx taskUpdateTx, terminate <-chan struct{}) *podObserver {
// watch pods from the given pod ListWatch
if podLW == nil {
// fail early to make debugging easier
panic("cannot create executor with nil PodLW")
}
p := &podObserver{
terminate: terminate,
taskUpdateTx: taskUpdateTx,
}
_, p.podController = framework.NewInformer(podLW, &api.Pod{}, podRelistPeriod, &framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*api.Pod)
log.V(4).Infof("pod %s/%s created on apiserver", pod.Namespace, pod.Name)
p.handleChangedApiserverPod(pod)
},
UpdateFunc: func(oldObj, newObj interface{}) {
pod := newObj.(*api.Pod)
log.V(4).Infof("pod %s/%s updated on apiserver", pod.Namespace, pod.Name)
p.handleChangedApiserverPod(pod)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*api.Pod)
log.V(4).Infof("pod %s/%s deleted on apiserver", pod.Namespace, pod.Name)
},
})
return p
}
// run begins observing pod state changes; blocks until the terminate chan closes.
func (p *podObserver) run() {
p.podController.Run(p.terminate)
}
// handleChangedApiserverPod is invoked for pod add/update state changes and decides whether
// task updates are necessary. if so, a task update is executed via taskUpdateTx.
func (p *podObserver) handleChangedApiserverPod(pod *api.Pod) {
// Don't do anything for pods without task anotation which means:
// - "pre-scheduled" pods which have a NodeName set to this node without being scheduled already.
// - static/mirror pods: they'll never have a TaskID annotation, and we don't expect them to ever change.
// - all other pods that haven't passed through the launch-task-binding phase, which would set annotations.
taskId := pod.Annotations[meta.TaskIdKey]
if taskId == "" {
// There also could be a race between the overall launch-task process and this update, but here we
// will never be able to process such a stale update because the "update pod" that we're receiving
// in this func won't yet have a task ID annotation. It follows that we can safely drop such a stale
// update on the floor because we'll get another update later that, in addition to the changes that
// we're dropping now, will also include the changes from the binding process.
log.V(5).Infof("ignoring pod update for %s/%s because %s annotation is missing", pod.Namespace, pod.Name, meta.TaskIdKey)
return
}
_, err := p.taskUpdateTx(taskId, func(_ *kuberTask, relatedPod *api.Pod) (sendSnapshot bool) {
if relatedPod == nil {
// should never happen because:
// (a) the update pod record has already passed through the binding phase in launchTasks()
// (b) all remaining updates to executor.{pods,tasks} are sync'd in unison
log.Errorf("internal state error: pod not found for task %s", taskId)
return
}
// TODO(sttts): check whether we can and should send all "semantic" changes down to the kubelet
// see kubelet/config/config.go for semantic change detection
// check for updated labels/annotations: need to forward these for the downward API
sendSnapshot = sendSnapshot || updateMetaMap(&relatedPod.Labels, pod.Labels)
sendSnapshot = sendSnapshot || updateMetaMap(&relatedPod.Annotations, pod.Annotations)
// terminating pod?
if pod.Status.Phase == api.PodRunning {
timeModified := differentTime(relatedPod.DeletionTimestamp, pod.DeletionTimestamp)
graceModified := differentPeriod(relatedPod.DeletionGracePeriodSeconds, pod.DeletionGracePeriodSeconds)
if timeModified || graceModified {
log.Infof("pod %s/%s is terminating at %v with %vs grace period, telling kubelet",
pod.Namespace, pod.Name, *pod.DeletionTimestamp, *pod.DeletionGracePeriodSeconds)
// modify the pod in our registry instead of sending the new pod. The later
// would allow that other changes bleed into the kubelet. For now we are
// very conservative changing this behaviour.
relatedPod.DeletionTimestamp = pod.DeletionTimestamp
relatedPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds
sendSnapshot = true
}
}
return
})
if err != nil {
log.Errorf("failed to update pod %s/%s: %+v", pod.Namespace, pod.Name, err)
}
}
// updateMetaMap looks for differences between src and dest; if there are differences
// then dest is changed (possibly to point to src) and this func returns true.
func updateMetaMap(dest *map[string]string, src map[string]string) (changed bool) {
// check for things in dest that are missing in src
for k := range *dest {
if _, ok := src[k]; !ok {
changed = true
break
}
}
if !changed {
if len(*dest) == 0 {
if len(src) > 0 {
changed = true
goto finished
}
// no update needed
return
}
// check for things in src that are missing/different in dest
for k, v := range src {
if vv, ok := (*dest)[k]; !ok || vv != v {
changed = true
break
}
}
}
finished:
*dest = src
return
}
func differentTime(a, b *unversioned.Time) bool {
return (a == nil) != (b == nil) || (a != nil && b != nil && *a != *b)
}
func differentPeriod(a, b *int64) bool {
return (a == nil) != (b == nil) || (a != nil && b != nil && *a != *b)
}

View File

@ -78,7 +78,7 @@ func (r *clientRegistrator) Run(terminate <-chan struct{}) error {
loop := func() { loop := func() {
RegistrationLoop: RegistrationLoop:
for { for {
obj := r.queue.CancelablePop(terminate) obj := r.queue.Pop(terminate)
if obj == nil { if obj == nil {
break RegistrationLoop break RegistrationLoop
} }

View File

@ -441,7 +441,7 @@ func (s *offerStorage) ageOffers() {
} }
func (s *offerStorage) nextListener() *offerListener { func (s *offerStorage) nextListener() *offerListener {
obj := s.listeners.Pop() obj := s.listeners.Pop(nil)
if listen, ok := obj.(*offerListener); !ok { if listen, ok := obj.(*offerListener); !ok {
//programming error //programming error
panic(fmt.Sprintf("unexpected listener object %v", obj)) panic(fmt.Sprintf("unexpected listener object %v", obj))

View File

@ -302,12 +302,18 @@ func (f *DelayFIFO) Get(id string) (UniqueID, bool) {
// Variant of DelayQueue.Pop() for UniqueDelayed items // Variant of DelayQueue.Pop() for UniqueDelayed items
func (q *DelayFIFO) Await(timeout time.Duration) UniqueID { func (q *DelayFIFO) Await(timeout time.Duration) UniqueID {
cancel := make(chan struct{}) var (
ch := make(chan interface{}, 1) cancel = make(chan struct{})
ch = make(chan interface{}, 1)
t = time.NewTimer(timeout)
)
defer t.Stop()
go func() { ch <- q.pop(cancel) }() go func() { ch <- q.pop(cancel) }()
var x interface{} var x interface{}
select { select {
case <-time.After(timeout): case <-t.C:
close(cancel) close(cancel)
x = <-ch x = <-ch
case x = <-ch: case x = <-ch:
@ -319,13 +325,19 @@ func (q *DelayFIFO) Await(timeout time.Duration) UniqueID {
return nil return nil
} }
// Variant of DelayQueue.Pop() for UniqueDelayed items // Pop blocks until either there is an item available to dequeue or else the specified
func (q *DelayFIFO) Pop() UniqueID { // cancel chan is closed. Callers that have no interest in providing a cancel chan
return q.pop(nil).(UniqueID) // should specify nil, or else WithoutCancel() (for readability).
func (q *DelayFIFO) Pop(cancel <-chan struct{}) UniqueID {
x := q.pop(cancel)
if x == nil {
return nil
}
return x.(UniqueID)
} }
// variant of DelayQueue.Pop that implements optional cancellation // variant of DelayQueue.Pop that implements optional cancellation
func (q *DelayFIFO) pop(cancel chan struct{}) interface{} { func (q *DelayFIFO) pop(cancel <-chan struct{}) interface{} {
next := func() *qitem { next := func() *qitem {
q.lock() q.lock()
defer q.unlock() defer q.unlock()

View File

@ -358,7 +358,7 @@ func TestDFIFO_sanity_check(t *testing.T) {
// pop last // pop last
before := time.Now() before := time.Now()
x := df.Pop() x := df.Pop(nil)
assert.Equal(a.(*testjob).instance, 2) assert.Equal(a.(*testjob).instance, 2)
now := time.Now() now := time.Now()
@ -395,7 +395,7 @@ func TestDFIFO_Offer(t *testing.T) {
} }
before := time.Now() before := time.Now()
x := dq.Pop() x := dq.Pop(nil)
now := time.Now() now := time.Now()
waitPeriod := now.Sub(before) waitPeriod := now.Sub(before)

View File

@ -220,22 +220,28 @@ func (f *HistoricalFIFO) Poll(id string, t EventType) bool {
// Variant of DelayQueue.Pop() for UniqueDelayed items // Variant of DelayQueue.Pop() for UniqueDelayed items
func (q *HistoricalFIFO) Await(timeout time.Duration) interface{} { func (q *HistoricalFIFO) Await(timeout time.Duration) interface{} {
cancel := make(chan struct{}) var (
ch := make(chan interface{}, 1) cancel = make(chan struct{})
go func() { ch <- q.CancelablePop(cancel) }() ch = make(chan interface{}, 1)
t = time.NewTimer(timeout)
)
defer t.Stop()
go func() { ch <- q.Pop(cancel) }()
select { select {
case <-time.After(timeout): case <-t.C:
close(cancel) close(cancel)
return <-ch return <-ch
case x := <-ch: case x := <-ch:
return x return x
} }
} }
func (f *HistoricalFIFO) Pop() interface{} {
return f.CancelablePop(nil)
}
func (f *HistoricalFIFO) CancelablePop(cancel <-chan struct{}) interface{} { // Pop blocks until either there is an item available to dequeue or else the specified
// cancel chan is closed. Callers that have no interest in providing a cancel chan
// should specify nil, or else WithoutCancel() (for readability).
func (f *HistoricalFIFO) Pop(cancel <-chan struct{}) interface{} {
popEvent := (Entry)(nil) popEvent := (Entry)(nil)
defer func() { defer func() {
f.carrier(popEvent) f.carrier(popEvent)

View File

@ -75,7 +75,7 @@ func TestFIFO_basic(t *testing.T) {
lastInt := _int(0) lastInt := _int(0)
lastUint := _uint(0) lastUint := _uint(0)
for i := 0; i < amount*2; i++ { for i := 0; i < amount*2; i++ {
switch obj := f.Pop().(type) { switch obj := f.Pop(nil).(type) {
case _int: case _int:
if obj <= lastInt { if obj <= lastInt {
t.Errorf("got %v (int) out of order, last was %v", obj, lastInt) t.Errorf("got %v (int) out of order, last was %v", obj, lastInt)
@ -100,7 +100,7 @@ func TestFIFO_addUpdate(t *testing.T) {
got := make(chan *testObj, 2) got := make(chan *testObj, 2)
go func() { go func() {
for { for {
got <- f.Pop().(*testObj) got <- f.Pop(nil).(*testObj)
} }
}() }()
@ -126,7 +126,7 @@ func TestFIFO_addReplace(t *testing.T) {
got := make(chan *testObj, 2) got := make(chan *testObj, 2)
go func() { go func() {
for { for {
got <- f.Pop().(*testObj) got <- f.Pop(nil).(*testObj)
} }
}() }()
@ -158,24 +158,24 @@ func TestFIFO_detectLineJumpers(t *testing.T) {
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
defer close(done) defer close(done)
if e, a := 13, f.Pop().(*testObj).value; a != e { if e, a := 13, f.Pop(nil).(*testObj).value; a != e {
err = fmt.Errorf("expected %d, got %d", e, a) err = fmt.Errorf("expected %d, got %d", e, a)
return return
} }
f.Add(&testObj{"foo", 14}) // ensure foo doesn't jump back in line f.Add(&testObj{"foo", 14}) // ensure foo doesn't jump back in line
if e, a := 1, f.Pop().(*testObj).value; a != e { if e, a := 1, f.Pop(nil).(*testObj).value; a != e {
err = fmt.Errorf("expected %d, got %d", e, a) err = fmt.Errorf("expected %d, got %d", e, a)
return return
} }
if e, a := 30, f.Pop().(*testObj).value; a != e { if e, a := 30, f.Pop(nil).(*testObj).value; a != e {
err = fmt.Errorf("expected %d, got %d", e, a) err = fmt.Errorf("expected %d, got %d", e, a)
return return
} }
if e, a := 14, f.Pop().(*testObj).value; a != e { if e, a := 14, f.Pop(nil).(*testObj).value; a != e {
err = fmt.Errorf("expected %d, got %d", e, a) err = fmt.Errorf("expected %d, got %d", e, a)
return return
} }

View File

@ -59,7 +59,7 @@ type FIFO interface {
// ready, they are returned in the order in which they were added/updated. // ready, they are returned in the order in which they were added/updated.
// The item is removed from the queue (and the store) before it is returned, // The item is removed from the queue (and the store) before it is returned,
// so if you don't successfully process it, you need to add it back with Add(). // so if you don't successfully process it, you need to add it back with Add().
Pop() interface{} Pop(cancel <-chan struct{}) interface{}
// Await attempts to Pop within the given interval; upon success the non-nil // Await attempts to Pop within the given interval; upon success the non-nil
// item is returned, otherwise nil // item is returned, otherwise nil
@ -101,3 +101,5 @@ type UniqueDeadlined interface {
UniqueID UniqueID
Deadlined Deadlined
} }
func WithoutCancel() <-chan struct{} { return nil }

View File

@ -89,7 +89,7 @@ func (k *errorHandler) Error(pod *api.Pod, schedulingErr error) {
} }
delay := k.backoff.Get(podKey) delay := k.backoff.Get(podKey)
log.V(3).Infof("requeuing pod %v with delay %v", podKey, delay) log.V(3).Infof("requeuing pod %v with delay %v", podKey, delay)
k.qr.Requeue(&queuer.Pod{Pod: pod, Delay: &delay, Notify: breakoutEarly}) k.qr.Requeue(queuer.NewPod(pod, queuer.Delay(delay), queuer.Notify(breakoutEarly)))
default: default:
log.V(2).Infof("Task is no longer pending, aborting reschedule for pod %v", podKey) log.V(2).Infof("Task is no longer pending, aborting reschedule for pod %v", podKey)

View File

@ -104,7 +104,7 @@ func (s *podReconciler) Reconcile(t *podtask.T) {
now := time.Now() now := time.Now()
log.V(3).Infof("reoffering pod %v", podKey) log.V(3).Infof("reoffering pod %v", podKey)
s.qr.Reoffer(queuer.NewPodWithDeadline(pod, &now)) s.qr.Reoffer(queuer.NewPod(pod, queuer.Deadline(now)))
} else { } else {
// pod is scheduled. // pod is scheduled.
// not sure how this happened behind our backs. attempt to reconstruct // not sure how this happened behind our backs. attempt to reconstruct

View File

@ -25,16 +25,44 @@ import (
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
) )
// functional Pod option
type PodOpt func(*Pod)
// wrapper for the k8s pod type so that we can define additional methods on a "pod" // wrapper for the k8s pod type so that we can define additional methods on a "pod"
type Pod struct { type Pod struct {
*api.Pod *api.Pod
deadline *time.Time deadline *time.Time
Delay *time.Duration delay *time.Duration
Notify queue.BreakChan notify queue.BreakChan
} }
func NewPodWithDeadline(pod *api.Pod, deadline *time.Time) *Pod { func NewPod(pod *api.Pod, opt ...PodOpt) *Pod {
return &Pod{Pod: pod, deadline: deadline} p := &Pod{Pod: pod}
for _, f := range opt {
f(p)
}
return p
}
// Deadline sets the deadline for a Pod
func Deadline(deadline time.Time) PodOpt {
return func(pod *Pod) {
pod.deadline = &deadline
}
}
// Delay sets the delay for a Pod
func Delay(delay time.Duration) PodOpt {
return func(pod *Pod) {
pod.delay = &delay
}
}
// Notify sets the breakout notification channel for a Pod
func Notify(notify queue.BreakChan) PodOpt {
return func(pod *Pod) {
pod.notify = notify
}
} }
// implements Copyable // implements Copyable
@ -65,14 +93,14 @@ func (dp *Pod) Deadline() (time.Time, bool) {
} }
func (dp *Pod) GetDelay() time.Duration { func (dp *Pod) GetDelay() time.Duration {
if dp.Delay != nil { if dp.delay != nil {
return *(dp.Delay) return *(dp.delay)
} }
return 0 return 0
} }
func (p *Pod) Breaker() queue.BreakChan { func (p *Pod) Breaker() queue.BreakChan {
return p.Notify return p.notify
} }
func (p *Pod) String() string { func (p *Pod) String() string {