mirror of https://github.com/k3s-io/k3s
kubelet: Fix racy kubelet tests.
Add fakePodWorkders to run syncPod() in serial for testing.pull/6/head
parent
5fcf5911fa
commit
52af792852
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||
)
|
||||
|
||||
// fakePodWorkers runs sync pod function in serial, so we can have
|
||||
// deterministic behaviour in testing.
|
||||
type fakePodWorkers struct {
|
||||
syncPodFn syncPodFnType
|
||||
runtimeCache kubecontainer.RuntimeCache
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) {
|
||||
pods, err := f.runtimeCache.GetPods()
|
||||
if err != nil {
|
||||
f.t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if err := f.syncPodFn(pod, mirrorPod, kubecontainer.Pods(pods).FindPodByID(pod.UID)); err != nil {
|
||||
f.t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakePodWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {}
|
|
@ -332,7 +332,7 @@ type Kubelet struct {
|
|||
runtimeCache kubecontainer.RuntimeCache
|
||||
kubeClient client.Interface
|
||||
rootDirectory string
|
||||
podWorkers *podWorkers
|
||||
podWorkers PodWorkers
|
||||
resyncInterval time.Duration
|
||||
sourcesReady SourcesReadyFn
|
||||
|
||||
|
|
|
@ -30,7 +30,6 @@ import (
|
|||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -64,7 +63,6 @@ type TestKubelet struct {
|
|||
fakeDocker *dockertools.FakeDockerClient
|
||||
fakeCadvisor *cadvisor.Mock
|
||||
fakeKubeClient *testclient.Fake
|
||||
waitGroup *sync.WaitGroup
|
||||
fakeMirrorClient *fakeMirrorClient
|
||||
}
|
||||
|
||||
|
@ -91,7 +89,6 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
|||
if err := os.MkdirAll(kubelet.rootDirectory, 0750); err != nil {
|
||||
t.Fatalf("can't mkdir(%q): %v", kubelet.rootDirectory, err)
|
||||
}
|
||||
waitGroup := new(sync.WaitGroup)
|
||||
kubelet.sourcesReady = func() bool { return true }
|
||||
kubelet.masterServiceNamespace = api.NamespaceDefault
|
||||
kubelet.serviceLister = testServiceLister{}
|
||||
|
@ -111,16 +108,13 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
|||
|
||||
kubelet.containerRuntime = dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin, kubelet, &fakeHTTP{}, runtimeHooks)
|
||||
kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerRuntime)
|
||||
kubelet.podWorkers = newPodWorkers(
|
||||
kubelet.runtimeCache,
|
||||
func(pod *api.Pod, mirrorPod *api.Pod, runningPod container.Pod) error {
|
||||
err := kubelet.syncPod(pod, mirrorPod, runningPod)
|
||||
waitGroup.Done()
|
||||
return err
|
||||
},
|
||||
fakeRecorder)
|
||||
kubelet.podWorkers = &fakePodWorkers{
|
||||
syncPodFn: kubelet.syncPod,
|
||||
runtimeCache: kubelet.runtimeCache,
|
||||
t: t,
|
||||
}
|
||||
kubelet.volumeManager = newVolumeManager()
|
||||
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
|
||||
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, fakeMirrorClient}
|
||||
}
|
||||
|
||||
func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) {
|
||||
|
@ -373,7 +367,6 @@ func TestSyncPodsDoesNothing(t *testing.T) {
|
|||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
container := api.Container{Name: "bar"}
|
||||
pods := []*api.Pod{
|
||||
|
@ -417,20 +410,21 @@ func TestSyncPodsDoesNothing(t *testing.T) {
|
|||
}
|
||||
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Check the pod infra contianer.
|
||||
"inspect_container",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
}
|
||||
|
||||
func TestSyncPodsWithTerminationLog(t *testing.T) {
|
||||
|
@ -438,7 +432,6 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
|
|||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
container := api.Container{
|
||||
Name: "bar",
|
||||
TerminationMessagePath: "/dev/somepath",
|
||||
|
@ -459,14 +452,12 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
|
|||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_image",
|
||||
// Create pod infra container.
|
||||
|
@ -474,7 +465,10 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
|
|||
// Create container.
|
||||
"create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
fakeDocker.Lock()
|
||||
parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":")
|
||||
|
@ -500,7 +494,6 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
|
|||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
// TODO: Move this test to dockertools so that we don't have to do the hacky
|
||||
// type assertion here.
|
||||
dm := kubelet.containerRuntime.(*dockertools.DockerManager)
|
||||
|
@ -521,15 +514,13 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
|
|||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_image",
|
||||
// Create pod infra container.
|
||||
|
@ -537,7 +528,10 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
|
|||
// Create container.
|
||||
"create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
fakeDocker.Lock()
|
||||
|
||||
|
@ -564,7 +558,6 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
|
|||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
// TODO: Move this test to dockertools so that we don't have to do the hacky
|
||||
// type assertion here.
|
||||
dm := kubelet.containerRuntime.(*dockertools.DockerManager)
|
||||
|
@ -586,16 +579,14 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
waitGroup.Add(1)
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_image",
|
||||
// Create pod infra container.
|
||||
|
@ -603,7 +594,10 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
|
|||
// Create container.
|
||||
"create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
fakeDocker.Lock()
|
||||
|
||||
|
@ -624,7 +618,6 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
|
|||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
pods := []*api.Pod{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
|
@ -653,16 +646,14 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
|
|||
HostConfig: &docker.HostConfig{},
|
||||
},
|
||||
}
|
||||
waitGroup.Add(1)
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_image",
|
||||
// Check the pod infra container.
|
||||
|
@ -670,7 +661,10 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
|
|||
// Create container.
|
||||
"create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
fakeDocker.Lock()
|
||||
if len(fakeDocker.Created) != 1 ||
|
||||
|
@ -695,7 +689,6 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
|||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
fakeHttp := fakeHTTP{}
|
||||
|
||||
// Simulate HTTP failure. Re-create the containerRuntime to inject the failure.
|
||||
|
@ -742,16 +735,14 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
|||
HostConfig: &docker.HostConfig{},
|
||||
},
|
||||
}
|
||||
waitGroup.Add(1)
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_image",
|
||||
// Check the pod infra container.
|
||||
|
@ -759,7 +750,10 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
|||
// Create container.
|
||||
"create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
fakeDocker.Lock()
|
||||
if len(fakeDocker.Created) != 1 ||
|
||||
|
@ -777,7 +771,6 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
|
|||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
pods := []*api.Pod{
|
||||
{
|
||||
|
@ -840,19 +833,17 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
waitGroup.Add(2)
|
||||
kubelet.podManager.SetPods(pods)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyUnorderedCalls(t, fakeDocker, []string{
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list",
|
||||
|
||||
// foo1
|
||||
"list",
|
||||
"list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container",
|
||||
// Kill the container since pod infra container is not running.
|
||||
|
@ -866,12 +857,16 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
|
|||
|
||||
// foo2
|
||||
"list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Check the pod infra container.
|
||||
"inspect_container",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container"})
|
||||
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
// A map iteration is used to delete containers, so must not depend on
|
||||
// order here.
|
||||
|
@ -977,7 +972,6 @@ func TestSyncPodsDeletesDuplicate(t *testing.T) {
|
|||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
pods := []*api.Pod{
|
||||
{
|
||||
|
@ -1030,15 +1024,13 @@ func TestSyncPodsDeletesDuplicate(t *testing.T) {
|
|||
}
|
||||
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Check the pod infra container.
|
||||
|
@ -1046,7 +1038,10 @@ func TestSyncPodsDeletesDuplicate(t *testing.T) {
|
|||
// Kill the duplicated container.
|
||||
"stop",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
// Expect one of the duplicates to be killed.
|
||||
if len(fakeDocker.Stopped) != 1 || (fakeDocker.Stopped[0] != "1234" && fakeDocker.Stopped[0] != "4567") {
|
||||
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
|
||||
|
@ -1058,7 +1053,6 @@ func TestSyncPodsBadHash(t *testing.T) {
|
|||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
pods := []*api.Pod{
|
||||
{
|
||||
|
@ -1101,15 +1095,13 @@ func TestSyncPodsBadHash(t *testing.T) {
|
|||
}
|
||||
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Check the pod infra container.
|
||||
|
@ -1117,7 +1109,10 @@ func TestSyncPodsBadHash(t *testing.T) {
|
|||
// Kill and restart the bad hash container.
|
||||
"stop", "create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil {
|
||||
t.Errorf("%v", err)
|
||||
|
@ -1129,7 +1124,6 @@ func TestSyncPodsUnhealthy(t *testing.T) {
|
|||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
pods := []*api.Pod{
|
||||
{
|
||||
|
@ -1175,15 +1169,13 @@ func TestSyncPodsUnhealthy(t *testing.T) {
|
|||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Check the pod infra container.
|
||||
|
@ -1193,7 +1185,10 @@ func TestSyncPodsUnhealthy(t *testing.T) {
|
|||
// Restart the unhealthy container.
|
||||
"create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil {
|
||||
t.Errorf("%v", err)
|
||||
|
@ -1646,7 +1641,6 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
|
|||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
// Simulate HTTP failure. Re-create the containerRuntime to inject the failure.
|
||||
kubelet.httpClient = &fakeHTTP{
|
||||
|
@ -1695,15 +1689,13 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
|
|||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_image",
|
||||
// Check the pod infra container.
|
||||
|
@ -1713,7 +1705,10 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
|
|||
// Kill the container since event handler fails.
|
||||
"stop",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container"})
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
})
|
||||
|
||||
// TODO(yifan): Check the stopped container's name.
|
||||
if len(fakeDocker.Stopped) != 1 {
|
||||
|
@ -1733,7 +1728,6 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
|
|||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
// TODO: Move this test to dockertools so that we don't have to do the hacky
|
||||
// type assertion here.
|
||||
dm := kubelet.containerRuntime.(*dockertools.DockerManager)
|
||||
|
@ -1761,12 +1755,10 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
|
|||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
fakeDocker.Lock()
|
||||
|
||||
|
@ -3662,7 +3654,6 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) {
|
|||
testKubelet := newTestKubelet(t)
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
pods := []*api.Pod{
|
||||
{
|
||||
|
@ -3682,12 +3673,10 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) {
|
|||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
podFullName := kubecontainer.GetPodFullName(pods[0])
|
||||
status, ok := kubelet.statusManager.GetPodStatus(podFullName)
|
||||
if ok {
|
||||
|
@ -3812,7 +3801,6 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) {
|
|||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
containers := []api.Container{
|
||||
{Name: "succeeded"},
|
||||
|
@ -3891,7 +3879,7 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) {
|
|||
}{
|
||||
{
|
||||
api.RestartPolicyAlways,
|
||||
[]string{"list", "list", "list",
|
||||
[]string{"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Check the pod infra container.
|
||||
|
@ -3899,13 +3887,16 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) {
|
|||
// Restart both containers.
|
||||
"create", "start", "create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container", "inspect_container", "inspect_container"},
|
||||
"list", "inspect_container", "inspect_container", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
},
|
||||
[]string{"succeeded", "failed"},
|
||||
[]string{},
|
||||
},
|
||||
{
|
||||
api.RestartPolicyOnFailure,
|
||||
[]string{"list", "list", "list",
|
||||
[]string{"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Check the pod infra container.
|
||||
|
@ -3913,13 +3904,16 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) {
|
|||
// Restart the failed container.
|
||||
"create", "start",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container", "inspect_container"},
|
||||
"list", "inspect_container", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
},
|
||||
[]string{"failed"},
|
||||
[]string{},
|
||||
},
|
||||
{
|
||||
api.RestartPolicyNever,
|
||||
[]string{"list", "list", "list",
|
||||
[]string{"list", "list",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Check the pod infra container.
|
||||
|
@ -3927,7 +3921,10 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) {
|
|||
// Stop the last pod infra container.
|
||||
"stop",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "inspect_container"},
|
||||
"list", "inspect_container", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
"list",
|
||||
},
|
||||
[]string{},
|
||||
[]string{"9876"},
|
||||
},
|
||||
|
@ -3941,12 +3938,10 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) {
|
|||
pods[0].Spec.RestartPolicy = tt.policy
|
||||
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("%d: unexpected error: %v", i, err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
// 'stop' is because the pod infra container is killed when no container is running.
|
||||
verifyCalls(t, fakeDocker, tt.calls)
|
||||
|
@ -3965,7 +3960,6 @@ func TestGetPodStatusWithLastTermination(t *testing.T) {
|
|||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
containers := []api.Container{
|
||||
{Name: "succeeded"},
|
||||
|
@ -4072,12 +4066,10 @@ func TestGetPodStatusWithLastTermination(t *testing.T) {
|
|||
},
|
||||
}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("%d: unexpected error: %v", i, err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
// Check if we can retrieve the pod status from GetPodStatus().
|
||||
podName := kubecontainer.GetPodFullName(pods[0])
|
||||
|
@ -4112,7 +4104,6 @@ func TestGetPodCreationFailureReason(t *testing.T) {
|
|||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
// Inject the creation failure error to docker.
|
||||
failureReason := "creation failure"
|
||||
|
@ -4150,12 +4141,10 @@ func TestGetPodCreationFailureReason(t *testing.T) {
|
|||
pods := []*api.Pod{pod}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
kubelet.volumeManager.SetVolumes(pod.UID, kubecontainer.VolumeMap{})
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
status, err := kubelet.GetPodStatus(kubecontainer.GetPodFullName(pod))
|
||||
if err != nil {
|
||||
|
@ -4178,7 +4167,6 @@ func TestGetPodPullImageFailureReason(t *testing.T) {
|
|||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
fakeDocker := testKubelet.fakeDocker
|
||||
waitGroup := testKubelet.waitGroup
|
||||
|
||||
// Initialize the FakeDockerPuller so that it'd try to pull non-existent
|
||||
// images.
|
||||
|
@ -4219,12 +4207,10 @@ func TestGetPodPullImageFailureReason(t *testing.T) {
|
|||
pods := []*api.Pod{pod}
|
||||
kubelet.podManager.SetPods(pods)
|
||||
kubelet.volumeManager.SetVolumes(pod.UID, kubecontainer.VolumeMap{})
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
waitGroup.Wait()
|
||||
|
||||
status, err := kubelet.GetPodStatus(kubecontainer.GetPodFullName(pod))
|
||||
if err != nil {
|
||||
|
|
|
@ -28,6 +28,12 @@ import (
|
|||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// PodWorkers is an abstract interface for testability.
|
||||
type PodWorkers interface {
|
||||
UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func())
|
||||
ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
|
||||
}
|
||||
|
||||
type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod) error
|
||||
|
||||
type podWorkers struct {
|
||||
|
|
Loading…
Reference in New Issue