Merge pull request #5685 from yujuhong/pod_status

Kubelet: per-pod workers should avoid grabbing the pod array lock
pull/6/head
Victor Marmol 2015-03-19 18:54:36 -07:00
commit b6d2cf4b3e
2 changed files with 34 additions and 28 deletions

View File

@ -1245,12 +1245,12 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, c
result, err := kl.probeContainer(pod, podStatus, container, dockerContainer.ID, dockerContainer.Created) result, err := kl.probeContainer(pod, podStatus, container, dockerContainer.ID, dockerContainer.Created)
if err != nil { if err != nil {
// TODO(vmarmol): examine this logic. // TODO(vmarmol): examine this logic.
glog.Infof("probe no-error: %s", container.Name) glog.Infof("probe no-error: %q", container.Name)
containersToKeep[containerID] = index containersToKeep[containerID] = index
continue continue
} }
if result == probe.Success { if result == probe.Success {
glog.Infof("probe success: %s", container.Name) glog.Infof("probe success: %q", container.Name)
containersToKeep[containerID] = index containersToKeep[containerID] = index
continue continue
} }
@ -1314,7 +1314,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dock
// Before returning, regenerate status and store it in the cache. // Before returning, regenerate status and store it in the cache.
defer func() { defer func() {
status, err := kl.generatePodStatus(podFullName, uid) status, err := kl.generatePodStatusByPod(pod)
if err != nil { if err != nil {
glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err) glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
} else { } else {
@ -1467,7 +1467,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont
if _, ok := desiredVolumes[name]; !ok { if _, ok := desiredVolumes[name]; !ok {
parts := strings.Split(name, "/") parts := strings.Split(name, "/")
if runningSet.Has(parts[0]) { if runningSet.Has(parts[0]) {
glog.Infof("volume %s, still has a container running %s, skipping teardown", name, parts[0]) glog.Infof("volume %q, still has a container running %q, skipping teardown", name, parts[0])
continue continue
} }
//TODO (jonesdl) We should somehow differentiate between volumes that are supposed //TODO (jonesdl) We should somehow differentiate between volumes that are supposed
@ -1767,9 +1767,9 @@ func (kl *Kubelet) syncStatus(deadline time.Duration) {
} }
_, err = kl.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status) _, err = kl.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status)
if err != nil { if err != nil {
glog.Warningf("Error updating status for pod %s: %v (full pod: %s)", pod.Name, err, pod) glog.Warningf("Error updating status for pod %q: %v (full pod: %q)", pod.Name, err, pod)
} else { } else {
glog.V(3).Infof("Status for pod %q updated successfully: %s", pod.Name, pod) glog.V(3).Infof("Status for pod %q updated successfully: %q", pod.Name, pod)
} }
} }
t.Stop() t.Stop()
@ -1887,7 +1887,16 @@ func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet) {
return append([]api.Pod{}, kl.pods...), kl.mirrorPods return append([]api.Pod{}, kl.pods...), kl.mirrorPods
} }
// GetPodByName provides the first pod that matches namespace and name, as well as whether the node was found. func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.Pod, bool) {
name, namespace, err := ParsePodFullName(podFullName)
if err != nil {
return nil, false
}
return kl.GetPodByName(namespace, name)
}
// GetPodByName provides the first pod that matches namespace and name, as well
// as whether the pod was found.
func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) { func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
kl.podLock.RLock() kl.podLock.RLock()
defer kl.podLock.RUnlock() defer kl.podLock.RUnlock()
@ -1917,10 +1926,10 @@ func (kl *Kubelet) updateNodeStatus() error {
func (kl *Kubelet) tryUpdateNodeStatus() error { func (kl *Kubelet) tryUpdateNodeStatus() error {
node, err := kl.kubeClient.Nodes().Get(kl.hostname) node, err := kl.kubeClient.Nodes().Get(kl.hostname)
if err != nil { if err != nil {
return fmt.Errorf("error getting node %s: %v", kl.hostname, err) return fmt.Errorf("error getting node %q: %v", kl.hostname, err)
} }
if node == nil { if node == nil {
return fmt.Errorf("no node instance returned for %v", kl.hostname) return fmt.Errorf("no node instance returned for %q", kl.hostname)
} }
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start // TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
@ -2042,41 +2051,37 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio
return ready return ready
} }
func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.PodSpec, bool) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
for _, pod := range kl.pods {
if GetPodFullName(&pod) == podFullName {
return &pod.Spec, true
}
}
return nil, false
}
// GetPodStatus returns information from Docker about the containers in a pod // GetPodStatus returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) { func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
// Check to see if we have a cached version of the status. // Check to see if we have a cached version of the status.
cachedPodStatus, found := kl.getPodStatusFromCache(podFullName) cachedPodStatus, found := kl.getPodStatusFromCache(podFullName)
if found { if found {
glog.V(3).Infof("Returning cached status for %s", podFullName) glog.V(3).Infof("Returning cached status for %q", podFullName)
return cachedPodStatus, nil return cachedPodStatus, nil
} }
return kl.generatePodStatus(podFullName, uid) return kl.generatePodStatus(podFullName, uid)
} }
func (kl *Kubelet) generatePodStatus(podFullName string, uid types.UID) (api.PodStatus, error) { func (kl *Kubelet) generatePodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
glog.V(3).Infof("Generating status for %s", podFullName) pod, found := kl.GetPodByFullName(podFullName)
spec, found := kl.GetPodByFullName(podFullName)
if !found { if !found {
return api.PodStatus{}, fmt.Errorf("Couldn't find spec for pod %s", podFullName) return api.PodStatus{}, fmt.Errorf("couldn't find pod %q", podFullName)
} }
return kl.generatePodStatusByPod(pod)
}
podStatus, err := dockertools.GetDockerPodStatus(kl.dockerClient, *spec, podFullName, uid) // By passing the pod directly, this method avoids pod lookup, which requires
// grabbing a lock.
func (kl *Kubelet) generatePodStatusByPod(pod *api.Pod) (api.PodStatus, error) {
podFullName := GetPodFullName(pod)
glog.V(3).Infof("Generating status for %q", podFullName)
spec := &pod.Spec
podStatus, err := dockertools.GetDockerPodStatus(kl.dockerClient, *spec, podFullName, pod.UID)
if err != nil { if err != nil {
// Error handling // Error handling
glog.Infof("Query docker container info for pod %s failed with error (%v)", podFullName, err) glog.Infof("Query docker container info for pod %q failed with error (%v)", podFullName, err)
if strings.Contains(err.Error(), "resource temporarily unavailable") { if strings.Contains(err.Error(), "resource temporarily unavailable") {
// Leave upstream layer to decide what to do // Leave upstream layer to decide what to do
return api.PodStatus{}, err return api.PodStatus{}, err
@ -2153,7 +2158,7 @@ func (kl *Kubelet) PortForward(podFullName string, uid types.UID, port uint16, s
} }
podInfraContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uid, dockertools.PodInfraContainerName) podInfraContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uid, dockertools.PodInfraContainerName)
if !found { if !found {
return fmt.Errorf("Unable to find pod infra container for pod %s, uid %v", podFullName, uid) return fmt.Errorf("Unable to find pod infra container for pod %q, uid %v", podFullName, uid)
} }
return kl.runner.PortForward(podInfraContainer.ID, port, stream) return kl.runner.PortForward(podInfraContainer.ID, port, stream)
} }

View File

@ -77,6 +77,7 @@ func TestRunOnce(t *testing.T) {
rootDirectory: "/tmp/kubelet", rootDirectory: "/tmp/kubelet",
recorder: &record.FakeRecorder{}, recorder: &record.FakeRecorder{},
cadvisor: cadvisor, cadvisor: cadvisor,
podStatuses: make(map[string]api.PodStatus),
} }
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))