Refactor liveness probing

This commit builds on previous work and creates an independent
worker for every liveness probe. Liveness probes behave largely the same
as readiness probes, so much of the code is shared by introducing a
probeType paramater to distinguish the type when it matters. The
circular dependency between the runtime and the prober is broken by
exposing a shared liveness ResultsManager, owned by the
kubelet. Finally, an Updates channel is introduced to the ResultsManager
so the kubelet can react to unhealthy containers immediately.
pull/6/head
Tim St. Clair 2015-10-19 15:15:59 -07:00
parent 0d7b53a201
commit a263c77b65
16 changed files with 510 additions and 396 deletions

View File

@ -21,7 +21,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/prober"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/oom"
@ -31,7 +31,7 @@ import (
func NewFakeDockerManager(
client DockerInterface,
recorder record.EventRecorder,
prober prober.Prober,
livenessManager proberesults.Manager,
containerRefManager *kubecontainer.RefManager,
machineInfo *cadvisorapi.MachineInfo,
podInfraContainerImage string,
@ -45,7 +45,7 @@ func NewFakeDockerManager(
fakeOOMAdjuster := oom.NewFakeOOMAdjuster()
fakeProcFs := procfs.NewFakeProcFs()
dm := NewDockerManager(client, recorder, prober, containerRefManager, machineInfo, podInfraContainerImage, qps,
dm := NewDockerManager(client, recorder, livenessManager, containerRefManager, machineInfo, podInfraContainerImage, qps,
burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{},
fakeOOMAdjuster, fakeProcFs, false, imageBackOff)
dm.dockerPuller = &FakeDockerPuller{}

View File

@ -44,10 +44,9 @@ import (
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/network/hairpin"
"k8s.io/kubernetes/pkg/kubelet/prober"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/qos"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
@ -119,8 +118,8 @@ type DockerManager struct {
// Network plugin.
networkPlugin network.NetworkPlugin
// Health check prober.
prober prober.Prober
// Health check results.
livenessManager proberesults.Manager
// Generator of runtime container options.
generator kubecontainer.RunContainerOptionsGenerator
@ -147,7 +146,7 @@ type DockerManager struct {
func NewDockerManager(
client DockerInterface,
recorder record.EventRecorder,
prober prober.Prober,
livenessManager proberesults.Manager,
containerRefManager *kubecontainer.RefManager,
machineInfo *cadvisorapi.MachineInfo,
podInfraContainerImage string,
@ -208,7 +207,7 @@ func NewDockerManager(
dockerRoot: dockerRoot,
containerLogsDir: containerLogsDir,
networkPlugin: networkPlugin,
prober: prober,
livenessManager: livenessManager,
generator: generator,
execHandler: execHandler,
oomAdjuster: oomAdjuster,
@ -1762,20 +1761,13 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kub
continue
}
result, err := dm.prober.ProbeLiveness(pod, podStatus, container, c.ID, c.Created)
if err != nil {
// TODO(vmarmol): examine this logic.
glog.V(2).Infof("probe no-error: %q", container.Name)
containersToKeep[containerID] = index
continue
}
if result == probe.Success {
glog.V(4).Infof("probe success: %q", container.Name)
liveness, found := dm.livenessManager.Get(c.ID)
if !found || liveness == proberesults.Success {
containersToKeep[containerID] = index
continue
}
if pod.Spec.RestartPolicy != api.RestartPolicyNever {
glog.Infof("pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result)
glog.Infof("pod %q container %q is unhealthy, it will be killed and re-created.", podFullName, container.Name)
containersToStart[index] = empty{}
}
}

View File

@ -38,7 +38,8 @@ import (
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/prober"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
uexec "k8s.io/kubernetes/pkg/util/exec"
@ -83,7 +84,7 @@ func newTestDockerManagerWithHTTPClient(fakeHTTPClient *fakeHTTP) (*DockerManage
dockerManager := NewFakeDockerManager(
fakeDocker,
fakeRecorder,
prober.FakeProber{},
proberesults.NewManager(),
containerRefManager,
&cadvisorapi.MachineInfo{},
PodInfraContainerImage,
@ -854,6 +855,10 @@ func TestSyncPodBadHash(t *testing.T) {
}
func TestSyncPodsUnhealthy(t *testing.T) {
const (
unhealthyContainerID = "1234"
infraContainerID = "9876"
)
dm, fakeDocker := newTestDockerManager()
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
@ -862,40 +867,35 @@ func TestSyncPodsUnhealthy(t *testing.T) {
Namespace: "new",
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "bar",
LivenessProbe: &api.Probe{
// Always returns healthy == false
},
},
},
Containers: []api.Container{{Name: "unhealthy"}},
},
}
fakeDocker.ContainerList = []docker.APIContainers{
{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s_bar_foo_new_12345678_42"},
ID: "1234",
Names: []string{"/k8s_unhealthy_foo_new_12345678_42"},
ID: unhealthyContainerID,
},
{
// pod infra container
Names: []string{"/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(pod), 16) + "_foo_new_12345678_42"},
ID: "9876",
ID: infraContainerID,
},
}
fakeDocker.ContainerMap = map[string]*docker.Container{
"1234": {
ID: "1234",
unhealthyContainerID: {
ID: unhealthyContainerID,
Config: &docker.Config{},
HostConfig: &docker.HostConfig{},
},
"9876": {
ID: "9876",
infraContainerID: {
ID: infraContainerID,
Config: &docker.Config{},
HostConfig: &docker.HostConfig{},
},
}
dm.livenessManager.Set(kubetypes.DockerID(unhealthyContainerID).ContainerID(), proberesults.Failure, nil)
runSyncPod(t, dm, fakeDocker, pod, nil)
@ -908,7 +908,7 @@ func TestSyncPodsUnhealthy(t *testing.T) {
"create", "start", "inspect_container",
})
if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil {
if err := fakeDocker.AssertStopped([]string{unhealthyContainerID}); err != nil {
t.Errorf("%v", err)
}
}

View File

@ -54,12 +54,12 @@ import (
"k8s.io/kubernetes/pkg/kubelet/network"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/prober"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/rkt"
"k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
@ -309,6 +309,10 @@ func NewMainKubelet(
procFs := procfs.NewProcFs()
imageBackOff := util.NewBackOff(resyncInterval, MaxContainerBackOff)
readinessManager := proberesults.NewManager()
klet.livenessManager = proberesults.NewManagerWithUpdates()
// Initialize the runtime.
switch containerRuntime {
case "docker":
@ -316,7 +320,7 @@ func NewMainKubelet(
klet.containerRuntime = dockertools.NewDockerManager(
dockerClient,
recorder,
klet, // prober
klet.livenessManager,
containerRefManager,
machineInfo,
podInfraContainerImage,
@ -344,7 +348,7 @@ func NewMainKubelet(
klet,
recorder,
containerRefManager,
klet, // prober
klet.livenessManager,
klet.volumeManager,
imageBackOff)
if err != nil {
@ -396,11 +400,14 @@ func NewMainKubelet(
klet.runner = klet.containerRuntime
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
klet.prober = prober.New(klet.runner, containerRefManager, recorder)
klet.probeManager = prober.NewManager(
klet.resyncInterval,
klet.statusManager,
klet.prober)
readinessManager,
klet.livenessManager,
klet.runner,
containerRefManager,
recorder)
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {
@ -508,10 +515,10 @@ type Kubelet struct {
// Network plugin.
networkPlugin network.NetworkPlugin
// Handles container readiness probing
// Handles container probing.
probeManager prober.Manager
// TODO: Move prober ownership to the probeManager once the runtime no longer depends on it.
prober prober.Prober
// Manages container health check results.
livenessManager proberesults.Manager
// How long to keep idle streaming command execution/port forwarding
// connections open before terminating them
@ -1982,6 +1989,12 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
// Periodically syncs all the pods and performs cleanup tasks.
glog.V(4).Infof("SyncLoop (periodic sync)")
handler.HandlePodSyncs(kl.podManager.GetPods())
case update := <-kl.livenessManager.Updates():
// We only care about failures (signalling container death) here.
if update.Result == proberesults.Failure {
glog.V(1).Infof("SyncLoop (container unhealthy).")
handler.HandlePodSyncs([]*api.Pod{update.Pod})
}
}
kl.syncLoopMonitor.Store(time.Now())
return true
@ -2831,16 +2844,6 @@ func (kl *Kubelet) GetRuntime() kubecontainer.Runtime {
return kl.containerRuntime
}
// Proxy prober calls through the Kubelet to break the circular dependency between the runtime &
// prober.
// TODO: Remove this hack once the runtime no longer depends on the prober.
func (kl *Kubelet) ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID, createdAt int64) (probe.Result, error) {
return kl.prober.ProbeLiveness(pod, status, container, containerID, createdAt)
}
func (kl *Kubelet) ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) {
return kl.prober.ProbeReadiness(pod, status, container, containerID)
}
var minRsrc = resource.MustParse("1k")
var maxRsrc = resource.MustParse("1P")

View File

@ -47,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/network"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/prober"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/runtime"
@ -134,8 +135,8 @@ func newTestKubelet(t *testing.T) *TestKubelet {
t: t,
}
kubelet.prober = prober.FakeProber{}
kubelet.probeManager = prober.FakeManager{}
kubelet.livenessManager = proberesults.NewManager()
kubelet.volumeManager = newVolumeManager()
kubelet.containerManager, _ = newContainerManager(fakeContainerMgrMountInt(), mockCadvisor, "", "", "")

View File

@ -37,7 +37,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/prober"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -152,7 +152,7 @@ func newTestDockerManager() (*dockertools.DockerManager, *dockertools.FakeDocker
dockerManager := dockertools.NewFakeDockerManager(
fakeDocker,
fakeRecorder,
prober.FakeProber{},
proberesults.NewManager(),
containerRefManager,
&cadvisorapi.MachineInfo{},
dockertools.PodInfraContainerImage,

View File

@ -1,45 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/probe"
)
var _ Prober = FakeProber{}
type FakeProber struct {
Readiness probe.Result
Liveness probe.Result
Error error
}
func (f FakeProber) ProbeLiveness(_ *api.Pod, _ api.PodStatus, c api.Container, _ kubecontainer.ContainerID, _ int64) (probe.Result, error) {
if c.LivenessProbe == nil {
return probe.Success, nil
}
return f.Liveness, f.Error
}
func (f FakeProber) ProbeReadiness(_ *api.Pod, _ api.PodStatus, c api.Container, _ kubecontainer.ContainerID) (probe.Result, error) {
if c.ReadinessProbe == nil {
return probe.Success, nil
}
return f.Readiness, f.Error
}

View File

@ -22,9 +22,11 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
kubeutil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -53,19 +55,22 @@ type Manager interface {
}
type manager struct {
// Caches the results of readiness probes.
readinessCache results.Manager
// Map of active workers for readiness
readinessProbes map[containerPath]*worker
// Lock for accessing & mutating readinessProbes
// Map of active workers for probes
workers map[probeKey]*worker
// Lock for accessing & mutating workers
workerLock sync.RWMutex
// The statusManager cache provides pod IP and container IDs for probing.
statusManager status.Manager
// readinessManager manages the results of readiness probes
readinessManager results.Manager
// livenessManager manages the results of liveness probes
livenessManager results.Manager
// prober executes the probe actions.
prober Prober
prober *prober
// Default period for workers to execute a probe.
defaultProbePeriod time.Duration
@ -74,36 +79,79 @@ type manager struct {
func NewManager(
defaultProbePeriod time.Duration,
statusManager status.Manager,
prober Prober) Manager {
readinessManager results.Manager,
livenessManager results.Manager,
runner kubecontainer.ContainerCommandRunner,
refManager *kubecontainer.RefManager,
recorder record.EventRecorder) Manager {
prober := newProber(runner, refManager, recorder)
return &manager{
defaultProbePeriod: defaultProbePeriod,
statusManager: statusManager,
prober: prober,
readinessCache: results.NewManager(),
readinessProbes: make(map[containerPath]*worker),
readinessManager: readinessManager,
livenessManager: livenessManager,
workers: make(map[probeKey]*worker),
}
}
// Key uniquely identifying containers
type containerPath struct {
// Key uniquely identifying container probes
type probeKey struct {
podUID types.UID
containerName string
probeType probeType
}
// Type of probe (readiness or liveness)
type probeType int
const (
liveness probeType = iota
readiness
)
// For debugging.
func (t probeType) String() string {
switch t {
case readiness:
return "Readiness"
case liveness:
return "Liveness"
default:
return "UNKNOWN"
}
}
func (m *manager) AddPod(pod *api.Pod) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
key := containerPath{podUID: pod.UID}
key := probeKey{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
if _, ok := m.readinessProbes[key]; ok {
glog.Errorf("Readiness probe already exists! %v - %v",
kubecontainer.GetPodFullName(pod), c.Name)
return
}
if c.ReadinessProbe != nil {
m.readinessProbes[key] = m.newWorker(pod, c)
key.probeType = readiness
if _, ok := m.workers[key]; ok {
glog.Errorf("Readiness probe already exists! %v - %v",
kubeutil.FormatPodName(pod), c.Name)
return
}
w := newWorker(m, readiness, pod, c)
m.workers[key] = w
go w.run()
}
if c.LivenessProbe != nil {
key.probeType = liveness
if _, ok := m.workers[key]; ok {
glog.Errorf("Liveness probe already exists! %v - %v",
kubeutil.FormatPodName(pod), c.Name)
return
}
w := newWorker(m, liveness, pod, c)
m.workers[key] = w
go w.run()
}
}
}
@ -112,11 +160,14 @@ func (m *manager) RemovePod(pod *api.Pod) {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
key := containerPath{podUID: pod.UID}
key := probeKey{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
if worker, ok := m.readinessProbes[key]; ok {
close(worker.stop)
for _, probeType := range [...]probeType{readiness, liveness} {
key.probeType = probeType
if worker, ok := m.workers[key]; ok {
close(worker.stop)
}
}
}
}
@ -130,8 +181,8 @@ func (m *manager) CleanupPods(activePods []*api.Pod) {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
for path, worker := range m.readinessProbes {
if _, ok := desiredPods[path.podUID]; !ok {
for key, worker := range m.workers {
if _, ok := desiredPods[key.podUID]; !ok {
close(worker.stop)
}
}
@ -142,28 +193,27 @@ func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *api.PodStatus) {
var ready bool
if c.State.Running == nil {
ready = false
} else if result, ok := m.readinessCache.Get(
kubecontainer.ParseContainerID(c.ContainerID)); ok {
} else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok {
ready = result == results.Success
} else {
// The check whether there is a probe which hasn't run yet.
_, exists := m.getReadinessProbe(podUID, c.Name)
_, exists := m.getWorker(podUID, c.Name, readiness)
ready = !exists
}
podStatus.ContainerStatuses[i].Ready = ready
}
}
func (m *manager) getReadinessProbe(podUID types.UID, containerName string) (*worker, bool) {
func (m *manager) getWorker(podUID types.UID, containerName string, probeType probeType) (*worker, bool) {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
probe, ok := m.readinessProbes[containerPath{podUID, containerName}]
return probe, ok
worker, ok := m.workers[probeKey{podUID, containerName, probeType}]
return worker, ok
}
// Called by the worker after exiting.
func (m *manager) removeReadinessProbe(podUID types.UID, containerName string) {
func (m *manager) removeWorker(podUID types.UID, containerName string, probeType probeType) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
delete(m.readinessProbes, containerPath{podUID, containerName})
delete(m.workers, probeKey{podUID, containerName, probeType})
}

View File

@ -23,11 +23,13 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
)
@ -53,13 +55,13 @@ func TestAddRemovePods(t *testing.T) {
Containers: []api.Container{{
Name: "no_probe1",
}, {
Name: "prober1",
Name: "readiness",
ReadinessProbe: &api.Probe{},
}, {
Name: "no_probe2",
}, {
Name: "prober2",
ReadinessProbe: &api.Probe{},
Name: "liveness",
LivenessProbe: &api.Probe{},
}},
},
}
@ -77,7 +79,10 @@ func TestAddRemovePods(t *testing.T) {
// Adding a pod with probes.
m.AddPod(&probePod)
probePaths := []containerPath{{"probe_pod", "prober1"}, {"probe_pod", "prober2"}}
probePaths := []probeKey{
{"probe_pod", "readiness", readiness},
{"probe_pod", "liveness", liveness},
}
if err := expectProbes(m, probePaths); err != nil {
t.Error(err)
}
@ -115,8 +120,8 @@ func TestCleanupPods(t *testing.T) {
Name: "prober1",
ReadinessProbe: &api.Probe{},
}, {
Name: "prober2",
ReadinessProbe: &api.Probe{},
Name: "prober2",
LivenessProbe: &api.Probe{},
}},
},
}
@ -129,8 +134,8 @@ func TestCleanupPods(t *testing.T) {
Name: "prober1",
ReadinessProbe: &api.Probe{},
}, {
Name: "prober2",
ReadinessProbe: &api.Probe{},
Name: "prober2",
LivenessProbe: &api.Probe{},
}},
},
}
@ -139,8 +144,14 @@ func TestCleanupPods(t *testing.T) {
m.CleanupPods([]*api.Pod{&podToKeep})
removedProbes := []containerPath{{"pod_cleanup", "prober1"}, {"pod_cleanup", "prober2"}}
expectedProbes := []containerPath{{"pod_keep", "prober1"}, {"pod_keep", "prober2"}}
removedProbes := []probeKey{
{"pod_cleanup", "prober1", readiness},
{"pod_cleanup", "prober2", liveness},
}
expectedProbes := []probeKey{
{"pod_keep", "prober1", readiness},
{"pod_keep", "prober2", liveness},
}
if err := waitForWorkerExit(m, removedProbes); err != nil {
t.Fatal(err)
}
@ -195,28 +206,28 @@ func TestUpdatePodStatus(t *testing.T) {
m := newTestManager()
// Setup probe "workers" and cached results.
m.readinessProbes = map[containerPath]*worker{
containerPath{podUID, probedReady.Name}: {},
containerPath{podUID, probedPending.Name}: {},
containerPath{podUID, probedUnready.Name}: {},
containerPath{podUID, terminated.Name}: {},
m.workers = map[probeKey]*worker{
probeKey{podUID, unprobed.Name, liveness}: {},
probeKey{podUID, probedReady.Name, readiness}: {},
probeKey{podUID, probedPending.Name, readiness}: {},
probeKey{podUID, probedUnready.Name, readiness}: {},
probeKey{podUID, terminated.Name, readiness}: {},
}
m.readinessCache.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success)
m.readinessCache.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure)
m.readinessCache.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success)
m.readinessManager.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success, nil)
m.readinessManager.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure, nil)
m.readinessManager.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success, nil)
m.UpdatePodStatus(podUID, &podStatus)
expectedReadiness := map[containerPath]bool{
containerPath{podUID, unprobed.Name}: true,
containerPath{podUID, probedReady.Name}: true,
containerPath{podUID, probedPending.Name}: false,
containerPath{podUID, probedUnready.Name}: false,
containerPath{podUID, terminated.Name}: false,
expectedReadiness := map[probeKey]bool{
probeKey{podUID, unprobed.Name, readiness}: true,
probeKey{podUID, probedReady.Name, readiness}: true,
probeKey{podUID, probedPending.Name, readiness}: false,
probeKey{podUID, probedUnready.Name, readiness}: false,
probeKey{podUID, terminated.Name, readiness}: false,
}
for _, c := range podStatus.ContainerStatuses {
expected, ok := expectedReadiness[containerPath{podUID, c.Name}]
expected, ok := expectedReadiness[probeKey{podUID, c.Name, readiness}]
if !ok {
t.Fatalf("Missing expectation for test case: %v", c.Name)
}
@ -227,16 +238,16 @@ func TestUpdatePodStatus(t *testing.T) {
}
}
func expectProbes(m *manager, expectedReadinessProbes []containerPath) error {
func expectProbes(m *manager, expectedProbes []probeKey) error {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
var unexpected []containerPath
missing := make([]containerPath, len(expectedReadinessProbes))
copy(missing, expectedReadinessProbes)
var unexpected []probeKey
missing := make([]probeKey, len(expectedProbes))
copy(missing, expectedProbes)
outer:
for probePath := range m.readinessProbes {
for probePath := range m.workers {
for i, expectedPath := range missing {
if probePath == expectedPath {
missing = append(missing[:i], missing[i+1:]...)
@ -255,26 +266,34 @@ outer:
func newTestManager() *manager {
const probePeriod = 1
statusManager := status.NewManager(&testclient.Fake{})
prober := FakeProber{Readiness: probe.Success}
return NewManager(probePeriod, statusManager, prober).(*manager)
m := NewManager(
probePeriod,
status.NewManager(&testclient.Fake{}),
results.NewManager(),
results.NewManager(),
nil, // runner
kubecontainer.NewRefManager(),
&record.FakeRecorder{},
).(*manager)
// Don't actually execute probes.
m.prober.exec = fakeExecProber{probe.Success, nil}
return m
}
// Wait for the given workers to exit & clean up.
func waitForWorkerExit(m *manager, workerPaths []containerPath) error {
func waitForWorkerExit(m *manager, workerPaths []probeKey) error {
const interval = 100 * time.Millisecond
const timeout = 30 * time.Second
for _, w := range workerPaths {
condition := func() (bool, error) {
_, exists := m.getReadinessProbe(w.podUID, w.containerName)
_, exists := m.getWorker(w.podUID, w.containerName, w.probeType)
return !exists, nil
}
if exited, _ := condition(); exited {
continue // Already exited, no need to poll.
}
glog.Infof("Polling %v", w)
if err := wait.Poll(interval, timeout, condition); err != nil {
if err := wait.Poll(interval, util.ForeverTestTimeout, condition); err != nil {
return err
}
}

View File

@ -39,12 +39,6 @@ import (
const maxProbeRetries = 3
// Prober checks the healthiness of a container.
type Prober interface {
ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID, createdAt int64) (probe.Result, error)
ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error)
}
// Prober helps to check the liveness/readiness of a container.
type prober struct {
exec execprobe.ExecProber
@ -58,10 +52,10 @@ type prober struct {
// NewProber creates a Prober, it takes a command runner and
// several container info managers.
func New(
func newProber(
runner kubecontainer.ContainerCommandRunner,
refManager *kubecontainer.RefManager,
recorder record.EventRecorder) Prober {
recorder record.EventRecorder) *prober {
return &prober{
exec: execprobe.New(),
@ -73,9 +67,19 @@ func New(
}
}
// ProbeLiveness probes the liveness of a container.
// If the initalDelay since container creation on liveness probe has not passed the probe will return probe.Success.
func (pb *prober) ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID, createdAt int64) (probe.Result, error) {
// probe probes the container.
func (pb *prober) probe(probeType probeType, pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) {
switch probeType {
case readiness:
return pb.probeReadiness(pod, status, container, containerID)
case liveness:
return pb.probeLiveness(pod, status, container, containerID)
}
return probe.Unknown, fmt.Errorf("Unknown probe type: %q", probeType)
}
// probeLiveness probes the liveness of a container.
func (pb *prober) probeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) {
var live probe.Result
var output string
var err error
@ -83,11 +87,7 @@ func (pb *prober) ProbeLiveness(pod *api.Pod, status api.PodStatus, container ap
if p == nil {
return probe.Success, nil
}
if time.Now().Unix()-createdAt < p.InitialDelaySeconds {
return probe.Success, nil
} else {
live, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries)
}
live, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries)
ctrName := fmt.Sprintf("%s:%s", kubecontainer.GetPodFullName(pod), container.Name)
if err != nil || live != probe.Success {
// Liveness failed in one way or another.
@ -113,17 +113,16 @@ func (pb *prober) ProbeLiveness(pod *api.Pod, status api.PodStatus, container ap
return probe.Success, nil
}
// ProbeReadiness probes and sets the readiness of a container.
func (pb *prober) ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) {
// probeReadiness probes and sets the readiness of a container.
func (pb *prober) probeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) {
var ready probe.Result
var output string
var err error
p := container.ReadinessProbe
if p == nil {
ready = probe.Success
} else {
ready, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries)
return probe.Success, nil
}
ready, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries)
ctrName := fmt.Sprintf("%s:%s", kubecontainer.GetPodFullName(pod), container.Name)
if err != nil || ready == probe.Failure {
// Readiness failed in one way or another.

View File

@ -19,7 +19,6 @@ package prober
import (
"errors"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
@ -184,7 +183,6 @@ func TestProbeContainer(t *testing.T) {
recorder: &record.FakeRecorder{},
}
containerID := kubecontainer.ContainerID{"test", "foobar"}
createdAt := time.Now().Unix()
tests := []struct {
testContainer api.Container
@ -201,14 +199,7 @@ func TestProbeContainer(t *testing.T) {
// Only LivenessProbe. expectedReadiness should always be true here.
{
testContainer: api.Container{
LivenessProbe: &api.Probe{InitialDelaySeconds: 100},
},
expectedLiveness: probe.Success,
expectedReadiness: probe.Success,
},
{
testContainer: api.Container{
LivenessProbe: &api.Probe{InitialDelaySeconds: -100},
LivenessProbe: &api.Probe{},
},
expectedLiveness: probe.Unknown,
expectedReadiness: probe.Success,
@ -216,7 +207,6 @@ func TestProbeContainer(t *testing.T) {
{
testContainer: api.Container{
LivenessProbe: &api.Probe{
InitialDelaySeconds: -100,
Handler: api.Handler{
Exec: &api.ExecAction{},
},
@ -228,7 +218,6 @@ func TestProbeContainer(t *testing.T) {
{
testContainer: api.Container{
LivenessProbe: &api.Probe{
InitialDelaySeconds: -100,
Handler: api.Handler{
Exec: &api.ExecAction{},
},
@ -240,7 +229,6 @@ func TestProbeContainer(t *testing.T) {
{
testContainer: api.Container{
LivenessProbe: &api.Probe{
InitialDelaySeconds: -100,
Handler: api.Handler{
Exec: &api.ExecAction{},
},
@ -252,7 +240,6 @@ func TestProbeContainer(t *testing.T) {
{
testContainer: api.Container{
LivenessProbe: &api.Probe{
InitialDelaySeconds: -100,
Handler: api.Handler{
Exec: &api.ExecAction{},
},
@ -265,7 +252,7 @@ func TestProbeContainer(t *testing.T) {
// // Only ReadinessProbe. expectedLiveness should always be probe.Success here.
{
testContainer: api.Container{
ReadinessProbe: &api.Probe{InitialDelaySeconds: 100},
ReadinessProbe: &api.Probe{},
},
expectedLiveness: probe.Success,
expectedReadiness: probe.Unknown,
@ -273,7 +260,6 @@ func TestProbeContainer(t *testing.T) {
{
testContainer: api.Container{
ReadinessProbe: &api.Probe{
InitialDelaySeconds: -100,
Handler: api.Handler{
Exec: &api.ExecAction{},
},
@ -285,7 +271,6 @@ func TestProbeContainer(t *testing.T) {
{
testContainer: api.Container{
ReadinessProbe: &api.Probe{
InitialDelaySeconds: -100,
Handler: api.Handler{
Exec: &api.ExecAction{},
},
@ -297,7 +282,6 @@ func TestProbeContainer(t *testing.T) {
{
testContainer: api.Container{
ReadinessProbe: &api.Probe{
InitialDelaySeconds: -100,
Handler: api.Handler{
Exec: &api.ExecAction{},
},
@ -309,7 +293,6 @@ func TestProbeContainer(t *testing.T) {
{
testContainer: api.Container{
ReadinessProbe: &api.Probe{
InitialDelaySeconds: -100,
Handler: api.Handler{
Exec: &api.ExecAction{},
},
@ -322,32 +305,8 @@ func TestProbeContainer(t *testing.T) {
// Both LivenessProbe and ReadinessProbe.
{
testContainer: api.Container{
LivenessProbe: &api.Probe{InitialDelaySeconds: 100},
ReadinessProbe: &api.Probe{InitialDelaySeconds: 100},
},
expectedLiveness: probe.Success,
expectedReadiness: probe.Unknown,
},
{
testContainer: api.Container{
LivenessProbe: &api.Probe{InitialDelaySeconds: 100},
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
},
expectedLiveness: probe.Success,
expectedReadiness: probe.Unknown,
},
{
testContainer: api.Container{
LivenessProbe: &api.Probe{InitialDelaySeconds: -100},
ReadinessProbe: &api.Probe{InitialDelaySeconds: 100},
},
expectedLiveness: probe.Unknown,
expectedReadiness: probe.Unknown,
},
{
testContainer: api.Container{
LivenessProbe: &api.Probe{InitialDelaySeconds: -100},
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
LivenessProbe: &api.Probe{},
ReadinessProbe: &api.Probe{},
},
expectedLiveness: probe.Unknown,
expectedReadiness: probe.Unknown,
@ -355,25 +314,11 @@ func TestProbeContainer(t *testing.T) {
{
testContainer: api.Container{
LivenessProbe: &api.Probe{
InitialDelaySeconds: -100,
Handler: api.Handler{
Exec: &api.ExecAction{},
},
},
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
},
expectedLiveness: probe.Unknown,
expectedReadiness: probe.Unknown,
},
{
testContainer: api.Container{
LivenessProbe: &api.Probe{
InitialDelaySeconds: -100,
Handler: api.Handler{
Exec: &api.ExecAction{},
},
},
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
ReadinessProbe: &api.Probe{},
},
expectedLiveness: probe.Failure,
expectedReadiness: probe.Unknown,
@ -381,13 +326,11 @@ func TestProbeContainer(t *testing.T) {
{
testContainer: api.Container{
LivenessProbe: &api.Probe{
InitialDelaySeconds: -100,
Handler: api.Handler{
Exec: &api.ExecAction{},
},
},
ReadinessProbe: &api.Probe{
InitialDelaySeconds: -100,
Handler: api.Handler{
Exec: &api.ExecAction{},
},
@ -405,7 +348,7 @@ func TestProbeContainer(t *testing.T) {
prober.exec = fakeExecProber{test.expectedLiveness, nil}
}
liveness, err := prober.ProbeLiveness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID, createdAt)
liveness, err := prober.probeLiveness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID)
if test.expectError && err == nil {
t.Errorf("[%d] Expected liveness probe error but no error was returned.", i)
}
@ -418,7 +361,7 @@ func TestProbeContainer(t *testing.T) {
// TODO: Test readiness errors
prober.exec = fakeExecProber{test.expectedReadiness, nil}
readiness, err := prober.ProbeReadiness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID)
readiness, err := prober.probeReadiness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID)
if err != nil {
t.Errorf("[%d] Unexpected readiness probe error: %v", i, err)
}

View File

@ -19,17 +19,23 @@ package results
import (
"sync"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
// Manager provides a probe results cache.
// Manager provides a probe results cache and channel of updates.
type Manager interface {
// Get returns the cached result for the container with the given ID.
Get(id kubecontainer.ContainerID) (Result, bool)
Get(kubecontainer.ContainerID) (Result, bool)
// Set sets the cached result for the container with the given ID.
Set(id kubecontainer.ContainerID, result Result)
// The pod is only included to be sent with the update.
Set(kubecontainer.ContainerID, Result, *api.Pod)
// Remove clears the cached result for the container with the given ID.
Remove(id kubecontainer.ContainerID)
Remove(kubecontainer.ContainerID)
// Updates creates a channel that receives an Update whenever its result changes (but not
// removed).
// NOTE: The current implementation only supports a single updates channel.
Updates() <-chan Update
}
// Result is the type for probe results.
@ -51,19 +57,36 @@ func (r Result) String() string {
}
}
// Update is an enum of the types of updates sent over the Updates channel.
type Update struct {
ContainerID kubecontainer.ContainerID
Result Result
Pod *api.Pod
}
// Manager implementation.
type manager struct {
// guards the cache
sync.RWMutex
// map of container ID -> probe Result
cache map[kubecontainer.ContainerID]Result
// channel of updates (may be nil)
updates chan Update
}
var _ Manager = &manager{}
// NewManager creates ane returns an empty results manager.
func NewManager() Manager {
return &manager{cache: make(map[kubecontainer.ContainerID]Result)}
m := &manager{cache: make(map[kubecontainer.ContainerID]Result)}
return m
}
// NewManager creates ane returns an empty results manager.
func NewManagerWithUpdates() Manager {
m := NewManager().(*manager)
m.updates = make(chan Update, 20)
return m
}
func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) {
@ -73,13 +96,22 @@ func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) {
return result, found
}
func (m *manager) Set(id kubecontainer.ContainerID, result Result) {
func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *api.Pod) {
if m.setInternal(id, result) {
m.pushUpdate(Update{id, result, pod})
}
}
// Internal helper for locked portion of set. Returns whether an update should be sent.
func (m *manager) setInternal(id kubecontainer.ContainerID, result Result) bool {
m.Lock()
defer m.Unlock()
prev, exists := m.cache[id]
if !exists || prev != result {
m.cache[id] = result
return true
}
return false
}
func (m *manager) Remove(id kubecontainer.ContainerID) {
@ -87,3 +119,14 @@ func (m *manager) Remove(id kubecontainer.ContainerID) {
defer m.Unlock()
delete(m.cache, id)
}
func (m *manager) Updates() <-chan Update {
return m.updates
}
// pushUpdates sends an update on the updates channel if it is initialized.
func (m *manager) pushUpdate(update Update) {
if m.updates != nil {
m.updates <- update
}
}

View File

@ -18,9 +18,12 @@ package results
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util"
)
func TestCacheOperations(t *testing.T) {
@ -32,7 +35,7 @@ func TestCacheOperations(t *testing.T) {
_, found := m.Get(unsetID)
assert.False(t, found, "unset result found")
m.Set(setID, Success)
m.Set(setID, Success, nil)
result, found := m.Get(setID)
assert.True(t, result == Success, "set result")
assert.True(t, found, "set result found")
@ -41,3 +44,55 @@ func TestCacheOperations(t *testing.T) {
_, found = m.Get(setID)
assert.False(t, found, "removed result found")
}
func TestUpdates(t *testing.T) {
m := NewManagerWithUpdates()
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "test-pod"}}
fooID := kubecontainer.ContainerID{"test", "foo"}
barID := kubecontainer.ContainerID{"test", "bar"}
expectUpdate := func(expected Update, msg string) {
select {
case u := <-m.Updates():
if expected != u {
t.Errorf("Expected update %v, recieved %v: %s %s", expected, u, msg)
}
case <-time.After(util.ForeverTestTimeout):
t.Errorf("Timed out waiting for update %v: %s", expected, msg)
}
}
expectNoUpdate := func(msg string) {
// NOTE: Since updates are accumulated asynchronously, this method is not guaranteed to fail
// when it should. In the event it misses a failure, the following calls to expectUpdate should
// still fail.
select {
case u := <-m.Updates():
t.Errorf("Unexpected update %v: %s", u, msg)
default:
// Pass
}
}
// New result should always push an update.
m.Set(fooID, Success, pod)
expectUpdate(Update{fooID, Success, pod}, "new success")
m.Set(barID, Failure, pod)
expectUpdate(Update{barID, Failure, pod}, "new failure")
// Unchanged results should not send an update.
m.Set(fooID, Success, pod)
expectNoUpdate("unchanged foo")
m.Set(barID, Failure, pod)
expectNoUpdate("unchanged bar")
// Changed results should send an update.
m.Set(fooID, Failure, pod)
expectUpdate(Update{fooID, Failure, pod}, "changed foo")
m.Set(barID, Success, pod)
expectUpdate(Update{barID, Success, pod}, "changed bar")
}

View File

@ -32,7 +32,6 @@ import (
// associated with it which runs the probe loop until the container permanently terminates, or the
// stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date
// container IDs.
// TODO: Handle liveness probing
type worker struct {
// Channel for stopping the probe, it should be closed to trigger a stop.
stop chan struct{}
@ -46,44 +45,65 @@ type worker struct {
// Describes the probe configuration (read-only)
spec *api.Probe
// The type of the worker.
probeType probeType
// The probe value during the initial delay.
initialValue results.Result
// Where to store this workers results.
resultsManager results.Manager
probeManager *manager
// The last known container ID for this worker.
containerID kubecontainer.ContainerID
}
// Creates and starts a new probe worker.
func (m *manager) newWorker(
func newWorker(
m *manager,
probeType probeType,
pod *api.Pod,
container api.Container) *worker {
w := &worker{
stop: make(chan struct{}),
pod: pod,
container: container,
spec: container.ReadinessProbe,
stop: make(chan struct{}),
pod: pod,
container: container,
probeType: probeType,
probeManager: m,
}
// Start the worker thread.
go run(m, w)
switch probeType {
case readiness:
w.spec = container.ReadinessProbe
w.resultsManager = m.readinessManager
w.initialValue = results.Failure
case liveness:
w.spec = container.LivenessProbe
w.resultsManager = m.livenessManager
w.initialValue = results.Success
}
return w
}
// run periodically probes the container.
func run(m *manager, w *worker) {
probeTicker := time.NewTicker(m.defaultProbePeriod)
func (w *worker) run() {
probeTicker := time.NewTicker(w.probeManager.defaultProbePeriod)
defer func() {
// Clean up.
probeTicker.Stop()
if !w.containerID.IsEmpty() {
m.readinessCache.Remove(w.containerID)
w.resultsManager.Remove(w.containerID)
}
m.removeReadinessProbe(w.pod.UID, w.container.Name)
w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
}()
probeLoop:
for doProbe(m, w) {
for w.doProbe() {
// Wait for next probe tick.
select {
case <-w.stop:
@ -96,10 +116,10 @@ probeLoop:
// doProbe probes the container once and records the result.
// Returns whether the worker should continue.
func doProbe(m *manager, w *worker) (keepGoing bool) {
func (w *worker) doProbe() (keepGoing bool) {
defer util.HandleCrash(func(_ interface{}) { keepGoing = true })
status, ok := m.statusManager.GetPodStatus(w.pod.UID)
status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
if !ok {
// Either the pod has not been created yet, or it was already deleted.
glog.V(3).Infof("No status for pod: %v", kubeletutil.FormatPodName(w.pod))
@ -123,7 +143,7 @@ func doProbe(m *manager, w *worker) (keepGoing bool) {
if w.containerID.String() != c.ContainerID {
if !w.containerID.IsEmpty() {
m.readinessCache.Remove(w.containerID)
w.resultsManager.Remove(w.containerID)
}
w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
}
@ -131,22 +151,23 @@ func doProbe(m *manager, w *worker) (keepGoing bool) {
if c.State.Running == nil {
glog.V(3).Infof("Non-running container probed: %v - %v",
kubeletutil.FormatPodName(w.pod), w.container.Name)
m.readinessCache.Set(w.containerID, results.Failure)
if !w.containerID.IsEmpty() {
w.resultsManager.Set(w.containerID, results.Failure, w.pod)
}
// Abort if the container will not be restarted.
return c.State.Terminated == nil ||
w.pod.Spec.RestartPolicy != api.RestartPolicyNever
}
if int64(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
// Readiness defaults to false during the initial delay.
m.readinessCache.Set(w.containerID, results.Failure)
w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
return true
}
// TODO: Move error handling out of prober.
result, _ := m.prober.ProbeReadiness(w.pod, status, w.container, w.containerID)
result, _ := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
if result != probe.Unknown {
m.readinessCache.Set(w.containerID, result != probe.Failure)
w.resultsManager.Set(w.containerID, result != probe.Failure, w.pod)
}
return true

View File

@ -17,14 +17,19 @@ limitations under the License.
package prober
import (
"fmt"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/wait"
)
const (
@ -52,12 +57,11 @@ func TestDoProbe(t *testing.T) {
failedStatus.Phase = api.PodFailed
tests := []struct {
probe api.Probe
podStatus *api.PodStatus
expectContinue bool
expectReadySet bool
expectedReadiness results.Result
probe api.Probe
podStatus *api.PodStatus
expectContinue bool
expectSet bool
expectedResult results.Result
}{
{ // No status.
expectContinue: true,
@ -72,136 +76,158 @@ func TestDoProbe(t *testing.T) {
{ // Container waiting
podStatus: &pendingStatus,
expectContinue: true,
expectReadySet: true,
expectSet: true,
},
{ // Container terminated
podStatus: &terminatedStatus,
expectReadySet: true,
podStatus: &terminatedStatus,
expectSet: true,
},
{ // Probe successful.
podStatus: &runningStatus,
expectContinue: true,
expectReadySet: true,
expectedReadiness: results.Success,
podStatus: &runningStatus,
expectContinue: true,
expectSet: true,
expectedResult: results.Success,
},
{ // Initial delay passed
podStatus: &runningStatus,
probe: api.Probe{
InitialDelaySeconds: -100,
},
expectContinue: true,
expectReadySet: true,
expectedReadiness: results.Success,
expectContinue: true,
expectSet: true,
expectedResult: results.Success,
},
}
for i, test := range tests {
w := newTestWorker(test.probe)
if test.podStatus != nil {
m.statusManager.SetPodStatus(w.pod, *test.podStatus)
}
if c := doProbe(m, w); c != test.expectContinue {
t.Errorf("[%d] Expected continue to be %v but got %v", i, test.expectContinue, c)
}
ready, ok := m.readinessCache.Get(containerID)
if ok != test.expectReadySet {
t.Errorf("[%d] Expected to have readiness: %v but got %v", i, test.expectReadySet, ok)
}
if ready != test.expectedReadiness {
t.Errorf("[%d] Expected readiness: %v but got %v", i, test.expectedReadiness, ready)
}
for _, probeType := range [...]probeType{liveness, readiness} {
for i, test := range tests {
w := newTestWorker(m, probeType, test.probe)
if test.podStatus != nil {
m.statusManager.SetPodStatus(w.pod, *test.podStatus)
}
if c := w.doProbe(); c != test.expectContinue {
t.Errorf("[%s-%d] Expected continue to be %v but got %v", probeType, i, test.expectContinue, c)
}
result, ok := resultsManager(m, probeType).Get(containerID)
if ok != test.expectSet {
t.Errorf("[%s-%d] Expected to have result: %v but got %v", probeType, i, test.expectSet, ok)
}
if result != test.expectedResult {
t.Errorf("[%s-%d] Expected result: %v but got %v", probeType, i, test.expectedResult, result)
}
// Clean up.
m.statusManager.DeletePodStatus(podUID)
m.readinessCache.Remove(containerID)
// Clean up.
m.statusManager.DeletePodStatus(podUID)
resultsManager(m, probeType).Remove(containerID)
}
}
}
func TestInitialDelay(t *testing.T) {
m := newTestManager()
w := newTestWorker(api.Probe{
InitialDelaySeconds: 10,
})
m.statusManager.SetPodStatus(w.pod, getRunningStatus())
if !doProbe(m, w) {
t.Errorf("Expected to continue, but did not")
}
for _, probeType := range [...]probeType{liveness, readiness} {
w := newTestWorker(m, probeType, api.Probe{
InitialDelaySeconds: 10,
})
m.statusManager.SetPodStatus(w.pod, getRunningStatus())
ready, ok := m.readinessCache.Get(containerID)
if !ok {
t.Errorf("Expected readiness to be false, but was not set")
} else if ready {
t.Errorf("Expected readiness to be false, but was true")
}
if !w.doProbe() {
t.Errorf("[%s] Expected to continue, but did not", probeType)
}
// 100 seconds later...
laterStatus := getRunningStatus()
laterStatus.ContainerStatuses[0].State.Running.StartedAt.Time =
time.Now().Add(-100 * time.Second)
m.statusManager.SetPodStatus(w.pod, laterStatus)
expectedResult := results.Result(probeType == liveness)
result, ok := resultsManager(m, probeType).Get(containerID)
if !ok {
t.Errorf("[%s] Expected result to be set during initial delay, but was not set", probeType)
} else if result != expectedResult {
t.Errorf("[%s] Expected result to be %v during initial delay, but was %v",
probeType, expectedResult, result)
}
// Second call should succeed (already waited).
if !doProbe(m, w) {
t.Errorf("Expected to continue, but did not")
}
// 100 seconds later...
laterStatus := getRunningStatus()
laterStatus.ContainerStatuses[0].State.Running.StartedAt.Time =
time.Now().Add(-100 * time.Second)
m.statusManager.SetPodStatus(w.pod, laterStatus)
ready, ok = m.readinessCache.Get(containerID)
if !ok {
t.Errorf("Expected readiness to be true, but was not set")
} else if !ready {
t.Errorf("Expected readiness to be true, but was false")
// Second call should succeed (already waited).
if !w.doProbe() {
t.Errorf("[%s] Expected to continue, but did not", probeType)
}
result, ok = resultsManager(m, probeType).Get(containerID)
if !ok {
t.Errorf("[%s] Expected result to be true, but was not set", probeType)
} else if !result {
t.Errorf("[%s] Expected result to be true, but was false", probeType)
}
}
}
func TestCleanUp(t *testing.T) {
m := newTestManager()
pod := getTestPod(api.Probe{})
m.statusManager.SetPodStatus(&pod, getRunningStatus())
m.readinessCache.Set(containerID, results.Success)
w := m.newWorker(&pod, pod.Spec.Containers[0])
m.readinessProbes[containerPath{podUID, containerName}] = w
if ready, _ := m.readinessCache.Get(containerID); !ready {
t.Fatal("Expected readiness to be true.")
}
for _, probeType := range [...]probeType{liveness, readiness} {
key := probeKey{podUID, containerName, probeType}
w := newTestWorker(m, probeType, api.Probe{})
m.statusManager.SetPodStatus(w.pod, getRunningStatus())
go w.run()
m.workers[key] = w
close(w.stop)
if err := waitForWorkerExit(m, []containerPath{{podUID, containerName}}); err != nil {
t.Fatal(err)
}
// Wait for worker to run.
condition := func() (bool, error) {
ready, _ := resultsManager(m, probeType).Get(containerID)
return ready == results.Success, nil
}
if ready, _ := condition(); !ready {
if err := wait.Poll(100*time.Millisecond, util.ForeverTestTimeout, condition); err != nil {
t.Fatalf("[%s] Error waiting for worker ready: %v", probeType, err)
}
}
if _, ok := m.readinessCache.Get(containerID); ok {
t.Error("Expected readiness to be cleared.")
}
if _, ok := m.readinessProbes[containerPath{podUID, containerName}]; ok {
t.Error("Expected worker to be cleared.")
close(w.stop)
if err := waitForWorkerExit(m, []probeKey{key}); err != nil {
t.Fatalf("[%s] error waiting for worker exit: %v", probeType, err)
}
if _, ok := resultsManager(m, probeType).Get(containerID); ok {
t.Errorf("[%s] Expected result to be cleared.", probeType)
}
if _, ok := m.workers[key]; ok {
t.Errorf("[%s] Expected worker to be cleared.", probeType)
}
}
}
func TestHandleCrash(t *testing.T) {
m := newTestManager()
m.prober = CrashingProber{}
w := newTestWorker(api.Probe{})
m.prober = &prober{
refManager: kubecontainer.NewRefManager(),
recorder: &record.FakeRecorder{},
exec: crashingExecProber{},
}
w := newTestWorker(m, readiness, api.Probe{})
m.statusManager.SetPodStatus(w.pod, getRunningStatus())
// doProbe should recover from the crash, and keep going.
if !doProbe(m, w) {
if !w.doProbe() {
t.Error("Expected to keep going, but terminated.")
}
if _, ok := m.readinessCache.Get(containerID); ok {
if _, ok := m.readinessManager.Get(containerID); ok {
t.Error("Expected readiness to be unchanged from crash.")
}
}
func newTestWorker(probeSpec api.Probe) *worker {
pod := getTestPod(probeSpec)
return &worker{
stop: make(chan struct{}),
pod: &pod,
container: pod.Spec.Containers[0],
spec: &probeSpec,
func newTestWorker(m *manager, probeType probeType, probeSpec api.Probe) *worker {
// All tests rely on the fake exec prober.
probeSpec.Handler = api.Handler{
Exec: &api.ExecAction{},
}
pod := getTestPod(probeType, probeSpec)
return newWorker(m, probeType, &pod, pod.Spec.Containers[0])
}
func getRunningStatus() api.PodStatus {
@ -217,10 +243,15 @@ func getRunningStatus() api.PodStatus {
return podStatus
}
func getTestPod(probeSpec api.Probe) api.Pod {
func getTestPod(probeType probeType, probeSpec api.Probe) api.Pod {
container := api.Container{
Name: containerName,
ReadinessProbe: &probeSpec,
Name: containerName,
}
switch probeType {
case readiness:
container.ReadinessProbe = &probeSpec
case liveness:
container.LivenessProbe = &probeSpec
}
pod := api.Pod{
Spec: api.PodSpec{
@ -232,12 +263,18 @@ func getTestPod(probeSpec api.Probe) api.Pod {
return pod
}
type CrashingProber struct{}
func (f CrashingProber) ProbeLiveness(_ *api.Pod, _ api.PodStatus, c api.Container, _ kubecontainer.ContainerID, _ int64) (probe.Result, error) {
panic("Intentional ProbeLiveness crash.")
func resultsManager(m *manager, probeType probeType) results.Manager {
switch probeType {
case readiness:
return m.readinessManager
case liveness:
return m.livenessManager
}
panic(fmt.Errorf("Unhandled case: %v", probeType))
}
func (f CrashingProber) ProbeReadiness(_ *api.Pod, _ api.PodStatus, c api.Container, _ kubecontainer.ContainerID) (probe.Result, error) {
panic("Intentional ProbeReadiness crash.")
type crashingExecProber struct{}
func (p crashingExecProber) Probe(_ exec.Cmd) (probe.Result, string, error) {
panic("Intentional Probe crash.")
}

View File

@ -41,9 +41,8 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/credentialprovider"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/prober"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
@ -89,7 +88,7 @@ type Runtime struct {
containerRefManager *kubecontainer.RefManager
generator kubecontainer.RunContainerOptionsGenerator
recorder record.EventRecorder
prober prober.Prober
livenessManager proberesults.Manager
volumeGetter volumeGetter
imagePuller kubecontainer.ImagePuller
}
@ -108,8 +107,9 @@ func New(config *Config,
generator kubecontainer.RunContainerOptionsGenerator,
recorder record.EventRecorder,
containerRefManager *kubecontainer.RefManager,
prober prober.Prober,
volumeGetter volumeGetter, imageBackOff *util.Backoff) (*Runtime, error) {
livenessManager proberesults.Manager,
volumeGetter volumeGetter,
imageBackOff *util.Backoff) (*Runtime, error) {
systemdVersion, err := getSystemdVersion()
if err != nil {
@ -146,7 +146,7 @@ func New(config *Config,
containerRefManager: containerRefManager,
generator: generator,
recorder: recorder,
prober: prober,
livenessManager: livenessManager,
volumeGetter: volumeGetter,
}
rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt, imageBackOff)
@ -1032,17 +1032,13 @@ func (r *Runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus
break
}
result, err := r.prober.ProbeLiveness(pod, podStatus, container, c.ID, c.Created)
// TODO(vmarmol): examine this logic.
if err == nil && result != probe.Success && pod.Spec.RestartPolicy != api.RestartPolicyNever {
glog.Infof("Pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result)
liveness, found := r.livenessManager.Get(c.ID)
if found && liveness != proberesults.Success && pod.Spec.RestartPolicy != api.RestartPolicyNever {
glog.Infof("Pod %q container %q is unhealthy, it will be killed and re-created.", podFullName, container.Name)
restartPod = true
break
}
if err != nil {
glog.V(2).Infof("Probe container %q failed: %v", container.Name, err)
}
delete(unidentifiedContainers, c.ID)
}