back off restarts of crashlooping containers

Signed-off-by: Sam Abed <samabed@gmail.com>
pull/6/head
Sam Abed 2015-08-13 22:59:15 +10:00
parent c92cc62145
commit 995cb15bb6
14 changed files with 505 additions and 34 deletions

View File

@ -77,7 +77,7 @@ More detailed information about the current (and previous) container statuses ca
## RestartPolicy
The possible values for RestartPolicy are `Always`, `OnFailure`, or `Never`. If RestartPolicy is not set, the default value is `Always`. RestartPolicy applies to all containers in the pod. RestartPolicy only refers to restarts of the containers by the Kubelet on the same node. As discussed in the [pods document](pods.md#durability-of-pods-or-lack-thereof), once bound to a node, a pod will never be rebound to another node. This means that some kind of controller is necessary in order for a pod to survive node failure, even if just a single pod at a time is desired.
The possible values for RestartPolicy are `Always`, `OnFailure`, or `Never`. If RestartPolicy is not set, the default value is `Always`. RestartPolicy applies to all containers in the pod. RestartPolicy only refers to restarts of the containers by the Kubelet on the same node. Failed containers that are restarted by Kubelet, are restarted with an exponential back-off delay, the delay is in multiples of sync-frequency 0, 1x, 2x, 4x, 8x ... capped at 5 minutes and is reset after 10 minutes of successful execution. As discussed in the [pods document](pods.md#durability-of-pods-or-lack-thereof), once bound to a node, a pod will never be rebound to another node. This means that some kind of controller is necessary in order for a pod to survive node failure, even if just a single pod at a time is desired.
The only controller we have today is [`ReplicationController`](replication-controller.md). `ReplicationController` is *only* appropriate for pods with `RestartPolicy = Always`. `ReplicationController` should refuse to instantiate any pod that has a different restart policy.

View File

@ -24,6 +24,7 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/volume"
)
@ -151,7 +152,7 @@ func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) {
return f.PodList, f.Err
}
func (f *FakeRuntime) SyncPod(pod *api.Pod, _ Pod, _ api.PodStatus, _ []api.Secret) error {
func (f *FakeRuntime) SyncPod(pod *api.Pod, _ Pod, _ api.PodStatus, _ []api.Secret, backOff *util.Backoff) error {
f.Lock()
defer f.Unlock()

View File

@ -17,6 +17,7 @@ limitations under the License.
package container
import (
"errors"
"fmt"
"io"
"reflect"
@ -24,9 +25,13 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/volume"
)
// Container Terminated and Kubelet is backing off the restart
var ErrCrashLoopBackOff = errors.New("CrashLoopBackOff")
type Version interface {
// Compare compares two versions of the runtime. On success it returns -1
// if the version is less than the other, 1 if it is greater than the other,
@ -53,7 +58,7 @@ type Runtime interface {
// exited and dead containers (used for garbage collection).
GetPods(all bool) ([]*Pod, error)
// Syncs the running pod into the desired pod.
SyncPod(pod *api.Pod, runningPod Pod, podStatus api.PodStatus, pullSecrets []api.Secret) error
SyncPod(pod *api.Pod, runningPod Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error
// KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.
KillPod(pod *api.Pod, runningPod Pod) error
// GetPodStatus retrieves the status of the pod, including the information of

View File

@ -233,14 +233,15 @@ func (c DockerContainers) FindPodContainer(podFullName string, uid types.UID, co
const containerNamePrefix = "k8s"
// Creates a name which can be reversed to identify both full pod name and container name.
func BuildDockerName(dockerName KubeletContainerName, container *api.Container) string {
func BuildDockerName(dockerName KubeletContainerName, container *api.Container) (string, string) {
containerName := dockerName.ContainerName + "." + strconv.FormatUint(kubecontainer.HashContainer(container), 16)
return fmt.Sprintf("%s_%s_%s_%s_%08x",
stableName := fmt.Sprintf("%s_%s_%s_%s",
containerNamePrefix,
containerName,
dockerName.PodFullName,
dockerName.PodUID,
rand.Uint32())
dockerName.PodUID)
return stableName, fmt.Sprintf("%s_%08x", stableName, rand.Uint32())
}
// Unpacks a container name, returning the pod full name and container name we would have used to

View File

@ -100,7 +100,7 @@ func verifyPackUnpack(t *testing.T, podNamespace, podUID, podName, containerName
util.DeepHashObject(hasher, *container)
computedHash := uint64(hasher.Sum32())
podFullName := fmt.Sprintf("%s_%s", podName, podNamespace)
name := BuildDockerName(KubeletContainerName{podFullName, types.UID(podUID), container.Name}, container)
_, name := BuildDockerName(KubeletContainerName{podFullName, types.UID(podUID), container.Name}, container)
returned, hash, err := ParseDockerName(name)
if err != nil {
t.Errorf("Failed to parse Docker container name %q: %v", name, err)

View File

@ -468,10 +468,15 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
}
}
// Handle the containers for which we cannot find any associated active or
// dead docker containers.
// Handle the containers for which we cannot find any associated active or dead docker containers or are in restart backoff
for _, container := range manifest.Containers {
if _, found := statuses[container.Name]; found {
if containerStatus, found := statuses[container.Name]; found {
reason, ok := dm.reasonCache.Get(uid, container.Name)
if ok && reason == kubecontainer.ErrCrashLoopBackOff.Error() {
containerStatus.LastTerminationState = containerStatus.State
containerStatus.State.Waiting = &api.ContainerStateWaiting{Reason: reason}
containerStatus.State.Running = nil
}
continue
}
var containerStatus api.ContainerStatus
@ -634,8 +639,9 @@ func (dm *DockerManager) runContainer(
// of CPU shares.
cpuShares = milliCPUToShares(cpuRequest.MilliValue())
}
_, containerName := BuildDockerName(dockerName, container)
dockerOpts := docker.CreateContainerOptions{
Name: BuildDockerName(dockerName, container),
Name: containerName,
Config: &docker.Config{
Env: makeEnvList(opts.Envs),
ExposedPorts: exposedPorts,
@ -1641,7 +1647,7 @@ func (dm *DockerManager) clearReasonCache(pod *api.Pod, container *api.Container
}
// Sync the running pod to match the specified desired pod.
func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret) error {
func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error {
start := time.Now()
defer func() {
metrics.ContainerManagerLatency.WithLabelValues("SyncPod").Observe(metrics.SinceInMicroseconds(start))
@ -1707,6 +1713,13 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
// Start everything
for idx := range containerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
// containerChanges.StartInfraContainer causes the containers to be restarted for config reasons
// ignore backoff
if !containerChanges.StartInfraContainer && dm.doBackOff(pod, container, podStatus, backOff) {
glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, podFullName)
continue
}
glog.V(4).Infof("Creating container %+v in pod %v", container, podFullName)
err := dm.imagePuller.PullImage(pod, container, pullSecrets)
dm.updateReasonCache(pod, container, err)
@ -1799,3 +1812,43 @@ func getUidFromUser(id string) string {
// no gid, just return the id
return id
}
func (dm *DockerManager) doBackOff(pod *api.Pod, container *api.Container, podStatus api.PodStatus, backOff *util.Backoff) bool {
var ts util.Time
for _, containerStatus := range podStatus.ContainerStatuses {
if containerStatus.Name != container.Name {
continue
}
// first failure
if containerStatus.State.Terminated != nil {
ts = containerStatus.State.Terminated.FinishedAt
break
}
// state is waiting and the failure timestamp is in LastTerminationState
if (containerStatus.State.Waiting != nil) && (containerStatus.LastTerminationState.Terminated != nil) {
ts = containerStatus.LastTerminationState.Terminated.FinishedAt
break
}
}
// found a container that requires backoff
if !ts.IsZero() {
dockerName := KubeletContainerName{
PodFullName: kubecontainer.GetPodFullName(pod),
PodUID: pod.UID,
ContainerName: container.Name,
}
stableName, _ := BuildDockerName(dockerName, container)
if backOff.IsInBackOffSince(stableName, ts.Time) {
if ref, err := kubecontainer.GenerateContainerRef(pod, container); err == nil {
dm.recorder.Eventf(ref, "Backoff", "Back-off restarting failed docker container")
}
dm.updateReasonCache(pod, container, kubecontainer.ErrCrashLoopBackOff)
glog.Infof("Back-off %s restarting failed container=%s pod=%s", backOff.Get(stableName), container.Name, kubecontainer.GetPodFullName(pod))
return true
}
backOff.Next(stableName, ts.Time)
}
dm.clearReasonCache(pod, container)
return false
}

View File

@ -844,7 +844,7 @@ func generatePodInfraContainerHash(pod *api.Pod) uint64 {
// runSyncPod is a helper function to retrieve the running pods from the fake
// docker client and runs SyncPod for the given pod.
func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, pod *api.Pod) {
func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, pod *api.Pod, backOff *util.Backoff) {
runningPods, err := dm.GetPods(false)
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -855,7 +855,10 @@ func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, p
t.Errorf("unexpected error: %v", err)
}
fakeDocker.ClearCalls()
err = dm.SyncPod(pod, runningPod, *podStatus, []api.Secret{})
if backOff == nil {
backOff = util.NewBackOff(time.Second, time.Minute)
}
err = dm.SyncPod(pod, runningPod, *podStatus, []api.Secret{}, backOff)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -878,7 +881,7 @@ func TestSyncPodCreateNetAndContainer(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod)
runSyncPod(t, dm, fakeDocker, pod, nil)
verifyCalls(t, fakeDocker, []string{
// Create pod infra container.
"create", "start", "inspect_container",
@ -926,7 +929,7 @@ func TestSyncPodCreatesNetAndContainerPullsImage(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod)
runSyncPod(t, dm, fakeDocker, pod, nil)
verifyCalls(t, fakeDocker, []string{
// Create pod infra container.
@ -978,7 +981,7 @@ func TestSyncPodWithPodInfraCreatesContainer(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod)
runSyncPod(t, dm, fakeDocker, pod, nil)
verifyCalls(t, fakeDocker, []string{
// Inspect pod infra container (but does not create)"
@ -1017,7 +1020,7 @@ func TestSyncPodDeletesWithNoPodInfraContainer(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod)
runSyncPod(t, dm, fakeDocker, pod, nil)
verifyCalls(t, fakeDocker, []string{
// Kill the container since pod infra container is not running.
@ -1090,7 +1093,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod)
runSyncPod(t, dm, fakeDocker, pod, nil)
verifyCalls(t, fakeDocker, []string{
// Check the pod infra container.
@ -1144,7 +1147,7 @@ func TestSyncPodBadHash(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod)
runSyncPod(t, dm, fakeDocker, pod, nil)
verifyCalls(t, fakeDocker, []string{
// Check the pod infra container.
@ -1202,7 +1205,7 @@ func TestSyncPodsUnhealthy(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod)
runSyncPod(t, dm, fakeDocker, pod, nil)
verifyCalls(t, fakeDocker, []string{
// Check the pod infra container.
@ -1259,7 +1262,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod)
runSyncPod(t, dm, fakeDocker, pod, nil)
verifyCalls(t, fakeDocker, []string{
// Check the pod infra contianer.
@ -1292,7 +1295,7 @@ func TestSyncPodWithPullPolicy(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod)
runSyncPod(t, dm, fakeDocker, pod, nil)
fakeDocker.Lock()
@ -1456,7 +1459,7 @@ func TestSyncPodWithRestartPolicy(t *testing.T) {
fakeDocker.ContainerMap = containerMap
pod.Spec.RestartPolicy = tt.policy
runSyncPod(t, dm, fakeDocker, pod)
runSyncPod(t, dm, fakeDocker, pod, nil)
// 'stop' is because the pod infra container is killed when no container is running.
verifyCalls(t, fakeDocker, tt.calls)
@ -1575,7 +1578,7 @@ func TestGetPodStatusWithLastTermination(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod)
runSyncPod(t, dm, fakeDocker, pod, nil)
// Check if we can retrieve the pod status.
status, err := dm.GetPodStatus(pod)
@ -1603,6 +1606,123 @@ func TestGetPodStatusWithLastTermination(t *testing.T) {
}
}
func TestSyncPodBackoff(t *testing.T) {
var fakeClock = &util.FakeClock{Time: time.Now()}
startTime := fakeClock.Now()
dm, fakeDocker := newTestDockerManager()
containers := []api.Container{
{Name: "good"},
{Name: "bad"},
}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: "podfoo",
Namespace: "nsnew",
},
Spec: api.PodSpec{
Containers: containers,
},
}
containerList := []docker.APIContainers{
// format is // k8s_<container-id>_<pod-fullname>_<pod-uid>_<random>
{
// pod infra container
Names: []string{"/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(pod), 16) + "_podfoo_nsnew_12345678_0"},
ID: "9876",
},
{
Names: []string{"/k8s_good." + strconv.FormatUint(kubecontainer.HashContainer(&containers[0]), 16) + "_podfoo_nsnew_12345678_0"},
ID: "1234",
},
}
exitedAPIContainers := []docker.APIContainers{
{
// format is // k8s_<container-id>_<pod-fullname>_<pod-uid>
Names: []string{"/k8s_bad." + strconv.FormatUint(kubecontainer.HashContainer(&containers[1]), 16) + "_podfoo_nsnew_12345678_0"},
ID: "5678",
},
}
stableId := "k8s_bad." + strconv.FormatUint(kubecontainer.HashContainer(&containers[1]), 16) + "_podfoo_nsnew_12345678"
containerMap := map[string]*docker.Container{
"9876": {
ID: "9876",
Name: "POD",
Config: &docker.Config{},
HostConfig: &docker.HostConfig{},
State: docker.State{
StartedAt: startTime,
Running: true,
},
},
"1234": {
ID: "1234",
Name: "good",
Config: &docker.Config{},
HostConfig: &docker.HostConfig{},
State: docker.State{
StartedAt: startTime,
Running: true,
},
},
"5678": {
ID: "5678",
Name: "bad",
Config: &docker.Config{},
HostConfig: &docker.HostConfig{},
State: docker.State{
ExitCode: 42,
StartedAt: startTime,
FinishedAt: fakeClock.Now(),
},
},
}
startCalls := []string{"inspect_container", "create", "start", "inspect_container"}
backOffCalls := []string{"inspect_container"}
tests := []struct {
tick int
backoff int
killDelay int
result []string
}{
{1, 1, 1, startCalls},
{2, 2, 2, startCalls},
{3, 2, 3, backOffCalls},
{4, 4, 4, startCalls},
{5, 4, 5, backOffCalls},
{6, 4, 6, backOffCalls},
{7, 4, 7, backOffCalls},
{8, 8, 129, startCalls},
{130, 1, 0, startCalls},
}
backOff := util.NewBackOff(time.Second, time.Minute)
backOff.Clock = fakeClock
for _, c := range tests {
fakeDocker.ContainerMap = containerMap
fakeDocker.ExitedContainerList = exitedAPIContainers
fakeDocker.ContainerList = containerList
fakeClock.Time = startTime.Add(time.Duration(c.tick) * time.Second)
runSyncPod(t, dm, fakeDocker, pod, backOff)
verifyCalls(t, fakeDocker, c.result)
if backOff.Get(stableId) != time.Duration(c.backoff)*time.Second {
t.Errorf("At tick %s expected backoff=%s got=%s", time.Duration(c.tick)*time.Second, time.Duration(c.backoff)*time.Second, backOff.Get(stableId))
}
if len(fakeDocker.Created) > 0 {
// pretend kill the container
fakeDocker.Created = nil
containerMap["5678"].State.FinishedAt = startTime.Add(time.Duration(c.killDelay) * time.Second)
}
}
}
func TestGetPodCreationFailureReason(t *testing.T) {
dm, fakeDocker := newTestDockerManager()
@ -1639,7 +1759,7 @@ func TestGetPodCreationFailureReason(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod)
runSyncPod(t, dm, fakeDocker, pod, nil)
// Check if we can retrieve the pod status.
status, err := dm.GetPodStatus(pod)
if err != nil {
@ -1695,7 +1815,7 @@ func TestGetPodPullImageFailureReason(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod)
runSyncPod(t, dm, fakeDocker, pod, nil)
// Check if we can retrieve the pod status.
status, err := dm.GetPodStatus(pod)
if err != nil {
@ -1839,7 +1959,7 @@ func TestSyncPodWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod)
runSyncPod(t, dm, fakeDocker, pod, nil)
verifyCalls(t, fakeDocker, []string{
// Check the pod infra container.
@ -1902,7 +2022,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod)
runSyncPod(t, dm, fakeDocker, pod, nil)
verifyCalls(t, fakeDocker, []string{
// Check the pod infra container.
@ -1969,7 +2089,7 @@ func TestSyncPodWithTerminationLog(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod)
runSyncPod(t, dm, fakeDocker, pod, nil)
verifyCalls(t, fakeDocker, []string{
// Create pod infra container.
"create", "start", "inspect_container",
@ -2007,7 +2127,7 @@ func TestSyncPodWithHostNetwork(t *testing.T) {
},
}
runSyncPod(t, dm, fakeDocker, pod)
runSyncPod(t, dm, fakeDocker, pod, nil)
verifyCalls(t, fakeDocker, []string{
// Create pod infra container.

View File

@ -79,6 +79,9 @@ const (
// Location of container logs.
containerLogsDir = "/var/log/containers"
// max backoff period
maxContainerBackOff = 300 * time.Second
)
var (
@ -373,6 +376,7 @@ func NewMainKubelet(
}
}
klet.backOff = util.NewBackOff(resyncInterval, maxContainerBackOff)
return klet, nil
}
@ -525,6 +529,9 @@ type Kubelet struct {
// Monitor Kubelet's sync loop
syncLoopMonitor util.AtomicValue
// Container restart Backoff
backOff *util.Backoff
}
// getRootDir returns the full path to the directory under which kubelet can
@ -1258,7 +1265,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
return err
}
err = kl.containerRuntime.SyncPod(pod, runningPod, podStatus, pullSecrets)
err = kl.containerRuntime.SyncPod(pod, runningPod, podStatus, pullSecrets, kl.backOff)
if err != nil {
return err
}
@ -1563,6 +1570,7 @@ func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) erro
glog.Errorf("Failed to cleanup terminated pods: %v", err)
}
kl.backOff.GC()
return err
}

View File

@ -129,6 +129,9 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.volumeManager = newVolumeManager()
kubelet.containerManager, _ = newContainerManager(mockCadvisor, "", "", "")
kubelet.networkConfigured = true
fakeClock := &util.FakeClock{Time: time.Now()}
kubelet.backOff = util.NewBackOff(time.Second, time.Minute)
kubelet.backOff.Clock = fakeClock
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient}
}

View File

@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/errors"
)
@ -910,7 +911,7 @@ func (r *runtime) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) {
}
// SyncPod syncs the running pod to match the specified desired pod.
func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret) error {
func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error {
podFullName := kubecontainer.GetPodFullName(pod)
// Add references to all containers.

111
pkg/util/backoff.go Normal file
View File

@ -0,0 +1,111 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"math"
"sync"
"time"
)
type backoffEntry struct {
backoff time.Duration
lastUpdate time.Time
}
type Backoff struct {
sync.Mutex
Clock Clock
defaultDuration time.Duration
maxDuration time.Duration
perItemBackoff map[string]*backoffEntry
}
func NewBackOff(initial, max time.Duration) *Backoff {
return &Backoff{
perItemBackoff: map[string]*backoffEntry{},
Clock: RealClock{},
defaultDuration: initial,
maxDuration: max,
}
}
// Get the current backoff Duration
func (p *Backoff) Get(id string) time.Duration {
p.Lock()
defer p.Unlock()
var delay time.Duration
entry, ok := p.perItemBackoff[id]
if ok {
delay = entry.backoff
}
return delay
}
// move backoff to the next mark, capping at maxDuration
func (p *Backoff) Next(id string, eventTime time.Time) {
p.Lock()
defer p.Unlock()
entry, ok := p.perItemBackoff[id]
if !ok || hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
entry = p.initEntryUnsafe(id)
} else {
delay := entry.backoff * 2 // exponential
entry.backoff = time.Duration(math.Min(float64(delay), float64(p.maxDuration)))
}
entry.lastUpdate = p.Clock.Now()
}
// Returns True if the elapsed time since eventTime is smaller than the current backoff window
func (p *Backoff) IsInBackOffSince(id string, eventTime time.Time) bool {
p.Lock()
defer p.Unlock()
entry, ok := p.perItemBackoff[id]
if !ok {
return false
}
if hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
return false
}
return p.Clock.Now().Sub(eventTime) < entry.backoff
}
// Garbage collect records that have aged past maxDuration. Backoff users are expected
// to invoke this periodically.
func (p *Backoff) GC() {
p.Lock()
defer p.Unlock()
now := p.Clock.Now()
for id, entry := range p.perItemBackoff {
if now.Sub(entry.lastUpdate) > p.maxDuration*2 {
// GC when entry has not been updated for 2*maxDuration
delete(p.perItemBackoff, id)
}
}
}
// Take a lock on *Backoff, before calling initEntryUnsafe
func (p *Backoff) initEntryUnsafe(id string) *backoffEntry {
entry := &backoffEntry{backoff: p.defaultDuration}
p.perItemBackoff[id] = entry
return entry
}
// After 2*maxDuration we restart the backoff factor to the begining
func hasExpired(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool {
return eventTime.Sub(lastUpdate) > maxDuration*2 // consider stable if it's ok for twice the maxDuration
}

125
pkg/util/backoff_test.go Normal file
View File

@ -0,0 +1,125 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
"time"
)
func NewFakeBackOff(initial, max time.Duration, tc *FakeClock) *Backoff {
return &Backoff{
perItemBackoff: map[string]*backoffEntry{},
Clock: tc,
defaultDuration: initial,
maxDuration: max,
}
}
func TestSlowBackoff(t *testing.T) {
id := "_idSlow"
tc := &FakeClock{Time: time.Now()}
step := time.Second
maxDuration := 50 * step
b := NewFakeBackOff(step, maxDuration, tc)
cases := []time.Duration{0, 1, 2, 4, 8, 16, 32, 50, 50, 50}
for ix, c := range cases {
tc.Step(step)
w := b.Get(id)
if w != c*step {
t.Errorf("input: '%d': expected %s, got %s", ix, c*step, w)
}
b.Next(id, tc.Now())
}
}
func TestBackoffReset(t *testing.T) {
id := "_idReset"
tc := &FakeClock{Time: time.Now()}
step := time.Second
maxDuration := step * 5
b := NewFakeBackOff(step, maxDuration, tc)
startTime := tc.Now()
// get to backoff = maxDuration
for i := 0; i <= int(maxDuration/step); i++ {
tc.Step(step)
b.Next(id, tc.Now())
}
// backoff should be capped at maxDuration
if !b.IsInBackOffSince(id, tc.Now()) {
t.Errorf("expected to be in Backoff got %s", b.Get(id))
}
lastUpdate := tc.Now()
tc.Step(2*maxDuration + step) // time += 11s, 11 > 2*maxDuration
if b.IsInBackOffSince(id, lastUpdate) {
t.Errorf("now=%s lastUpdate=%s (%s) expected Backoff reset got %s b.lastUpdate=%s", tc.Now(), startTime, tc.Now().Sub(lastUpdate), b.Get(id))
}
}
func TestBackoffHightWaterMark(t *testing.T) {
id := "_idHiWaterMark"
tc := &FakeClock{Time: time.Now()}
step := time.Second
maxDuration := 5 * step
b := NewFakeBackOff(step, maxDuration, tc)
// get to backoff = maxDuration
for i := 0; i <= int(maxDuration/step); i++ {
tc.Step(step)
b.Next(id, tc.Now())
}
// backoff high watermark expires after 2*maxDuration
tc.Step(maxDuration + step)
b.Next(id, tc.Now())
if b.Get(id) != maxDuration {
t.Errorf("expected Backoff to stay at high watermark %s got %s", maxDuration, b.Get(id))
}
}
func TestBackoffGC(t *testing.T) {
id := "_idGC"
tc := &FakeClock{Time: time.Now()}
step := time.Second
maxDuration := 5 * step
b := NewFakeBackOff(step, maxDuration, tc)
for i := 0; i <= int(maxDuration/step); i++ {
tc.Step(step)
b.Next(id, tc.Now())
}
lastUpdate := tc.Now()
tc.Step(maxDuration + step)
b.GC()
_, found := b.perItemBackoff[id]
if !found {
t.Errorf("expected GC to skip entry, elapsed time=%s maxDuration=%s", tc.Now().Sub(lastUpdate), maxDuration)
}
tc.Step(maxDuration + step)
b.GC()
r, found := b.perItemBackoff[id]
if found {
t.Errorf("expected GC of entry after %s got entry %v", tc.Now().Sub(lastUpdate), r)
}
}

View File

@ -54,3 +54,8 @@ func (f *FakeClock) Now() time.Time {
func (f *FakeClock) Since(ts time.Time) time.Duration {
return f.Time.Sub(ts)
}
// Move clock by Duration
func (f *FakeClock) Step(d time.Duration) {
f.Time = f.Time.Add(d)
}

38
pkg/util/clock_test.go Normal file
View File

@ -0,0 +1,38 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
"time"
)
func TestFakeClock(t *testing.T) {
startTime := time.Now()
tc := &FakeClock{Time: startTime}
tc.Step(time.Second)
now := tc.Now()
if now.Sub(startTime) != time.Second {
t.Errorf("input: %s now=%s gap=%s expected=%s", startTime, now, now.Sub(startTime), time.Second)
}
tt := tc.Now()
tc.Time = tt.Add(time.Hour)
if tc.Now().Sub(tt) != time.Hour {
t.Errorf("input: %s now=%s gap=%s expected=%s", tt, tc.Now(), tc.Now().Sub(tt), time.Hour)
}
}