* Update pod status only when it changes.

* Refactor syncing logic into a separate struct
pull/6/head
Filip Grzadkowski 2015-03-20 17:37:08 +01:00
parent 69a648406d
commit 632ca506ce
10 changed files with 288 additions and 176 deletions

View File

@ -224,7 +224,6 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
configFilePath := makeTempDirOrDie("config", testRootDir)
glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
kcfg := kubeletapp.SimpleKubelet(cl, &fakeDocker1, machineList[0], testRootDir, firstManifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath)
kcfg.PodStatusUpdateFrequency = 1 * time.Second
kubeletapp.RunKubelet(kcfg)
// Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both
@ -232,7 +231,6 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
testRootDir = makeTempDirOrDie("kubelet_integ_2.", "")
glog.Infof("Using %s as root dir for kubelet #2", testRootDir)
kcfg = kubeletapp.SimpleKubelet(cl, &fakeDocker2, machineList[1], testRootDir, secondManifestURL, "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "")
kcfg.PodStatusUpdateFrequency = 1 * time.Second
kubeletapp.RunKubelet(kcfg)
return apiServer.URL, configFilePath
}

View File

@ -52,7 +52,6 @@ type KubeletServer struct {
FileCheckFrequency time.Duration
HTTPCheckFrequency time.Duration
StatusUpdateFrequency time.Duration
PodStatusUpdateFrequency time.Duration
ManifestURL string
EnableServer bool
Address util.IP
@ -86,14 +85,13 @@ type KubeletServer struct {
// NewKubeletServer will create a new KubeletServer with default values.
func NewKubeletServer() *KubeletServer {
return &KubeletServer{
SyncFrequency: 10 * time.Second,
FileCheckFrequency: 20 * time.Second,
HTTPCheckFrequency: 20 * time.Second,
StatusUpdateFrequency: 20 * time.Second,
PodStatusUpdateFrequency: 2 * time.Minute,
EnableServer: true,
Address: util.IP(net.ParseIP("127.0.0.1")),
Port: ports.KubeletPort,
SyncFrequency: 10 * time.Second,
FileCheckFrequency: 20 * time.Second,
HTTPCheckFrequency: 20 * time.Second,
StatusUpdateFrequency: 20 * time.Second,
EnableServer: true,
Address: util.IP(net.ParseIP("127.0.0.1")),
Port: ports.KubeletPort,
PodInfraContainerImage: kubelet.PodInfraContainerImage,
RootDirectory: defaultRootDir,
RegistryBurst: 10,
@ -115,7 +113,6 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.Config, "config", s.Config, "Path to the config file or directory of files")
fs.DurationVar(&s.SyncFrequency, "sync_frequency", s.SyncFrequency, "Max period between synchronizing running containers and config")
fs.DurationVar(&s.StatusUpdateFrequency, "status_update_frequency", s.StatusUpdateFrequency, "Duration between posting node status to master")
fs.DurationVar(&s.PodStatusUpdateFrequency, "pod_status_update_frequency", s.PodStatusUpdateFrequency, "Duration between posting pod status updates to the master")
fs.DurationVar(&s.FileCheckFrequency, "file_check_frequency", s.FileCheckFrequency, "Duration between checking config files for new data")
fs.DurationVar(&s.HTTPCheckFrequency, "http_check_frequency", s.HTTPCheckFrequency, "Duration between checking http for new data")
fs.StringVar(&s.ManifestURL, "manifest_url", s.ManifestURL, "URL for accessing the container manifest")
@ -183,7 +180,6 @@ func (s *KubeletServer) Run(_ []string) error {
ConfigFile: s.Config,
ManifestURL: s.ManifestURL,
StatusUpdateFrequency: s.StatusUpdateFrequency,
PodStatusUpdateFrequency: s.PodStatusUpdateFrequency,
FileCheckFrequency: s.FileCheckFrequency,
HTTPCheckFrequency: s.HTTPCheckFrequency,
PodInfraContainerImage: s.PodInfraContainerImage,
@ -283,24 +279,23 @@ func SimpleKubelet(client *client.Client,
RootDirectory: rootDir,
ManifestURL: manifestURL,
PodInfraContainerImage: kubelet.PodInfraContainerImage,
Port: port,
Address: util.IP(net.ParseIP(address)),
EnableServer: true,
EnableDebuggingHandlers: true,
HTTPCheckFrequency: 1 * time.Second,
FileCheckFrequency: 1 * time.Second,
StatusUpdateFrequency: 3 * time.Second,
PodStatusUpdateFrequency: 2 * time.Minute,
SyncFrequency: 3 * time.Second,
MinimumGCAge: 10 * time.Second,
MaxPerPodContainerCount: 5,
MaxContainerCount: 100,
MasterServiceNamespace: masterServiceNamespace,
VolumePlugins: volumePlugins,
TLSOptions: tlsOptions,
CadvisorInterface: cadvisorInterface,
ConfigFile: configFilePath,
ImageGCPolicy: imageGCPolicy,
Port: port,
Address: util.IP(net.ParseIP(address)),
EnableServer: true,
EnableDebuggingHandlers: true,
HTTPCheckFrequency: 1 * time.Second,
FileCheckFrequency: 1 * time.Second,
StatusUpdateFrequency: 3 * time.Second,
SyncFrequency: 3 * time.Second,
MinimumGCAge: 10 * time.Second,
MaxPerPodContainerCount: 5,
MaxContainerCount: 100,
MasterServiceNamespace: masterServiceNamespace,
VolumePlugins: volumePlugins,
TLSOptions: tlsOptions,
CadvisorInterface: cadvisorInterface,
ConfigFile: configFilePath,
ImageGCPolicy: imageGCPolicy,
}
return &kcfg
}
@ -386,7 +381,6 @@ type KubeletConfig struct {
ConfigFile string
ManifestURL string
StatusUpdateFrequency time.Duration
PodStatusUpdateFrequency time.Duration
FileCheckFrequency time.Duration
HTTPCheckFrequency time.Duration
Hostname string
@ -453,7 +447,6 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
kc.Recorder,
kc.CadvisorInterface,
kc.StatusUpdateFrequency,
kc.PodStatusUpdateFrequency,
kc.ImageGCPolicy)
if err != nil {

View File

@ -71,7 +71,7 @@ func ResolvePort(portReference util.IntOrString, container *api.Container) (int,
func (h *httpActionHandler) Run(podFullName string, uid types.UID, container *api.Container, handler *api.Handler) error {
host := handler.HTTPGet.Host
if len(host) == 0 {
status, err := h.kubelet.GetPodStatus(podFullName, uid)
status, err := h.kubelet.GetPodStatus(podFullName)
if err != nil {
glog.Errorf("Unable to get pod info, event handlers may be invalid.")
return err

View File

@ -125,7 +125,6 @@ func NewMainKubelet(
recorder record.EventRecorder,
cadvisorInterface cadvisor.Interface,
statusUpdateFrequency time.Duration,
podStatusUpdateFrequency time.Duration,
imageGCPolicy ImageGCPolicy) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
@ -133,9 +132,6 @@ func NewMainKubelet(
if resyncInterval <= 0 {
return nil, fmt.Errorf("invalid sync frequency %d", resyncInterval)
}
if podStatusUpdateFrequency <= 0 {
return nil, fmt.Errorf("invalid status update frequency %d", podStatusUpdateFrequency)
}
dockerClient = metrics.NewInstrumentedDockerInterface(dockerClient)
// Wait for the Docker daemon to be up (with a timeout).
@ -199,6 +195,7 @@ func NewMainKubelet(
if err != nil {
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}
statusManager := newStatusManager(kubeClient)
klet := &Kubelet{
hostname: hostname,
@ -207,7 +204,6 @@ func NewMainKubelet(
rootDirectory: rootDirectory,
statusUpdateFrequency: statusUpdateFrequency,
resyncInterval: resyncInterval,
podStatusUpdateFrequency: podStatusUpdateFrequency,
podInfraContainerImage: podInfraContainerImage,
containerIDToRef: map[string]*api.ObjectReference{},
runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
@ -227,6 +223,7 @@ func NewMainKubelet(
cadvisor: cadvisorInterface,
containerGC: containerGC,
imageManager: imageManager,
statusManager: statusManager,
}
klet.podManager = newBasicPodManager(klet.kubeClient)
@ -253,8 +250,6 @@ func NewMainKubelet(
klet.networkPlugin = plug
}
klet.podStatuses = make(map[string]api.PodStatus)
return klet, nil
}
@ -273,26 +268,18 @@ type nodeLister interface {
// Kubelet is the main kubelet implementation.
type Kubelet struct {
hostname string
dockerClient dockertools.DockerInterface
dockerCache dockertools.DockerCache
kubeClient client.Interface
rootDirectory string
podInfraContainerImage string
podWorkers *podWorkers
statusUpdateFrequency time.Duration
resyncInterval time.Duration
podStatusUpdateFrequency time.Duration
sourcesReady SourcesReadyFn
hostname string
dockerClient dockertools.DockerInterface
dockerCache dockertools.DockerCache
kubeClient client.Interface
rootDirectory string
podInfraContainerImage string
podWorkers *podWorkers
statusUpdateFrequency time.Duration
resyncInterval time.Duration
sourcesReady SourcesReadyFn
podManager podManager
// A pod status cache stores statuses for pods (both rejected and synced).
// Note that currently no thread attempts to acquire podStatusesLock while
// accessing podManager, and vice versa. If you intend to change this usage
// pattern, please explicitly impose an acquiring order to avoid deadlocks
// and document such an order in the comment.
podStatusesLock sync.RWMutex
podStatuses map[string]api.PodStatus
// Needed to report events for containers belonging to deleted/modified pods.
// Tracks references for reporting events
@ -331,16 +318,16 @@ type Kubelet struct {
// Network plugin
networkPlugin network.NetworkPlugin
// probe runner holder
// Probe runner holder
prober probeHolder
// container readiness state holder
// Container readiness state holder
readiness *readinessStates
// how long to keep idle streaming command execution/port forwarding
// How long to keep idle streaming command execution/port forwarding
// connections open before terminating them
streamingConnectionIdleTimeout time.Duration
// the EventRecorder to use
// The EventRecorder to use
recorder record.EventRecorder
// Policy for handling garbage collection of dead containers.
@ -351,6 +338,9 @@ type Kubelet struct {
// Cached MachineInfo returned by cadvisor.
machineInfo *cadvisorApi.MachineInfo
// Syncs pods statuses with apiserver; also used as a cache of statuses.
statusManager *statusManager
}
// getRootDir returns the full path to the directory under which kubelet can
@ -521,30 +511,6 @@ func (kl *Kubelet) StartGarbageCollection() {
}, 5*time.Minute)
}
func (kl *Kubelet) getPodStatusFromCache(podFullName string) (api.PodStatus, bool) {
kl.podStatusesLock.RLock()
defer kl.podStatusesLock.RUnlock()
status, ok := kl.podStatuses[podFullName]
return status, ok
}
func (kl *Kubelet) setPodStatusInCache(podFullName string, status api.PodStatus) {
kl.podStatusesLock.Lock()
defer kl.podStatusesLock.Unlock()
kl.podStatuses[podFullName] = status
}
func (kl *Kubelet) removeOrphanedStatuses(podFullNames map[string]bool) {
kl.podStatusesLock.Lock()
defer kl.podStatusesLock.Unlock()
for key := range kl.podStatuses {
if _, ok := podFullNames[key]; !ok {
glog.V(5).Infof("Removing %q from status map.", key)
delete(kl.podStatuses, key)
}
}
}
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan PodUpdate) {
if kl.logServer == nil {
@ -557,11 +523,7 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
glog.Warning("No api server defined - no node status update will be sent.")
}
go kl.syncNodeStatus()
// syncStatus handles its own frequency and throttling, run it always.
go util.Forever(func() {
kl.syncStatus(kl.podStatusUpdateFrequency)
}, 0)
kl.statusManager.Start()
kl.syncLoop(updates, kl)
}
@ -1263,7 +1225,7 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, c
if found {
glog.V(4).Infof("Found infra pod for %q", podFullName)
containersToKeep[podInfraContainerID] = -1
podStatus, err = kl.GetPodStatus(podFullName, uid)
podStatus, err = kl.GetPodStatus(podFullName)
if err != nil {
glog.Errorf("Unable to get pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
}
@ -1358,7 +1320,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dock
if err != nil {
glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
} else {
kl.setPodStatusInCache(podFullName, status)
kl.statusManager.SetPodStatus(podFullName, status)
}
}()
@ -1528,13 +1490,13 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
for _, pod := range allPods {
podFullNames[GetPodFullName(&pod)] = true
}
kl.removeOrphanedStatuses(podFullNames)
kl.statusManager.RemoveOrphanedStatuses(podFullNames)
// Filter out the rejected pod. They don't have running containers.
kl.handleNotFittingPods(allPods)
var pods []api.Pod
for _, pod := range allPods {
status, ok := kl.getPodStatusFromCache(GetPodFullName(&pod))
status, ok := kl.statusManager.GetPodStatus(GetPodFullName(&pod))
if ok && status.Phase == api.PodFailed {
continue
}
@ -1712,21 +1674,21 @@ func (kl *Kubelet) handleNotFittingPods(pods []api.Pod) {
fitting, notFitting := checkHostPortConflicts(pods)
for _, pod := range notFitting {
kl.recorder.Eventf(&pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{
kl.statusManager.SetPodStatus(GetPodFullName(&pod), api.PodStatus{
Phase: api.PodFailed,
Message: "Pod cannot be started due to host port conflict"})
}
fitting, notFitting = kl.checkNodeSelectorMatching(fitting)
for _, pod := range notFitting {
kl.recorder.Eventf(&pod, "nodeSelectorMismatching", "Cannot start the pod due to node selector mismatch.")
kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{
kl.statusManager.SetPodStatus(GetPodFullName(&pod), api.PodStatus{
Phase: api.PodFailed,
Message: "Pod cannot be started due to node selector mismatch"})
}
fitting, notFitting = kl.checkCapacityExceeded(fitting)
for _, pod := range notFitting {
kl.recorder.Eventf(&pod, "capacityExceeded", "Cannot start the pod due to exceeded capacity.")
kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{
kl.statusManager.SetPodStatus(GetPodFullName(&pod), api.PodStatus{
Phase: api.PodFailed,
Message: "Pod cannot be started due to exceeded capacity"})
}
@ -1768,40 +1730,6 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
}
}
// syncStatus syncs pods statuses with the apiserver. Spread the updates over the specified deadline.
func (kl *Kubelet) syncStatus(deadline time.Duration) {
start := time.Now()
glog.V(3).Infof("Syncing pods status")
pods, _ := kl.GetPods()
if len(pods) == 0 {
// No pods, sleep the rest of our deadline.
time.Sleep(deadline - time.Since(start))
return
}
// TODO(vmarmol): Enhance util.RateLimiter for our use here.
singleDeadline := time.Duration(deadline.Nanoseconds() / int64(len(pods)))
t := time.NewTicker(singleDeadline)
for _, pod := range pods {
// Don't hit the api server too hard, wait for the next time slot.
<-t.C
status, err := kl.GetPodStatus(GetPodFullName(&pod), pod.UID)
if err != nil {
glog.Warningf("Error getting pod %q status: %v, retry later", pod.Name, err)
continue
}
_, err = kl.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status)
if err != nil {
glog.Warningf("Error updating status for pod %q: %v (full pod: %q)", pod.Name, err, pod)
} else {
glog.V(3).Infof("Status for pod %q updated successfully: %q", pod.Name, pod)
}
}
t.Stop()
}
// Returns Docker version for this Kubelet.
func (kl *Kubelet) GetDockerVersion() ([]uint, error) {
if kl.dockerClient == nil {
@ -1832,11 +1760,10 @@ func (kl *Kubelet) validateContainerStatus(podStatus *api.PodStatus, containerNa
}
// GetKubeletContainerLogs returns logs from the container
// The second parameter of GetPodStatus and FindPodContainer methods represents pod UUID, which is allowed to be blank
// TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt
// or all of them.
func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error {
podStatus, err := kl.GetPodStatus(podFullName, "")
podStatus, err := kl.GetPodStatus(podFullName)
if err != nil {
if err == dockertools.ErrNoContainersInPod {
return fmt.Errorf("pod %q not found\n", podFullName)
@ -2019,19 +1946,17 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio
}
// GetPodStatus returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
uid = kl.podManager.TranslatePodUID(uid)
func (kl *Kubelet) GetPodStatus(podFullName string) (api.PodStatus, error) {
// Check to see if we have a cached version of the status.
cachedPodStatus, found := kl.getPodStatusFromCache(podFullName)
cachedPodStatus, found := kl.statusManager.GetPodStatus(podFullName)
if found {
glog.V(3).Infof("Returning cached status for %q", podFullName)
return cachedPodStatus, nil
}
return kl.generatePodStatus(podFullName, uid)
return kl.generatePodStatus(podFullName)
}
func (kl *Kubelet) generatePodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
func (kl *Kubelet) generatePodStatus(podFullName string) (api.PodStatus, error) {
pod, found := kl.GetPodByFullName(podFullName)
if !found {
return api.PodStatus{}, fmt.Errorf("couldn't find pod %q", podFullName)

View File

@ -99,7 +99,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.nodeLister = testNodeLister{}
kubelet.readiness = newReadinessStates()
kubelet.recorder = fakeRecorder
kubelet.podStatuses = map[string]api.PodStatus{}
kubelet.statusManager = newStatusManager(fakeKubeClient)
if err := kubelet.setupDataDirs(); err != nil {
t.Fatalf("can't initialize kubelet data dirs: %v", err)
}
@ -2866,13 +2866,10 @@ func TestHandlePortConflicts(t *testing.T) {
conflictedPodName := GetPodFullName(&pods[0])
kl.handleNotFittingPods(pods)
if len(kl.podStatuses) != 1 {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
// Check pod status stored in the status map.
status, ok := kl.podStatuses[conflictedPodName]
if !ok {
t.Fatalf("status of pod %q is not found in the status map.", conflictedPodName)
status, err := kl.GetPodStatus(conflictedPodName)
if err != nil {
t.Fatalf("status of pod %q is not found in the status map: ", conflictedPodName, err)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
@ -2880,9 +2877,9 @@ func TestHandlePortConflicts(t *testing.T) {
// Check if we can retrieve the pod status from GetPodStatus().
kl.podManager.SetPods(pods)
status, err := kl.GetPodStatus(conflictedPodName, "")
status, err = kl.GetPodStatus(conflictedPodName)
if err != nil {
t.Fatalf("unable to retrieve pod status for pod %q: #v.", conflictedPodName, err)
t.Fatalf("unable to retrieve pod status for pod %q: %#v.", conflictedPodName, err)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
@ -2919,13 +2916,10 @@ func TestHandleNodeSelector(t *testing.T) {
notfittingPodName := GetPodFullName(&pods[0])
kl.handleNotFittingPods(pods)
if len(kl.podStatuses) != 1 {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
// Check pod status stored in the status map.
status, ok := kl.podStatuses[notfittingPodName]
if !ok {
t.Fatalf("status of pod %q is not found in the status map.", notfittingPodName)
status, err := kl.GetPodStatus(notfittingPodName)
if err != nil {
t.Fatalf("status of pod %q is not found in the status map: %#v", notfittingPodName, err)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
@ -2933,9 +2927,9 @@ func TestHandleNodeSelector(t *testing.T) {
// Check if we can retrieve the pod status from GetPodStatus().
kl.podManager.SetPods(pods)
status, err := kl.GetPodStatus(notfittingPodName, "")
status, err = kl.GetPodStatus(notfittingPodName)
if err != nil {
t.Fatalf("unable to retrieve pod status for pod %q: #v.", notfittingPodName, err)
t.Fatalf("unable to retrieve pod status for pod %q: %#v.", notfittingPodName, err)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
@ -2978,13 +2972,10 @@ func TestHandleMemExceeded(t *testing.T) {
notfittingPodName := GetPodFullName(&pods[0])
kl.handleNotFittingPods(pods)
if len(kl.podStatuses) != 1 {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
// Check pod status stored in the status map.
status, ok := kl.podStatuses[notfittingPodName]
if !ok {
t.Fatalf("status of pod %q is not found in the status map.", notfittingPodName)
status, err := kl.GetPodStatus(notfittingPodName)
if err != nil {
t.Fatalf("status of pod %q is not found in the status map: ", notfittingPodName, err)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
@ -2992,7 +2983,7 @@ func TestHandleMemExceeded(t *testing.T) {
// Check if we can retrieve the pod status from GetPodStatus().
kl.podManager.SetPods(pods)
status, err := kl.GetPodStatus(notfittingPodName, "")
status, err = kl.GetPodStatus(notfittingPodName)
if err != nil {
t.Fatalf("unable to retrieve pod status for pod %q: #v.", notfittingPodName, err)
}
@ -3001,24 +2992,25 @@ func TestHandleMemExceeded(t *testing.T) {
}
}
// TODO(filipg): This test should be removed once StatusSyncer can do garbage collection without external signal.
func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kl := testKubelet.kubelet
pods := []api.Pod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{ObjectMeta: api.ObjectMeta{Name: "pod1"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{ObjectMeta: api.ObjectMeta{Name: "pod2"}, Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
}
// Run once to populate the status map.
kl.handleNotFittingPods(pods)
if len(kl.podStatuses) != 1 {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
if _, err := kl.GetPodStatus(BuildPodFullName("pod2", "")); err != nil {
t.Fatalf("expected to have status cached for %q: %v", "pod2", err)
}
// Sync with empty pods so that the entry in status map will be removed.
kl.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now())
if len(kl.podStatuses) != 0 {
t.Fatalf("expected length of status map to be 0. Got map %#v.", kl.podStatuses)
if _, err := kl.GetPodStatus(BuildPodFullName("pod2", "")); err == nil {
t.Fatalf("expected to not have status cached for %q: %v", "pod2", err)
}
}

View File

@ -77,8 +77,8 @@ func TestRunOnce(t *testing.T) {
rootDirectory: "/tmp/kubelet",
recorder: &record.FakeRecorder{},
cadvisor: cadvisor,
podStatuses: make(map[string]api.PodStatus),
nodeLister: testNodeLister{},
statusManager: newStatusManager(nil),
}
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))

View File

@ -84,7 +84,7 @@ type HostInterface interface {
GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error)
GetPods() ([]api.Pod, mirrorPods)
GetPodByName(namespace, name string) (*api.Pod, bool)
GetPodStatus(name string, uid types.UID) (api.PodStatus, error)
GetPodStatus(name string) (api.PodStatus, error)
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error
@ -289,7 +289,6 @@ func (s *Server) handlePodStatus(w http.ResponseWriter, req *http.Request, versi
return
}
podID := u.Query().Get("podID")
podUID := types.UID(u.Query().Get("UUID"))
podNamespace := u.Query().Get("podNamespace")
if len(podID) == 0 {
http.Error(w, "Missing 'podID=' query entry.", http.StatusBadRequest)
@ -304,7 +303,7 @@ func (s *Server) handlePodStatus(w http.ResponseWriter, req *http.Request, versi
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
}
status, err := s.host.GetPodStatus(GetPodFullName(pod), podUID)
status, err := s.host.GetPodStatus(GetPodFullName(pod))
if err != nil {
s.error(w, err)
return

View File

@ -59,7 +59,7 @@ func (fk *fakeKubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
return fk.podByNameFunc(namespace, name)
}
func (fk *fakeKubelet) GetPodStatus(name string, uid types.UID) (api.PodStatus, error) {
func (fk *fakeKubelet) GetPodStatus(name string) (api.PodStatus, error) {
return fk.statusFunc(name)
}

View File

@ -0,0 +1,122 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"reflect"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
type podStatusSyncRequest struct {
podFullName string
status api.PodStatus
}
// Updates pod statuses in apiserver. Writes only when new status has changed.
// All methods are thread-safe.
type statusManager struct {
kubeClient client.Interface
// Map from pod full name to sync status of the corresponding pod.
podStatusesLock sync.RWMutex
podStatuses map[string]api.PodStatus
podStatusChannel chan podStatusSyncRequest
}
func newStatusManager(kubeClient client.Interface) *statusManager {
return &statusManager{
kubeClient: kubeClient,
podStatuses: make(map[string]api.PodStatus),
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
}
}
func (s *statusManager) Start() {
// We can run SyncBatch() often because it will block until we have some updates to send.
go util.Forever(s.SyncBatch, 0)
}
func (s *statusManager) GetPodStatus(podFullName string) (api.PodStatus, bool) {
s.podStatusesLock.RLock()
defer s.podStatusesLock.RUnlock()
status, ok := s.podStatuses[podFullName]
return status, ok
}
func (s *statusManager) SetPodStatus(podFullName string, status api.PodStatus) {
s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock()
oldStatus, found := s.podStatuses[podFullName]
if !found || !reflect.DeepEqual(oldStatus, status) {
s.podStatuses[podFullName] = status
s.podStatusChannel <- podStatusSyncRequest{podFullName, status}
} else {
glog.V(3).Infof("Ignoring same pod status for %s - old: %s new: %s", podFullName, oldStatus, status)
}
}
func (s *statusManager) DeletePodStatus(podFullName string) {
s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock()
delete(s.podStatuses, podFullName)
}
// TODO(filipg): It'd be cleaner if we can do this without signal from user.
func (s *statusManager) RemoveOrphanedStatuses(podFullNames map[string]bool) {
s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock()
for key := range s.podStatuses {
if _, ok := podFullNames[key]; !ok {
glog.V(5).Infof("Removing %q from status map.", key)
delete(s.podStatuses, key)
}
}
}
// SyncBatch syncs pods statuses with the apiserver. It will loop until channel
// s.podStatusChannel is empty for at least 1s.
func (s *statusManager) SyncBatch() {
for {
select {
case syncRequest := <-s.podStatusChannel:
podFullName := syncRequest.podFullName
status := syncRequest.status
glog.V(3).Infof("Syncing status for %s", podFullName)
name, namespace, err := ParsePodFullName(podFullName)
if err != nil {
glog.Warningf("Cannot parse pod full name %q: %s", podFullName, err)
}
_, err = s.kubeClient.Pods(namespace).UpdateStatus(name, &status)
if err != nil {
// We failed to update status. In order to make sure we retry next time
// we delete cached value. This may result in an additional update, but
// this is ok.
s.DeletePodStatus(podFullName)
glog.Warningf("Error updating status for pod %q: %v", name, err)
} else {
glog.V(3).Infof("Status for pod %q updated successfully", name)
}
case <-time.After(1 * time.Second):
return
}
}
}

View File

@ -0,0 +1,83 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"math/rand"
"strconv"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
)
const (
podFullName string = "podName_namespace"
)
func newTestStatusManager() *statusManager {
return newStatusManager(&client.Fake{})
}
func generateRandomMessage() string {
return strconv.Itoa(rand.Int())
}
func getRandomPodStatus() api.PodStatus {
return api.PodStatus{
Message: generateRandomMessage(),
}
}
func verifyActions(t *testing.T, kubeClient client.Interface, expectedActions []string) {
actions := kubeClient.(*client.Fake).Actions
if len(actions) != len(expectedActions) {
t.Errorf("unexpected actions, got: %s expected: %s", actions, expectedActions)
return
}
for i := 0; i < len(actions); i++ {
if actions[i].Action != expectedActions[i] {
t.Errorf("unexpected actions, got: %s expected: %s", actions, expectedActions)
}
}
}
func TestNewStatus(t *testing.T) {
syncer := newTestStatusManager()
syncer.SetPodStatus(podFullName, getRandomPodStatus())
syncer.SyncBatch()
verifyActions(t, syncer.kubeClient, []string{"update-status-pod"})
}
func TestChangedStatus(t *testing.T) {
syncer := newTestStatusManager()
syncer.SetPodStatus(podFullName, getRandomPodStatus())
syncer.SyncBatch()
syncer.SetPodStatus(podFullName, getRandomPodStatus())
syncer.SyncBatch()
verifyActions(t, syncer.kubeClient, []string{"update-status-pod", "update-status-pod"})
}
func TestUnchangedStatus(t *testing.T) {
syncer := newTestStatusManager()
podStatus := getRandomPodStatus()
syncer.SetPodStatus(podFullName, podStatus)
syncer.SyncBatch()
syncer.SetPodStatus(podFullName, podStatus)
syncer.SyncBatch()
verifyActions(t, syncer.kubeClient, []string{"update-status-pod"})
}