From a263c77b65e7e8847633e4643cb93bd854162dad Mon Sep 17 00:00:00 2001 From: "Tim St. Clair" Date: Mon, 19 Oct 2015 15:15:59 -0700 Subject: [PATCH] Refactor liveness probing This commit builds on previous work and creates an independent worker for every liveness probe. Liveness probes behave largely the same as readiness probes, so much of the code is shared by introducing a probeType paramater to distinguish the type when it matters. The circular dependency between the runtime and the prober is broken by exposing a shared liveness ResultsManager, owned by the kubelet. Finally, an Updates channel is introduced to the ResultsManager so the kubelet can react to unhealthy containers immediately. --- pkg/kubelet/dockertools/fake_manager.go | 6 +- pkg/kubelet/dockertools/manager.go | 24 +- pkg/kubelet/dockertools/manager_test.go | 34 +-- pkg/kubelet/kubelet.go | 39 +-- pkg/kubelet/kubelet_test.go | 3 +- pkg/kubelet/network/cni/cni_test.go | 4 +- pkg/kubelet/prober/fake_prober.go | 45 ---- pkg/kubelet/prober/manager.go | 114 ++++++--- pkg/kubelet/prober/manager_test.go | 95 ++++--- pkg/kubelet/prober/prober.go | 41 ++- pkg/kubelet/prober/prober_test.go | 71 +----- pkg/kubelet/prober/results/results_manager.go | 55 ++++- .../prober/results/results_manager_test.go | 57 ++++- pkg/kubelet/prober/worker.go | 63 +++-- pkg/kubelet/prober/worker_test.go | 233 ++++++++++-------- pkg/kubelet/rkt/rkt.go | 22 +- 16 files changed, 510 insertions(+), 396 deletions(-) delete mode 100644 pkg/kubelet/prober/fake_prober.go diff --git a/pkg/kubelet/dockertools/fake_manager.go b/pkg/kubelet/dockertools/fake_manager.go index 7c76805198..16a1d5fe73 100644 --- a/pkg/kubelet/dockertools/fake_manager.go +++ b/pkg/kubelet/dockertools/fake_manager.go @@ -21,7 +21,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" - "k8s.io/kubernetes/pkg/kubelet/prober" + proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/oom" @@ -31,7 +31,7 @@ import ( func NewFakeDockerManager( client DockerInterface, recorder record.EventRecorder, - prober prober.Prober, + livenessManager proberesults.Manager, containerRefManager *kubecontainer.RefManager, machineInfo *cadvisorapi.MachineInfo, podInfraContainerImage string, @@ -45,7 +45,7 @@ func NewFakeDockerManager( fakeOOMAdjuster := oom.NewFakeOOMAdjuster() fakeProcFs := procfs.NewFakeProcFs() - dm := NewDockerManager(client, recorder, prober, containerRefManager, machineInfo, podInfraContainerImage, qps, + dm := NewDockerManager(client, recorder, livenessManager, containerRefManager, machineInfo, podInfraContainerImage, qps, burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{}, fakeOOMAdjuster, fakeProcFs, false, imageBackOff) dm.dockerPuller = &FakeDockerPuller{} diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 8cfac56c5c..e8f1bfc835 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -44,10 +44,9 @@ import ( "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network/hairpin" - "k8s.io/kubernetes/pkg/kubelet/prober" + proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/qos" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" - "k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" @@ -119,8 +118,8 @@ type DockerManager struct { // Network plugin. networkPlugin network.NetworkPlugin - // Health check prober. - prober prober.Prober + // Health check results. + livenessManager proberesults.Manager // Generator of runtime container options. generator kubecontainer.RunContainerOptionsGenerator @@ -147,7 +146,7 @@ type DockerManager struct { func NewDockerManager( client DockerInterface, recorder record.EventRecorder, - prober prober.Prober, + livenessManager proberesults.Manager, containerRefManager *kubecontainer.RefManager, machineInfo *cadvisorapi.MachineInfo, podInfraContainerImage string, @@ -208,7 +207,7 @@ func NewDockerManager( dockerRoot: dockerRoot, containerLogsDir: containerLogsDir, networkPlugin: networkPlugin, - prober: prober, + livenessManager: livenessManager, generator: generator, execHandler: execHandler, oomAdjuster: oomAdjuster, @@ -1762,20 +1761,13 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kub continue } - result, err := dm.prober.ProbeLiveness(pod, podStatus, container, c.ID, c.Created) - if err != nil { - // TODO(vmarmol): examine this logic. - glog.V(2).Infof("probe no-error: %q", container.Name) - containersToKeep[containerID] = index - continue - } - if result == probe.Success { - glog.V(4).Infof("probe success: %q", container.Name) + liveness, found := dm.livenessManager.Get(c.ID) + if !found || liveness == proberesults.Success { containersToKeep[containerID] = index continue } if pod.Spec.RestartPolicy != api.RestartPolicyNever { - glog.Infof("pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result) + glog.Infof("pod %q container %q is unhealthy, it will be killed and re-created.", podFullName, container.Name) containersToStart[index] = empty{} } } diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go index f62c7dac5e..fada0015c4 100644 --- a/pkg/kubelet/dockertools/manager_test.go +++ b/pkg/kubelet/dockertools/manager_test.go @@ -38,7 +38,8 @@ import ( "k8s.io/kubernetes/pkg/client/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" - "k8s.io/kubernetes/pkg/kubelet/prober" + proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" uexec "k8s.io/kubernetes/pkg/util/exec" @@ -83,7 +84,7 @@ func newTestDockerManagerWithHTTPClient(fakeHTTPClient *fakeHTTP) (*DockerManage dockerManager := NewFakeDockerManager( fakeDocker, fakeRecorder, - prober.FakeProber{}, + proberesults.NewManager(), containerRefManager, &cadvisorapi.MachineInfo{}, PodInfraContainerImage, @@ -854,6 +855,10 @@ func TestSyncPodBadHash(t *testing.T) { } func TestSyncPodsUnhealthy(t *testing.T) { + const ( + unhealthyContainerID = "1234" + infraContainerID = "9876" + ) dm, fakeDocker := newTestDockerManager() pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ @@ -862,40 +867,35 @@ func TestSyncPodsUnhealthy(t *testing.T) { Namespace: "new", }, Spec: api.PodSpec{ - Containers: []api.Container{ - {Name: "bar", - LivenessProbe: &api.Probe{ - // Always returns healthy == false - }, - }, - }, + Containers: []api.Container{{Name: "unhealthy"}}, }, } fakeDocker.ContainerList = []docker.APIContainers{ { // the k8s prefix is required for the kubelet to manage the container - Names: []string{"/k8s_bar_foo_new_12345678_42"}, - ID: "1234", + Names: []string{"/k8s_unhealthy_foo_new_12345678_42"}, + ID: unhealthyContainerID, }, { // pod infra container Names: []string{"/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(pod), 16) + "_foo_new_12345678_42"}, - ID: "9876", + ID: infraContainerID, }, } fakeDocker.ContainerMap = map[string]*docker.Container{ - "1234": { - ID: "1234", + unhealthyContainerID: { + ID: unhealthyContainerID, Config: &docker.Config{}, HostConfig: &docker.HostConfig{}, }, - "9876": { - ID: "9876", + infraContainerID: { + ID: infraContainerID, Config: &docker.Config{}, HostConfig: &docker.HostConfig{}, }, } + dm.livenessManager.Set(kubetypes.DockerID(unhealthyContainerID).ContainerID(), proberesults.Failure, nil) runSyncPod(t, dm, fakeDocker, pod, nil) @@ -908,7 +908,7 @@ func TestSyncPodsUnhealthy(t *testing.T) { "create", "start", "inspect_container", }) - if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil { + if err := fakeDocker.AssertStopped([]string{unhealthyContainerID}); err != nil { t.Errorf("%v", err) } } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 1c2d63a6d9..3c2f05a858 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -54,12 +54,12 @@ import ( "k8s.io/kubernetes/pkg/kubelet/network" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/prober" + proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/rkt" "k8s.io/kubernetes/pkg/kubelet/status" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubeletutil "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" @@ -309,6 +309,10 @@ func NewMainKubelet( procFs := procfs.NewProcFs() imageBackOff := util.NewBackOff(resyncInterval, MaxContainerBackOff) + + readinessManager := proberesults.NewManager() + klet.livenessManager = proberesults.NewManagerWithUpdates() + // Initialize the runtime. switch containerRuntime { case "docker": @@ -316,7 +320,7 @@ func NewMainKubelet( klet.containerRuntime = dockertools.NewDockerManager( dockerClient, recorder, - klet, // prober + klet.livenessManager, containerRefManager, machineInfo, podInfraContainerImage, @@ -344,7 +348,7 @@ func NewMainKubelet( klet, recorder, containerRefManager, - klet, // prober + klet.livenessManager, klet.volumeManager, imageBackOff) if err != nil { @@ -396,11 +400,14 @@ func NewMainKubelet( klet.runner = klet.containerRuntime klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient)) - klet.prober = prober.New(klet.runner, containerRefManager, recorder) klet.probeManager = prober.NewManager( klet.resyncInterval, klet.statusManager, - klet.prober) + readinessManager, + klet.livenessManager, + klet.runner, + containerRefManager, + recorder) runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime) if err != nil { @@ -508,10 +515,10 @@ type Kubelet struct { // Network plugin. networkPlugin network.NetworkPlugin - // Handles container readiness probing + // Handles container probing. probeManager prober.Manager - // TODO: Move prober ownership to the probeManager once the runtime no longer depends on it. - prober prober.Prober + // Manages container health check results. + livenessManager proberesults.Manager // How long to keep idle streaming command execution/port forwarding // connections open before terminating them @@ -1982,6 +1989,12 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler // Periodically syncs all the pods and performs cleanup tasks. glog.V(4).Infof("SyncLoop (periodic sync)") handler.HandlePodSyncs(kl.podManager.GetPods()) + case update := <-kl.livenessManager.Updates(): + // We only care about failures (signalling container death) here. + if update.Result == proberesults.Failure { + glog.V(1).Infof("SyncLoop (container unhealthy).") + handler.HandlePodSyncs([]*api.Pod{update.Pod}) + } } kl.syncLoopMonitor.Store(time.Now()) return true @@ -2831,16 +2844,6 @@ func (kl *Kubelet) GetRuntime() kubecontainer.Runtime { return kl.containerRuntime } -// Proxy prober calls through the Kubelet to break the circular dependency between the runtime & -// prober. -// TODO: Remove this hack once the runtime no longer depends on the prober. -func (kl *Kubelet) ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID, createdAt int64) (probe.Result, error) { - return kl.prober.ProbeLiveness(pod, status, container, containerID, createdAt) -} -func (kl *Kubelet) ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) { - return kl.prober.ProbeReadiness(pod, status, container, containerID) -} - var minRsrc = resource.MustParse("1k") var maxRsrc = resource.MustParse("1P") diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 92e606fb0e..6c82c6091c 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -47,6 +47,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/network" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/prober" + proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/status" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/runtime" @@ -134,8 +135,8 @@ func newTestKubelet(t *testing.T) *TestKubelet { t: t, } - kubelet.prober = prober.FakeProber{} kubelet.probeManager = prober.FakeManager{} + kubelet.livenessManager = proberesults.NewManager() kubelet.volumeManager = newVolumeManager() kubelet.containerManager, _ = newContainerManager(fakeContainerMgrMountInt(), mockCadvisor, "", "", "") diff --git a/pkg/kubelet/network/cni/cni_test.go b/pkg/kubelet/network/cni/cni_test.go index 7ff5d363b6..38e6c4fcb4 100644 --- a/pkg/kubelet/network/cni/cni_test.go +++ b/pkg/kubelet/network/cni/cni_test.go @@ -37,7 +37,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" - "k8s.io/kubernetes/pkg/kubelet/prober" + proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/util/sets" ) @@ -152,7 +152,7 @@ func newTestDockerManager() (*dockertools.DockerManager, *dockertools.FakeDocker dockerManager := dockertools.NewFakeDockerManager( fakeDocker, fakeRecorder, - prober.FakeProber{}, + proberesults.NewManager(), containerRefManager, &cadvisorapi.MachineInfo{}, dockertools.PodInfraContainerImage, diff --git a/pkg/kubelet/prober/fake_prober.go b/pkg/kubelet/prober/fake_prober.go deleted file mode 100644 index fd18dbd05d..0000000000 --- a/pkg/kubelet/prober/fake_prober.go +++ /dev/null @@ -1,45 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package prober - -import ( - "k8s.io/kubernetes/pkg/api" - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/probe" -) - -var _ Prober = FakeProber{} - -type FakeProber struct { - Readiness probe.Result - Liveness probe.Result - Error error -} - -func (f FakeProber) ProbeLiveness(_ *api.Pod, _ api.PodStatus, c api.Container, _ kubecontainer.ContainerID, _ int64) (probe.Result, error) { - if c.LivenessProbe == nil { - return probe.Success, nil - } - return f.Liveness, f.Error -} - -func (f FakeProber) ProbeReadiness(_ *api.Pod, _ api.PodStatus, c api.Container, _ kubecontainer.ContainerID) (probe.Result, error) { - if c.ReadinessProbe == nil { - return probe.Success, nil - } - return f.Readiness, f.Error -} diff --git a/pkg/kubelet/prober/manager.go b/pkg/kubelet/prober/manager.go index a39089a3f5..13045107c6 100644 --- a/pkg/kubelet/prober/manager.go +++ b/pkg/kubelet/prober/manager.go @@ -22,9 +22,11 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/status" + kubeutil "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/sets" ) @@ -53,19 +55,22 @@ type Manager interface { } type manager struct { - // Caches the results of readiness probes. - readinessCache results.Manager - - // Map of active workers for readiness - readinessProbes map[containerPath]*worker - // Lock for accessing & mutating readinessProbes + // Map of active workers for probes + workers map[probeKey]*worker + // Lock for accessing & mutating workers workerLock sync.RWMutex // The statusManager cache provides pod IP and container IDs for probing. statusManager status.Manager + // readinessManager manages the results of readiness probes + readinessManager results.Manager + + // livenessManager manages the results of liveness probes + livenessManager results.Manager + // prober executes the probe actions. - prober Prober + prober *prober // Default period for workers to execute a probe. defaultProbePeriod time.Duration @@ -74,36 +79,79 @@ type manager struct { func NewManager( defaultProbePeriod time.Duration, statusManager status.Manager, - prober Prober) Manager { + readinessManager results.Manager, + livenessManager results.Manager, + runner kubecontainer.ContainerCommandRunner, + refManager *kubecontainer.RefManager, + recorder record.EventRecorder) Manager { + prober := newProber(runner, refManager, recorder) return &manager{ defaultProbePeriod: defaultProbePeriod, statusManager: statusManager, prober: prober, - readinessCache: results.NewManager(), - readinessProbes: make(map[containerPath]*worker), + readinessManager: readinessManager, + livenessManager: livenessManager, + workers: make(map[probeKey]*worker), } } -// Key uniquely identifying containers -type containerPath struct { +// Key uniquely identifying container probes +type probeKey struct { podUID types.UID containerName string + probeType probeType +} + +// Type of probe (readiness or liveness) +type probeType int + +const ( + liveness probeType = iota + readiness +) + +// For debugging. +func (t probeType) String() string { + switch t { + case readiness: + return "Readiness" + case liveness: + return "Liveness" + default: + return "UNKNOWN" + } } func (m *manager) AddPod(pod *api.Pod) { m.workerLock.Lock() defer m.workerLock.Unlock() - key := containerPath{podUID: pod.UID} + key := probeKey{podUID: pod.UID} for _, c := range pod.Spec.Containers { key.containerName = c.Name - if _, ok := m.readinessProbes[key]; ok { - glog.Errorf("Readiness probe already exists! %v - %v", - kubecontainer.GetPodFullName(pod), c.Name) - return - } + if c.ReadinessProbe != nil { - m.readinessProbes[key] = m.newWorker(pod, c) + key.probeType = readiness + if _, ok := m.workers[key]; ok { + glog.Errorf("Readiness probe already exists! %v - %v", + kubeutil.FormatPodName(pod), c.Name) + return + } + w := newWorker(m, readiness, pod, c) + m.workers[key] = w + go w.run() + } + + if c.LivenessProbe != nil { + key.probeType = liveness + if _, ok := m.workers[key]; ok { + glog.Errorf("Liveness probe already exists! %v - %v", + kubeutil.FormatPodName(pod), c.Name) + return + } + w := newWorker(m, liveness, pod, c) + m.workers[key] = w + go w.run() } } } @@ -112,11 +160,14 @@ func (m *manager) RemovePod(pod *api.Pod) { m.workerLock.RLock() defer m.workerLock.RUnlock() - key := containerPath{podUID: pod.UID} + key := probeKey{podUID: pod.UID} for _, c := range pod.Spec.Containers { key.containerName = c.Name - if worker, ok := m.readinessProbes[key]; ok { - close(worker.stop) + for _, probeType := range [...]probeType{readiness, liveness} { + key.probeType = probeType + if worker, ok := m.workers[key]; ok { + close(worker.stop) + } } } } @@ -130,8 +181,8 @@ func (m *manager) CleanupPods(activePods []*api.Pod) { m.workerLock.RLock() defer m.workerLock.RUnlock() - for path, worker := range m.readinessProbes { - if _, ok := desiredPods[path.podUID]; !ok { + for key, worker := range m.workers { + if _, ok := desiredPods[key.podUID]; !ok { close(worker.stop) } } @@ -142,28 +193,27 @@ func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *api.PodStatus) { var ready bool if c.State.Running == nil { ready = false - } else if result, ok := m.readinessCache.Get( - kubecontainer.ParseContainerID(c.ContainerID)); ok { + } else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok { ready = result == results.Success } else { // The check whether there is a probe which hasn't run yet. - _, exists := m.getReadinessProbe(podUID, c.Name) + _, exists := m.getWorker(podUID, c.Name, readiness) ready = !exists } podStatus.ContainerStatuses[i].Ready = ready } } -func (m *manager) getReadinessProbe(podUID types.UID, containerName string) (*worker, bool) { +func (m *manager) getWorker(podUID types.UID, containerName string, probeType probeType) (*worker, bool) { m.workerLock.RLock() defer m.workerLock.RUnlock() - probe, ok := m.readinessProbes[containerPath{podUID, containerName}] - return probe, ok + worker, ok := m.workers[probeKey{podUID, containerName, probeType}] + return worker, ok } // Called by the worker after exiting. -func (m *manager) removeReadinessProbe(podUID types.UID, containerName string) { +func (m *manager) removeWorker(podUID types.UID, containerName string, probeType probeType) { m.workerLock.Lock() defer m.workerLock.Unlock() - delete(m.readinessProbes, containerPath{podUID, containerName}) + delete(m.workers, probeKey{podUID, containerName, probeType}) } diff --git a/pkg/kubelet/prober/manager_test.go b/pkg/kubelet/prober/manager_test.go index 9caf1cc37d..d03151598b 100644 --- a/pkg/kubelet/prober/manager_test.go +++ b/pkg/kubelet/prober/manager_test.go @@ -23,11 +23,13 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/unversioned/testclient" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/probe" + "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/wait" ) @@ -53,13 +55,13 @@ func TestAddRemovePods(t *testing.T) { Containers: []api.Container{{ Name: "no_probe1", }, { - Name: "prober1", + Name: "readiness", ReadinessProbe: &api.Probe{}, }, { Name: "no_probe2", }, { - Name: "prober2", - ReadinessProbe: &api.Probe{}, + Name: "liveness", + LivenessProbe: &api.Probe{}, }}, }, } @@ -77,7 +79,10 @@ func TestAddRemovePods(t *testing.T) { // Adding a pod with probes. m.AddPod(&probePod) - probePaths := []containerPath{{"probe_pod", "prober1"}, {"probe_pod", "prober2"}} + probePaths := []probeKey{ + {"probe_pod", "readiness", readiness}, + {"probe_pod", "liveness", liveness}, + } if err := expectProbes(m, probePaths); err != nil { t.Error(err) } @@ -115,8 +120,8 @@ func TestCleanupPods(t *testing.T) { Name: "prober1", ReadinessProbe: &api.Probe{}, }, { - Name: "prober2", - ReadinessProbe: &api.Probe{}, + Name: "prober2", + LivenessProbe: &api.Probe{}, }}, }, } @@ -129,8 +134,8 @@ func TestCleanupPods(t *testing.T) { Name: "prober1", ReadinessProbe: &api.Probe{}, }, { - Name: "prober2", - ReadinessProbe: &api.Probe{}, + Name: "prober2", + LivenessProbe: &api.Probe{}, }}, }, } @@ -139,8 +144,14 @@ func TestCleanupPods(t *testing.T) { m.CleanupPods([]*api.Pod{&podToKeep}) - removedProbes := []containerPath{{"pod_cleanup", "prober1"}, {"pod_cleanup", "prober2"}} - expectedProbes := []containerPath{{"pod_keep", "prober1"}, {"pod_keep", "prober2"}} + removedProbes := []probeKey{ + {"pod_cleanup", "prober1", readiness}, + {"pod_cleanup", "prober2", liveness}, + } + expectedProbes := []probeKey{ + {"pod_keep", "prober1", readiness}, + {"pod_keep", "prober2", liveness}, + } if err := waitForWorkerExit(m, removedProbes); err != nil { t.Fatal(err) } @@ -195,28 +206,28 @@ func TestUpdatePodStatus(t *testing.T) { m := newTestManager() // Setup probe "workers" and cached results. - m.readinessProbes = map[containerPath]*worker{ - containerPath{podUID, probedReady.Name}: {}, - containerPath{podUID, probedPending.Name}: {}, - containerPath{podUID, probedUnready.Name}: {}, - containerPath{podUID, terminated.Name}: {}, + m.workers = map[probeKey]*worker{ + probeKey{podUID, unprobed.Name, liveness}: {}, + probeKey{podUID, probedReady.Name, readiness}: {}, + probeKey{podUID, probedPending.Name, readiness}: {}, + probeKey{podUID, probedUnready.Name, readiness}: {}, + probeKey{podUID, terminated.Name, readiness}: {}, } - - m.readinessCache.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success) - m.readinessCache.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure) - m.readinessCache.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success) + m.readinessManager.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success, nil) + m.readinessManager.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure, nil) + m.readinessManager.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success, nil) m.UpdatePodStatus(podUID, &podStatus) - expectedReadiness := map[containerPath]bool{ - containerPath{podUID, unprobed.Name}: true, - containerPath{podUID, probedReady.Name}: true, - containerPath{podUID, probedPending.Name}: false, - containerPath{podUID, probedUnready.Name}: false, - containerPath{podUID, terminated.Name}: false, + expectedReadiness := map[probeKey]bool{ + probeKey{podUID, unprobed.Name, readiness}: true, + probeKey{podUID, probedReady.Name, readiness}: true, + probeKey{podUID, probedPending.Name, readiness}: false, + probeKey{podUID, probedUnready.Name, readiness}: false, + probeKey{podUID, terminated.Name, readiness}: false, } for _, c := range podStatus.ContainerStatuses { - expected, ok := expectedReadiness[containerPath{podUID, c.Name}] + expected, ok := expectedReadiness[probeKey{podUID, c.Name, readiness}] if !ok { t.Fatalf("Missing expectation for test case: %v", c.Name) } @@ -227,16 +238,16 @@ func TestUpdatePodStatus(t *testing.T) { } } -func expectProbes(m *manager, expectedReadinessProbes []containerPath) error { +func expectProbes(m *manager, expectedProbes []probeKey) error { m.workerLock.RLock() defer m.workerLock.RUnlock() - var unexpected []containerPath - missing := make([]containerPath, len(expectedReadinessProbes)) - copy(missing, expectedReadinessProbes) + var unexpected []probeKey + missing := make([]probeKey, len(expectedProbes)) + copy(missing, expectedProbes) outer: - for probePath := range m.readinessProbes { + for probePath := range m.workers { for i, expectedPath := range missing { if probePath == expectedPath { missing = append(missing[:i], missing[i+1:]...) @@ -255,26 +266,34 @@ outer: func newTestManager() *manager { const probePeriod = 1 - statusManager := status.NewManager(&testclient.Fake{}) - prober := FakeProber{Readiness: probe.Success} - return NewManager(probePeriod, statusManager, prober).(*manager) + m := NewManager( + probePeriod, + status.NewManager(&testclient.Fake{}), + results.NewManager(), + results.NewManager(), + nil, // runner + kubecontainer.NewRefManager(), + &record.FakeRecorder{}, + ).(*manager) + // Don't actually execute probes. + m.prober.exec = fakeExecProber{probe.Success, nil} + return m } // Wait for the given workers to exit & clean up. -func waitForWorkerExit(m *manager, workerPaths []containerPath) error { +func waitForWorkerExit(m *manager, workerPaths []probeKey) error { const interval = 100 * time.Millisecond - const timeout = 30 * time.Second for _, w := range workerPaths { condition := func() (bool, error) { - _, exists := m.getReadinessProbe(w.podUID, w.containerName) + _, exists := m.getWorker(w.podUID, w.containerName, w.probeType) return !exists, nil } if exited, _ := condition(); exited { continue // Already exited, no need to poll. } glog.Infof("Polling %v", w) - if err := wait.Poll(interval, timeout, condition); err != nil { + if err := wait.Poll(interval, util.ForeverTestTimeout, condition); err != nil { return err } } diff --git a/pkg/kubelet/prober/prober.go b/pkg/kubelet/prober/prober.go index befdcc18b0..b57e120387 100644 --- a/pkg/kubelet/prober/prober.go +++ b/pkg/kubelet/prober/prober.go @@ -39,12 +39,6 @@ import ( const maxProbeRetries = 3 -// Prober checks the healthiness of a container. -type Prober interface { - ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID, createdAt int64) (probe.Result, error) - ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) -} - // Prober helps to check the liveness/readiness of a container. type prober struct { exec execprobe.ExecProber @@ -58,10 +52,10 @@ type prober struct { // NewProber creates a Prober, it takes a command runner and // several container info managers. -func New( +func newProber( runner kubecontainer.ContainerCommandRunner, refManager *kubecontainer.RefManager, - recorder record.EventRecorder) Prober { + recorder record.EventRecorder) *prober { return &prober{ exec: execprobe.New(), @@ -73,9 +67,19 @@ func New( } } -// ProbeLiveness probes the liveness of a container. -// If the initalDelay since container creation on liveness probe has not passed the probe will return probe.Success. -func (pb *prober) ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID, createdAt int64) (probe.Result, error) { +// probe probes the container. +func (pb *prober) probe(probeType probeType, pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) { + switch probeType { + case readiness: + return pb.probeReadiness(pod, status, container, containerID) + case liveness: + return pb.probeLiveness(pod, status, container, containerID) + } + return probe.Unknown, fmt.Errorf("Unknown probe type: %q", probeType) +} + +// probeLiveness probes the liveness of a container. +func (pb *prober) probeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) { var live probe.Result var output string var err error @@ -83,11 +87,7 @@ func (pb *prober) ProbeLiveness(pod *api.Pod, status api.PodStatus, container ap if p == nil { return probe.Success, nil } - if time.Now().Unix()-createdAt < p.InitialDelaySeconds { - return probe.Success, nil - } else { - live, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries) - } + live, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries) ctrName := fmt.Sprintf("%s:%s", kubecontainer.GetPodFullName(pod), container.Name) if err != nil || live != probe.Success { // Liveness failed in one way or another. @@ -113,17 +113,16 @@ func (pb *prober) ProbeLiveness(pod *api.Pod, status api.PodStatus, container ap return probe.Success, nil } -// ProbeReadiness probes and sets the readiness of a container. -func (pb *prober) ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) { +// probeReadiness probes and sets the readiness of a container. +func (pb *prober) probeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) { var ready probe.Result var output string var err error p := container.ReadinessProbe if p == nil { - ready = probe.Success - } else { - ready, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries) + return probe.Success, nil } + ready, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries) ctrName := fmt.Sprintf("%s:%s", kubecontainer.GetPodFullName(pod), container.Name) if err != nil || ready == probe.Failure { // Readiness failed in one way or another. diff --git a/pkg/kubelet/prober/prober_test.go b/pkg/kubelet/prober/prober_test.go index ebf62c59eb..954b9ddf97 100644 --- a/pkg/kubelet/prober/prober_test.go +++ b/pkg/kubelet/prober/prober_test.go @@ -19,7 +19,6 @@ package prober import ( "errors" "testing" - "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/record" @@ -184,7 +183,6 @@ func TestProbeContainer(t *testing.T) { recorder: &record.FakeRecorder{}, } containerID := kubecontainer.ContainerID{"test", "foobar"} - createdAt := time.Now().Unix() tests := []struct { testContainer api.Container @@ -201,14 +199,7 @@ func TestProbeContainer(t *testing.T) { // Only LivenessProbe. expectedReadiness should always be true here. { testContainer: api.Container{ - LivenessProbe: &api.Probe{InitialDelaySeconds: 100}, - }, - expectedLiveness: probe.Success, - expectedReadiness: probe.Success, - }, - { - testContainer: api.Container{ - LivenessProbe: &api.Probe{InitialDelaySeconds: -100}, + LivenessProbe: &api.Probe{}, }, expectedLiveness: probe.Unknown, expectedReadiness: probe.Success, @@ -216,7 +207,6 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ LivenessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -228,7 +218,6 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ LivenessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -240,7 +229,6 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ LivenessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -252,7 +240,6 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ LivenessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -265,7 +252,7 @@ func TestProbeContainer(t *testing.T) { // // Only ReadinessProbe. expectedLiveness should always be probe.Success here. { testContainer: api.Container{ - ReadinessProbe: &api.Probe{InitialDelaySeconds: 100}, + ReadinessProbe: &api.Probe{}, }, expectedLiveness: probe.Success, expectedReadiness: probe.Unknown, @@ -273,7 +260,6 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ ReadinessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -285,7 +271,6 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ ReadinessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -297,7 +282,6 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ ReadinessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -309,7 +293,6 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ ReadinessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -322,32 +305,8 @@ func TestProbeContainer(t *testing.T) { // Both LivenessProbe and ReadinessProbe. { testContainer: api.Container{ - LivenessProbe: &api.Probe{InitialDelaySeconds: 100}, - ReadinessProbe: &api.Probe{InitialDelaySeconds: 100}, - }, - expectedLiveness: probe.Success, - expectedReadiness: probe.Unknown, - }, - { - testContainer: api.Container{ - LivenessProbe: &api.Probe{InitialDelaySeconds: 100}, - ReadinessProbe: &api.Probe{InitialDelaySeconds: -100}, - }, - expectedLiveness: probe.Success, - expectedReadiness: probe.Unknown, - }, - { - testContainer: api.Container{ - LivenessProbe: &api.Probe{InitialDelaySeconds: -100}, - ReadinessProbe: &api.Probe{InitialDelaySeconds: 100}, - }, - expectedLiveness: probe.Unknown, - expectedReadiness: probe.Unknown, - }, - { - testContainer: api.Container{ - LivenessProbe: &api.Probe{InitialDelaySeconds: -100}, - ReadinessProbe: &api.Probe{InitialDelaySeconds: -100}, + LivenessProbe: &api.Probe{}, + ReadinessProbe: &api.Probe{}, }, expectedLiveness: probe.Unknown, expectedReadiness: probe.Unknown, @@ -355,25 +314,11 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ LivenessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, }, - ReadinessProbe: &api.Probe{InitialDelaySeconds: -100}, - }, - expectedLiveness: probe.Unknown, - expectedReadiness: probe.Unknown, - }, - { - testContainer: api.Container{ - LivenessProbe: &api.Probe{ - InitialDelaySeconds: -100, - Handler: api.Handler{ - Exec: &api.ExecAction{}, - }, - }, - ReadinessProbe: &api.Probe{InitialDelaySeconds: -100}, + ReadinessProbe: &api.Probe{}, }, expectedLiveness: probe.Failure, expectedReadiness: probe.Unknown, @@ -381,13 +326,11 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ LivenessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, }, ReadinessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -405,7 +348,7 @@ func TestProbeContainer(t *testing.T) { prober.exec = fakeExecProber{test.expectedLiveness, nil} } - liveness, err := prober.ProbeLiveness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID, createdAt) + liveness, err := prober.probeLiveness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID) if test.expectError && err == nil { t.Errorf("[%d] Expected liveness probe error but no error was returned.", i) } @@ -418,7 +361,7 @@ func TestProbeContainer(t *testing.T) { // TODO: Test readiness errors prober.exec = fakeExecProber{test.expectedReadiness, nil} - readiness, err := prober.ProbeReadiness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID) + readiness, err := prober.probeReadiness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID) if err != nil { t.Errorf("[%d] Unexpected readiness probe error: %v", i, err) } diff --git a/pkg/kubelet/prober/results/results_manager.go b/pkg/kubelet/prober/results/results_manager.go index 208d3d5ffd..eb55f71e85 100644 --- a/pkg/kubelet/prober/results/results_manager.go +++ b/pkg/kubelet/prober/results/results_manager.go @@ -19,17 +19,23 @@ package results import ( "sync" + "k8s.io/kubernetes/pkg/api" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) -// Manager provides a probe results cache. +// Manager provides a probe results cache and channel of updates. type Manager interface { // Get returns the cached result for the container with the given ID. - Get(id kubecontainer.ContainerID) (Result, bool) + Get(kubecontainer.ContainerID) (Result, bool) // Set sets the cached result for the container with the given ID. - Set(id kubecontainer.ContainerID, result Result) + // The pod is only included to be sent with the update. + Set(kubecontainer.ContainerID, Result, *api.Pod) // Remove clears the cached result for the container with the given ID. - Remove(id kubecontainer.ContainerID) + Remove(kubecontainer.ContainerID) + // Updates creates a channel that receives an Update whenever its result changes (but not + // removed). + // NOTE: The current implementation only supports a single updates channel. + Updates() <-chan Update } // Result is the type for probe results. @@ -51,19 +57,36 @@ func (r Result) String() string { } } +// Update is an enum of the types of updates sent over the Updates channel. +type Update struct { + ContainerID kubecontainer.ContainerID + Result Result + Pod *api.Pod +} + // Manager implementation. type manager struct { // guards the cache sync.RWMutex // map of container ID -> probe Result cache map[kubecontainer.ContainerID]Result + // channel of updates (may be nil) + updates chan Update } var _ Manager = &manager{} // NewManager creates ane returns an empty results manager. func NewManager() Manager { - return &manager{cache: make(map[kubecontainer.ContainerID]Result)} + m := &manager{cache: make(map[kubecontainer.ContainerID]Result)} + return m +} + +// NewManager creates ane returns an empty results manager. +func NewManagerWithUpdates() Manager { + m := NewManager().(*manager) + m.updates = make(chan Update, 20) + return m } func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) { @@ -73,13 +96,22 @@ func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) { return result, found } -func (m *manager) Set(id kubecontainer.ContainerID, result Result) { +func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *api.Pod) { + if m.setInternal(id, result) { + m.pushUpdate(Update{id, result, pod}) + } +} + +// Internal helper for locked portion of set. Returns whether an update should be sent. +func (m *manager) setInternal(id kubecontainer.ContainerID, result Result) bool { m.Lock() defer m.Unlock() prev, exists := m.cache[id] if !exists || prev != result { m.cache[id] = result + return true } + return false } func (m *manager) Remove(id kubecontainer.ContainerID) { @@ -87,3 +119,14 @@ func (m *manager) Remove(id kubecontainer.ContainerID) { defer m.Unlock() delete(m.cache, id) } + +func (m *manager) Updates() <-chan Update { + return m.updates +} + +// pushUpdates sends an update on the updates channel if it is initialized. +func (m *manager) pushUpdate(update Update) { + if m.updates != nil { + m.updates <- update + } +} diff --git a/pkg/kubelet/prober/results/results_manager_test.go b/pkg/kubelet/prober/results/results_manager_test.go index d815c1a84b..24b131958b 100644 --- a/pkg/kubelet/prober/results/results_manager_test.go +++ b/pkg/kubelet/prober/results/results_manager_test.go @@ -18,9 +18,12 @@ package results import ( "testing" + "time" "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/api" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/util" ) func TestCacheOperations(t *testing.T) { @@ -32,7 +35,7 @@ func TestCacheOperations(t *testing.T) { _, found := m.Get(unsetID) assert.False(t, found, "unset result found") - m.Set(setID, Success) + m.Set(setID, Success, nil) result, found := m.Get(setID) assert.True(t, result == Success, "set result") assert.True(t, found, "set result found") @@ -41,3 +44,55 @@ func TestCacheOperations(t *testing.T) { _, found = m.Get(setID) assert.False(t, found, "removed result found") } + +func TestUpdates(t *testing.T) { + m := NewManagerWithUpdates() + + pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "test-pod"}} + fooID := kubecontainer.ContainerID{"test", "foo"} + barID := kubecontainer.ContainerID{"test", "bar"} + + expectUpdate := func(expected Update, msg string) { + select { + case u := <-m.Updates(): + if expected != u { + t.Errorf("Expected update %v, recieved %v: %s %s", expected, u, msg) + } + case <-time.After(util.ForeverTestTimeout): + t.Errorf("Timed out waiting for update %v: %s", expected, msg) + } + } + + expectNoUpdate := func(msg string) { + // NOTE: Since updates are accumulated asynchronously, this method is not guaranteed to fail + // when it should. In the event it misses a failure, the following calls to expectUpdate should + // still fail. + select { + case u := <-m.Updates(): + t.Errorf("Unexpected update %v: %s", u, msg) + default: + // Pass + } + } + + // New result should always push an update. + m.Set(fooID, Success, pod) + expectUpdate(Update{fooID, Success, pod}, "new success") + + m.Set(barID, Failure, pod) + expectUpdate(Update{barID, Failure, pod}, "new failure") + + // Unchanged results should not send an update. + m.Set(fooID, Success, pod) + expectNoUpdate("unchanged foo") + + m.Set(barID, Failure, pod) + expectNoUpdate("unchanged bar") + + // Changed results should send an update. + m.Set(fooID, Failure, pod) + expectUpdate(Update{fooID, Failure, pod}, "changed foo") + + m.Set(barID, Success, pod) + expectUpdate(Update{barID, Success, pod}, "changed bar") +} diff --git a/pkg/kubelet/prober/worker.go b/pkg/kubelet/prober/worker.go index 480218efe1..996b85d004 100644 --- a/pkg/kubelet/prober/worker.go +++ b/pkg/kubelet/prober/worker.go @@ -32,7 +32,6 @@ import ( // associated with it which runs the probe loop until the container permanently terminates, or the // stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date // container IDs. -// TODO: Handle liveness probing type worker struct { // Channel for stopping the probe, it should be closed to trigger a stop. stop chan struct{} @@ -46,44 +45,65 @@ type worker struct { // Describes the probe configuration (read-only) spec *api.Probe + // The type of the worker. + probeType probeType + + // The probe value during the initial delay. + initialValue results.Result + + // Where to store this workers results. + resultsManager results.Manager + probeManager *manager + // The last known container ID for this worker. containerID kubecontainer.ContainerID } // Creates and starts a new probe worker. -func (m *manager) newWorker( +func newWorker( + m *manager, + probeType probeType, pod *api.Pod, container api.Container) *worker { w := &worker{ - stop: make(chan struct{}), - pod: pod, - container: container, - spec: container.ReadinessProbe, + stop: make(chan struct{}), + pod: pod, + container: container, + probeType: probeType, + probeManager: m, } - // Start the worker thread. - go run(m, w) + switch probeType { + case readiness: + w.spec = container.ReadinessProbe + w.resultsManager = m.readinessManager + w.initialValue = results.Failure + case liveness: + w.spec = container.LivenessProbe + w.resultsManager = m.livenessManager + w.initialValue = results.Success + } return w } // run periodically probes the container. -func run(m *manager, w *worker) { - probeTicker := time.NewTicker(m.defaultProbePeriod) +func (w *worker) run() { + probeTicker := time.NewTicker(w.probeManager.defaultProbePeriod) defer func() { // Clean up. probeTicker.Stop() if !w.containerID.IsEmpty() { - m.readinessCache.Remove(w.containerID) + w.resultsManager.Remove(w.containerID) } - m.removeReadinessProbe(w.pod.UID, w.container.Name) + w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType) }() probeLoop: - for doProbe(m, w) { + for w.doProbe() { // Wait for next probe tick. select { case <-w.stop: @@ -96,10 +116,10 @@ probeLoop: // doProbe probes the container once and records the result. // Returns whether the worker should continue. -func doProbe(m *manager, w *worker) (keepGoing bool) { +func (w *worker) doProbe() (keepGoing bool) { defer util.HandleCrash(func(_ interface{}) { keepGoing = true }) - status, ok := m.statusManager.GetPodStatus(w.pod.UID) + status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID) if !ok { // Either the pod has not been created yet, or it was already deleted. glog.V(3).Infof("No status for pod: %v", kubeletutil.FormatPodName(w.pod)) @@ -123,7 +143,7 @@ func doProbe(m *manager, w *worker) (keepGoing bool) { if w.containerID.String() != c.ContainerID { if !w.containerID.IsEmpty() { - m.readinessCache.Remove(w.containerID) + w.resultsManager.Remove(w.containerID) } w.containerID = kubecontainer.ParseContainerID(c.ContainerID) } @@ -131,22 +151,23 @@ func doProbe(m *manager, w *worker) (keepGoing bool) { if c.State.Running == nil { glog.V(3).Infof("Non-running container probed: %v - %v", kubeletutil.FormatPodName(w.pod), w.container.Name) - m.readinessCache.Set(w.containerID, results.Failure) + if !w.containerID.IsEmpty() { + w.resultsManager.Set(w.containerID, results.Failure, w.pod) + } // Abort if the container will not be restarted. return c.State.Terminated == nil || w.pod.Spec.RestartPolicy != api.RestartPolicyNever } if int64(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds { - // Readiness defaults to false during the initial delay. - m.readinessCache.Set(w.containerID, results.Failure) + w.resultsManager.Set(w.containerID, w.initialValue, w.pod) return true } // TODO: Move error handling out of prober. - result, _ := m.prober.ProbeReadiness(w.pod, status, w.container, w.containerID) + result, _ := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID) if result != probe.Unknown { - m.readinessCache.Set(w.containerID, result != probe.Failure) + w.resultsManager.Set(w.containerID, result != probe.Failure, w.pod) } return true diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index 3a7a6262f7..7006dc4c42 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -17,14 +17,19 @@ limitations under the License. package prober import ( + "fmt" "testing" "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/probe" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/exec" + "k8s.io/kubernetes/pkg/util/wait" ) const ( @@ -52,12 +57,11 @@ func TestDoProbe(t *testing.T) { failedStatus.Phase = api.PodFailed tests := []struct { - probe api.Probe - podStatus *api.PodStatus - - expectContinue bool - expectReadySet bool - expectedReadiness results.Result + probe api.Probe + podStatus *api.PodStatus + expectContinue bool + expectSet bool + expectedResult results.Result }{ { // No status. expectContinue: true, @@ -72,136 +76,158 @@ func TestDoProbe(t *testing.T) { { // Container waiting podStatus: &pendingStatus, expectContinue: true, - expectReadySet: true, + expectSet: true, }, { // Container terminated - podStatus: &terminatedStatus, - expectReadySet: true, + podStatus: &terminatedStatus, + expectSet: true, }, { // Probe successful. - podStatus: &runningStatus, - expectContinue: true, - expectReadySet: true, - expectedReadiness: results.Success, + podStatus: &runningStatus, + expectContinue: true, + expectSet: true, + expectedResult: results.Success, }, { // Initial delay passed podStatus: &runningStatus, probe: api.Probe{ InitialDelaySeconds: -100, }, - expectContinue: true, - expectReadySet: true, - expectedReadiness: results.Success, + expectContinue: true, + expectSet: true, + expectedResult: results.Success, }, } - for i, test := range tests { - w := newTestWorker(test.probe) - if test.podStatus != nil { - m.statusManager.SetPodStatus(w.pod, *test.podStatus) - } - if c := doProbe(m, w); c != test.expectContinue { - t.Errorf("[%d] Expected continue to be %v but got %v", i, test.expectContinue, c) - } - ready, ok := m.readinessCache.Get(containerID) - if ok != test.expectReadySet { - t.Errorf("[%d] Expected to have readiness: %v but got %v", i, test.expectReadySet, ok) - } - if ready != test.expectedReadiness { - t.Errorf("[%d] Expected readiness: %v but got %v", i, test.expectedReadiness, ready) - } + for _, probeType := range [...]probeType{liveness, readiness} { + for i, test := range tests { + w := newTestWorker(m, probeType, test.probe) + if test.podStatus != nil { + m.statusManager.SetPodStatus(w.pod, *test.podStatus) + } + if c := w.doProbe(); c != test.expectContinue { + t.Errorf("[%s-%d] Expected continue to be %v but got %v", probeType, i, test.expectContinue, c) + } + result, ok := resultsManager(m, probeType).Get(containerID) + if ok != test.expectSet { + t.Errorf("[%s-%d] Expected to have result: %v but got %v", probeType, i, test.expectSet, ok) + } + if result != test.expectedResult { + t.Errorf("[%s-%d] Expected result: %v but got %v", probeType, i, test.expectedResult, result) + } - // Clean up. - m.statusManager.DeletePodStatus(podUID) - m.readinessCache.Remove(containerID) + // Clean up. + m.statusManager.DeletePodStatus(podUID) + resultsManager(m, probeType).Remove(containerID) + } } } func TestInitialDelay(t *testing.T) { m := newTestManager() - w := newTestWorker(api.Probe{ - InitialDelaySeconds: 10, - }) - m.statusManager.SetPodStatus(w.pod, getRunningStatus()) - if !doProbe(m, w) { - t.Errorf("Expected to continue, but did not") - } + for _, probeType := range [...]probeType{liveness, readiness} { + w := newTestWorker(m, probeType, api.Probe{ + InitialDelaySeconds: 10, + }) + m.statusManager.SetPodStatus(w.pod, getRunningStatus()) - ready, ok := m.readinessCache.Get(containerID) - if !ok { - t.Errorf("Expected readiness to be false, but was not set") - } else if ready { - t.Errorf("Expected readiness to be false, but was true") - } + if !w.doProbe() { + t.Errorf("[%s] Expected to continue, but did not", probeType) + } - // 100 seconds later... - laterStatus := getRunningStatus() - laterStatus.ContainerStatuses[0].State.Running.StartedAt.Time = - time.Now().Add(-100 * time.Second) - m.statusManager.SetPodStatus(w.pod, laterStatus) + expectedResult := results.Result(probeType == liveness) + result, ok := resultsManager(m, probeType).Get(containerID) + if !ok { + t.Errorf("[%s] Expected result to be set during initial delay, but was not set", probeType) + } else if result != expectedResult { + t.Errorf("[%s] Expected result to be %v during initial delay, but was %v", + probeType, expectedResult, result) + } - // Second call should succeed (already waited). - if !doProbe(m, w) { - t.Errorf("Expected to continue, but did not") - } + // 100 seconds later... + laterStatus := getRunningStatus() + laterStatus.ContainerStatuses[0].State.Running.StartedAt.Time = + time.Now().Add(-100 * time.Second) + m.statusManager.SetPodStatus(w.pod, laterStatus) - ready, ok = m.readinessCache.Get(containerID) - if !ok { - t.Errorf("Expected readiness to be true, but was not set") - } else if !ready { - t.Errorf("Expected readiness to be true, but was false") + // Second call should succeed (already waited). + if !w.doProbe() { + t.Errorf("[%s] Expected to continue, but did not", probeType) + } + + result, ok = resultsManager(m, probeType).Get(containerID) + if !ok { + t.Errorf("[%s] Expected result to be true, but was not set", probeType) + } else if !result { + t.Errorf("[%s] Expected result to be true, but was false", probeType) + } } } func TestCleanUp(t *testing.T) { m := newTestManager() - pod := getTestPod(api.Probe{}) - m.statusManager.SetPodStatus(&pod, getRunningStatus()) - m.readinessCache.Set(containerID, results.Success) - w := m.newWorker(&pod, pod.Spec.Containers[0]) - m.readinessProbes[containerPath{podUID, containerName}] = w - if ready, _ := m.readinessCache.Get(containerID); !ready { - t.Fatal("Expected readiness to be true.") - } + for _, probeType := range [...]probeType{liveness, readiness} { + key := probeKey{podUID, containerName, probeType} + w := newTestWorker(m, probeType, api.Probe{}) + m.statusManager.SetPodStatus(w.pod, getRunningStatus()) + go w.run() + m.workers[key] = w - close(w.stop) - if err := waitForWorkerExit(m, []containerPath{{podUID, containerName}}); err != nil { - t.Fatal(err) - } + // Wait for worker to run. + condition := func() (bool, error) { + ready, _ := resultsManager(m, probeType).Get(containerID) + return ready == results.Success, nil + } + if ready, _ := condition(); !ready { + if err := wait.Poll(100*time.Millisecond, util.ForeverTestTimeout, condition); err != nil { + t.Fatalf("[%s] Error waiting for worker ready: %v", probeType, err) + } + } - if _, ok := m.readinessCache.Get(containerID); ok { - t.Error("Expected readiness to be cleared.") - } - if _, ok := m.readinessProbes[containerPath{podUID, containerName}]; ok { - t.Error("Expected worker to be cleared.") + close(w.stop) + if err := waitForWorkerExit(m, []probeKey{key}); err != nil { + t.Fatalf("[%s] error waiting for worker exit: %v", probeType, err) + } + + if _, ok := resultsManager(m, probeType).Get(containerID); ok { + t.Errorf("[%s] Expected result to be cleared.", probeType) + } + if _, ok := m.workers[key]; ok { + t.Errorf("[%s] Expected worker to be cleared.", probeType) + } } } func TestHandleCrash(t *testing.T) { m := newTestManager() - m.prober = CrashingProber{} - w := newTestWorker(api.Probe{}) + m.prober = &prober{ + refManager: kubecontainer.NewRefManager(), + recorder: &record.FakeRecorder{}, + exec: crashingExecProber{}, + } + + w := newTestWorker(m, readiness, api.Probe{}) m.statusManager.SetPodStatus(w.pod, getRunningStatus()) // doProbe should recover from the crash, and keep going. - if !doProbe(m, w) { + if !w.doProbe() { t.Error("Expected to keep going, but terminated.") } - if _, ok := m.readinessCache.Get(containerID); ok { + if _, ok := m.readinessManager.Get(containerID); ok { t.Error("Expected readiness to be unchanged from crash.") } } -func newTestWorker(probeSpec api.Probe) *worker { - pod := getTestPod(probeSpec) - return &worker{ - stop: make(chan struct{}), - pod: &pod, - container: pod.Spec.Containers[0], - spec: &probeSpec, +func newTestWorker(m *manager, probeType probeType, probeSpec api.Probe) *worker { + // All tests rely on the fake exec prober. + probeSpec.Handler = api.Handler{ + Exec: &api.ExecAction{}, } + + pod := getTestPod(probeType, probeSpec) + return newWorker(m, probeType, &pod, pod.Spec.Containers[0]) } func getRunningStatus() api.PodStatus { @@ -217,10 +243,15 @@ func getRunningStatus() api.PodStatus { return podStatus } -func getTestPod(probeSpec api.Probe) api.Pod { +func getTestPod(probeType probeType, probeSpec api.Probe) api.Pod { container := api.Container{ - Name: containerName, - ReadinessProbe: &probeSpec, + Name: containerName, + } + switch probeType { + case readiness: + container.ReadinessProbe = &probeSpec + case liveness: + container.LivenessProbe = &probeSpec } pod := api.Pod{ Spec: api.PodSpec{ @@ -232,12 +263,18 @@ func getTestPod(probeSpec api.Probe) api.Pod { return pod } -type CrashingProber struct{} - -func (f CrashingProber) ProbeLiveness(_ *api.Pod, _ api.PodStatus, c api.Container, _ kubecontainer.ContainerID, _ int64) (probe.Result, error) { - panic("Intentional ProbeLiveness crash.") +func resultsManager(m *manager, probeType probeType) results.Manager { + switch probeType { + case readiness: + return m.readinessManager + case liveness: + return m.livenessManager + } + panic(fmt.Errorf("Unhandled case: %v", probeType)) } -func (f CrashingProber) ProbeReadiness(_ *api.Pod, _ api.PodStatus, c api.Container, _ kubecontainer.ContainerID) (probe.Result, error) { - panic("Intentional ProbeReadiness crash.") +type crashingExecProber struct{} + +func (p crashingExecProber) Probe(_ exec.Cmd) (probe.Result, string, error) { + panic("Intentional Probe crash.") } diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 1865a7b456..23a59ffedc 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -41,9 +41,8 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/credentialprovider" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/kubelet/prober" + proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" kubeletutil "k8s.io/kubernetes/pkg/kubelet/util" - "k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" @@ -89,7 +88,7 @@ type Runtime struct { containerRefManager *kubecontainer.RefManager generator kubecontainer.RunContainerOptionsGenerator recorder record.EventRecorder - prober prober.Prober + livenessManager proberesults.Manager volumeGetter volumeGetter imagePuller kubecontainer.ImagePuller } @@ -108,8 +107,9 @@ func New(config *Config, generator kubecontainer.RunContainerOptionsGenerator, recorder record.EventRecorder, containerRefManager *kubecontainer.RefManager, - prober prober.Prober, - volumeGetter volumeGetter, imageBackOff *util.Backoff) (*Runtime, error) { + livenessManager proberesults.Manager, + volumeGetter volumeGetter, + imageBackOff *util.Backoff) (*Runtime, error) { systemdVersion, err := getSystemdVersion() if err != nil { @@ -146,7 +146,7 @@ func New(config *Config, containerRefManager: containerRefManager, generator: generator, recorder: recorder, - prober: prober, + livenessManager: livenessManager, volumeGetter: volumeGetter, } rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt, imageBackOff) @@ -1032,17 +1032,13 @@ func (r *Runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus break } - result, err := r.prober.ProbeLiveness(pod, podStatus, container, c.ID, c.Created) - // TODO(vmarmol): examine this logic. - if err == nil && result != probe.Success && pod.Spec.RestartPolicy != api.RestartPolicyNever { - glog.Infof("Pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result) + liveness, found := r.livenessManager.Get(c.ID) + if found && liveness != proberesults.Success && pod.Spec.RestartPolicy != api.RestartPolicyNever { + glog.Infof("Pod %q container %q is unhealthy, it will be killed and re-created.", podFullName, container.Name) restartPod = true break } - if err != nil { - glog.V(2).Infof("Probe container %q failed: %v", container.Name, err) - } delete(unidentifiedContainers, c.ID) }