kubelet: independent pod syncs and backoff on error

Currently kubelet syncs all pods every 10s. This is not preferred because
 * Some pods may have been sync'd recently.
 * This may cause all the pods to be sync'd at once, causing undesirable
   CPU spikes.

This PR replaces the global syncs with independent, periodic pod syncs. At the
end of syncing, each pod worker will enqueue itslef with a future timestamp (
current time + sync interval), when it will be due for another sync.
 * If the pod worker encoutners an sync error, it may requeue with a different
   timestamp to retry sooner.
 * If a sync is triggered by the update channel (events or spec changes), the
   pod worker would enqueue a new sync time.

This change is necessary for moving to long or no periodic sync period once pod
lifecycle event generator is completed. We will still rely on the mechanism to
requeue the pod on sync error.

This change also makes sure that if a sync does not succeed (either due to
real error or the per-container backoff mechanism), an error would be propagated
back to the pod worker, which is responsible for requeuing.
pull/6/head
Yu-Ju Hong 2015-09-02 10:18:11 -07:00
parent 1407fd2071
commit 2eb17df46b
9 changed files with 272 additions and 58 deletions

View File

@ -1827,8 +1827,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
}
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
err = dm.KillPod(pod, runningPod)
if err != nil {
if err := dm.KillPod(pod, runningPod); err != nil {
return err
}
} else {
@ -1845,9 +1844,9 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
break
}
}
err = dm.KillContainerInPod(container.ID, podContainer, pod)
if err != nil {
if err := dm.KillContainerInPod(container.ID, podContainer, pod); err != nil {
glog.Errorf("Error killing container: %v", err)
return err
}
}
}
@ -1893,6 +1892,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
pod.Status.PodIP = dm.determineContainerIP(pod.Name, pod.Namespace, podInfraContainer)
}
containersStarted := 0
// Start everything
for idx := range containerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
@ -1946,11 +1946,15 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), container.Name, err)
continue
}
containersStarted++
// Successfully started the container; clear the entry in the failure
// reason cache.
dm.clearReasonCache(pod, container)
}
if containersStarted != len(containerChanges.ContainersToStart) {
return fmt.Errorf("not all containers have started: %d != %d", containersStarted, containerChanges.ContainersToStart)
}
return nil
}

View File

@ -538,7 +538,7 @@ func generatePodInfraContainerHash(pod *api.Pod) uint64 {
// runSyncPod is a helper function to retrieve the running pods from the fake
// docker client and runs SyncPod for the given pod.
func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, pod *api.Pod, backOff *util.Backoff) {
func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, pod *api.Pod, backOff *util.Backoff, expectErr bool) {
runningPods, err := dm.GetPods(false)
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -554,8 +554,10 @@ func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, p
backOff = util.NewBackOff(time.Second, time.Minute)
}
err = dm.SyncPod(pod, runningPod, *podStatus, []api.Secret{}, backOff)
if err != nil {
if err != nil && !expectErr {
t.Errorf("unexpected error: %v", err)
} else if err == nil && expectErr {
t.Errorf("expected error didn't occur")
}
}
@ -576,7 +578,7 @@ func TestSyncPodCreateNetAndContainer(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{
// Create pod infra container.
"create", "start", "inspect_container", "inspect_container",
@ -623,7 +625,7 @@ func TestSyncPodCreatesNetAndContainerPullsImage(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{
// Create pod infra container.
@ -675,7 +677,7 @@ func TestSyncPodWithPodInfraCreatesContainer(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{
// Inspect pod infra container (but does not create)"
@ -722,7 +724,7 @@ func TestSyncPodDeletesWithNoPodInfraContainer(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{
// Kill the container since pod infra container is not running.
@ -795,7 +797,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{
// Check the pod infra container.
@ -849,7 +851,7 @@ func TestSyncPodBadHash(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{
// Check the pod infra container.
@ -906,7 +908,7 @@ func TestSyncPodsUnhealthy(t *testing.T) {
}
dm.livenessManager.Set(kubetypes.DockerID(unhealthyContainerID).ContainerID(), proberesults.Failure, nil)
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{
// Check the pod infra container.
@ -963,7 +965,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{
// Check the pod infra contianer.
@ -1004,7 +1006,7 @@ func TestSyncPodWithPullPolicy(t *testing.T) {
Message: "Container image \"pull_never_image\" is not present with pull policy of Never"}},
}
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, true)
statuses, err := dm.GetPodStatus(pod)
if err != nil {
t.Errorf("unable to get pod status")
@ -1147,7 +1149,7 @@ func TestSyncPodWithRestartPolicy(t *testing.T) {
fakeDocker.ContainerMap = containerMap
pod.Spec.RestartPolicy = tt.policy
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, false)
// 'stop' is because the pod infra container is killed when no container is running.
verifyCalls(t, fakeDocker, tt.calls)
@ -1267,7 +1269,7 @@ func TestGetPodStatusWithLastTermination(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, false)
// Check if we can retrieve the pod status.
status, err := dm.GetPodStatus(pod)
@ -1377,16 +1379,17 @@ func TestSyncPodBackoff(t *testing.T) {
backoff int
killDelay int
result []string
expectErr bool
}{
{1, 1, 1, startCalls},
{2, 2, 2, startCalls},
{3, 2, 3, backOffCalls},
{4, 4, 4, startCalls},
{5, 4, 5, backOffCalls},
{6, 4, 6, backOffCalls},
{7, 4, 7, backOffCalls},
{8, 8, 129, startCalls},
{130, 1, 0, startCalls},
{1, 1, 1, startCalls, false},
{2, 2, 2, startCalls, false},
{3, 2, 3, backOffCalls, true},
{4, 4, 4, startCalls, false},
{5, 4, 5, backOffCalls, true},
{6, 4, 6, backOffCalls, true},
{7, 4, 7, backOffCalls, true},
{8, 8, 129, startCalls, false},
{130, 1, 0, startCalls, false},
}
backOff := util.NewBackOff(time.Second, time.Minute)
@ -1397,7 +1400,7 @@ func TestSyncPodBackoff(t *testing.T) {
fakeDocker.ContainerList = containerList
fakeClock.Time = startTime.Add(time.Duration(c.tick) * time.Second)
runSyncPod(t, dm, fakeDocker, pod, backOff)
runSyncPod(t, dm, fakeDocker, pod, backOff, c.expectErr)
verifyCalls(t, fakeDocker, c.result)
if backOff.Get(stableId) != time.Duration(c.backoff)*time.Second {
@ -1448,7 +1451,7 @@ func TestGetPodCreationFailureReason(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, true)
// Check if we can retrieve the pod status.
status, err := dm.GetPodStatus(pod)
if err != nil {
@ -1504,7 +1507,7 @@ func TestGetPodPullImageFailureReason(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, true)
// Check if we can retrieve the pod status.
status, err := dm.GetPodStatus(pod)
if err != nil {
@ -1544,7 +1547,7 @@ func TestGetRestartCount(t *testing.T) {
// Helper function for verifying the restart count.
verifyRestartCount := func(pod *api.Pod, expectedCount int) api.PodStatus {
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, false)
status, err := dm.GetPodStatus(pod)
if err != nil {
t.Fatalf("unexpected error %v", err)
@ -1621,7 +1624,7 @@ func TestGetTerminationMessagePath(t *testing.T) {
fakeDocker.ContainerMap = map[string]*docker.Container{}
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, false)
containerList := fakeDocker.ContainerList
if len(containerList) != 2 {
@ -1680,7 +1683,7 @@ func TestSyncPodWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{
// Check the pod infra container.
@ -1743,7 +1746,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, true)
verifyCalls(t, fakeDocker, []string{
// Check the pod infra container.
@ -1817,7 +1820,7 @@ func TestSyncPodWithTerminationLog(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{
// Create pod infra container.
"create", "start", "inspect_container", "inspect_container",
@ -1857,7 +1860,7 @@ func TestSyncPodWithHostNetwork(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod, nil)
runSyncPod(t, dm, fakeDocker, pod, nil, false)
verifyCalls(t, fakeDocker, []string{
// Create pod infra container.

View File

@ -61,6 +61,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"
@ -432,7 +433,10 @@ func NewMainKubelet(
return nil, err
}
klet.runtimeCache = runtimeCache
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder)
klet.workQueue = queue.NewBasicWorkQueue()
// TODO(yujuhong): backoff and resync interval should be set differently
// once we switch to using pod event generator.
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, klet.resyncInterval)
metrics.Register(runtimeCache)
@ -468,13 +472,14 @@ type nodeLister interface {
// Kubelet is the main kubelet implementation.
type Kubelet struct {
hostname string
nodeName string
dockerClient dockertools.DockerInterface
runtimeCache kubecontainer.RuntimeCache
kubeClient client.Interface
rootDirectory string
podWorkers PodWorkers
hostname string
nodeName string
dockerClient dockertools.DockerInterface
runtimeCache kubecontainer.RuntimeCache
kubeClient client.Interface
rootDirectory string
podWorkers PodWorkers
resyncInterval time.Duration
resyncTicker *time.Ticker
sourcesReady SourcesReadyFn
@ -642,6 +647,9 @@ type Kubelet struct {
// Information about the ports which are opened by daemons on Node running this Kubelet server.
daemonEndpoints *api.NodeDaemonEndpoints
// A queue used to trigger pod workers.
workQueue queue.WorkQueue
}
func (kl *Kubelet) allSourcesReady() bool {
@ -1417,7 +1425,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
return nil
}
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error {
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) (syncErr error) {
podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID
start := time.Now()
@ -1438,6 +1446,8 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
status, err := kl.generatePodStatus(pod)
if err != nil {
glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
// Propagate the error upstream.
syncErr = err
} else {
podToUpdate := pod
if mirrorPod != nil {
@ -2073,7 +2083,10 @@ func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, str
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
glog.Info("Starting kubelet main sync loop.")
kl.resyncTicker = time.NewTicker(kl.resyncInterval)
// The resyncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
kl.resyncTicker = time.NewTicker(time.Second)
var housekeepingTimestamp time.Time
for {
if !kl.containerRuntimeUp() {
@ -2138,9 +2151,15 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
glog.Errorf("Kubelet does not support snapshot update")
}
case <-kl.resyncTicker.C:
// Periodically syncs all the pods and performs cleanup tasks.
glog.V(4).Infof("SyncLoop (periodic sync)")
handler.HandlePodSyncs(kl.podManager.GetPods())
podUIDs := kl.workQueue.GetWork()
var podsToSync []*api.Pod
for _, uid := range podUIDs {
if pod, ok := kl.podManager.GetPodByUID(uid); ok {
podsToSync = append(podsToSync, pod)
}
}
glog.V(2).Infof("SyncLoop (SYNC): %d pods", len(podsToSync))
kl.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
// We only care about failures (signalling container death) here.
if update.Result == proberesults.Failure {

View File

@ -50,6 +50,7 @@ import (
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
@ -145,6 +146,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.backOff.Clock = fakeClock
kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20)
kubelet.resyncInterval = 10 * time.Second
kubelet.workQueue = queue.NewBasicWorkQueue()
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient}
}

View File

@ -43,6 +43,7 @@ type Manager interface {
GetPods() []*api.Pod
GetPodByFullName(podFullName string) (*api.Pod, bool)
GetPodByName(namespace, name string) (*api.Pod, bool)
GetPodByUID(types.UID) (*api.Pod, bool)
GetPodByMirrorPod(*api.Pod) (*api.Pod, bool)
GetMirrorPodByPod(*api.Pod) (*api.Pod, bool)
GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod)
@ -177,6 +178,15 @@ func (pm *basicManager) GetPodByName(namespace, name string) (*api.Pod, bool) {
return pm.GetPodByFullName(podFullName)
}
// GetPodByUID provides the (non-mirror) pod that matches pod UID as well as
// whether the pod was found.
func (pm *basicManager) GetPodByUID(uid types.UID) (*api.Pod, bool) {
pm.lock.RLock()
defer pm.lock.RUnlock()
pod, ok := pm.podByUID[uid]
return pod, ok
}
// GetPodByName returns the (non-mirror) pod that matches full name, as well as
// whether the pod was found.
func (pm *basicManager) GetPodByFullName(podFullName string) (*api.Pod, bool) {

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
)
@ -55,6 +56,8 @@ type podWorkers struct {
// runtimeCache is used for listing running containers.
runtimeCache kubecontainer.RuntimeCache
workQueue queue.WorkQueue
// This function is run to sync the desired stated of pod.
// NOTE: This function has to be thread-safe - it can be called for
// different pods at the same time.
@ -62,6 +65,12 @@ type podWorkers struct {
// The EventRecorder to use
recorder record.EventRecorder
// backOffPeriod is the duration to back off when there is a sync error.
backOffPeriod time.Duration
// resyncInterval is the duration to wait until the next sync.
resyncInterval time.Duration
}
type workUpdate struct {
@ -79,7 +88,7 @@ type workUpdate struct {
}
func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType,
recorder record.EventRecorder) *podWorkers {
recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration) *podWorkers {
return &podWorkers{
podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{},
@ -87,37 +96,40 @@ func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnT
runtimeCache: runtimeCache,
syncPodFn: syncPodFn,
recorder: recorder,
workQueue: workQueue,
resyncInterval: resyncInterval,
backOffPeriod: backOffPeriod,
}
}
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
var minRuntimeCacheTime time.Time
for newWork := range podUpdates {
func() {
defer p.checkForUpdates(newWork.pod.UID, newWork.updateCompleteFn)
err := func() (err error) {
// We would like to have the state of the containers from at least
// the moment when we finished the previous processing of that pod.
if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil {
glog.Errorf("Error updating the container runtime cache: %v", err)
return
return err
}
pods, err := p.runtimeCache.GetPods()
if err != nil {
glog.Errorf("Error getting pods while syncing pod: %v", err)
return
return err
}
err = p.syncPodFn(newWork.pod, newWork.mirrorPod,
kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID), newWork.updateType)
minRuntimeCacheTime = time.Now()
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
p.recorder.Eventf(newWork.pod, "FailedSync", "Error syncing pod, skipping: %v", err)
return
return err
}
minRuntimeCacheTime = time.Now()
newWork.updateCompleteFn()
return nil
}()
p.wrapUp(newWork.pod.UID, err)
}
}
@ -192,7 +204,17 @@ func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty
}
}
func (p *podWorkers) checkForUpdates(uid types.UID, updateComplete func()) {
func (p *podWorkers) wrapUp(uid types.UID, syncErr error) {
// Requeue the last update if the last sync returned error.
if syncErr != nil {
p.workQueue.Enqueue(uid, p.backOffPeriod)
} else {
p.workQueue.Enqueue(uid, p.resyncInterval)
}
p.checkForUpdates(uid)
}
func (p *podWorkers) checkForUpdates(uid types.UID) {
p.podLock.Lock()
defer p.podLock.Unlock()
if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists {

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/types"
)
@ -66,6 +67,9 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
return nil
},
fakeRecorder,
queue.NewBasicWorkQueue(),
time.Second,
time.Second,
)
return podWorkers, processed
}
@ -200,7 +204,7 @@ func TestFakePodWorkers(t *testing.T) {
kubeletForRealWorkers := &simpleFakeKubelet{}
kubeletForFakeWorkers := &simpleFakeKubelet{}
realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder)
realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second)
fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeRuntimeCache, t}
tests := []struct {

View File

@ -0,0 +1,73 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"sync"
"time"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
)
// WorkQueue allows queueing items with a timestamp. An item is
// considered ready to process if the timestamp has expired.
type WorkQueue interface {
// GetWork dequeues and returns all ready items.
GetWork() []types.UID
// Enqueue inserts a new item or overwrites an existing item with the
// new timestamp (time.Now() + delay) if it is greater.
Enqueue(item types.UID, delay time.Duration)
}
type basicWorkQueue struct {
clock util.Clock
lock sync.Mutex
queue map[types.UID]time.Time
}
var _ WorkQueue = &basicWorkQueue{}
func NewBasicWorkQueue() WorkQueue {
queue := make(map[types.UID]time.Time)
return &basicWorkQueue{queue: queue, clock: util.RealClock{}}
}
func (q *basicWorkQueue) GetWork() []types.UID {
q.lock.Lock()
defer q.lock.Unlock()
now := q.clock.Now()
var items []types.UID
for k, v := range q.queue {
if v.Before(now) {
items = append(items, k)
delete(q.queue, k)
}
}
return items
}
func (q *basicWorkQueue) Enqueue(item types.UID, delay time.Duration) {
q.lock.Lock()
defer q.lock.Unlock()
now := q.clock.Now()
timestamp := now.Add(delay)
existing, ok := q.queue[item]
if !ok || (ok && existing.Before(timestamp)) {
q.queue[item] = timestamp
}
}

View File

@ -0,0 +1,77 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"testing"
"time"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
)
func newTestBasicWorkQueue() (*basicWorkQueue, *util.FakeClock) {
fakeClock := &util.FakeClock{Time: time.Now()}
wq := &basicWorkQueue{
clock: fakeClock,
queue: make(map[types.UID]time.Time),
}
return wq, fakeClock
}
func compareResults(t *testing.T, expected, actual []types.UID) {
expectedSet := sets.NewString()
for _, u := range expected {
expectedSet.Insert(string(u))
}
actualSet := sets.NewString()
for _, u := range actual {
actualSet.Insert(string(u))
}
if !expectedSet.Equal(actualSet) {
t.Errorf("Expected %#v, got %#v", expectedSet.List(), actualSet.List())
}
}
func TestGetWork(t *testing.T) {
q, clock := newTestBasicWorkQueue()
q.Enqueue(types.UID("foo1"), -1*time.Minute)
q.Enqueue(types.UID("foo2"), -1*time.Minute)
q.Enqueue(types.UID("foo3"), 1*time.Minute)
q.Enqueue(types.UID("foo4"), 1*time.Minute)
expected := []types.UID{types.UID("foo1"), types.UID("foo2")}
compareResults(t, expected, q.GetWork())
compareResults(t, []types.UID{}, q.GetWork())
// Dial the time to 1 hour ahead.
clock.Step(time.Hour)
expected = []types.UID{types.UID("foo3"), types.UID("foo4")}
compareResults(t, expected, q.GetWork())
compareResults(t, []types.UID{}, q.GetWork())
}
func TestEnqueueKeepGreaterTimestamp(t *testing.T) {
q, _ := newTestBasicWorkQueue()
item := types.UID("foo")
q.Enqueue(item, -7*time.Hour)
q.Enqueue(item, 3*time.Hour)
compareResults(t, []types.UID{}, q.GetWork())
q.Enqueue(item, 3*time.Hour)
q.Enqueue(item, -7*time.Hour)
compareResults(t, []types.UID{}, q.GetWork())
}