2015-02-27 09:19:41 +00:00
|
|
|
/*
|
2015-05-01 16:19:44 +00:00
|
|
|
Copyright 2014 The Kubernetes Authors All rights reserved.
|
2015-02-27 09:19:41 +00:00
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package kubelet
|
|
|
|
|
|
|
|
import (
|
|
|
|
"sync"
|
2015-03-09 14:30:08 +00:00
|
|
|
"time"
|
2015-02-27 09:19:41 +00:00
|
|
|
|
2015-08-05 22:05:17 +00:00
|
|
|
"github.com/golang/glog"
|
2015-08-05 22:03:47 +00:00
|
|
|
"k8s.io/kubernetes/pkg/api"
|
2015-09-03 21:40:58 +00:00
|
|
|
"k8s.io/kubernetes/pkg/client/record"
|
2015-08-05 22:03:47 +00:00
|
|
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
2015-10-09 17:24:31 +00:00
|
|
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
2015-09-02 17:18:11 +00:00
|
|
|
"k8s.io/kubernetes/pkg/kubelet/util/queue"
|
2015-08-05 22:03:47 +00:00
|
|
|
"k8s.io/kubernetes/pkg/types"
|
2016-01-15 07:32:10 +00:00
|
|
|
"k8s.io/kubernetes/pkg/util/runtime"
|
2016-02-05 15:27:06 +00:00
|
|
|
"k8s.io/kubernetes/pkg/util/wait"
|
2015-02-27 09:19:41 +00:00
|
|
|
)
|
|
|
|
|
2015-05-08 18:48:26 +00:00
|
|
|
// PodWorkers is an abstract interface for testability.
|
|
|
|
type PodWorkers interface {
|
2015-10-09 17:24:31 +00:00
|
|
|
UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kubetypes.SyncPodType, updateComplete func())
|
2015-05-08 18:48:26 +00:00
|
|
|
ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
|
2015-08-19 00:52:26 +00:00
|
|
|
ForgetWorker(uid types.UID)
|
2015-05-08 18:48:26 +00:00
|
|
|
}
|
|
|
|
|
2016-01-20 21:26:02 +00:00
|
|
|
type syncPodFnType func(*api.Pod, *api.Pod, *kubecontainer.PodStatus, kubetypes.SyncPodType) error
|
2015-02-27 09:19:41 +00:00
|
|
|
|
2016-02-05 15:27:06 +00:00
|
|
|
const (
|
|
|
|
// jitter factor for resyncInterval
|
|
|
|
workerResyncIntervalJitterFactor = 0.5
|
|
|
|
|
|
|
|
// jitter factor for backOffPeriod
|
|
|
|
workerBackOffPeriodJitterFactor = 0.5
|
|
|
|
)
|
|
|
|
|
2015-02-27 09:19:41 +00:00
|
|
|
type podWorkers struct {
|
2015-03-09 14:23:52 +00:00
|
|
|
// Protects all per worker fields.
|
2015-02-27 09:19:41 +00:00
|
|
|
podLock sync.Mutex
|
|
|
|
|
|
|
|
// Tracks all running per-pod goroutines - per-pod goroutine will be
|
|
|
|
// processing updates received through its corresponding channel.
|
2015-02-24 23:29:18 +00:00
|
|
|
podUpdates map[types.UID]chan workUpdate
|
2015-02-27 09:19:41 +00:00
|
|
|
// Track the current state of per-pod goroutines.
|
|
|
|
// Currently all update request for a given pod coming when another
|
|
|
|
// update of this pod is being processed are ignored.
|
|
|
|
isWorking map[types.UID]bool
|
2015-03-09 14:30:08 +00:00
|
|
|
// Tracks the last undelivered work item for this pod - a work item is
|
|
|
|
// undelivered if it comes in while the worker is working.
|
|
|
|
lastUndeliveredWorkUpdate map[types.UID]workUpdate
|
2015-02-27 09:19:41 +00:00
|
|
|
|
2015-09-02 17:18:11 +00:00
|
|
|
workQueue queue.WorkQueue
|
|
|
|
|
2015-02-27 09:19:41 +00:00
|
|
|
// This function is run to sync the desired stated of pod.
|
|
|
|
// NOTE: This function has to be thread-safe - it can be called for
|
|
|
|
// different pods at the same time.
|
2015-02-24 23:29:18 +00:00
|
|
|
syncPodFn syncPodFnType
|
2015-03-03 06:06:20 +00:00
|
|
|
|
|
|
|
// The EventRecorder to use
|
|
|
|
recorder record.EventRecorder
|
2015-09-02 17:18:11 +00:00
|
|
|
|
|
|
|
// backOffPeriod is the duration to back off when there is a sync error.
|
|
|
|
backOffPeriod time.Duration
|
|
|
|
|
|
|
|
// resyncInterval is the duration to wait until the next sync.
|
|
|
|
resyncInterval time.Duration
|
2016-01-20 02:15:10 +00:00
|
|
|
|
|
|
|
// podCache stores kubecontainer.PodStatus for all pods.
|
|
|
|
podCache kubecontainer.Cache
|
2015-02-27 09:19:41 +00:00
|
|
|
}
|
|
|
|
|
2015-02-24 23:29:18 +00:00
|
|
|
type workUpdate struct {
|
|
|
|
// The pod state to reflect.
|
2015-03-13 13:19:07 +00:00
|
|
|
pod *api.Pod
|
2015-02-24 23:29:18 +00:00
|
|
|
|
2015-03-24 23:52:38 +00:00
|
|
|
// The mirror pod of pod; nil if it does not exist.
|
|
|
|
mirrorPod *api.Pod
|
2015-03-09 22:46:47 +00:00
|
|
|
|
2015-02-24 23:29:18 +00:00
|
|
|
// Function to call when the update is complete.
|
|
|
|
updateCompleteFn func()
|
2015-06-10 00:50:15 +00:00
|
|
|
|
|
|
|
// A string describing the type of this update, eg: create
|
2015-10-09 17:24:31 +00:00
|
|
|
updateType kubetypes.SyncPodType
|
2015-02-24 23:29:18 +00:00
|
|
|
}
|
|
|
|
|
2016-01-20 21:26:02 +00:00
|
|
|
func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQueue queue.WorkQueue,
|
|
|
|
resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers {
|
2015-02-27 09:19:41 +00:00
|
|
|
return &podWorkers{
|
2015-03-09 14:30:08 +00:00
|
|
|
podUpdates: map[types.UID]chan workUpdate{},
|
|
|
|
isWorking: map[types.UID]bool{},
|
|
|
|
lastUndeliveredWorkUpdate: map[types.UID]workUpdate{},
|
|
|
|
syncPodFn: syncPodFn,
|
|
|
|
recorder: recorder,
|
2015-09-02 17:18:11 +00:00
|
|
|
workQueue: workQueue,
|
|
|
|
resyncInterval: resyncInterval,
|
|
|
|
backOffPeriod: backOffPeriod,
|
2016-01-20 02:15:10 +00:00
|
|
|
podCache: podCache,
|
2015-02-27 09:19:41 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-02-24 23:29:18 +00:00
|
|
|
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
|
2016-01-20 21:26:02 +00:00
|
|
|
var lastSyncTime time.Time
|
2015-02-24 23:29:18 +00:00
|
|
|
for newWork := range podUpdates {
|
2016-01-20 21:26:02 +00:00
|
|
|
err := func() error {
|
2016-01-20 02:15:10 +00:00
|
|
|
podID := newWork.pod.UID
|
2016-01-20 21:26:02 +00:00
|
|
|
// This is a blocking call that would return only if the cache
|
|
|
|
// has an entry for the pod that is newer than minRuntimeCache
|
|
|
|
// Time. This ensures the worker doesn't start syncing until
|
|
|
|
// after the cache is at least newer than the finished time of
|
|
|
|
// the previous sync.
|
|
|
|
status, err := p.podCache.GetNewerThan(podID, lastSyncTime)
|
2015-02-27 09:19:41 +00:00
|
|
|
if err != nil {
|
2015-09-02 17:18:11 +00:00
|
|
|
return err
|
2015-02-27 09:19:41 +00:00
|
|
|
}
|
2016-01-20 21:26:02 +00:00
|
|
|
err = p.syncPodFn(newWork.pod, newWork.mirrorPod, status, newWork.updateType)
|
|
|
|
lastSyncTime = time.Now()
|
2015-02-27 09:19:41 +00:00
|
|
|
if err != nil {
|
2015-09-02 17:18:11 +00:00
|
|
|
return err
|
2015-02-27 09:19:41 +00:00
|
|
|
}
|
2015-02-24 23:29:18 +00:00
|
|
|
newWork.updateCompleteFn()
|
2015-09-02 17:18:11 +00:00
|
|
|
return nil
|
2015-02-27 09:19:41 +00:00
|
|
|
}()
|
2016-01-20 21:26:02 +00:00
|
|
|
if err != nil {
|
|
|
|
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
|
|
|
|
p.recorder.Eventf(newWork.pod, api.EventTypeWarning, kubecontainer.FailedSync, "Error syncing pod, skipping: %v", err)
|
|
|
|
}
|
2015-09-02 17:18:11 +00:00
|
|
|
p.wrapUp(newWork.pod.UID, err)
|
2015-02-27 09:19:41 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-02-24 23:29:18 +00:00
|
|
|
// Apply the new setting to the specified pod. updateComplete is called when the update is completed.
|
2015-10-09 17:24:31 +00:00
|
|
|
func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kubetypes.SyncPodType, updateComplete func()) {
|
2015-02-27 09:19:41 +00:00
|
|
|
uid := pod.UID
|
2015-02-24 23:29:18 +00:00
|
|
|
var podUpdates chan workUpdate
|
2015-02-27 09:19:41 +00:00
|
|
|
var exists bool
|
|
|
|
|
|
|
|
p.podLock.Lock()
|
|
|
|
defer p.podLock.Unlock()
|
|
|
|
if podUpdates, exists = p.podUpdates[uid]; !exists {
|
2015-03-09 14:30:08 +00:00
|
|
|
// We need to have a buffer here, because checkForUpdates() method that
|
|
|
|
// puts an update into channel is called from the same goroutine where
|
|
|
|
// the channel is consumed. However, it is guaranteed that in such case
|
|
|
|
// the channel is empty, so buffer of size 1 is enough.
|
2015-02-24 23:29:18 +00:00
|
|
|
podUpdates = make(chan workUpdate, 1)
|
2015-02-27 09:19:41 +00:00
|
|
|
p.podUpdates[uid] = podUpdates
|
2015-06-10 00:50:15 +00:00
|
|
|
|
|
|
|
// Creating a new pod worker either means this is a new pod, or that the
|
|
|
|
// kubelet just restarted. In either case the kubelet is willing to believe
|
|
|
|
// the status of the pod for the first pod worker sync. See corresponding
|
|
|
|
// comment in syncPod.
|
2015-02-28 00:32:40 +00:00
|
|
|
go func() {
|
2016-01-15 07:32:10 +00:00
|
|
|
defer runtime.HandleCrash()
|
2015-02-28 00:32:40 +00:00
|
|
|
p.managePodLoop(podUpdates)
|
|
|
|
}()
|
2015-02-27 09:19:41 +00:00
|
|
|
}
|
|
|
|
if !p.isWorking[pod.UID] {
|
|
|
|
p.isWorking[pod.UID] = true
|
2015-02-24 23:29:18 +00:00
|
|
|
podUpdates <- workUpdate{
|
|
|
|
pod: pod,
|
2015-03-24 23:52:38 +00:00
|
|
|
mirrorPod: mirrorPod,
|
2015-02-24 23:29:18 +00:00
|
|
|
updateCompleteFn: updateComplete,
|
2015-06-10 00:50:15 +00:00
|
|
|
updateType: updateType,
|
2015-02-24 23:29:18 +00:00
|
|
|
}
|
2015-03-09 14:30:08 +00:00
|
|
|
} else {
|
|
|
|
p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{
|
|
|
|
pod: pod,
|
2015-03-24 23:52:38 +00:00
|
|
|
mirrorPod: mirrorPod,
|
2015-03-09 14:30:08 +00:00
|
|
|
updateCompleteFn: updateComplete,
|
2015-06-10 00:50:15 +00:00
|
|
|
updateType: updateType,
|
2015-03-09 14:30:08 +00:00
|
|
|
}
|
2015-02-27 09:19:41 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-08-19 00:52:26 +00:00
|
|
|
func (p *podWorkers) removeWorker(uid types.UID) {
|
|
|
|
if ch, ok := p.podUpdates[uid]; ok {
|
|
|
|
close(ch)
|
|
|
|
delete(p.podUpdates, uid)
|
|
|
|
// If there is an undelivered work update for this pod we need to remove it
|
|
|
|
// since per-pod goroutine won't be able to put it to the already closed
|
|
|
|
// channel when it finish processing the current work update.
|
|
|
|
if _, cached := p.lastUndeliveredWorkUpdate[uid]; cached {
|
|
|
|
delete(p.lastUndeliveredWorkUpdate, uid)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
func (p *podWorkers) ForgetWorker(uid types.UID) {
|
|
|
|
p.podLock.Lock()
|
|
|
|
defer p.podLock.Unlock()
|
|
|
|
p.removeWorker(uid)
|
|
|
|
}
|
|
|
|
|
2015-02-27 09:19:41 +00:00
|
|
|
func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {
|
|
|
|
p.podLock.Lock()
|
|
|
|
defer p.podLock.Unlock()
|
2015-08-19 00:52:26 +00:00
|
|
|
for key := range p.podUpdates {
|
2015-02-27 09:19:41 +00:00
|
|
|
if _, exists := desiredPods[key]; !exists {
|
2015-08-19 00:52:26 +00:00
|
|
|
p.removeWorker(key)
|
2015-02-27 09:19:41 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-09-02 17:18:11 +00:00
|
|
|
func (p *podWorkers) wrapUp(uid types.UID, syncErr error) {
|
|
|
|
// Requeue the last update if the last sync returned error.
|
2016-01-20 02:15:10 +00:00
|
|
|
switch {
|
|
|
|
case syncErr == nil:
|
|
|
|
// No error; requeue at the regular resync interval.
|
2016-02-05 15:27:06 +00:00
|
|
|
p.workQueue.Enqueue(uid, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
|
2016-01-20 02:15:10 +00:00
|
|
|
default:
|
|
|
|
// Error occurred during the sync; back off and then retry.
|
2016-02-05 15:27:06 +00:00
|
|
|
p.workQueue.Enqueue(uid, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
|
2015-09-02 17:18:11 +00:00
|
|
|
}
|
|
|
|
p.checkForUpdates(uid)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *podWorkers) checkForUpdates(uid types.UID) {
|
2015-02-27 09:19:41 +00:00
|
|
|
p.podLock.Lock()
|
2015-03-09 14:30:08 +00:00
|
|
|
defer p.podLock.Unlock()
|
|
|
|
if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists {
|
|
|
|
p.podUpdates[uid] <- workUpdate
|
|
|
|
delete(p.lastUndeliveredWorkUpdate, uid)
|
|
|
|
} else {
|
|
|
|
p.isWorking[uid] = false
|
|
|
|
}
|
2015-02-27 09:19:41 +00:00
|
|
|
}
|