Add flow control pkg

Refactor pkg names in flow control related files
pull/6/head
harry 2016-03-09 10:58:24 +08:00 committed by Harry Zhang
parent 7d7ca5ab72
commit 5fe773d37c
19 changed files with 57 additions and 49 deletions

View File

@ -27,7 +27,6 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/flowcontrol"
) )
@ -105,7 +104,7 @@ func readExpBackoffConfig() BackoffManager {
return &NoBackoff{} return &NoBackoff{}
} }
return &URLBackoff{ return &URLBackoff{
Backoff: util.NewBackOff( Backoff: flowcontrol.NewBackOff(
time.Duration(backoffBaseInt)*time.Second, time.Duration(backoffBaseInt)*time.Second,
time.Duration(backoffDurationInt)*time.Second)} time.Duration(backoffDurationInt)*time.Second)}
} }

View File

@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/intstr"
utiltesting "k8s.io/kubernetes/pkg/util/testing" utiltesting "k8s.io/kubernetes/pkg/util/testing"
@ -770,7 +771,7 @@ func TestBackoffLifecycle(t *testing.T) {
clock := util.FakeClock{} clock := util.FakeClock{}
request.backoffMgr = &URLBackoff{ request.backoffMgr = &URLBackoff{
// Use a fake backoff here to avoid flakes and speed the test up. // Use a fake backoff here to avoid flakes and speed the test up.
Backoff: util.NewFakeBackOff( Backoff: flowcontrol.NewFakeBackOff(
time.Duration(1)*time.Second, time.Duration(1)*time.Second,
time.Duration(200)*time.Second, time.Duration(200)*time.Second,
&clock, &clock,

View File

@ -21,7 +21,7 @@ import (
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
@ -42,7 +42,7 @@ type BackoffManager interface {
// we need for URL specific exponential backoff. // we need for URL specific exponential backoff.
type URLBackoff struct { type URLBackoff struct {
// Uses backoff as underlying implementation. // Uses backoff as underlying implementation.
Backoff *util.Backoff Backoff *flowcontrol.Backoff
} }
// NoBackoff is a stub implementation, can be used for mocking or else as a default. // NoBackoff is a stub implementation, can be used for mocking or else as a default.
@ -63,7 +63,7 @@ func (n *NoBackoff) Sleep(d time.Duration) {
// by tests which want to run 1000s of mock requests without slowing down. // by tests which want to run 1000s of mock requests without slowing down.
func (b *URLBackoff) Disable() { func (b *URLBackoff) Disable() {
glog.V(4).Infof("Disabling backoff strategy") glog.V(4).Infof("Disabling backoff strategy")
b.Backoff = util.NewBackOff(0*time.Second, 0*time.Second) b.Backoff = flowcontrol.NewBackOff(0*time.Second, 0*time.Second)
} }
// baseUrlKey returns the key which urls will be mapped to. // baseUrlKey returns the key which urls will be mapped to.

View File

@ -21,7 +21,7 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flowcontrol"
) )
func parse(raw string) *url.URL { func parse(raw string) *url.URL {
@ -31,7 +31,7 @@ func parse(raw string) *url.URL {
func TestURLBackoffFunctionalityCollisions(t *testing.T) { func TestURLBackoffFunctionalityCollisions(t *testing.T) {
myBackoff := &URLBackoff{ myBackoff := &URLBackoff{
Backoff: util.NewBackOff(1*time.Second, 60*time.Second), Backoff: flowcontrol.NewBackOff(1*time.Second, 60*time.Second),
} }
// Add some noise and make sure backoff for a clean URL is zero. // Add some noise and make sure backoff for a clean URL is zero.
@ -47,7 +47,7 @@ func TestURLBackoffFunctionalityCollisions(t *testing.T) {
// TestURLBackoffFunctionality generally tests the URLBackoff wrapper. We avoid duplicating tests from backoff and request. // TestURLBackoffFunctionality generally tests the URLBackoff wrapper. We avoid duplicating tests from backoff and request.
func TestURLBackoffFunctionality(t *testing.T) { func TestURLBackoffFunctionality(t *testing.T) {
myBackoff := &URLBackoff{ myBackoff := &URLBackoff{
Backoff: util.NewBackOff(1*time.Second, 60*time.Second), Backoff: flowcontrol.NewBackOff(1*time.Second, 60*time.Second),
} }
// Now test that backoff increases, then recovers. // Now test that backoff increases, then recovers.

View File

@ -22,7 +22,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flowcontrol"
) )
// imagePuller pulls the image using Runtime.PullImage(). // imagePuller pulls the image using Runtime.PullImage().
@ -31,7 +31,7 @@ import (
type imagePuller struct { type imagePuller struct {
recorder record.EventRecorder recorder record.EventRecorder
runtime Runtime runtime Runtime
backOff *util.Backoff backOff *flowcontrol.Backoff
} }
// enforce compatibility. // enforce compatibility.
@ -39,7 +39,7 @@ var _ ImagePuller = &imagePuller{}
// NewImagePuller takes an event recorder and container runtime to create a // NewImagePuller takes an event recorder and container runtime to create a
// image puller that wraps the container runtime's PullImage interface. // image puller that wraps the container runtime's PullImage interface.
func NewImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller { func NewImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *flowcontrol.Backoff) ImagePuller {
return &imagePuller{ return &imagePuller{
recorder: recorder, recorder: recorder,
runtime: runtime, runtime: runtime,

View File

@ -27,6 +27,7 @@ import (
. "k8s.io/kubernetes/pkg/kubelet/container" . "k8s.io/kubernetes/pkg/kubelet/container"
ctest "k8s.io/kubernetes/pkg/kubelet/container/testing" ctest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
) )
func TestPuller(t *testing.T) { func TestPuller(t *testing.T) {
@ -99,7 +100,7 @@ func TestPuller(t *testing.T) {
ImagePullPolicy: c.policy, ImagePullPolicy: c.policy,
} }
backOff := util.NewBackOff(time.Second, time.Minute) backOff := flowcontrol.NewBackOff(time.Second, time.Minute)
fakeClock := util.NewFakeClock(time.Now()) fakeClock := util.NewFakeClock(time.Now())
backOff.Clock = fakeClock backOff.Clock = fakeClock

View File

@ -26,7 +26,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
) )
@ -68,7 +68,7 @@ type Runtime interface {
// GarbageCollect removes dead containers using the specified container gc policy // GarbageCollect removes dead containers using the specified container gc policy
GarbageCollect(gcPolicy ContainerGCPolicy) error GarbageCollect(gcPolicy ContainerGCPolicy) error
// Syncs the running pod into the desired pod. // Syncs the running pod into the desired pod.
SyncPod(pod *api.Pod, apiPodStatus api.PodStatus, podStatus *PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) PodSyncResult SyncPod(pod *api.Pod, apiPodStatus api.PodStatus, podStatus *PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) PodSyncResult
// KillPod kills all the containers of a pod. Pod may be nil, running pod must not be. // KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.
// TODO(random-liu): Return PodSyncResult in KillPod. // TODO(random-liu): Return PodSyncResult in KillPod.
KillPod(pod *api.Pod, runningPod Pod) error KillPod(pod *api.Pod, runningPod Pod) error

View File

@ -23,7 +23,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
) )
@ -42,7 +42,7 @@ type imagePullRequest struct {
type serializedImagePuller struct { type serializedImagePuller struct {
recorder record.EventRecorder recorder record.EventRecorder
runtime Runtime runtime Runtime
backOff *util.Backoff backOff *flowcontrol.Backoff
pullRequests chan *imagePullRequest pullRequests chan *imagePullRequest
} }
@ -53,7 +53,7 @@ var _ ImagePuller = &serializedImagePuller{}
// image puller that wraps the container runtime's PullImage interface. // image puller that wraps the container runtime's PullImage interface.
// Pulls one image at a time. // Pulls one image at a time.
// Issue #10959 has the rationale behind serializing image pulls. // Issue #10959 has the rationale behind serializing image pulls.
func NewSerializedImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller { func NewSerializedImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *flowcontrol.Backoff) ImagePuller {
imagePuller := &serializedImagePuller{ imagePuller := &serializedImagePuller{
recorder: recorder, recorder: recorder,
runtime: runtime, runtime: runtime,

View File

@ -27,6 +27,7 @@ import (
. "k8s.io/kubernetes/pkg/kubelet/container" . "k8s.io/kubernetes/pkg/kubelet/container"
ctest "k8s.io/kubernetes/pkg/kubelet/container/testing" ctest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
) )
func TestSerializedPuller(t *testing.T) { func TestSerializedPuller(t *testing.T) {
@ -99,7 +100,7 @@ func TestSerializedPuller(t *testing.T) {
ImagePullPolicy: c.policy, ImagePullPolicy: c.policy,
} }
backOff := util.NewBackOff(time.Second, time.Minute) backOff := flowcontrol.NewBackOff(time.Second, time.Minute)
fakeClock := util.NewFakeClock(time.Now()) fakeClock := util.NewFakeClock(time.Now())
backOff.Clock = fakeClock backOff.Clock = fakeClock

View File

@ -26,7 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
. "k8s.io/kubernetes/pkg/kubelet/container" . "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
) )
@ -189,7 +189,7 @@ func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) {
return f.PodList, f.Err return f.PodList, f.Err
} }
func (f *FakeRuntime) SyncPod(pod *api.Pod, _ api.PodStatus, _ *PodStatus, _ []api.Secret, backOff *util.Backoff) (result PodSyncResult) { func (f *FakeRuntime) SyncPod(pod *api.Pod, _ api.PodStatus, _ *PodStatus, _ []api.Secret, backOff *flowcontrol.Backoff) (result PodSyncResult) {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()

View File

@ -23,7 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
. "k8s.io/kubernetes/pkg/kubelet/container" . "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
) )
@ -63,7 +63,7 @@ func (r *Mock) GetPods(all bool) ([]*Pod, error) {
return args.Get(0).([]*Pod), args.Error(1) return args.Get(0).([]*Pod), args.Error(1)
} }
func (r *Mock) SyncPod(pod *api.Pod, apiStatus api.PodStatus, status *PodStatus, secrets []api.Secret, backOff *util.Backoff) PodSyncResult { func (r *Mock) SyncPod(pod *api.Pod, apiStatus api.PodStatus, status *PodStatus, secrets []api.Secret, backOff *flowcontrol.Backoff) PodSyncResult {
args := r.Called(pod, apiStatus, status, secrets, backOff) args := r.Called(pod, apiStatus, status, secrets, backOff)
return args.Get(0).(PodSyncResult) return args.Get(0).(PodSyncResult)
} }

View File

@ -25,7 +25,7 @@ import (
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/procfs" "k8s.io/kubernetes/pkg/util/procfs"
) )
@ -43,7 +43,7 @@ func NewFakeDockerManager(
osInterface kubecontainer.OSInterface, osInterface kubecontainer.OSInterface,
networkPlugin network.NetworkPlugin, networkPlugin network.NetworkPlugin,
runtimeHelper kubecontainer.RuntimeHelper, runtimeHelper kubecontainer.RuntimeHelper,
httpClient kubetypes.HttpGetter, imageBackOff *util.Backoff) *DockerManager { httpClient kubetypes.HttpGetter, imageBackOff *flowcontrol.Backoff) *DockerManager {
fakeOOMAdjuster := oom.NewFakeOOMAdjuster() fakeOOMAdjuster := oom.NewFakeOOMAdjuster()
fakeProcFs := procfs.NewFakeProcFS() fakeProcFs := procfs.NewFakeProcFS()

View File

@ -50,7 +50,7 @@ import (
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/procfs" "k8s.io/kubernetes/pkg/util/procfs"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
@ -194,7 +194,7 @@ func NewDockerManager(
oomAdjuster *oom.OOMAdjuster, oomAdjuster *oom.OOMAdjuster,
procFs procfs.ProcFSInterface, procFs procfs.ProcFSInterface,
cpuCFSQuota bool, cpuCFSQuota bool,
imageBackOff *util.Backoff, imageBackOff *flowcontrol.Backoff,
serializeImagePulls bool, serializeImagePulls bool,
enableCustomMetrics bool, enableCustomMetrics bool,
hairpinMode bool, hairpinMode bool,
@ -1768,7 +1768,7 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, podStatus *kub
} }
// Sync the running pod to match the specified desired pod. // Sync the running pod to match the specified desired pod.
func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) (result kubecontainer.PodSyncResult) { func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
start := time.Now() start := time.Now()
defer func() { defer func() {
metrics.ContainerManagerLatency.WithLabelValues("SyncPod").Observe(metrics.SinceInMicroseconds(start)) metrics.ContainerManagerLatency.WithLabelValues("SyncPod").Observe(metrics.SinceInMicroseconds(start))
@ -2018,7 +2018,7 @@ func getUidFromUser(id string) string {
// If all instances of a container are garbage collected, doBackOff will also return false, which means the container may be restarted before the // If all instances of a container are garbage collected, doBackOff will also return false, which means the container may be restarted before the
// backoff deadline. However, because that won't cause error and the chance is really slim, we can just ignore it for now. // backoff deadline. However, because that won't cause error and the chance is really slim, we can just ignore it for now.
// If a container is still in backoff, the function will return a brief backoff error and a detailed error message. // If a container is still in backoff, the function will return a brief backoff error and a detailed error message.
func (dm *DockerManager) doBackOff(pod *api.Pod, container *api.Container, podStatus *kubecontainer.PodStatus, backOff *util.Backoff) (bool, error, string) { func (dm *DockerManager) doBackOff(pod *api.Pod, container *api.Container, podStatus *kubecontainer.PodStatus, backOff *flowcontrol.Backoff) (bool, error, string) {
var cStatus *kubecontainer.ContainerStatus var cStatus *kubecontainer.ContainerStatus
// Use the finished time of the latest exited container as the start point to calculate whether to do back-off. // Use the finished time of the latest exited container as the start point to calculate whether to do back-off.
// TODO(random-liu): Better define backoff start point; add unit and e2e test after we finalize this. (See github issue #22240) // TODO(random-liu): Better define backoff start point; add unit and e2e test after we finalize this. (See github issue #22240)

View File

@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
uexec "k8s.io/kubernetes/pkg/util/exec" uexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
@ -106,7 +107,7 @@ func newTestDockerManagerWithHTTPClientWithVersion(fakeHTTPClient *fakeHTTP, ver
networkPlugin, networkPlugin,
&fakeRuntimeHelper{}, &fakeRuntimeHelper{},
fakeHTTPClient, fakeHTTPClient,
util.NewBackOff(time.Second, 300*time.Second)) flowcontrol.NewBackOff(time.Second, 300*time.Second))
return dockerManager, fakeDocker return dockerManager, fakeDocker
} }
@ -586,14 +587,14 @@ func generatePodInfraContainerHash(pod *api.Pod) uint64 {
// runSyncPod is a helper function to retrieve the running pods from the fake // runSyncPod is a helper function to retrieve the running pods from the fake
// docker client and runs SyncPod for the given pod. // docker client and runs SyncPod for the given pod.
func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, pod *api.Pod, backOff *util.Backoff, expectErr bool) kubecontainer.PodSyncResult { func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, pod *api.Pod, backOff *flowcontrol.Backoff, expectErr bool) kubecontainer.PodSyncResult {
podStatus, err := dm.GetPodStatus(pod.UID, pod.Name, pod.Namespace) podStatus, err := dm.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
fakeDocker.ClearCalls() fakeDocker.ClearCalls()
if backOff == nil { if backOff == nil {
backOff = util.NewBackOff(time.Second, time.Minute) backOff = flowcontrol.NewBackOff(time.Second, time.Minute)
} }
// api.PodStatus is not used in SyncPod now, pass in an empty one. // api.PodStatus is not used in SyncPod now, pass in an empty one.
result := dm.SyncPod(pod, api.PodStatus{}, podStatus, []api.Secret{}, backOff) result := dm.SyncPod(pod, api.PodStatus{}, podStatus, []api.Secret{}, backOff)
@ -1089,7 +1090,7 @@ func TestSyncPodBackoff(t *testing.T) {
{130, 1, 0, startCalls, false}, {130, 1, 0, startCalls, false},
} }
backOff := util.NewBackOff(time.Second, time.Minute) backOff := flowcontrol.NewBackOff(time.Second, time.Minute)
backOff.Clock = fakeClock backOff.Clock = fakeClock
for _, c := range tests { for _, c := range tests {
fakeDocker.SetFakeContainers(dockerContainers) fakeDocker.SetFakeContainers(dockerContainers)

View File

@ -72,6 +72,7 @@ import (
"k8s.io/kubernetes/pkg/util/atomic" "k8s.io/kubernetes/pkg/util/atomic"
"k8s.io/kubernetes/pkg/util/bandwidth" "k8s.io/kubernetes/pkg/util/bandwidth"
utilerrors "k8s.io/kubernetes/pkg/util/errors" utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/flowcontrol"
kubeio "k8s.io/kubernetes/pkg/util/io" kubeio "k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
utilnet "k8s.io/kubernetes/pkg/util/net" utilnet "k8s.io/kubernetes/pkg/util/net"
@ -363,7 +364,7 @@ func NewMainKubelet(
} }
procFs := procfs.NewProcFS() procFs := procfs.NewProcFS()
imageBackOff := util.NewBackOff(backOffPeriod, MaxContainerBackOff) imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.livenessManager = proberesults.NewManager() klet.livenessManager = proberesults.NewManager()
@ -475,7 +476,7 @@ func NewMainKubelet(
klet.workQueue = queue.NewBasicWorkQueue() klet.workQueue = queue.NewBasicWorkQueue()
klet.podWorkers = newPodWorkers(klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache) klet.podWorkers = newPodWorkers(klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
klet.backOff = util.NewBackOff(backOffPeriod, MaxContainerBackOff) klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity) klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
klet.sourcesSeen = sets.NewString() klet.sourcesSeen = sets.NewString()
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs() klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
@ -691,7 +692,7 @@ type Kubelet struct {
syncLoopMonitor atomic.Value syncLoopMonitor atomic.Value
// Container restart Backoff // Container restart Backoff
backOff *util.Backoff backOff *flowcontrol.Backoff
// Channel for sending pods to kill. // Channel for sending pods to kill.
podKillingCh chan *kubecontainer.PodPair podKillingCh chan *kubecontainer.PodPair

View File

@ -62,6 +62,7 @@ import (
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/bandwidth" "k8s.io/kubernetes/pkg/util/bandwidth"
"k8s.io/kubernetes/pkg/util/diff" "k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
@ -184,7 +185,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
} }
kubelet.imageManager, err = newImageManager(fakeRuntime, mockCadvisor, fakeRecorder, fakeNodeRef, fakeImageGCPolicy) kubelet.imageManager, err = newImageManager(fakeRuntime, mockCadvisor, fakeRecorder, fakeNodeRef, fakeImageGCPolicy)
fakeClock := util.NewFakeClock(time.Now()) fakeClock := util.NewFakeClock(time.Now())
kubelet.backOff = util.NewBackOff(time.Second, time.Minute) kubelet.backOff = flowcontrol.NewBackOff(time.Second, time.Minute)
kubelet.backOff.Clock = fakeClock kubelet.backOff.Clock = fakeClock
kubelet.podKillingCh = make(chan *kubecontainer.PodPair, 20) kubelet.podKillingCh = make(chan *kubecontainer.PodPair, 20)
kubelet.resyncInterval = 10 * time.Second kubelet.resyncInterval = 10 * time.Second

View File

@ -47,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilexec "k8s.io/kubernetes/pkg/util/exec" utilexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
utilstrings "k8s.io/kubernetes/pkg/util/strings" utilstrings "k8s.io/kubernetes/pkg/util/strings"
) )
@ -141,7 +142,7 @@ func New(
containerRefManager *kubecontainer.RefManager, containerRefManager *kubecontainer.RefManager,
livenessManager proberesults.Manager, livenessManager proberesults.Manager,
volumeGetter volumeGetter, volumeGetter volumeGetter,
imageBackOff *util.Backoff, imageBackOff *flowcontrol.Backoff,
serializeImagePulls bool, serializeImagePulls bool,
) (*Runtime, error) { ) (*Runtime, error) {
// Create dbus connection. // Create dbus connection.
@ -1124,7 +1125,7 @@ func (r *Runtime) Status() error {
} }
// SyncPod syncs the running pod to match the specified desired pod. // SyncPod syncs the running pod to match the specified desired pod.
func (r *Runtime) SyncPod(pod *api.Pod, podStatus api.PodStatus, internalPodStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) (result kubecontainer.PodSyncResult) { func (r *Runtime) SyncPod(pod *api.Pod, podStatus api.PodStatus, internalPodStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
var err error var err error
defer func() { defer func() {
if err != nil { if err != nil {

View File

@ -14,12 +14,13 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package util package flowcontrol
import ( import (
"sync" "sync"
"time" "time"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/integer" "k8s.io/kubernetes/pkg/util/integer"
) )
@ -30,13 +31,13 @@ type backoffEntry struct {
type Backoff struct { type Backoff struct {
sync.Mutex sync.Mutex
Clock Clock Clock util.Clock
defaultDuration time.Duration defaultDuration time.Duration
maxDuration time.Duration maxDuration time.Duration
perItemBackoff map[string]*backoffEntry perItemBackoff map[string]*backoffEntry
} }
func NewFakeBackOff(initial, max time.Duration, tc *FakeClock) *Backoff { func NewFakeBackOff(initial, max time.Duration, tc *util.FakeClock) *Backoff {
return &Backoff{ return &Backoff{
perItemBackoff: map[string]*backoffEntry{}, perItemBackoff: map[string]*backoffEntry{},
Clock: tc, Clock: tc,
@ -48,7 +49,7 @@ func NewFakeBackOff(initial, max time.Duration, tc *FakeClock) *Backoff {
func NewBackOff(initial, max time.Duration) *Backoff { func NewBackOff(initial, max time.Duration) *Backoff {
return &Backoff{ return &Backoff{
perItemBackoff: map[string]*backoffEntry{}, perItemBackoff: map[string]*backoffEntry{},
Clock: RealClock{}, Clock: util.RealClock{},
defaultDuration: initial, defaultDuration: initial,
maxDuration: max, maxDuration: max,
} }

View File

@ -14,16 +14,17 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package util package flowcontrol
import ( import (
"k8s.io/kubernetes/pkg/util"
"testing" "testing"
"time" "time"
) )
func TestSlowBackoff(t *testing.T) { func TestSlowBackoff(t *testing.T) {
id := "_idSlow" id := "_idSlow"
tc := NewFakeClock(time.Now()) tc := util.NewFakeClock(time.Now())
step := time.Second step := time.Second
maxDuration := 50 * step maxDuration := 50 * step
@ -49,7 +50,7 @@ func TestSlowBackoff(t *testing.T) {
func TestBackoffReset(t *testing.T) { func TestBackoffReset(t *testing.T) {
id := "_idReset" id := "_idReset"
tc := NewFakeClock(time.Now()) tc := util.NewFakeClock(time.Now())
step := time.Second step := time.Second
maxDuration := step * 5 maxDuration := step * 5
b := NewFakeBackOff(step, maxDuration, tc) b := NewFakeBackOff(step, maxDuration, tc)
@ -75,7 +76,7 @@ func TestBackoffReset(t *testing.T) {
func TestBackoffHightWaterMark(t *testing.T) { func TestBackoffHightWaterMark(t *testing.T) {
id := "_idHiWaterMark" id := "_idHiWaterMark"
tc := NewFakeClock(time.Now()) tc := util.NewFakeClock(time.Now())
step := time.Second step := time.Second
maxDuration := 5 * step maxDuration := 5 * step
b := NewFakeBackOff(step, maxDuration, tc) b := NewFakeBackOff(step, maxDuration, tc)
@ -97,7 +98,7 @@ func TestBackoffHightWaterMark(t *testing.T) {
func TestBackoffGC(t *testing.T) { func TestBackoffGC(t *testing.T) {
id := "_idGC" id := "_idGC"
tc := NewFakeClock(time.Now()) tc := util.NewFakeClock(time.Now())
step := time.Second step := time.Second
maxDuration := 5 * step maxDuration := 5 * step
@ -125,7 +126,7 @@ func TestBackoffGC(t *testing.T) {
func TestIsInBackOffSinceUpdate(t *testing.T) { func TestIsInBackOffSinceUpdate(t *testing.T) {
id := "_idIsInBackOffSinceUpdate" id := "_idIsInBackOffSinceUpdate"
tc := NewFakeClock(time.Now()) tc := util.NewFakeClock(time.Now())
step := time.Second step := time.Second
maxDuration := 10 * step maxDuration := 10 * step
b := NewFakeBackOff(step, maxDuration, tc) b := NewFakeBackOff(step, maxDuration, tc)