Merge pull request #20726 from ingvagabund/jitter-sync-loops-in-kubelet

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2016-02-10 09:06:59 -08:00
commit c382943353
2 changed files with 18 additions and 3 deletions

View File

@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
) )
// PodWorkers is an abstract interface for testability. // PodWorkers is an abstract interface for testability.
@ -39,6 +40,14 @@ type PodWorkers interface {
type syncPodFnType func(*api.Pod, *api.Pod, *kubecontainer.PodStatus, kubetypes.SyncPodType) error type syncPodFnType func(*api.Pod, *api.Pod, *kubecontainer.PodStatus, kubetypes.SyncPodType) error
const (
// jitter factor for resyncInterval
workerResyncIntervalJitterFactor = 0.5
// jitter factor for backOffPeriod
workerBackOffPeriodJitterFactor = 0.5
)
type podWorkers struct { type podWorkers struct {
// Protects all per worker fields. // Protects all per worker fields.
podLock sync.Mutex podLock sync.Mutex
@ -209,10 +218,10 @@ func (p *podWorkers) wrapUp(uid types.UID, syncErr error) {
switch { switch {
case syncErr == nil: case syncErr == nil:
// No error; requeue at the regular resync interval. // No error; requeue at the regular resync interval.
p.workQueue.Enqueue(uid, p.resyncInterval) p.workQueue.Enqueue(uid, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
default: default:
// Error occurred during the sync; back off and then retry. // Error occurred during the sync; back off and then retry.
p.workQueue.Enqueue(uid, p.backOffPeriod) p.workQueue.Enqueue(uid, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
} }
p.checkForUpdates(uid) p.checkForUpdates(uid)
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package prober package prober
import ( import (
"math/rand"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -93,7 +94,8 @@ func newWorker(
// run periodically probes the container. // run periodically probes the container.
func (w *worker) run() { func (w *worker) run() {
probeTicker := time.NewTicker(time.Duration(w.spec.PeriodSeconds) * time.Second) probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second
probeTicker := time.NewTicker(probeTickerPeriod)
defer func() { defer func() {
// Clean up. // Clean up.
@ -105,6 +107,10 @@ func (w *worker) run() {
w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType) w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
}() }()
// If kubelet restarted the probes could be started in rapid succession.
// Let the worker wait for a random portion of tickerPeriod before probing.
time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
probeLoop: probeLoop:
for w.doProbe() { for w.doProbe() {
// Wait for next probe tick. // Wait for next probe tick.