Merge pull request #22733 from resouer/flow-control

Automatic merge from submit-queue

Add flow control pkg

minor fix ref #15634
Refactor pkg names in back off related files
pull/6/head
k8s-merge-robot 2016-04-11 06:18:51 -07:00
commit 6a87dba0b8
19 changed files with 57 additions and 49 deletions

View File

@ -27,7 +27,6 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)
@ -105,7 +104,7 @@ func readExpBackoffConfig() BackoffManager {
return &NoBackoff{}
}
return &URLBackoff{
Backoff: util.NewBackOff(
Backoff: flowcontrol.NewBackOff(
time.Duration(backoffBaseInt)*time.Second,
time.Duration(backoffDurationInt)*time.Second)}
}

View File

@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/intstr"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
@ -770,7 +771,7 @@ func TestBackoffLifecycle(t *testing.T) {
clock := util.FakeClock{}
request.backoffMgr = &URLBackoff{
// Use a fake backoff here to avoid flakes and speed the test up.
Backoff: util.NewFakeBackOff(
Backoff: flowcontrol.NewFakeBackOff(
time.Duration(1)*time.Second,
time.Duration(200)*time.Second,
&clock,

View File

@ -21,7 +21,7 @@ import (
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -42,7 +42,7 @@ type BackoffManager interface {
// we need for URL specific exponential backoff.
type URLBackoff struct {
// Uses backoff as underlying implementation.
Backoff *util.Backoff
Backoff *flowcontrol.Backoff
}
// NoBackoff is a stub implementation, can be used for mocking or else as a default.
@ -63,7 +63,7 @@ func (n *NoBackoff) Sleep(d time.Duration) {
// by tests which want to run 1000s of mock requests without slowing down.
func (b *URLBackoff) Disable() {
glog.V(4).Infof("Disabling backoff strategy")
b.Backoff = util.NewBackOff(0*time.Second, 0*time.Second)
b.Backoff = flowcontrol.NewBackOff(0*time.Second, 0*time.Second)
}
// baseUrlKey returns the key which urls will be mapped to.

View File

@ -21,7 +21,7 @@ import (
"testing"
"time"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)
func parse(raw string) *url.URL {
@ -31,7 +31,7 @@ func parse(raw string) *url.URL {
func TestURLBackoffFunctionalityCollisions(t *testing.T) {
myBackoff := &URLBackoff{
Backoff: util.NewBackOff(1*time.Second, 60*time.Second),
Backoff: flowcontrol.NewBackOff(1*time.Second, 60*time.Second),
}
// Add some noise and make sure backoff for a clean URL is zero.
@ -47,7 +47,7 @@ func TestURLBackoffFunctionalityCollisions(t *testing.T) {
// TestURLBackoffFunctionality generally tests the URLBackoff wrapper. We avoid duplicating tests from backoff and request.
func TestURLBackoffFunctionality(t *testing.T) {
myBackoff := &URLBackoff{
Backoff: util.NewBackOff(1*time.Second, 60*time.Second),
Backoff: flowcontrol.NewBackOff(1*time.Second, 60*time.Second),
}
// Now test that backoff increases, then recovers.

View File

@ -22,7 +22,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)
// imagePuller pulls the image using Runtime.PullImage().
@ -31,7 +31,7 @@ import (
type imagePuller struct {
recorder record.EventRecorder
runtime Runtime
backOff *util.Backoff
backOff *flowcontrol.Backoff
}
// enforce compatibility.
@ -39,7 +39,7 @@ var _ ImagePuller = &imagePuller{}
// NewImagePuller takes an event recorder and container runtime to create a
// image puller that wraps the container runtime's PullImage interface.
func NewImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller {
func NewImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *flowcontrol.Backoff) ImagePuller {
return &imagePuller{
recorder: recorder,
runtime: runtime,

View File

@ -27,6 +27,7 @@ import (
. "k8s.io/kubernetes/pkg/kubelet/container"
ctest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)
func TestPuller(t *testing.T) {
@ -99,7 +100,7 @@ func TestPuller(t *testing.T) {
ImagePullPolicy: c.policy,
}
backOff := util.NewBackOff(time.Second, time.Minute)
backOff := flowcontrol.NewBackOff(time.Second, time.Minute)
fakeClock := util.NewFakeClock(time.Now())
backOff.Clock = fakeClock

View File

@ -26,7 +26,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/volume"
)
@ -68,7 +68,7 @@ type Runtime interface {
// GarbageCollect removes dead containers using the specified container gc policy
GarbageCollect(gcPolicy ContainerGCPolicy) error
// Syncs the running pod into the desired pod.
SyncPod(pod *api.Pod, apiPodStatus api.PodStatus, podStatus *PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) PodSyncResult
SyncPod(pod *api.Pod, apiPodStatus api.PodStatus, podStatus *PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) PodSyncResult
// KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.
// TODO(random-liu): Return PodSyncResult in KillPod.
KillPod(pod *api.Pod, runningPod Pod) error

View File

@ -23,7 +23,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait"
)
@ -42,7 +42,7 @@ type imagePullRequest struct {
type serializedImagePuller struct {
recorder record.EventRecorder
runtime Runtime
backOff *util.Backoff
backOff *flowcontrol.Backoff
pullRequests chan *imagePullRequest
}
@ -53,7 +53,7 @@ var _ ImagePuller = &serializedImagePuller{}
// image puller that wraps the container runtime's PullImage interface.
// Pulls one image at a time.
// Issue #10959 has the rationale behind serializing image pulls.
func NewSerializedImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller {
func NewSerializedImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *flowcontrol.Backoff) ImagePuller {
imagePuller := &serializedImagePuller{
recorder: recorder,
runtime: runtime,

View File

@ -27,6 +27,7 @@ import (
. "k8s.io/kubernetes/pkg/kubelet/container"
ctest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)
func TestSerializedPuller(t *testing.T) {
@ -99,7 +100,7 @@ func TestSerializedPuller(t *testing.T) {
ImagePullPolicy: c.policy,
}
backOff := util.NewBackOff(time.Second, time.Minute)
backOff := flowcontrol.NewBackOff(time.Second, time.Minute)
fakeClock := util.NewFakeClock(time.Now())
backOff.Clock = fakeClock

View File

@ -26,7 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api"
. "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/volume"
)
@ -189,7 +189,7 @@ func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) {
return f.PodList, f.Err
}
func (f *FakeRuntime) SyncPod(pod *api.Pod, _ api.PodStatus, _ *PodStatus, _ []api.Secret, backOff *util.Backoff) (result PodSyncResult) {
func (f *FakeRuntime) SyncPod(pod *api.Pod, _ api.PodStatus, _ *PodStatus, _ []api.Secret, backOff *flowcontrol.Backoff) (result PodSyncResult) {
f.Lock()
defer f.Unlock()

View File

@ -23,7 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api"
. "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/volume"
)
@ -63,7 +63,7 @@ func (r *Mock) GetPods(all bool) ([]*Pod, error) {
return args.Get(0).([]*Pod), args.Error(1)
}
func (r *Mock) SyncPod(pod *api.Pod, apiStatus api.PodStatus, status *PodStatus, secrets []api.Secret, backOff *util.Backoff) PodSyncResult {
func (r *Mock) SyncPod(pod *api.Pod, apiStatus api.PodStatus, status *PodStatus, secrets []api.Secret, backOff *flowcontrol.Backoff) PodSyncResult {
args := r.Called(pod, apiStatus, status, secrets, backOff)
return args.Get(0).(PodSyncResult)
}

View File

@ -25,7 +25,7 @@ import (
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/procfs"
)
@ -43,7 +43,7 @@ func NewFakeDockerManager(
osInterface kubecontainer.OSInterface,
networkPlugin network.NetworkPlugin,
runtimeHelper kubecontainer.RuntimeHelper,
httpClient kubetypes.HttpGetter, imageBackOff *util.Backoff) *DockerManager {
httpClient kubetypes.HttpGetter, imageBackOff *flowcontrol.Backoff) *DockerManager {
fakeOOMAdjuster := oom.NewFakeOOMAdjuster()
fakeProcFs := procfs.NewFakeProcFS()

View File

@ -50,7 +50,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/procfs"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
@ -194,7 +194,7 @@ func NewDockerManager(
oomAdjuster *oom.OOMAdjuster,
procFs procfs.ProcFSInterface,
cpuCFSQuota bool,
imageBackOff *util.Backoff,
imageBackOff *flowcontrol.Backoff,
serializeImagePulls bool,
enableCustomMetrics bool,
hairpinMode bool,
@ -1732,7 +1732,7 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, podStatus *kub
}
// Sync the running pod to match the specified desired pod.
func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) (result kubecontainer.PodSyncResult) {
func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
start := time.Now()
defer func() {
metrics.ContainerManagerLatency.WithLabelValues("SyncPod").Observe(metrics.SinceInMicroseconds(start))
@ -1982,7 +1982,7 @@ func getUidFromUser(id string) string {
// If all instances of a container are garbage collected, doBackOff will also return false, which means the container may be restarted before the
// backoff deadline. However, because that won't cause error and the chance is really slim, we can just ignore it for now.
// If a container is still in backoff, the function will return a brief backoff error and a detailed error message.
func (dm *DockerManager) doBackOff(pod *api.Pod, container *api.Container, podStatus *kubecontainer.PodStatus, backOff *util.Backoff) (bool, error, string) {
func (dm *DockerManager) doBackOff(pod *api.Pod, container *api.Container, podStatus *kubecontainer.PodStatus, backOff *flowcontrol.Backoff) (bool, error, string) {
var cStatus *kubecontainer.ContainerStatus
// Use the finished time of the latest exited container as the start point to calculate whether to do back-off.
// TODO(random-liu): Better define backoff start point; add unit and e2e test after we finalize this. (See github issue #22240)

View File

@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
uexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -106,7 +107,7 @@ func newTestDockerManagerWithHTTPClientWithVersion(fakeHTTPClient *fakeHTTP, ver
networkPlugin,
&fakeRuntimeHelper{},
fakeHTTPClient,
util.NewBackOff(time.Second, 300*time.Second))
flowcontrol.NewBackOff(time.Second, 300*time.Second))
return dockerManager, fakeDocker
}
@ -586,14 +587,14 @@ func generatePodInfraContainerHash(pod *api.Pod) uint64 {
// runSyncPod is a helper function to retrieve the running pods from the fake
// docker client and runs SyncPod for the given pod.
func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, pod *api.Pod, backOff *util.Backoff, expectErr bool) kubecontainer.PodSyncResult {
func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, pod *api.Pod, backOff *flowcontrol.Backoff, expectErr bool) kubecontainer.PodSyncResult {
podStatus, err := dm.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
fakeDocker.ClearCalls()
if backOff == nil {
backOff = util.NewBackOff(time.Second, time.Minute)
backOff = flowcontrol.NewBackOff(time.Second, time.Minute)
}
// api.PodStatus is not used in SyncPod now, pass in an empty one.
result := dm.SyncPod(pod, api.PodStatus{}, podStatus, []api.Secret{}, backOff)
@ -1089,7 +1090,7 @@ func TestSyncPodBackoff(t *testing.T) {
{130, 1, 0, startCalls, false},
}
backOff := util.NewBackOff(time.Second, time.Minute)
backOff := flowcontrol.NewBackOff(time.Second, time.Minute)
backOff.Clock = fakeClock
for _, c := range tests {
fakeDocker.SetFakeContainers(dockerContainers)

View File

@ -72,6 +72,7 @@ import (
"k8s.io/kubernetes/pkg/util/atomic"
"k8s.io/kubernetes/pkg/util/bandwidth"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/flowcontrol"
kubeio "k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
utilnet "k8s.io/kubernetes/pkg/util/net"
@ -363,7 +364,7 @@ func NewMainKubelet(
}
procFs := procfs.NewProcFS()
imageBackOff := util.NewBackOff(backOffPeriod, MaxContainerBackOff)
imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.livenessManager = proberesults.NewManager()
@ -475,7 +476,7 @@ func NewMainKubelet(
klet.workQueue = queue.NewBasicWorkQueue()
klet.podWorkers = newPodWorkers(klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
klet.backOff = util.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
klet.sourcesSeen = sets.NewString()
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
@ -691,7 +692,7 @@ type Kubelet struct {
syncLoopMonitor atomic.Value
// Container restart Backoff
backOff *util.Backoff
backOff *flowcontrol.Backoff
// Channel for sending pods to kill.
podKillingCh chan *kubecontainer.PodPair

View File

@ -62,6 +62,7 @@ import (
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/bandwidth"
"k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/mount"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
@ -184,7 +185,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
}
kubelet.imageManager, err = newImageManager(fakeRuntime, mockCadvisor, fakeRecorder, fakeNodeRef, fakeImageGCPolicy)
fakeClock := util.NewFakeClock(time.Now())
kubelet.backOff = util.NewBackOff(time.Second, time.Minute)
kubelet.backOff = flowcontrol.NewBackOff(time.Second, time.Minute)
kubelet.backOff.Clock = fakeClock
kubelet.podKillingCh = make(chan *kubecontainer.PodPair, 20)
kubelet.resyncInterval = 10 * time.Second

View File

@ -47,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
utilexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/sets"
utilstrings "k8s.io/kubernetes/pkg/util/strings"
)
@ -141,7 +142,7 @@ func New(
containerRefManager *kubecontainer.RefManager,
livenessManager proberesults.Manager,
volumeGetter volumeGetter,
imageBackOff *util.Backoff,
imageBackOff *flowcontrol.Backoff,
serializeImagePulls bool,
) (*Runtime, error) {
// Create dbus connection.
@ -1124,7 +1125,7 @@ func (r *Runtime) Status() error {
}
// SyncPod syncs the running pod to match the specified desired pod.
func (r *Runtime) SyncPod(pod *api.Pod, podStatus api.PodStatus, internalPodStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) (result kubecontainer.PodSyncResult) {
func (r *Runtime) SyncPod(pod *api.Pod, podStatus api.PodStatus, internalPodStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
var err error
defer func() {
if err != nil {

View File

@ -14,12 +14,13 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package util
package flowcontrol
import (
"sync"
"time"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/integer"
)
@ -30,13 +31,13 @@ type backoffEntry struct {
type Backoff struct {
sync.Mutex
Clock Clock
Clock util.Clock
defaultDuration time.Duration
maxDuration time.Duration
perItemBackoff map[string]*backoffEntry
}
func NewFakeBackOff(initial, max time.Duration, tc *FakeClock) *Backoff {
func NewFakeBackOff(initial, max time.Duration, tc *util.FakeClock) *Backoff {
return &Backoff{
perItemBackoff: map[string]*backoffEntry{},
Clock: tc,
@ -48,7 +49,7 @@ func NewFakeBackOff(initial, max time.Duration, tc *FakeClock) *Backoff {
func NewBackOff(initial, max time.Duration) *Backoff {
return &Backoff{
perItemBackoff: map[string]*backoffEntry{},
Clock: RealClock{},
Clock: util.RealClock{},
defaultDuration: initial,
maxDuration: max,
}

View File

@ -14,16 +14,17 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package util
package flowcontrol
import (
"k8s.io/kubernetes/pkg/util"
"testing"
"time"
)
func TestSlowBackoff(t *testing.T) {
id := "_idSlow"
tc := NewFakeClock(time.Now())
tc := util.NewFakeClock(time.Now())
step := time.Second
maxDuration := 50 * step
@ -49,7 +50,7 @@ func TestSlowBackoff(t *testing.T) {
func TestBackoffReset(t *testing.T) {
id := "_idReset"
tc := NewFakeClock(time.Now())
tc := util.NewFakeClock(time.Now())
step := time.Second
maxDuration := step * 5
b := NewFakeBackOff(step, maxDuration, tc)
@ -75,7 +76,7 @@ func TestBackoffReset(t *testing.T) {
func TestBackoffHightWaterMark(t *testing.T) {
id := "_idHiWaterMark"
tc := NewFakeClock(time.Now())
tc := util.NewFakeClock(time.Now())
step := time.Second
maxDuration := 5 * step
b := NewFakeBackOff(step, maxDuration, tc)
@ -97,7 +98,7 @@ func TestBackoffHightWaterMark(t *testing.T) {
func TestBackoffGC(t *testing.T) {
id := "_idGC"
tc := NewFakeClock(time.Now())
tc := util.NewFakeClock(time.Now())
step := time.Second
maxDuration := 5 * step
@ -125,7 +126,7 @@ func TestBackoffGC(t *testing.T) {
func TestIsInBackOffSinceUpdate(t *testing.T) {
id := "_idIsInBackOffSinceUpdate"
tc := NewFakeClock(time.Now())
tc := util.NewFakeClock(time.Now())
step := time.Second
maxDuration := 10 * step
b := NewFakeBackOff(step, maxDuration, tc)