mirror of https://github.com/k3s-io/k3s
Merge pull request #5928 from yujuhong/pod_status
Kubelet: pass the acutal pod for status updatepull/6/head
commit
5eb373692b
|
@ -109,7 +109,7 @@ type SyncHandler interface {
|
|||
// Syncs current state to match the specified pods. SyncPodType specified what
|
||||
// type of sync is occuring per pod. StartTime specifies the time at which
|
||||
// syncing began (for use in monitoring).
|
||||
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods map[string]*api.Pod,
|
||||
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods map[string]api.Pod,
|
||||
startTime time.Time) error
|
||||
}
|
||||
|
||||
|
@ -1225,7 +1225,7 @@ type podContainerChangesSpec struct {
|
|||
containersToKeep map[dockertools.DockerID]int
|
||||
}
|
||||
|
||||
func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, runningPod kubecontainer.Pod) (podContainerChangesSpec, error) {
|
||||
func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod) (podContainerChangesSpec, error) {
|
||||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
uid := pod.UID
|
||||
glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid)
|
||||
|
@ -1335,21 +1335,30 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, r
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, runningPod kubecontainer.Pod) error {
|
||||
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error {
|
||||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
uid := pod.UID
|
||||
|
||||
// Before returning, regenerate status and store it in the cache.
|
||||
defer func() {
|
||||
if isStaticPod(pod) && mirrorPod == nil {
|
||||
// No need to cache the status because the mirror pod does not
|
||||
// exist yet.
|
||||
return
|
||||
}
|
||||
status, err := kl.generatePodStatusByPod(pod)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
|
||||
} else {
|
||||
kl.statusManager.SetPodStatus(podFullName, status)
|
||||
podToUpdate := pod
|
||||
if mirrorPod != nil {
|
||||
podToUpdate = mirrorPod
|
||||
}
|
||||
kl.statusManager.SetPodStatus(podToUpdate, status)
|
||||
}
|
||||
}()
|
||||
|
||||
containerChanges, err := kl.computePodContainerChanges(pod, hasMirrorPod, runningPod)
|
||||
containerChanges, err := kl.computePodContainerChanges(pod, runningPod)
|
||||
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1427,7 +1436,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, runningPod kubeconta
|
|||
kl.pullImageAndRunContainer(pod, &pod.Spec.Containers[container], &podVolumes, podInfraContainerID)
|
||||
}
|
||||
|
||||
if !hasMirrorPod && isStaticPod(pod) {
|
||||
if mirrorPod == nil && isStaticPod(pod) {
|
||||
glog.V(4).Infof("Creating a mirror pod %q", podFullName)
|
||||
// To make sure we will properly update static pod status we need to delete
|
||||
// it from status manager. Otherwise it is possible that we will miss manual
|
||||
|
@ -1514,7 +1523,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont
|
|||
|
||||
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
|
||||
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType,
|
||||
mirrorPods map[string]*api.Pod, start time.Time) error {
|
||||
mirrorPods map[string]api.Pod, start time.Time) error {
|
||||
defer func() {
|
||||
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
|
||||
}()
|
||||
|
@ -1555,8 +1564,11 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
|
|||
desiredPods[uid] = empty{}
|
||||
|
||||
// Run the sync in an async manifest worker.
|
||||
_, hasMirrorPod := mirrorPods[podFullName]
|
||||
kl.podWorkers.UpdatePod(pod, hasMirrorPod, func() {
|
||||
var mirrorPod *api.Pod = nil
|
||||
if m, ok := mirrorPods[podFullName]; ok {
|
||||
mirrorPod = &m
|
||||
}
|
||||
kl.podWorkers.UpdatePod(pod, mirrorPod, func() {
|
||||
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
|
||||
})
|
||||
|
||||
|
@ -1698,21 +1710,21 @@ func (kl *Kubelet) handleNotFittingPods(pods []api.Pod) {
|
|||
fitting, notFitting := checkHostPortConflicts(pods)
|
||||
for _, pod := range notFitting {
|
||||
kl.recorder.Eventf(&pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
|
||||
kl.statusManager.SetPodStatus(kubecontainer.GetPodFullName(&pod), api.PodStatus{
|
||||
kl.statusManager.SetPodStatus(&pod, api.PodStatus{
|
||||
Phase: api.PodFailed,
|
||||
Message: "Pod cannot be started due to host port conflict"})
|
||||
}
|
||||
fitting, notFitting = kl.checkNodeSelectorMatching(fitting)
|
||||
for _, pod := range notFitting {
|
||||
kl.recorder.Eventf(&pod, "nodeSelectorMismatching", "Cannot start the pod due to node selector mismatch.")
|
||||
kl.statusManager.SetPodStatus(kubecontainer.GetPodFullName(&pod), api.PodStatus{
|
||||
kl.statusManager.SetPodStatus(&pod, api.PodStatus{
|
||||
Phase: api.PodFailed,
|
||||
Message: "Pod cannot be started due to node selector mismatch"})
|
||||
}
|
||||
fitting, notFitting = kl.checkCapacityExceeded(fitting)
|
||||
for _, pod := range notFitting {
|
||||
kl.recorder.Eventf(&pod, "capacityExceeded", "Cannot start the pod due to exceeded capacity.")
|
||||
kl.statusManager.SetPodStatus(kubecontainer.GetPodFullName(&pod), api.PodStatus{
|
||||
kl.statusManager.SetPodStatus(&pod, api.PodStatus{
|
||||
Phase: api.PodFailed,
|
||||
Message: "Pod cannot be started due to exceeded capacity"})
|
||||
}
|
||||
|
|
|
@ -89,8 +89,8 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
|||
waitGroup := new(sync.WaitGroup)
|
||||
kubelet.podWorkers = newPodWorkers(
|
||||
fakeDockerCache,
|
||||
func(pod *api.Pod, hasMirrorPod bool, runningPod container.Pod) error {
|
||||
err := kubelet.syncPod(pod, hasMirrorPod, runningPod)
|
||||
func(pod *api.Pod, mirrorPod *api.Pod, runningPod container.Pod) error {
|
||||
err := kubelet.syncPod(pod, mirrorPod, runningPod)
|
||||
waitGroup.Done()
|
||||
return err
|
||||
},
|
||||
|
@ -488,7 +488,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
|
|||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -523,7 +523,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
|
|||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -574,7 +574,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
|
|||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -629,7 +629,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
|
|||
}
|
||||
waitGroup.Add(1)
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -681,7 +681,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
|
|||
}
|
||||
waitGroup.Add(1)
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -740,7 +740,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
|||
}
|
||||
waitGroup.Add(1)
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -811,7 +811,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
|
|||
}
|
||||
waitGroup.Add(2)
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -852,7 +852,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
|
|||
ID: "9876",
|
||||
},
|
||||
}
|
||||
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()); err != nil {
|
||||
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]api.Pod{}, time.Now()); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
// Validate nothing happened.
|
||||
|
@ -860,7 +860,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
|
|||
fakeDocker.ClearCalls()
|
||||
|
||||
ready = true
|
||||
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()); err != nil {
|
||||
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]api.Pod{}, time.Now()); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
|
||||
|
@ -899,7 +899,7 @@ func TestSyncPodsDeletes(t *testing.T) {
|
|||
ID: "4567",
|
||||
},
|
||||
}
|
||||
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -954,7 +954,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
|
|||
}
|
||||
pods := []api.Pod{bound}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.syncPod(&bound, false, dockerContainersToPod(dockerContainers))
|
||||
err := kubelet.syncPod(&bound, nil, dockerContainersToPod(dockerContainers))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -996,7 +996,7 @@ func TestSyncPodBadHash(t *testing.T) {
|
|||
}
|
||||
pods := []api.Pod{bound}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.syncPod(&bound, false, dockerContainersToPod(dockerContainers))
|
||||
err := kubelet.syncPod(&bound, nil, dockerContainersToPod(dockerContainers))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -1051,7 +1051,7 @@ func TestSyncPodUnhealthy(t *testing.T) {
|
|||
}
|
||||
pods := []api.Pod{bound}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.syncPod(&bound, false, dockerContainersToPod(dockerContainers))
|
||||
err := kubelet.syncPod(&bound, nil, dockerContainersToPod(dockerContainers))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -1742,7 +1742,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
|
|||
}
|
||||
pods := []api.Pod{bound}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.syncPod(&bound, false, dockerContainersToPod(dockerContainers))
|
||||
err := kubelet.syncPod(&bound, nil, dockerContainersToPod(dockerContainers))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -1782,7 +1782,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
}, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
}, emptyPodUIDs, map[string]api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -3046,7 +3046,7 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
|
|||
t.Fatalf("expected to have status cached for %q: %v", "pod2", err)
|
||||
}
|
||||
// Sync with empty pods so that the entry in status map will be removed.
|
||||
kl.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
kl.SyncPods([]api.Pod{}, emptyPodUIDs, map[string]api.Pod{}, time.Now())
|
||||
if _, err := kl.GetPodStatus(kubecontainer.BuildPodFullName("pod2", "")); err == nil {
|
||||
t.Fatalf("expected to not have status cached for %q: %v", "pod2", err)
|
||||
}
|
||||
|
@ -3287,8 +3287,7 @@ func TestCreateMirrorPod(t *testing.T) {
|
|||
}
|
||||
pods := []api.Pod{pod}
|
||||
kl.podManager.SetPods(pods)
|
||||
hasMirrorPod := false
|
||||
err := kl.syncPod(&pod, hasMirrorPod, container.Pod{})
|
||||
err := kl.syncPod(&pod, nil, container.Pod{})
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -3420,3 +3419,40 @@ func TestGetContainerInfoForMirrorPods(t *testing.T) {
|
|||
}
|
||||
mockCadvisor.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestDoNotCacheStatusForStaticPods(t *testing.T) {
|
||||
testKubelet := newTestKubelet(t)
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
pods := []api.Pod{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "12345678",
|
||||
Name: "foo",
|
||||
Namespace: "new",
|
||||
Annotations: map[string]string{
|
||||
ConfigSourceAnnotationKey: "file",
|
||||
},
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{Name: "bar"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
podFullName := kubecontainer.GetPodFullName(&pods[0])
|
||||
status, ok := kubelet.statusManager.GetPodStatus(podFullName)
|
||||
if ok {
|
||||
t.Errorf("unexpected status %#v found for static pod %q", status, podFullName)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,7 +46,7 @@ type podManager interface {
|
|||
GetPods() []api.Pod
|
||||
GetPodByFullName(podFullName string) (*api.Pod, bool)
|
||||
GetPodByName(namespace, name string) (*api.Pod, bool)
|
||||
GetPodsAndMirrorMap() ([]api.Pod, map[string]*api.Pod)
|
||||
GetPodsAndMirrorMap() ([]api.Pod, map[string]api.Pod)
|
||||
SetPods(pods []api.Pod)
|
||||
UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType)
|
||||
DeleteOrphanedMirrorPods()
|
||||
|
@ -194,15 +194,15 @@ func (self *basicPodManager) GetPods() []api.Pod {
|
|||
}
|
||||
|
||||
// GetPodsAndMirrorMap returns the a copy of the regular pods and the mirror
|
||||
// pod map indexed by full name for existence check.
|
||||
func (self *basicPodManager) GetPodsAndMirrorMap() ([]api.Pod, map[string]*api.Pod) {
|
||||
// pods indexed by full name.
|
||||
func (self *basicPodManager) GetPodsAndMirrorMap() ([]api.Pod, map[string]api.Pod) {
|
||||
self.lock.RLock()
|
||||
defer self.lock.RUnlock()
|
||||
mirrorPodByFullName := make(map[string]*api.Pod)
|
||||
for key, value := range self.mirrorPodByFullName {
|
||||
mirrorPodByFullName[key] = value
|
||||
mirrorPods := make(map[string]api.Pod)
|
||||
for key, pod := range self.mirrorPodByFullName {
|
||||
mirrorPods[key] = *pod
|
||||
}
|
||||
return self.getPods(), mirrorPodByFullName
|
||||
return self.getPods(), mirrorPods
|
||||
}
|
||||
|
||||
// GetPodByName provides the (non-mirror) pod that matches namespace and name,
|
||||
|
|
|
@ -29,7 +29,7 @@ import (
|
|||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
type syncPodFnType func(*api.Pod, bool, container.Pod) error
|
||||
type syncPodFnType func(*api.Pod, *api.Pod, container.Pod) error
|
||||
|
||||
type podWorkers struct {
|
||||
// Protects all per worker fields.
|
||||
|
@ -61,8 +61,8 @@ type workUpdate struct {
|
|||
// The pod state to reflect.
|
||||
pod *api.Pod
|
||||
|
||||
// Whether there exists a mirror pod for pod.
|
||||
hasMirrorPod bool
|
||||
// The mirror pod of pod; nil if it does not exist.
|
||||
mirrorPod *api.Pod
|
||||
|
||||
// Function to call when the update is complete.
|
||||
updateCompleteFn func()
|
||||
|
@ -97,7 +97,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
|
|||
return
|
||||
}
|
||||
|
||||
err = p.syncPodFn(newWork.pod, newWork.hasMirrorPod,
|
||||
err = p.syncPodFn(newWork.pod, newWork.mirrorPod,
|
||||
container.Pods(pods).FindPodByID(newWork.pod.UID))
|
||||
if err != nil {
|
||||
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
|
||||
|
@ -112,7 +112,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
|
|||
}
|
||||
|
||||
// Apply the new setting to the specified pod. updateComplete is called when the update is completed.
|
||||
func (p *podWorkers) UpdatePod(pod *api.Pod, hasMirrorPod bool, updateComplete func()) {
|
||||
func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) {
|
||||
uid := pod.UID
|
||||
var podUpdates chan workUpdate
|
||||
var exists bool
|
||||
|
@ -135,13 +135,13 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, hasMirrorPod bool, updateComplete f
|
|||
p.isWorking[pod.UID] = true
|
||||
podUpdates <- workUpdate{
|
||||
pod: pod,
|
||||
hasMirrorPod: hasMirrorPod,
|
||||
mirrorPod: mirrorPod,
|
||||
updateCompleteFn: updateComplete,
|
||||
}
|
||||
} else {
|
||||
p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{
|
||||
pod: pod,
|
||||
hasMirrorPod: hasMirrorPod,
|
||||
mirrorPod: mirrorPod,
|
||||
updateCompleteFn: updateComplete,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,7 +47,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
|
|||
|
||||
podWorkers := newPodWorkers(
|
||||
fakeDockerCache,
|
||||
func(pod *api.Pod, hasMirrorPod bool, runningPod container.Pod) error {
|
||||
func(pod *api.Pod, mirrorPod *api.Pod, runningPod container.Pod) error {
|
||||
func() {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
@ -84,7 +84,7 @@ func TestUpdatePod(t *testing.T) {
|
|||
numPods := 20
|
||||
for i := 0; i < numPods; i++ {
|
||||
for j := i; j < numPods; j++ {
|
||||
podWorkers.UpdatePod(newPod(string(j), string(i)), false, func() {})
|
||||
podWorkers.UpdatePod(newPod(string(j), string(i)), nil, func() {})
|
||||
}
|
||||
}
|
||||
drainWorkers(podWorkers, numPods)
|
||||
|
@ -117,7 +117,7 @@ func TestForgetNonExistingPodWorkers(t *testing.T) {
|
|||
|
||||
numPods := 20
|
||||
for i := 0; i < numPods; i++ {
|
||||
podWorkers.UpdatePod(newPod(string(i), "name"), false, func() {})
|
||||
podWorkers.UpdatePod(newPod(string(i), "name"), nil, func() {})
|
||||
}
|
||||
drainWorkers(podWorkers, numPods)
|
||||
|
||||
|
|
|
@ -108,7 +108,7 @@ func (kl *Kubelet) runPod(pod api.Pod, retryDelay time.Duration) error {
|
|||
glog.Infof("pod %q containers not running: syncing", pod.Name)
|
||||
// We don't create mirror pods in this mode; pass a dummy boolean value
|
||||
// to sycnPod.
|
||||
if err = kl.syncPod(&pod, false, p); err != nil {
|
||||
if err = kl.syncPod(&pod, nil, p); err != nil {
|
||||
return fmt.Errorf("error syncing pod: %v", err)
|
||||
}
|
||||
if retry >= RunOnceMaxRetries {
|
||||
|
|
|
@ -29,8 +29,8 @@ import (
|
|||
)
|
||||
|
||||
type podStatusSyncRequest struct {
|
||||
podFullName string
|
||||
status api.PodStatus
|
||||
pod *api.Pod
|
||||
status api.PodStatus
|
||||
}
|
||||
|
||||
// Updates pod statuses in apiserver. Writes only when new status has changed.
|
||||
|
@ -63,13 +63,14 @@ func (s *statusManager) GetPodStatus(podFullName string) (api.PodStatus, bool) {
|
|||
return status, ok
|
||||
}
|
||||
|
||||
func (s *statusManager) SetPodStatus(podFullName string, status api.PodStatus) {
|
||||
func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
|
||||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
s.podStatusesLock.Lock()
|
||||
defer s.podStatusesLock.Unlock()
|
||||
oldStatus, found := s.podStatuses[podFullName]
|
||||
if !found || !reflect.DeepEqual(oldStatus, status) {
|
||||
s.podStatuses[podFullName] = status
|
||||
s.podStatusChannel <- podStatusSyncRequest{podFullName, status}
|
||||
s.podStatusChannel <- podStatusSyncRequest{pod, status}
|
||||
} else {
|
||||
glog.V(3).Infof("Ignoring same pod status for %s - old: %s new: %s", podFullName, oldStatus, status)
|
||||
}
|
||||
|
@ -99,22 +100,19 @@ func (s *statusManager) SyncBatch() {
|
|||
for {
|
||||
select {
|
||||
case syncRequest := <-s.podStatusChannel:
|
||||
podFullName := syncRequest.podFullName
|
||||
pod := syncRequest.pod
|
||||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
status := syncRequest.status
|
||||
glog.V(3).Infof("Syncing status for %s", podFullName)
|
||||
name, namespace, err := kubecontainer.ParsePodFullName(podFullName)
|
||||
if err != nil {
|
||||
glog.Warningf("Cannot parse pod full name %q: %s", podFullName, err)
|
||||
}
|
||||
_, err = s.kubeClient.Pods(namespace).UpdateStatus(name, &status)
|
||||
_, err := s.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status)
|
||||
if err != nil {
|
||||
// We failed to update status. In order to make sure we retry next time
|
||||
// we delete cached value. This may result in an additional update, but
|
||||
// this is ok.
|
||||
s.DeletePodStatus(podFullName)
|
||||
glog.Warningf("Error updating status for pod %q: %v", name, err)
|
||||
glog.Warningf("Error updating status for pod %q: %v", podFullName, err)
|
||||
} else {
|
||||
glog.V(3).Infof("Status for pod %q updated successfully", name)
|
||||
glog.V(3).Infof("Status for pod %q updated successfully", podFullName)
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
return
|
||||
|
|
|
@ -25,9 +25,13 @@ import (
|
|||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
)
|
||||
|
||||
const (
|
||||
podFullName string = "podName_namespace"
|
||||
)
|
||||
var testPod *api.Pod = &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "12345678",
|
||||
Name: "foo",
|
||||
Namespace: "new",
|
||||
},
|
||||
}
|
||||
|
||||
func newTestStatusManager() *statusManager {
|
||||
return newStatusManager(&client.Fake{})
|
||||
|
@ -58,16 +62,16 @@ func verifyActions(t *testing.T, kubeClient client.Interface, expectedActions []
|
|||
|
||||
func TestNewStatus(t *testing.T) {
|
||||
syncer := newTestStatusManager()
|
||||
syncer.SetPodStatus(podFullName, getRandomPodStatus())
|
||||
syncer.SetPodStatus(testPod, getRandomPodStatus())
|
||||
syncer.SyncBatch()
|
||||
verifyActions(t, syncer.kubeClient, []string{"update-status-pod"})
|
||||
}
|
||||
|
||||
func TestChangedStatus(t *testing.T) {
|
||||
syncer := newTestStatusManager()
|
||||
syncer.SetPodStatus(podFullName, getRandomPodStatus())
|
||||
syncer.SetPodStatus(testPod, getRandomPodStatus())
|
||||
syncer.SyncBatch()
|
||||
syncer.SetPodStatus(podFullName, getRandomPodStatus())
|
||||
syncer.SetPodStatus(testPod, getRandomPodStatus())
|
||||
syncer.SyncBatch()
|
||||
verifyActions(t, syncer.kubeClient, []string{"update-status-pod", "update-status-pod"})
|
||||
}
|
||||
|
@ -75,9 +79,9 @@ func TestChangedStatus(t *testing.T) {
|
|||
func TestUnchangedStatus(t *testing.T) {
|
||||
syncer := newTestStatusManager()
|
||||
podStatus := getRandomPodStatus()
|
||||
syncer.SetPodStatus(podFullName, podStatus)
|
||||
syncer.SetPodStatus(testPod, podStatus)
|
||||
syncer.SyncBatch()
|
||||
syncer.SetPodStatus(podFullName, podStatus)
|
||||
syncer.SetPodStatus(testPod, podStatus)
|
||||
syncer.SyncBatch()
|
||||
verifyActions(t, syncer.kubeClient, []string{"update-status-pod"})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue