From 632ca506cefcc03bc029d8b5814b8d00b89deeda Mon Sep 17 00:00:00 2001 From: Filip Grzadkowski Date: Fri, 20 Mar 2015 17:37:08 +0100 Subject: [PATCH] * Update pod status only when it changes. * Refactor syncing logic into a separate struct --- cmd/integration/integration.go | 2 - cmd/kubelet/app/server.go | 55 +++++------- pkg/kubelet/handlers.go | 2 +- pkg/kubelet/kubelet.go | 139 +++++++---------------------- pkg/kubelet/kubelet_test.go | 52 +++++------ pkg/kubelet/runonce_test.go | 2 +- pkg/kubelet/server.go | 5 +- pkg/kubelet/server_test.go | 2 +- pkg/kubelet/status_manager.go | 122 +++++++++++++++++++++++++ pkg/kubelet/status_manager_test.go | 83 +++++++++++++++++ 10 files changed, 288 insertions(+), 176 deletions(-) create mode 100644 pkg/kubelet/status_manager.go create mode 100644 pkg/kubelet/status_manager_test.go diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 41e2fe2079..97fd806d13 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -224,7 +224,6 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st configFilePath := makeTempDirOrDie("config", testRootDir) glog.Infof("Using %s as root dir for kubelet #1", testRootDir) kcfg := kubeletapp.SimpleKubelet(cl, &fakeDocker1, machineList[0], testRootDir, firstManifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath) - kcfg.PodStatusUpdateFrequency = 1 * time.Second kubeletapp.RunKubelet(kcfg) // Kubelet (machine) // Create a second kubelet so that the guestbook example's two redis slaves both @@ -232,7 +231,6 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st testRootDir = makeTempDirOrDie("kubelet_integ_2.", "") glog.Infof("Using %s as root dir for kubelet #2", testRootDir) kcfg = kubeletapp.SimpleKubelet(cl, &fakeDocker2, machineList[1], testRootDir, secondManifestURL, "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "") - kcfg.PodStatusUpdateFrequency = 1 * time.Second kubeletapp.RunKubelet(kcfg) return apiServer.URL, configFilePath } diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 92a1974afe..9bc62e25d5 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -52,7 +52,6 @@ type KubeletServer struct { FileCheckFrequency time.Duration HTTPCheckFrequency time.Duration StatusUpdateFrequency time.Duration - PodStatusUpdateFrequency time.Duration ManifestURL string EnableServer bool Address util.IP @@ -86,14 +85,13 @@ type KubeletServer struct { // NewKubeletServer will create a new KubeletServer with default values. func NewKubeletServer() *KubeletServer { return &KubeletServer{ - SyncFrequency: 10 * time.Second, - FileCheckFrequency: 20 * time.Second, - HTTPCheckFrequency: 20 * time.Second, - StatusUpdateFrequency: 20 * time.Second, - PodStatusUpdateFrequency: 2 * time.Minute, - EnableServer: true, - Address: util.IP(net.ParseIP("127.0.0.1")), - Port: ports.KubeletPort, + SyncFrequency: 10 * time.Second, + FileCheckFrequency: 20 * time.Second, + HTTPCheckFrequency: 20 * time.Second, + StatusUpdateFrequency: 20 * time.Second, + EnableServer: true, + Address: util.IP(net.ParseIP("127.0.0.1")), + Port: ports.KubeletPort, PodInfraContainerImage: kubelet.PodInfraContainerImage, RootDirectory: defaultRootDir, RegistryBurst: 10, @@ -115,7 +113,6 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.Config, "config", s.Config, "Path to the config file or directory of files") fs.DurationVar(&s.SyncFrequency, "sync_frequency", s.SyncFrequency, "Max period between synchronizing running containers and config") fs.DurationVar(&s.StatusUpdateFrequency, "status_update_frequency", s.StatusUpdateFrequency, "Duration between posting node status to master") - fs.DurationVar(&s.PodStatusUpdateFrequency, "pod_status_update_frequency", s.PodStatusUpdateFrequency, "Duration between posting pod status updates to the master") fs.DurationVar(&s.FileCheckFrequency, "file_check_frequency", s.FileCheckFrequency, "Duration between checking config files for new data") fs.DurationVar(&s.HTTPCheckFrequency, "http_check_frequency", s.HTTPCheckFrequency, "Duration between checking http for new data") fs.StringVar(&s.ManifestURL, "manifest_url", s.ManifestURL, "URL for accessing the container manifest") @@ -183,7 +180,6 @@ func (s *KubeletServer) Run(_ []string) error { ConfigFile: s.Config, ManifestURL: s.ManifestURL, StatusUpdateFrequency: s.StatusUpdateFrequency, - PodStatusUpdateFrequency: s.PodStatusUpdateFrequency, FileCheckFrequency: s.FileCheckFrequency, HTTPCheckFrequency: s.HTTPCheckFrequency, PodInfraContainerImage: s.PodInfraContainerImage, @@ -283,24 +279,23 @@ func SimpleKubelet(client *client.Client, RootDirectory: rootDir, ManifestURL: manifestURL, PodInfraContainerImage: kubelet.PodInfraContainerImage, - Port: port, - Address: util.IP(net.ParseIP(address)), - EnableServer: true, - EnableDebuggingHandlers: true, - HTTPCheckFrequency: 1 * time.Second, - FileCheckFrequency: 1 * time.Second, - StatusUpdateFrequency: 3 * time.Second, - PodStatusUpdateFrequency: 2 * time.Minute, - SyncFrequency: 3 * time.Second, - MinimumGCAge: 10 * time.Second, - MaxPerPodContainerCount: 5, - MaxContainerCount: 100, - MasterServiceNamespace: masterServiceNamespace, - VolumePlugins: volumePlugins, - TLSOptions: tlsOptions, - CadvisorInterface: cadvisorInterface, - ConfigFile: configFilePath, - ImageGCPolicy: imageGCPolicy, + Port: port, + Address: util.IP(net.ParseIP(address)), + EnableServer: true, + EnableDebuggingHandlers: true, + HTTPCheckFrequency: 1 * time.Second, + FileCheckFrequency: 1 * time.Second, + StatusUpdateFrequency: 3 * time.Second, + SyncFrequency: 3 * time.Second, + MinimumGCAge: 10 * time.Second, + MaxPerPodContainerCount: 5, + MaxContainerCount: 100, + MasterServiceNamespace: masterServiceNamespace, + VolumePlugins: volumePlugins, + TLSOptions: tlsOptions, + CadvisorInterface: cadvisorInterface, + ConfigFile: configFilePath, + ImageGCPolicy: imageGCPolicy, } return &kcfg } @@ -386,7 +381,6 @@ type KubeletConfig struct { ConfigFile string ManifestURL string StatusUpdateFrequency time.Duration - PodStatusUpdateFrequency time.Duration FileCheckFrequency time.Duration HTTPCheckFrequency time.Duration Hostname string @@ -453,7 +447,6 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub kc.Recorder, kc.CadvisorInterface, kc.StatusUpdateFrequency, - kc.PodStatusUpdateFrequency, kc.ImageGCPolicy) if err != nil { diff --git a/pkg/kubelet/handlers.go b/pkg/kubelet/handlers.go index bb37cba019..2e8181b3c0 100644 --- a/pkg/kubelet/handlers.go +++ b/pkg/kubelet/handlers.go @@ -71,7 +71,7 @@ func ResolvePort(portReference util.IntOrString, container *api.Container) (int, func (h *httpActionHandler) Run(podFullName string, uid types.UID, container *api.Container, handler *api.Handler) error { host := handler.HTTPGet.Host if len(host) == 0 { - status, err := h.kubelet.GetPodStatus(podFullName, uid) + status, err := h.kubelet.GetPodStatus(podFullName) if err != nil { glog.Errorf("Unable to get pod info, event handlers may be invalid.") return err diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 9cf4b46ce6..328c85be6b 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -125,7 +125,6 @@ func NewMainKubelet( recorder record.EventRecorder, cadvisorInterface cadvisor.Interface, statusUpdateFrequency time.Duration, - podStatusUpdateFrequency time.Duration, imageGCPolicy ImageGCPolicy) (*Kubelet, error) { if rootDirectory == "" { return nil, fmt.Errorf("invalid root directory %q", rootDirectory) @@ -133,9 +132,6 @@ func NewMainKubelet( if resyncInterval <= 0 { return nil, fmt.Errorf("invalid sync frequency %d", resyncInterval) } - if podStatusUpdateFrequency <= 0 { - return nil, fmt.Errorf("invalid status update frequency %d", podStatusUpdateFrequency) - } dockerClient = metrics.NewInstrumentedDockerInterface(dockerClient) // Wait for the Docker daemon to be up (with a timeout). @@ -199,6 +195,7 @@ func NewMainKubelet( if err != nil { return nil, fmt.Errorf("failed to initialize image manager: %v", err) } + statusManager := newStatusManager(kubeClient) klet := &Kubelet{ hostname: hostname, @@ -207,7 +204,6 @@ func NewMainKubelet( rootDirectory: rootDirectory, statusUpdateFrequency: statusUpdateFrequency, resyncInterval: resyncInterval, - podStatusUpdateFrequency: podStatusUpdateFrequency, podInfraContainerImage: podInfraContainerImage, containerIDToRef: map[string]*api.ObjectReference{}, runner: dockertools.NewDockerContainerCommandRunner(dockerClient), @@ -227,6 +223,7 @@ func NewMainKubelet( cadvisor: cadvisorInterface, containerGC: containerGC, imageManager: imageManager, + statusManager: statusManager, } klet.podManager = newBasicPodManager(klet.kubeClient) @@ -253,8 +250,6 @@ func NewMainKubelet( klet.networkPlugin = plug } - klet.podStatuses = make(map[string]api.PodStatus) - return klet, nil } @@ -273,26 +268,18 @@ type nodeLister interface { // Kubelet is the main kubelet implementation. type Kubelet struct { - hostname string - dockerClient dockertools.DockerInterface - dockerCache dockertools.DockerCache - kubeClient client.Interface - rootDirectory string - podInfraContainerImage string - podWorkers *podWorkers - statusUpdateFrequency time.Duration - resyncInterval time.Duration - podStatusUpdateFrequency time.Duration - sourcesReady SourcesReadyFn + hostname string + dockerClient dockertools.DockerInterface + dockerCache dockertools.DockerCache + kubeClient client.Interface + rootDirectory string + podInfraContainerImage string + podWorkers *podWorkers + statusUpdateFrequency time.Duration + resyncInterval time.Duration + sourcesReady SourcesReadyFn podManager podManager - // A pod status cache stores statuses for pods (both rejected and synced). - // Note that currently no thread attempts to acquire podStatusesLock while - // accessing podManager, and vice versa. If you intend to change this usage - // pattern, please explicitly impose an acquiring order to avoid deadlocks - // and document such an order in the comment. - podStatusesLock sync.RWMutex - podStatuses map[string]api.PodStatus // Needed to report events for containers belonging to deleted/modified pods. // Tracks references for reporting events @@ -331,16 +318,16 @@ type Kubelet struct { // Network plugin networkPlugin network.NetworkPlugin - // probe runner holder + // Probe runner holder prober probeHolder - // container readiness state holder + // Container readiness state holder readiness *readinessStates - // how long to keep idle streaming command execution/port forwarding + // How long to keep idle streaming command execution/port forwarding // connections open before terminating them streamingConnectionIdleTimeout time.Duration - // the EventRecorder to use + // The EventRecorder to use recorder record.EventRecorder // Policy for handling garbage collection of dead containers. @@ -351,6 +338,9 @@ type Kubelet struct { // Cached MachineInfo returned by cadvisor. machineInfo *cadvisorApi.MachineInfo + + // Syncs pods statuses with apiserver; also used as a cache of statuses. + statusManager *statusManager } // getRootDir returns the full path to the directory under which kubelet can @@ -521,30 +511,6 @@ func (kl *Kubelet) StartGarbageCollection() { }, 5*time.Minute) } -func (kl *Kubelet) getPodStatusFromCache(podFullName string) (api.PodStatus, bool) { - kl.podStatusesLock.RLock() - defer kl.podStatusesLock.RUnlock() - status, ok := kl.podStatuses[podFullName] - return status, ok -} - -func (kl *Kubelet) setPodStatusInCache(podFullName string, status api.PodStatus) { - kl.podStatusesLock.Lock() - defer kl.podStatusesLock.Unlock() - kl.podStatuses[podFullName] = status -} - -func (kl *Kubelet) removeOrphanedStatuses(podFullNames map[string]bool) { - kl.podStatusesLock.Lock() - defer kl.podStatusesLock.Unlock() - for key := range kl.podStatuses { - if _, ok := podFullNames[key]; !ok { - glog.V(5).Infof("Removing %q from status map.", key) - delete(kl.podStatuses, key) - } - } -} - // Run starts the kubelet reacting to config updates func (kl *Kubelet) Run(updates <-chan PodUpdate) { if kl.logServer == nil { @@ -557,11 +523,7 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { glog.Warning("No api server defined - no node status update will be sent.") } go kl.syncNodeStatus() - - // syncStatus handles its own frequency and throttling, run it always. - go util.Forever(func() { - kl.syncStatus(kl.podStatusUpdateFrequency) - }, 0) + kl.statusManager.Start() kl.syncLoop(updates, kl) } @@ -1263,7 +1225,7 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, c if found { glog.V(4).Infof("Found infra pod for %q", podFullName) containersToKeep[podInfraContainerID] = -1 - podStatus, err = kl.GetPodStatus(podFullName, uid) + podStatus, err = kl.GetPodStatus(podFullName) if err != nil { glog.Errorf("Unable to get pod with name %q and uid %q info with error(%v)", podFullName, uid, err) } @@ -1358,7 +1320,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dock if err != nil { glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err) } else { - kl.setPodStatusInCache(podFullName, status) + kl.statusManager.SetPodStatus(podFullName, status) } }() @@ -1528,13 +1490,13 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric for _, pod := range allPods { podFullNames[GetPodFullName(&pod)] = true } - kl.removeOrphanedStatuses(podFullNames) + kl.statusManager.RemoveOrphanedStatuses(podFullNames) // Filter out the rejected pod. They don't have running containers. kl.handleNotFittingPods(allPods) var pods []api.Pod for _, pod := range allPods { - status, ok := kl.getPodStatusFromCache(GetPodFullName(&pod)) + status, ok := kl.statusManager.GetPodStatus(GetPodFullName(&pod)) if ok && status.Phase == api.PodFailed { continue } @@ -1712,21 +1674,21 @@ func (kl *Kubelet) handleNotFittingPods(pods []api.Pod) { fitting, notFitting := checkHostPortConflicts(pods) for _, pod := range notFitting { kl.recorder.Eventf(&pod, "hostPortConflict", "Cannot start the pod due to host port conflict.") - kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{ + kl.statusManager.SetPodStatus(GetPodFullName(&pod), api.PodStatus{ Phase: api.PodFailed, Message: "Pod cannot be started due to host port conflict"}) } fitting, notFitting = kl.checkNodeSelectorMatching(fitting) for _, pod := range notFitting { kl.recorder.Eventf(&pod, "nodeSelectorMismatching", "Cannot start the pod due to node selector mismatch.") - kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{ + kl.statusManager.SetPodStatus(GetPodFullName(&pod), api.PodStatus{ Phase: api.PodFailed, Message: "Pod cannot be started due to node selector mismatch"}) } fitting, notFitting = kl.checkCapacityExceeded(fitting) for _, pod := range notFitting { kl.recorder.Eventf(&pod, "capacityExceeded", "Cannot start the pod due to exceeded capacity.") - kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{ + kl.statusManager.SetPodStatus(GetPodFullName(&pod), api.PodStatus{ Phase: api.PodFailed, Message: "Pod cannot be started due to exceeded capacity"}) } @@ -1768,40 +1730,6 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { } } -// syncStatus syncs pods statuses with the apiserver. Spread the updates over the specified deadline. -func (kl *Kubelet) syncStatus(deadline time.Duration) { - start := time.Now() - glog.V(3).Infof("Syncing pods status") - - pods, _ := kl.GetPods() - if len(pods) == 0 { - // No pods, sleep the rest of our deadline. - time.Sleep(deadline - time.Since(start)) - return - } - - // TODO(vmarmol): Enhance util.RateLimiter for our use here. - singleDeadline := time.Duration(deadline.Nanoseconds() / int64(len(pods))) - t := time.NewTicker(singleDeadline) - for _, pod := range pods { - // Don't hit the api server too hard, wait for the next time slot. - <-t.C - - status, err := kl.GetPodStatus(GetPodFullName(&pod), pod.UID) - if err != nil { - glog.Warningf("Error getting pod %q status: %v, retry later", pod.Name, err) - continue - } - _, err = kl.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status) - if err != nil { - glog.Warningf("Error updating status for pod %q: %v (full pod: %q)", pod.Name, err, pod) - } else { - glog.V(3).Infof("Status for pod %q updated successfully: %q", pod.Name, pod) - } - } - t.Stop() -} - // Returns Docker version for this Kubelet. func (kl *Kubelet) GetDockerVersion() ([]uint, error) { if kl.dockerClient == nil { @@ -1832,11 +1760,10 @@ func (kl *Kubelet) validateContainerStatus(podStatus *api.PodStatus, containerNa } // GetKubeletContainerLogs returns logs from the container -// The second parameter of GetPodStatus and FindPodContainer methods represents pod UUID, which is allowed to be blank // TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt // or all of them. func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error { - podStatus, err := kl.GetPodStatus(podFullName, "") + podStatus, err := kl.GetPodStatus(podFullName) if err != nil { if err == dockertools.ErrNoContainersInPod { return fmt.Errorf("pod %q not found\n", podFullName) @@ -2019,19 +1946,17 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio } // GetPodStatus returns information from Docker about the containers in a pod -func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) { - uid = kl.podManager.TranslatePodUID(uid) - +func (kl *Kubelet) GetPodStatus(podFullName string) (api.PodStatus, error) { // Check to see if we have a cached version of the status. - cachedPodStatus, found := kl.getPodStatusFromCache(podFullName) + cachedPodStatus, found := kl.statusManager.GetPodStatus(podFullName) if found { glog.V(3).Infof("Returning cached status for %q", podFullName) return cachedPodStatus, nil } - return kl.generatePodStatus(podFullName, uid) + return kl.generatePodStatus(podFullName) } -func (kl *Kubelet) generatePodStatus(podFullName string, uid types.UID) (api.PodStatus, error) { +func (kl *Kubelet) generatePodStatus(podFullName string) (api.PodStatus, error) { pod, found := kl.GetPodByFullName(podFullName) if !found { return api.PodStatus{}, fmt.Errorf("couldn't find pod %q", podFullName) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 6df4547d82..f72140953c 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -99,7 +99,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.nodeLister = testNodeLister{} kubelet.readiness = newReadinessStates() kubelet.recorder = fakeRecorder - kubelet.podStatuses = map[string]api.PodStatus{} + kubelet.statusManager = newStatusManager(fakeKubeClient) if err := kubelet.setupDataDirs(); err != nil { t.Fatalf("can't initialize kubelet data dirs: %v", err) } @@ -2866,13 +2866,10 @@ func TestHandlePortConflicts(t *testing.T) { conflictedPodName := GetPodFullName(&pods[0]) kl.handleNotFittingPods(pods) - if len(kl.podStatuses) != 1 { - t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses) - } // Check pod status stored in the status map. - status, ok := kl.podStatuses[conflictedPodName] - if !ok { - t.Fatalf("status of pod %q is not found in the status map.", conflictedPodName) + status, err := kl.GetPodStatus(conflictedPodName) + if err != nil { + t.Fatalf("status of pod %q is not found in the status map: ", conflictedPodName, err) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) @@ -2880,9 +2877,9 @@ func TestHandlePortConflicts(t *testing.T) { // Check if we can retrieve the pod status from GetPodStatus(). kl.podManager.SetPods(pods) - status, err := kl.GetPodStatus(conflictedPodName, "") + status, err = kl.GetPodStatus(conflictedPodName) if err != nil { - t.Fatalf("unable to retrieve pod status for pod %q: #v.", conflictedPodName, err) + t.Fatalf("unable to retrieve pod status for pod %q: %#v.", conflictedPodName, err) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) @@ -2919,13 +2916,10 @@ func TestHandleNodeSelector(t *testing.T) { notfittingPodName := GetPodFullName(&pods[0]) kl.handleNotFittingPods(pods) - if len(kl.podStatuses) != 1 { - t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses) - } // Check pod status stored in the status map. - status, ok := kl.podStatuses[notfittingPodName] - if !ok { - t.Fatalf("status of pod %q is not found in the status map.", notfittingPodName) + status, err := kl.GetPodStatus(notfittingPodName) + if err != nil { + t.Fatalf("status of pod %q is not found in the status map: %#v", notfittingPodName, err) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) @@ -2933,9 +2927,9 @@ func TestHandleNodeSelector(t *testing.T) { // Check if we can retrieve the pod status from GetPodStatus(). kl.podManager.SetPods(pods) - status, err := kl.GetPodStatus(notfittingPodName, "") + status, err = kl.GetPodStatus(notfittingPodName) if err != nil { - t.Fatalf("unable to retrieve pod status for pod %q: #v.", notfittingPodName, err) + t.Fatalf("unable to retrieve pod status for pod %q: %#v.", notfittingPodName, err) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) @@ -2978,13 +2972,10 @@ func TestHandleMemExceeded(t *testing.T) { notfittingPodName := GetPodFullName(&pods[0]) kl.handleNotFittingPods(pods) - if len(kl.podStatuses) != 1 { - t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses) - } // Check pod status stored in the status map. - status, ok := kl.podStatuses[notfittingPodName] - if !ok { - t.Fatalf("status of pod %q is not found in the status map.", notfittingPodName) + status, err := kl.GetPodStatus(notfittingPodName) + if err != nil { + t.Fatalf("status of pod %q is not found in the status map: ", notfittingPodName, err) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) @@ -2992,7 +2983,7 @@ func TestHandleMemExceeded(t *testing.T) { // Check if we can retrieve the pod status from GetPodStatus(). kl.podManager.SetPods(pods) - status, err := kl.GetPodStatus(notfittingPodName, "") + status, err = kl.GetPodStatus(notfittingPodName) if err != nil { t.Fatalf("unable to retrieve pod status for pod %q: #v.", notfittingPodName, err) } @@ -3001,24 +2992,25 @@ func TestHandleMemExceeded(t *testing.T) { } } +// TODO(filipg): This test should be removed once StatusSyncer can do garbage collection without external signal. func TestPurgingObsoleteStatusMapEntries(t *testing.T) { testKubelet := newTestKubelet(t) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kl := testKubelet.kubelet pods := []api.Pod{ - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, + {ObjectMeta: api.ObjectMeta{Name: "pod1"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, + {ObjectMeta: api.ObjectMeta{Name: "pod2"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, } // Run once to populate the status map. kl.handleNotFittingPods(pods) - if len(kl.podStatuses) != 1 { - t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses) + if _, err := kl.GetPodStatus(BuildPodFullName("pod2", "")); err != nil { + t.Fatalf("expected to have status cached for %q: %v", "pod2", err) } // Sync with empty pods so that the entry in status map will be removed. kl.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()) - if len(kl.podStatuses) != 0 { - t.Fatalf("expected length of status map to be 0. Got map %#v.", kl.podStatuses) + if _, err := kl.GetPodStatus(BuildPodFullName("pod2", "")); err == nil { + t.Fatalf("expected to not have status cached for %q: %v", "pod2", err) } } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 0d50a9a80c..b952a96a3e 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -77,8 +77,8 @@ func TestRunOnce(t *testing.T) { rootDirectory: "/tmp/kubelet", recorder: &record.FakeRecorder{}, cadvisor: cadvisor, - podStatuses: make(map[string]api.PodStatus), nodeLister: testNodeLister{}, + statusManager: newStatusManager(nil), } kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index aa39082d21..272e6ef379 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -84,7 +84,7 @@ type HostInterface interface { GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) GetPods() ([]api.Pod, mirrorPods) GetPodByName(namespace, name string) (*api.Pod, bool) - GetPodStatus(name string, uid types.UID) (api.PodStatus, error) + GetPodStatus(name string) (api.PodStatus, error) RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error @@ -289,7 +289,6 @@ func (s *Server) handlePodStatus(w http.ResponseWriter, req *http.Request, versi return } podID := u.Query().Get("podID") - podUID := types.UID(u.Query().Get("UUID")) podNamespace := u.Query().Get("podNamespace") if len(podID) == 0 { http.Error(w, "Missing 'podID=' query entry.", http.StatusBadRequest) @@ -304,7 +303,7 @@ func (s *Server) handlePodStatus(w http.ResponseWriter, req *http.Request, versi http.Error(w, "Pod does not exist", http.StatusNotFound) return } - status, err := s.host.GetPodStatus(GetPodFullName(pod), podUID) + status, err := s.host.GetPodStatus(GetPodFullName(pod)) if err != nil { s.error(w, err) return diff --git a/pkg/kubelet/server_test.go b/pkg/kubelet/server_test.go index 1daa66c377..a3409997c9 100644 --- a/pkg/kubelet/server_test.go +++ b/pkg/kubelet/server_test.go @@ -59,7 +59,7 @@ func (fk *fakeKubelet) GetPodByName(namespace, name string) (*api.Pod, bool) { return fk.podByNameFunc(namespace, name) } -func (fk *fakeKubelet) GetPodStatus(name string, uid types.UID) (api.PodStatus, error) { +func (fk *fakeKubelet) GetPodStatus(name string) (api.PodStatus, error) { return fk.statusFunc(name) } diff --git a/pkg/kubelet/status_manager.go b/pkg/kubelet/status_manager.go new file mode 100644 index 0000000000..b3d42f2317 --- /dev/null +++ b/pkg/kubelet/status_manager.go @@ -0,0 +1,122 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "reflect" + "sync" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" +) + +type podStatusSyncRequest struct { + podFullName string + status api.PodStatus +} + +// Updates pod statuses in apiserver. Writes only when new status has changed. +// All methods are thread-safe. +type statusManager struct { + kubeClient client.Interface + // Map from pod full name to sync status of the corresponding pod. + podStatusesLock sync.RWMutex + podStatuses map[string]api.PodStatus + podStatusChannel chan podStatusSyncRequest +} + +func newStatusManager(kubeClient client.Interface) *statusManager { + return &statusManager{ + kubeClient: kubeClient, + podStatuses: make(map[string]api.PodStatus), + podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses + } +} + +func (s *statusManager) Start() { + // We can run SyncBatch() often because it will block until we have some updates to send. + go util.Forever(s.SyncBatch, 0) +} + +func (s *statusManager) GetPodStatus(podFullName string) (api.PodStatus, bool) { + s.podStatusesLock.RLock() + defer s.podStatusesLock.RUnlock() + status, ok := s.podStatuses[podFullName] + return status, ok +} + +func (s *statusManager) SetPodStatus(podFullName string, status api.PodStatus) { + s.podStatusesLock.Lock() + defer s.podStatusesLock.Unlock() + oldStatus, found := s.podStatuses[podFullName] + if !found || !reflect.DeepEqual(oldStatus, status) { + s.podStatuses[podFullName] = status + s.podStatusChannel <- podStatusSyncRequest{podFullName, status} + } else { + glog.V(3).Infof("Ignoring same pod status for %s - old: %s new: %s", podFullName, oldStatus, status) + } +} + +func (s *statusManager) DeletePodStatus(podFullName string) { + s.podStatusesLock.Lock() + defer s.podStatusesLock.Unlock() + delete(s.podStatuses, podFullName) +} + +// TODO(filipg): It'd be cleaner if we can do this without signal from user. +func (s *statusManager) RemoveOrphanedStatuses(podFullNames map[string]bool) { + s.podStatusesLock.Lock() + defer s.podStatusesLock.Unlock() + for key := range s.podStatuses { + if _, ok := podFullNames[key]; !ok { + glog.V(5).Infof("Removing %q from status map.", key) + delete(s.podStatuses, key) + } + } +} + +// SyncBatch syncs pods statuses with the apiserver. It will loop until channel +// s.podStatusChannel is empty for at least 1s. +func (s *statusManager) SyncBatch() { + for { + select { + case syncRequest := <-s.podStatusChannel: + podFullName := syncRequest.podFullName + status := syncRequest.status + glog.V(3).Infof("Syncing status for %s", podFullName) + name, namespace, err := ParsePodFullName(podFullName) + if err != nil { + glog.Warningf("Cannot parse pod full name %q: %s", podFullName, err) + } + _, err = s.kubeClient.Pods(namespace).UpdateStatus(name, &status) + if err != nil { + // We failed to update status. In order to make sure we retry next time + // we delete cached value. This may result in an additional update, but + // this is ok. + s.DeletePodStatus(podFullName) + glog.Warningf("Error updating status for pod %q: %v", name, err) + } else { + glog.V(3).Infof("Status for pod %q updated successfully", name) + } + case <-time.After(1 * time.Second): + return + } + } +} diff --git a/pkg/kubelet/status_manager_test.go b/pkg/kubelet/status_manager_test.go new file mode 100644 index 0000000000..8b1df74978 --- /dev/null +++ b/pkg/kubelet/status_manager_test.go @@ -0,0 +1,83 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "math/rand" + "strconv" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" +) + +const ( + podFullName string = "podName_namespace" +) + +func newTestStatusManager() *statusManager { + return newStatusManager(&client.Fake{}) +} + +func generateRandomMessage() string { + return strconv.Itoa(rand.Int()) +} + +func getRandomPodStatus() api.PodStatus { + return api.PodStatus{ + Message: generateRandomMessage(), + } +} + +func verifyActions(t *testing.T, kubeClient client.Interface, expectedActions []string) { + actions := kubeClient.(*client.Fake).Actions + if len(actions) != len(expectedActions) { + t.Errorf("unexpected actions, got: %s expected: %s", actions, expectedActions) + return + } + for i := 0; i < len(actions); i++ { + if actions[i].Action != expectedActions[i] { + t.Errorf("unexpected actions, got: %s expected: %s", actions, expectedActions) + } + } +} + +func TestNewStatus(t *testing.T) { + syncer := newTestStatusManager() + syncer.SetPodStatus(podFullName, getRandomPodStatus()) + syncer.SyncBatch() + verifyActions(t, syncer.kubeClient, []string{"update-status-pod"}) +} + +func TestChangedStatus(t *testing.T) { + syncer := newTestStatusManager() + syncer.SetPodStatus(podFullName, getRandomPodStatus()) + syncer.SyncBatch() + syncer.SetPodStatus(podFullName, getRandomPodStatus()) + syncer.SyncBatch() + verifyActions(t, syncer.kubeClient, []string{"update-status-pod", "update-status-pod"}) +} + +func TestUnchangedStatus(t *testing.T) { + syncer := newTestStatusManager() + podStatus := getRandomPodStatus() + syncer.SetPodStatus(podFullName, podStatus) + syncer.SyncBatch() + syncer.SetPodStatus(podFullName, podStatus) + syncer.SyncBatch() + verifyActions(t, syncer.kubeClient, []string{"update-status-pod"}) +}