k3s/pkg/kubelet/pod_workers.go

177 lines
5.7 KiB
Go
Raw Normal View History

2015-02-27 09:19:41 +00:00
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"sync"
"time"
2015-02-27 09:19:41 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
2015-02-27 09:19:41 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
2015-02-27 09:19:41 +00:00
"github.com/golang/glog"
)
type syncPodFnType func(*api.Pod, bool, container.Pod) error
2015-02-27 09:19:41 +00:00
type podWorkers struct {
// Protects all per worker fields.
2015-02-27 09:19:41 +00:00
podLock sync.Mutex
// Tracks all running per-pod goroutines - per-pod goroutine will be
// processing updates received through its corresponding channel.
podUpdates map[types.UID]chan workUpdate
2015-02-27 09:19:41 +00:00
// Track the current state of per-pod goroutines.
// Currently all update request for a given pod coming when another
// update of this pod is being processed are ignored.
isWorking map[types.UID]bool
// Tracks the last undelivered work item for this pod - a work item is
// undelivered if it comes in while the worker is working.
lastUndeliveredWorkUpdate map[types.UID]workUpdate
2015-02-27 09:19:41 +00:00
// DockerCache is used for listing running containers.
dockerCache dockertools.DockerCache
// This function is run to sync the desired stated of pod.
// NOTE: This function has to be thread-safe - it can be called for
// different pods at the same time.
syncPodFn syncPodFnType
// The EventRecorder to use
recorder record.EventRecorder
2015-02-27 09:19:41 +00:00
}
type workUpdate struct {
// The pod state to reflect.
2015-03-13 13:19:07 +00:00
pod *api.Pod
// Whether there exists a mirror pod for pod.
hasMirrorPod bool
// Function to call when the update is complete.
updateCompleteFn func()
}
func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType,
recorder record.EventRecorder) *podWorkers {
2015-02-27 09:19:41 +00:00
return &podWorkers{
podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{},
lastUndeliveredWorkUpdate: map[types.UID]workUpdate{},
dockerCache: dockerCache,
syncPodFn: syncPodFn,
recorder: recorder,
2015-02-27 09:19:41 +00:00
}
}
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
var minDockerCacheTime time.Time
for newWork := range podUpdates {
2015-02-27 09:19:41 +00:00
func() {
defer p.checkForUpdates(newWork.pod.UID, newWork.updateCompleteFn)
// We would like to have the state of Docker from at least the moment
// when we finished the previous processing of that pod.
if err := p.dockerCache.ForceUpdateIfOlder(minDockerCacheTime); err != nil {
glog.Errorf("Error updating docker cache: %v", err)
return
}
pods, err := p.dockerCache.GetPods()
2015-02-27 09:19:41 +00:00
if err != nil {
glog.Errorf("Error getting pods while syncing pod: %v", err)
2015-02-27 09:19:41 +00:00
return
}
err = p.syncPodFn(newWork.pod, newWork.hasMirrorPod,
container.Pods(pods).FindPodByID(newWork.pod.UID))
2015-02-27 09:19:41 +00:00
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)
2015-02-27 09:19:41 +00:00
return
}
minDockerCacheTime = time.Now()
newWork.updateCompleteFn()
2015-02-27 09:19:41 +00:00
}()
}
}
// Apply the new setting to the specified pod. updateComplete is called when the update is completed.
func (p *podWorkers) UpdatePod(pod *api.Pod, hasMirrorPod bool, updateComplete func()) {
2015-02-27 09:19:41 +00:00
uid := pod.UID
var podUpdates chan workUpdate
2015-02-27 09:19:41 +00:00
var exists bool
p.podLock.Lock()
defer p.podLock.Unlock()
if podUpdates, exists = p.podUpdates[uid]; !exists {
// We need to have a buffer here, because checkForUpdates() method that
// puts an update into channel is called from the same goroutine where
// the channel is consumed. However, it is guaranteed that in such case
// the channel is empty, so buffer of size 1 is enough.
podUpdates = make(chan workUpdate, 1)
2015-02-27 09:19:41 +00:00
p.podUpdates[uid] = podUpdates
go func() {
defer util.HandleCrash()
p.managePodLoop(podUpdates)
}()
2015-02-27 09:19:41 +00:00
}
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- workUpdate{
pod: pod,
hasMirrorPod: hasMirrorPod,
updateCompleteFn: updateComplete,
}
} else {
p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{
pod: pod,
hasMirrorPod: hasMirrorPod,
updateCompleteFn: updateComplete,
}
2015-02-27 09:19:41 +00:00
}
}
func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {
p.podLock.Lock()
defer p.podLock.Unlock()
for key, channel := range p.podUpdates {
if _, exists := desiredPods[key]; !exists {
close(channel)
delete(p.podUpdates, key)
// If there is an undelivered work update for this pod we need to remove it
// since per-pod goroutine won't be able to put it to the already closed
// channel when it finish processing the current work update.
if _, cached := p.lastUndeliveredWorkUpdate[key]; cached {
delete(p.lastUndeliveredWorkUpdate, key)
}
2015-02-27 09:19:41 +00:00
}
}
}
func (p *podWorkers) checkForUpdates(uid types.UID, updateComplete func()) {
2015-02-27 09:19:41 +00:00
p.podLock.Lock()
defer p.podLock.Unlock()
if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists {
p.podUpdates[uid] <- workUpdate
delete(p.lastUndeliveredWorkUpdate, uid)
} else {
p.isWorking[uid] = false
}
2015-02-27 09:19:41 +00:00
}