diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 345f49c2e9..06625aee68 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -111,7 +111,6 @@ func NewMainKubelet( rootDirectory: rootDirectory, resyncInterval: resyncInterval, podInfraContainerImage: podInfraContainerImage, - podWorkers: newPodWorkers(), dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{}, runner: dockertools.NewDockerContainerCommandRunner(dockerClient), httpClient: &http.Client{}, @@ -134,6 +133,7 @@ func NewMainKubelet( return nil, err } klet.dockerCache = dockerCache + klet.podWorkers = newPodWorkers(dockerCache, klet.syncPod) metrics.Register(dockerCache) @@ -453,43 +453,6 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { kl.syncLoop(updates, kl) } -// Per-pod workers. -type podWorkers struct { - lock sync.Mutex - - // Set of pods with existing workers. - workers util.StringSet -} - -func newPodWorkers() *podWorkers { - return &podWorkers{ - workers: util.NewStringSet(), - } -} - -// Runs a worker for "podFullName" asynchronously with the specified "action". -// If the worker for the "podFullName" is already running, functions as a no-op. -func (self *podWorkers) Run(podFullName string, action func()) { - self.lock.Lock() - defer self.lock.Unlock() - - // This worker is already running, let it finish. - if self.workers.Has(podFullName) { - return - } - self.workers.Insert(podFullName) - - // Run worker async. - go func() { - defer util.HandleCrash() - action() - - self.lock.Lock() - defer self.lock.Unlock() - self.workers.Delete(podFullName) - }() -} - func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string { binds := []string{} for _, mount := range container.VolumeMounts { @@ -1333,13 +1296,12 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error { } // Run the sync in an async manifest worker. - kl.podWorkers.Run(podFullName, func() { - if err := kl.syncPod(pod, dockerContainers); err != nil { - glog.Errorf("Error syncing pod, skipping: %v", err) - record.Eventf(pod, "failedSync", "Error syncing pod, skipping: %v", err) - } - }) + kl.podWorkers.UpdatePod(*pod) } + + // Stop the workers for no-longer existing pods. + kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods) + // Kill any containers we don't need. killed := []string{} for ix := range dockerContainers { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 84c7eb2e6c..ad110072e5 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -48,14 +48,15 @@ func init() { util.ReallyCrash = true } -func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient) { +func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *sync.WaitGroup) { fakeDocker := &dockertools.FakeDockerClient{ RemovedImages: util.StringSet{}, } + fakeDockerCache := dockertools.NewFakeDockerCache(fakeDocker) kubelet := &Kubelet{} kubelet.dockerClient = fakeDocker - kubelet.dockerCache = dockertools.NewFakeDockerCache(fakeDocker) + kubelet.dockerCache = fakeDockerCache kubelet.dockerPuller = &dockertools.FakeDockerPuller{} if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil { t.Fatalf("can't make a temp rootdir: %v", err) @@ -65,7 +66,14 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient) { if err := os.MkdirAll(kubelet.rootDirectory, 0750); err != nil { t.Fatalf("can't mkdir(%q): %v", kubelet.rootDirectory, err) } - kubelet.podWorkers = newPodWorkers() + waitGroup := new(sync.WaitGroup) + kubelet.podWorkers = newPodWorkers( + fakeDockerCache, + func(pod *api.BoundPod, containers dockertools.DockerContainers) error { + err := kubelet.syncPod(pod, containers) + waitGroup.Done() + return err + }) kubelet.sourceReady = func(source string) bool { return true } kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.serviceLister = testServiceLister{} @@ -74,7 +82,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient) { t.Fatalf("can't initialize kubelet data dirs: %v", err) } - return kubelet, fakeDocker + return kubelet, fakeDocker, waitGroup } func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) { @@ -126,7 +134,7 @@ func verifyBoolean(t *testing.T, expected, value bool) { } func TestKubeletDirs(t *testing.T) { - kubelet, _ := newTestKubelet(t) + kubelet, _, _ := newTestKubelet(t) root := kubelet.rootDirectory var exp, got string @@ -187,7 +195,7 @@ func TestKubeletDirs(t *testing.T) { } func TestKubeletDirsCompat(t *testing.T) { - kubelet, _ := newTestKubelet(t) + kubelet, _, _ := newTestKubelet(t) root := kubelet.rootDirectory if err := os.MkdirAll(root, 0750); err != nil { t.Fatalf("can't mkdir(%q): %s", root, err) @@ -293,7 +301,7 @@ func TestKillContainerWithError(t *testing.T) { Err: fmt.Errorf("sample error"), ContainerList: append([]docker.APIContainers{}, containers...), } - kubelet, _ := newTestKubelet(t) + kubelet, _, _ := newTestKubelet(t) for _, c := range fakeDocker.ContainerList { kubelet.readiness.set(c.ID, true) } @@ -324,7 +332,7 @@ func TestKillContainer(t *testing.T) { Names: []string{"/k8s_bar_qux_5678_42"}, }, } - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) fakeDocker.ContainerList = append([]docker.APIContainers{}, containers...) fakeDocker.Container = &docker.Container{ Name: "foobar", @@ -375,7 +383,7 @@ func (cr *channelReader) GetList() [][]api.BoundPod { } func TestSyncPodsDoesNothing(t *testing.T) { - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, waitGroup := newTestKubelet(t) container := api.Container{Name: "bar"} fakeDocker.ContainerList = []docker.APIContainers{ { @@ -404,16 +412,17 @@ func TestSyncPodsDoesNothing(t *testing.T) { }, }, } + waitGroup.Add(1) err := kubelet.SyncPods(kubelet.pods) if err != nil { t.Errorf("unexpected error: %v", err) } - kubelet.drainWorkers() - verifyCalls(t, fakeDocker, []string{"list", "list", "inspect_container", "inspect_container"}) + waitGroup.Wait() + verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container"}) } func TestSyncPodsWithTerminationLog(t *testing.T) { - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, waitGroup := newTestKubelet(t) container := api.Container{ Name: "bar", TerminationMessagePath: "/dev/somepath", @@ -434,13 +443,14 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { }, }, } + waitGroup.Add(1) err := kubelet.SyncPods(kubelet.pods) if err != nil { t.Errorf("unexpected error: %v", err) } - kubelet.drainWorkers() + waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) fakeDocker.Lock() parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":") @@ -453,19 +463,6 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { fakeDocker.Unlock() } -// drainWorkers waits until all workers are done. Should only used for testing. -func (kl *Kubelet) drainWorkers() { - for { - kl.podWorkers.lock.Lock() - length := len(kl.podWorkers.workers) - kl.podWorkers.lock.Unlock() - if length == 0 { - return - } - time.Sleep(time.Millisecond * 100) - } -} - func matchString(t *testing.T, pattern, str string) bool { match, err := regexp.MatchString(pattern, str) if err != nil { @@ -475,7 +472,7 @@ func matchString(t *testing.T, pattern, str string) bool { } func TestSyncPodsCreatesNetAndContainer(t *testing.T) { - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, waitGroup := newTestKubelet(t) kubelet.podInfraContainerImage = "custom_image_name" fakeDocker.ContainerList = []docker.APIContainers{} kubelet.pods = []api.BoundPod{ @@ -493,14 +490,15 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { }, }, } + waitGroup.Add(1) err := kubelet.SyncPods(kubelet.pods) if err != nil { t.Errorf("unexpected error: %v", err) } - kubelet.drainWorkers() + waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) fakeDocker.Lock() @@ -523,7 +521,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { } func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, waitGroup := newTestKubelet(t) puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller) puller.HasImages = []string{} kubelet.podInfraContainerImage = "custom_image_name" @@ -543,14 +541,15 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { }, }, } + waitGroup.Add(1) err := kubelet.SyncPods(kubelet.pods) if err != nil { t.Errorf("unexpected error: %v", err) } - kubelet.drainWorkers() + waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) fakeDocker.Lock() @@ -567,7 +566,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { } func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, waitGroup := newTestKubelet(t) fakeDocker.ContainerList = []docker.APIContainers{ { // pod infra container @@ -590,14 +589,15 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { }, }, } + waitGroup.Add(1) err := kubelet.SyncPods(kubelet.pods) if err != nil { t.Errorf("unexpected error: %v", err) } - kubelet.drainWorkers() + waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) fakeDocker.Lock() if len(fakeDocker.Created) != 1 || @@ -608,7 +608,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { } func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, waitGroup := newTestKubelet(t) fakeHttp := fakeHTTP{} kubelet.httpClient = &fakeHttp fakeDocker.ContainerList = []docker.APIContainers{ @@ -644,14 +644,15 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { }, }, } + waitGroup.Add(1) err := kubelet.SyncPods(kubelet.pods) if err != nil { t.Errorf("unexpected error: %v", err) } - kubelet.drainWorkers() + waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) fakeDocker.Lock() if len(fakeDocker.Created) != 1 || @@ -665,7 +666,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { } func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, waitGroup := newTestKubelet(t) fakeDocker.ContainerList = []docker.APIContainers{ { // format is // k8s___ @@ -688,14 +689,15 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { }, }, } + waitGroup.Add(1) err := kubelet.SyncPods(kubelet.pods) if err != nil { t.Errorf("unexpected error: %v", err) } - kubelet.drainWorkers() + waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "stop", "create", "start", "inspect_container", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "list", "stop", "create", "start", "inspect_container", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) // A map iteration is used to delete containers, so must not depend on // order here. @@ -711,7 +713,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { ready := false - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) kubelet.sourceReady = func(source string) bool { return ready } fakeDocker.ContainerList = []docker.APIContainers{ @@ -754,7 +756,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) { ready := false - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) kubelet.sourceReady = func(source string) bool { if source == "testSource" { return ready @@ -814,7 +816,7 @@ func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) { } func TestSyncPodsDeletes(t *testing.T) { - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) fakeDocker.ContainerList = []docker.APIContainers{ { // the k8s prefix is required for the kubelet to manage the container @@ -852,7 +854,7 @@ func TestSyncPodsDeletes(t *testing.T) { } func TestSyncPodDeletesDuplicate(t *testing.T) { - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) dockerContainers := dockertools.DockerContainers{ "1234": &docker.APIContainers{ // the k8s prefix is required for the kubelet to manage the container @@ -902,7 +904,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) { } func TestSyncPodBadHash(t *testing.T) { - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) dockerContainers := dockertools.DockerContainers{ "1234": &docker.APIContainers{ // the k8s prefix is required for the kubelet to manage the container @@ -951,7 +953,7 @@ func TestSyncPodBadHash(t *testing.T) { } func TestSyncPodUnhealthy(t *testing.T) { - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) dockerContainers := dockertools.DockerContainers{ "1234": &docker.APIContainers{ // the k8s prefix is required for the kubelet to manage the container @@ -1001,7 +1003,7 @@ func TestSyncPodUnhealthy(t *testing.T) { } func TestMountExternalVolumes(t *testing.T) { - kubelet, _ := newTestKubelet(t) + kubelet, _, _ := newTestKubelet(t) kubelet.volumePluginMgr.InitPlugins([]volume.Plugin{&volume.FakePlugin{"fake", nil}}, &volumeHost{kubelet}) pod := api.BoundPod{ @@ -1035,7 +1037,7 @@ func TestMountExternalVolumes(t *testing.T) { } func TestGetPodVolumesFromDisk(t *testing.T) { - kubelet, _ := newTestKubelet(t) + kubelet, _, _ := newTestKubelet(t) plug := &volume.FakePlugin{"fake", nil} kubelet.volumePluginMgr.InitPlugins([]volume.Plugin{plug}, &volumeHost{kubelet}) @@ -1310,7 +1312,7 @@ func TestGetContainerInfo(t *testing.T) { cadvisorReq := &info.ContainerInfoRequest{} mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil) - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) kubelet.cadvisorClient = mockCadvisor fakeDocker.ContainerList = []docker.APIContainers{ { @@ -1348,7 +1350,6 @@ func TestGetRootInfo(t *testing.T) { dockerClient: &fakeDocker, dockerPuller: &dockertools.FakeDockerPuller{}, cadvisorClient: mockCadvisor, - podWorkers: newPodWorkers(), } // If the container name is an empty string, then it means the root container. @@ -1360,7 +1361,7 @@ func TestGetRootInfo(t *testing.T) { } func TestGetContainerInfoWithoutCadvisor(t *testing.T) { - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) fakeDocker.ContainerList = []docker.APIContainers{ { ID: "foobar", @@ -1385,7 +1386,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) { cadvisorReq := &info.ContainerInfoRequest{} mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, ErrCadvisorApiFailure) - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) kubelet.cadvisorClient = mockCadvisor fakeDocker.ContainerList = []docker.APIContainers{ { @@ -1413,7 +1414,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) { func TestGetContainerInfoOnNonExistContainer(t *testing.T) { mockCadvisor := &mockCadvisorClient{} - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) kubelet.cadvisorClient = mockCadvisor fakeDocker.ContainerList = []docker.APIContainers{} @@ -1427,7 +1428,7 @@ func TestGetContainerInfoOnNonExistContainer(t *testing.T) { func TestGetContainerInfoWhenDockerToolsFailed(t *testing.T) { mockCadvisor := &mockCadvisorClient{} - kubelet, _ := newTestKubelet(t) + kubelet, _, _ := newTestKubelet(t) kubelet.cadvisorClient = mockCadvisor expectedErr := fmt.Errorf("List containers error") kubelet.dockerClient = &errorTestingDockerClient{listContainersError: expectedErr} @@ -1447,7 +1448,7 @@ func TestGetContainerInfoWhenDockerToolsFailed(t *testing.T) { func TestGetContainerInfoWithNoContainers(t *testing.T) { mockCadvisor := &mockCadvisorClient{} - kubelet, _ := newTestKubelet(t) + kubelet, _, _ := newTestKubelet(t) kubelet.cadvisorClient = mockCadvisor kubelet.dockerClient = &errorTestingDockerClient{listContainersError: nil} @@ -1466,7 +1467,7 @@ func TestGetContainerInfoWithNoContainers(t *testing.T) { func TestGetContainerInfoWithNoMatchingContainers(t *testing.T) { mockCadvisor := &mockCadvisorClient{} - kubelet, _ := newTestKubelet(t) + kubelet, _, _ := newTestKubelet(t) kubelet.cadvisorClient = mockCadvisor containerList := []docker.APIContainers{ @@ -1530,7 +1531,7 @@ func (f *fakeContainerCommandRunner) PortForward(podInfraContainerID string, por func TestRunInContainerNoSuchPod(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) fakeDocker.ContainerList = []docker.APIContainers{} kubelet.runner = &fakeCommandRunner @@ -1552,7 +1553,7 @@ func TestRunInContainerNoSuchPod(t *testing.T) { func TestRunInContainer(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) kubelet.runner = &fakeCommandRunner containerID := "abc1234" @@ -1593,7 +1594,7 @@ func TestRunInContainer(t *testing.T) { func TestRunHandlerExec(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) kubelet.runner = &fakeCommandRunner containerID := "abc1234" @@ -1641,7 +1642,7 @@ func (f *fakeHTTP) Get(url string) (*http.Response, error) { func TestRunHandlerHttp(t *testing.T) { fakeHttp := fakeHTTP{} - kubelet, _ := newTestKubelet(t) + kubelet, _, _ := newTestKubelet(t) kubelet.httpClient = &fakeHttp podName := "podFoo" @@ -1670,7 +1671,7 @@ func TestRunHandlerHttp(t *testing.T) { } func TestNewHandler(t *testing.T) { - kubelet, _ := newTestKubelet(t) + kubelet, _, _ := newTestKubelet(t) handler := &api.Handler{ HTTPGet: &api.HTTPGetAction{ Host: "foo", @@ -1701,7 +1702,7 @@ func TestNewHandler(t *testing.T) { } func TestSyncPodEventHandlerFails(t *testing.T) { - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) kubelet.httpClient = &fakeHTTP{ err: fmt.Errorf("test error"), } @@ -1890,7 +1891,7 @@ func TestKubeletGarbageCollection(t *testing.T) { }, } for _, test := range tests { - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) kubelet.maxContainerCount = 2 fakeDocker.ContainerList = test.containers fakeDocker.ContainerMap = test.containerDetails @@ -2055,7 +2056,7 @@ func TestPurgeOldest(t *testing.T) { }, } for _, test := range tests { - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) kubelet.maxContainerCount = 5 fakeDocker.ContainerMap = test.containerDetails kubelet.purgeOldest(test.ids) @@ -2066,11 +2067,12 @@ func TestPurgeOldest(t *testing.T) { } func TestSyncPodsWithPullPolicy(t *testing.T) { - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, waitGroup := newTestKubelet(t) puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller) puller.HasImages = []string{"existing_one", "want:latest"} kubelet.podInfraContainerImage = "custom_image_name" fakeDocker.ContainerList = []docker.APIContainers{} + waitGroup.Add(1) err := kubelet.SyncPods([]api.BoundPod{ { ObjectMeta: api.ObjectMeta{ @@ -2093,7 +2095,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - kubelet.drainWorkers() + waitGroup.Wait() fakeDocker.Lock() @@ -2399,7 +2401,7 @@ func TestMakeEnvironmentVariables(t *testing.T) { } for _, tc := range testCases { - kl, _ := newTestKubelet(t) + kl, _, _ := newTestKubelet(t) kl.masterServiceNamespace = tc.masterServiceNamespace if tc.nilLister { kl.serviceLister = nil @@ -2836,7 +2838,7 @@ func TestGetPodReadyCondition(t *testing.T) { func TestExecInContainerNoSuchPod(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) fakeDocker.ContainerList = []docker.APIContainers{} kubelet.runner = &fakeCommandRunner @@ -2863,7 +2865,7 @@ func TestExecInContainerNoSuchPod(t *testing.T) { func TestExecInContainerNoSuchContainer(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) kubelet.runner = &fakeCommandRunner podName := "podFoo" @@ -2916,7 +2918,7 @@ func (f *fakeReadWriteCloser) Close() error { func TestExecInContainer(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) kubelet.runner = &fakeCommandRunner podName := "podFoo" @@ -2975,7 +2977,7 @@ func TestExecInContainer(t *testing.T) { func TestPortForwardNoSuchPod(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) fakeDocker.ContainerList = []docker.APIContainers{} kubelet.runner = &fakeCommandRunner @@ -2999,7 +3001,7 @@ func TestPortForwardNoSuchPod(t *testing.T) { func TestPortForwardNoSuchContainer(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) kubelet.runner = &fakeCommandRunner podName := "podFoo" @@ -3034,7 +3036,7 @@ func TestPortForwardNoSuchContainer(t *testing.T) { func TestPortForward(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker := newTestKubelet(t) + kubelet, fakeDocker, _ := newTestKubelet(t) kubelet.runner = &fakeCommandRunner podName := "podFoo" diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go new file mode 100644 index 0000000000..b8995ccaa1 --- /dev/null +++ b/pkg/kubelet/pod_workers.go @@ -0,0 +1,101 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/golang/glog" +) + +type syncPodFunType func(*api.BoundPod, dockertools.DockerContainers) error + +// TODO(wojtek-t) Add unit tests for this type. +type podWorkers struct { + // Protects podUpdates field. + podLock sync.Mutex + + // Tracks all running per-pod goroutines - per-pod goroutine will be + // processing updates received through its corresponding channel. + podUpdates map[types.UID]chan api.BoundPod + // DockerCache is used for listing running containers. + dockerCache dockertools.DockerCache + + // This function is run to sync the desired stated of pod. + // NOTE: This function has to be thread-safe - it can be called for + // different pods at the same time. + syncPodFun syncPodFunType +} + +func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFun syncPodFunType) *podWorkers { + return &podWorkers{ + podUpdates: map[types.UID]chan api.BoundPod{}, + dockerCache: dockerCache, + syncPodFun: syncPodFun, + } +} + +func (p *podWorkers) managePodLoop(podUpdates <-chan api.BoundPod) { + for newPod := range podUpdates { + // Since we use docker cache, getting current state shouldn't cause + // performance overhead on Docker. Moreover, as long as we run syncPod + // no matter if it changes anything, having an old version of "containers" + // can cause starting eunended containers. + containers, err := p.dockerCache.RunningContainers() + if err != nil { + glog.Errorf("Error listing containers while syncing pod: %v", err) + continue + } + err = p.syncPodFun(&newPod, containers) + if err != nil { + glog.Errorf("Error syncing pod %s, skipping: %v", newPod.UID, err) + record.Eventf(&newPod, "failedSync", "Error syncing pod, skipping: %v", err) + continue + } + } +} + +func (p *podWorkers) UpdatePod(pod api.BoundPod) { + uid := pod.UID + var podUpdates chan api.BoundPod + var exists bool + + p.podLock.Lock() + defer p.podLock.Unlock() + if podUpdates, exists = p.podUpdates[uid]; !exists { + // TODO(wojtek-t): Adjust the size of the buffer in this channel + podUpdates = make(chan api.BoundPod, 5) + p.podUpdates[uid] = podUpdates + go p.managePodLoop(podUpdates) + } + podUpdates <- pod +} + +func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) { + p.podLock.Lock() + defer p.podLock.Unlock() + for key, channel := range p.podUpdates { + if _, exists := desiredPods[key]; !exists { + close(channel) + delete(p.podUpdates, key) + } + } +}