Refactor readiness probing

Each container with a readiness has an individual go-routine which
handles periodic probing for that container. The results are cached, and
written to the status.Manager in the pod sync path.
pull/6/head
Tim St. Clair 2015-08-25 10:39:41 -07:00
parent f01e124ca5
commit 52ece0c34e
19 changed files with 1040 additions and 203 deletions

View File

@ -51,7 +51,7 @@ func TrimRuntimePrefix(fullString string) string {
// ShouldContainerBeRestarted checks whether a container needs to be restarted.
// TODO(yifan): Think about how to refactor this.
func ShouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *api.PodStatus, readinessManager *ReadinessManager) bool {
func ShouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *api.PodStatus) bool {
podFullName := GetPodFullName(pod)
// Get all dead container status.
@ -62,11 +62,6 @@ func ShouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatu
}
}
// Set dead containers to notReady state.
for _, c := range resultStatus {
readinessManager.RemoveReadiness(TrimRuntimePrefix(c.ContainerID))
}
// Check RestartPolicy for dead container.
if len(resultStatus) > 0 {
if pod.Spec.RestartPolicy == api.RestartPolicyNever {

View File

@ -30,7 +30,7 @@ import (
func NewFakeDockerManager(
client DockerInterface,
recorder record.EventRecorder,
readinessManager *kubecontainer.ReadinessManager,
prober prober.Prober,
containerRefManager *kubecontainer.RefManager,
machineInfo *cadvisorApi.MachineInfo,
podInfraContainerImage string,
@ -44,10 +44,9 @@ func NewFakeDockerManager(
fakeOOMAdjuster := oom.NewFakeOOMAdjuster()
fakeProcFs := procfs.NewFakeProcFs()
dm := NewDockerManager(client, recorder, readinessManager, containerRefManager, machineInfo, podInfraContainerImage, qps,
dm := NewDockerManager(client, recorder, prober, containerRefManager, machineInfo, podInfraContainerImage, qps,
burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{},
fakeOOMAdjuster, fakeProcFs, false)
dm.dockerPuller = &FakeDockerPuller{}
dm.prober = prober.New(nil, readinessManager, containerRefManager, recorder)
return dm
}

View File

@ -87,7 +87,6 @@ var podInfraContainerImagePullPolicy = api.PullIfNotPresent
type DockerManager struct {
client DockerInterface
recorder record.EventRecorder
readinessManager *kubecontainer.ReadinessManager
containerRefManager *kubecontainer.RefManager
os kubecontainer.OSInterface
machineInfo *cadvisorApi.MachineInfo
@ -145,7 +144,7 @@ type DockerManager struct {
func NewDockerManager(
client DockerInterface,
recorder record.EventRecorder,
readinessManager *kubecontainer.ReadinessManager,
prober prober.Prober,
containerRefManager *kubecontainer.RefManager,
machineInfo *cadvisorApi.MachineInfo,
podInfraContainerImage string,
@ -195,7 +194,6 @@ func NewDockerManager(
dm := &DockerManager{
client: client,
recorder: recorder,
readinessManager: readinessManager,
containerRefManager: containerRefManager,
os: osInterface,
machineInfo: machineInfo,
@ -205,7 +203,7 @@ func NewDockerManager(
dockerRoot: dockerRoot,
containerLogsDir: containerLogsDir,
networkPlugin: networkPlugin,
prober: nil,
prober: prober,
generator: generator,
execHandler: execHandler,
oomAdjuster: oomAdjuster,
@ -213,7 +211,6 @@ func NewDockerManager(
cpuCFSQuota: cpuCFSQuota,
}
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
dm.prober = prober.New(dm, readinessManager, containerRefManager, recorder)
dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm)
return dm
@ -1363,8 +1360,6 @@ func (dm *DockerManager) killContainer(containerID types.UID, container *api.Con
gracePeriod -= int64(unversioned.Now().Sub(start.Time).Seconds())
}
dm.readinessManager.RemoveReadiness(ID)
// always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
if gracePeriod < minimumGracePeriodInSeconds {
gracePeriod = minimumGracePeriodInSeconds
@ -1659,7 +1654,7 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kub
c := runningPod.FindContainerByName(container.Name)
if c == nil {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus, dm.readinessManager) {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus) {
// If we are here it means that the container is dead and should be restarted, or never existed and should
// be created. We may be inserting this ID again if the container has changed and it has
// RestartPolicy::Always, but it's not a big deal.
@ -1694,7 +1689,7 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kub
continue
}
result, err := dm.prober.Probe(pod, podStatus, container, string(c.ID), c.Created)
result, err := dm.prober.ProbeLiveness(pod, podStatus, container, string(c.ID), c.Created)
if err != nil {
// TODO(vmarmol): examine this logic.
glog.V(2).Infof("probe no-error: %q", container.Name)

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
uexec "k8s.io/kubernetes/pkg/util/exec"
@ -74,14 +75,13 @@ func (*fakeOptionGenerator) GenerateRunContainerOptions(pod *api.Pod, container
func newTestDockerManagerWithHTTPClient(fakeHTTPClient *fakeHTTP) (*DockerManager, *FakeDockerClient) {
fakeDocker := &FakeDockerClient{VersionInfo: docker.Env{"Version=1.1.3", "ApiVersion=1.15"}, Errors: make(map[string]error), RemovedImages: sets.String{}}
fakeRecorder := &record.FakeRecorder{}
readinessManager := kubecontainer.NewReadinessManager()
containerRefManager := kubecontainer.NewRefManager()
networkPlugin, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
optionGenerator := &fakeOptionGenerator{}
dockerManager := NewFakeDockerManager(
fakeDocker,
fakeRecorder,
readinessManager,
prober.FakeProber{},
containerRefManager,
&cadvisorApi.MachineInfo{},
PodInfraContainerImage,
@ -398,10 +398,6 @@ func TestKillContainerInPod(t *testing.T) {
containerToKill := &containers[0]
containerToSpare := &containers[1]
fakeDocker.ContainerList = containers
// Set all containers to ready.
for _, c := range fakeDocker.ContainerList {
manager.readinessManager.SetReadiness(c.ID, true)
}
if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil {
t.Errorf("unexpected error: %v", err)
@ -410,13 +406,9 @@ func TestKillContainerInPod(t *testing.T) {
if err := fakeDocker.AssertStopped([]string{containerToKill.ID}); err != nil {
t.Errorf("container was not stopped correctly: %v", err)
}
// Verify that the readiness has been removed for the stopped container.
if ready := manager.readinessManager.GetReadiness(containerToKill.ID); ready {
t.Errorf("exepcted container entry ID '%v' to not be found. states: %+v", containerToKill.ID, ready)
}
if ready := manager.readinessManager.GetReadiness(containerToSpare.ID); !ready {
t.Errorf("exepcted container entry ID '%v' to be found. states: %+v", containerToSpare.ID, ready)
// Assert the container has been spared.
if err := fakeDocker.AssertStopped([]string{containerToSpare.ID}); err == nil {
t.Errorf("container unexpectedly stopped: %v", containerToSpare.ID)
}
}
@ -471,10 +463,6 @@ func TestKillContainerInPodWithPreStop(t *testing.T) {
},
},
}
// Set all containers to ready.
for _, c := range fakeDocker.ContainerList {
manager.readinessManager.SetReadiness(c.ID, true)
}
if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil {
t.Errorf("unexpected error: %v", err)
@ -510,27 +498,12 @@ func TestKillContainerInPodWithError(t *testing.T) {
Names: []string{"/k8s_bar_qux_new_1234_42"},
},
}
containerToKill := &containers[0]
containerToSpare := &containers[1]
fakeDocker.ContainerList = containers
fakeDocker.Errors["stop"] = fmt.Errorf("sample error")
// Set all containers to ready.
for _, c := range fakeDocker.ContainerList {
manager.readinessManager.SetReadiness(c.ID, true)
}
if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err == nil {
t.Errorf("expected error, found nil")
}
// Verify that the readiness has been removed even though the stop failed.
if ready := manager.readinessManager.GetReadiness(containerToKill.ID); ready {
t.Errorf("exepcted container entry ID '%v' to not be found. states: %+v", containerToKill.ID, ready)
}
if ready := manager.readinessManager.GetReadiness(containerToSpare.ID); !ready {
t.Errorf("exepcted container entry ID '%v' to be found. states: %+v", containerToSpare.ID, ready)
}
}
func TestIsAExitError(t *testing.T) {

View File

@ -52,11 +52,13 @@ import (
"k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/kubelet/rkt"
"k8s.io/kubernetes/pkg/kubelet/status"
kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeletUtil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
@ -243,7 +245,6 @@ func NewMainKubelet(
return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
}
statusManager := status.NewManager(kubeClient)
readinessManager := kubecontainer.NewReadinessManager()
containerRefManager := kubecontainer.NewRefManager()
volumeManager := newVolumeManager()
@ -258,7 +259,6 @@ func NewMainKubelet(
rootDirectory: rootDirectory,
resyncInterval: resyncInterval,
containerRefManager: containerRefManager,
readinessManager: readinessManager,
httpClient: &http.Client{},
sourcesReady: sourcesReady,
registerNode: registerNode,
@ -317,7 +317,7 @@ func NewMainKubelet(
klet.containerRuntime = dockertools.NewDockerManager(
dockerClient,
recorder,
readinessManager,
klet, // prober
containerRefManager,
machineInfo,
podInfraContainerImage,
@ -343,7 +343,7 @@ func NewMainKubelet(
klet,
recorder,
containerRefManager,
readinessManager,
klet, // prober
klet.volumeManager)
if err != nil {
return nil, err
@ -386,6 +386,12 @@ func NewMainKubelet(
klet.runner = klet.containerRuntime
klet.podManager = newBasicPodManager(klet.kubeClient)
klet.prober = prober.New(klet.runner, containerRefManager, recorder)
klet.probeManager = prober.NewManager(
klet.resyncInterval,
klet.statusManager,
klet.prober)
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {
return nil, err
@ -486,8 +492,10 @@ type Kubelet struct {
// Network plugin.
networkPlugin network.NetworkPlugin
// Container readiness state manager.
readinessManager *kubecontainer.ReadinessManager
// Handles container readiness probing
probeManager prober.Manager
// TODO: Move prober ownership to the probeManager once the runtime no longer depends on it.
prober prober.Prober
// How long to keep idle streaming command execution/port forwarding
// connections open before terminating them
@ -1665,6 +1673,7 @@ func (kl *Kubelet) HandlePodCleanups() error {
// Stop the workers for no-longer existing pods.
// TODO: is here the best place to forget pod workers?
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
kl.probeManager.CleanupPods(activePods)
runningPods, err := kl.runtimeCache.GetPods()
if err != nil {
@ -1993,6 +2002,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) {
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, SyncPodCreate, mirrorPod, start)
kl.probeManager.AddPod(pod)
}
}
@ -2024,6 +2034,7 @@ func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) {
if err := kl.deletePod(pod.UID); err != nil {
glog.V(2).Infof("Failed to delete pod %q, err: %v", kubeletUtil.FormatPodName(pod), err)
}
kl.probeManager.RemovePod(pod)
}
}
@ -2613,15 +2624,8 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {
// Assume info is ready to process
podStatus.Phase = GetPhase(spec, podStatus.ContainerStatuses)
for _, c := range spec.Containers {
for i, st := range podStatus.ContainerStatuses {
if st.Name == c.Name {
ready := st.State.Running != nil && kl.readinessManager.GetReadiness(kubecontainer.TrimRuntimePrefix(st.ContainerID))
podStatus.ContainerStatuses[i].Ready = ready
break
}
}
}
kl.probeManager.UpdatePodStatus(pod.UID, podStatus)
podStatus.Conditions = append(podStatus.Conditions, getPodReadyCondition(spec, podStatus.ContainerStatuses)...)
if !kl.standaloneMode {
@ -2791,6 +2795,16 @@ func (kl *Kubelet) GetRuntime() kubecontainer.Runtime {
return kl.containerRuntime
}
// Proxy prober calls through the Kubelet to break the circular dependency between the runtime &
// prober.
// TODO: Remove this hack once the runtime no longer depends on the prober.
func (kl *Kubelet) ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) {
return kl.prober.ProbeLiveness(pod, status, container, containerID, createdAt)
}
func (kl *Kubelet) ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string) (probe.Result, error) {
return kl.prober.ProbeReadiness(pod, status, container, containerID)
}
var minRsrc = resource.MustParse("1k")
var maxRsrc = resource.MustParse("1P")

View File

@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/container"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
@ -105,7 +106,6 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.masterServiceNamespace = api.NamespaceDefault
kubelet.serviceLister = testServiceLister{}
kubelet.nodeLister = testNodeLister{}
kubelet.readinessManager = kubecontainer.NewReadinessManager()
kubelet.recorder = fakeRecorder
kubelet.statusManager = status.NewManager(fakeKubeClient)
if err := kubelet.setupDataDirs(); err != nil {
@ -130,6 +130,10 @@ func newTestKubelet(t *testing.T) *TestKubelet {
runtimeCache: kubelet.runtimeCache,
t: t,
}
kubelet.prober = prober.FakeProber{}
kubelet.probeManager = prober.FakeManager{}
kubelet.volumeManager = newVolumeManager()
kubelet.containerManager, _ = newContainerManager(fakeContainerMgrMountInt(), mockCadvisor, "", "", "")
kubelet.networkConfigured = true

View File

@ -37,6 +37,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -143,13 +144,12 @@ func (nh *fakeNetworkHost) GetRuntime() kubecontainer.Runtime {
func newTestDockerManager() (*dockertools.DockerManager, *dockertools.FakeDockerClient) {
fakeDocker := &dockertools.FakeDockerClient{VersionInfo: docker.Env{"Version=1.1.3", "ApiVersion=1.15"}, Errors: make(map[string]error), RemovedImages: sets.String{}}
fakeRecorder := &record.FakeRecorder{}
readinessManager := kubecontainer.NewReadinessManager()
containerRefManager := kubecontainer.NewRefManager()
networkPlugin, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
dockerManager := dockertools.NewFakeDockerManager(
fakeDocker,
fakeRecorder,
readinessManager,
prober.FakeProber{},
containerRefManager,
&cadvisorApi.MachineInfo{},
dockertools.PodInfraContainerImage,

View File

@ -18,14 +18,20 @@ package prober
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/types"
)
var _ Prober = &FakeProber{}
type FakeManager struct{}
type FakeProber struct {
}
var _ Manager = FakeManager{}
func (fp *FakeProber) Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) {
return probe.Success, nil
// Unused methods.
func (_ FakeManager) AddPod(_ *api.Pod) {}
func (_ FakeManager) RemovePod(_ *api.Pod) {}
func (_ FakeManager) CleanupPods(_ []*api.Pod) {}
func (_ FakeManager) UpdatePodStatus(_ types.UID, podStatus *api.PodStatus) {
for i := range podStatus.ContainerStatuses {
podStatus.ContainerStatuses[i].Ready = true
}
}

View File

@ -0,0 +1,44 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/probe"
)
var _ Prober = FakeProber{}
type FakeProber struct {
Readiness probe.Result
Liveness probe.Result
Error error
}
func (f FakeProber) ProbeLiveness(_ *api.Pod, _ api.PodStatus, c api.Container, _ string, _ int64) (probe.Result, error) {
if c.LivenessProbe == nil {
return probe.Success, nil
}
return f.Liveness, f.Error
}
func (f FakeProber) ProbeReadiness(_ *api.Pod, _ api.PodStatus, c api.Container, _ string) (probe.Result, error) {
if c.ReadinessProbe == nil {
return probe.Success, nil
}
return f.Readiness, f.Error
}

View File

@ -0,0 +1,167 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/sets"
)
// Manager manages pod probing. It creates a probe "worker" for every container that specifies a
// probe (AddPod). The worker periodically probes its assigned container and caches the results. The
// manager usse the cached probe results to set the appropriate Ready state in the PodStatus when
// requested (UpdatePodStatus). Updating probe parameters is not currently supported.
// TODO: Move liveness probing out of the runtime, to here.
type Manager interface {
// AddPod creates new probe workers for every container probe. This should be called for every
// pod created.
AddPod(pod *api.Pod)
// RemovePod handles cleaning up the removed pod state, including terminating probe workers and
// deleting cached results.
RemovePod(pod *api.Pod)
// CleanupPods handles cleaning up pods which should no longer be running.
// It takes a list of "active pods" which should not be cleaned up.
CleanupPods(activePods []*api.Pod)
// UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
// container based on container running status, cached probe results and worker states.
UpdatePodStatus(types.UID, *api.PodStatus)
}
type manager struct {
// Caches the results of readiness probes.
readinessCache *readinessManager
// Map of active workers for readiness
readinessProbes map[containerPath]*worker
// Lock for accessing & mutating readinessProbes
workerLock sync.RWMutex
// The statusManager cache provides pod IP and container IDs for probing.
statusManager status.Manager
// prober executes the probe actions.
prober Prober
// Default period for workers to execute a probe.
defaultProbePeriod time.Duration
}
func NewManager(
defaultProbePeriod time.Duration,
statusManager status.Manager,
prober Prober) Manager {
return &manager{
defaultProbePeriod: defaultProbePeriod,
statusManager: statusManager,
prober: prober,
readinessCache: newReadinessManager(),
readinessProbes: make(map[containerPath]*worker),
}
}
// Key uniquely identifying containers
type containerPath struct {
podUID types.UID
containerName string
}
func (m *manager) AddPod(pod *api.Pod) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
key := containerPath{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
if _, ok := m.readinessProbes[key]; ok {
glog.Errorf("Readiness probe already exists! %v - %v",
kubecontainer.GetPodFullName(pod), c.Name)
return
}
if c.ReadinessProbe != nil {
m.readinessProbes[key] = m.newWorker(pod, c)
}
}
}
func (m *manager) RemovePod(pod *api.Pod) {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
key := containerPath{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
if worker, ok := m.readinessProbes[key]; ok {
close(worker.stop)
}
}
}
func (m *manager) CleanupPods(activePods []*api.Pod) {
desiredPods := make(map[types.UID]sets.Empty)
for _, pod := range activePods {
desiredPods[pod.UID] = sets.Empty{}
}
m.workerLock.RLock()
defer m.workerLock.RUnlock()
for path, worker := range m.readinessProbes {
if _, ok := desiredPods[path.podUID]; !ok {
close(worker.stop)
}
}
}
func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *api.PodStatus) {
for i, c := range podStatus.ContainerStatuses {
var ready bool
if c.State.Running == nil {
ready = false
} else if result, ok := m.readinessCache.getReadiness(kubecontainer.TrimRuntimePrefix(c.ContainerID)); ok {
ready = result
} else {
// The check whether there is a probe which hasn't run yet.
_, exists := m.getReadinessProbe(podUID, c.Name)
ready = !exists
}
podStatus.ContainerStatuses[i].Ready = ready
}
}
func (m *manager) getReadinessProbe(podUID types.UID, containerName string) (*worker, bool) {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
probe, ok := m.readinessProbes[containerPath{podUID, containerName}]
return probe, ok
}
// Called by the worker after exiting.
func (m *manager) removeReadinessProbe(podUID types.UID, containerName string) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
delete(m.readinessProbes, containerPath{podUID, containerName})
}

View File

@ -0,0 +1,280 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"fmt"
"testing"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/util/wait"
)
func TestAddRemovePods(t *testing.T) {
noProbePod := api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "no_probe_pod",
},
Spec: api.PodSpec{
Containers: []api.Container{{
Name: "no_probe1",
}, {
Name: "no_probe2",
}},
},
}
probePod := api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "probe_pod",
},
Spec: api.PodSpec{
Containers: []api.Container{{
Name: "no_probe1",
}, {
Name: "prober1",
ReadinessProbe: &api.Probe{},
}, {
Name: "no_probe2",
}, {
Name: "prober2",
ReadinessProbe: &api.Probe{},
}},
},
}
m := newTestManager()
if err := expectProbes(m, nil); err != nil {
t.Error(err)
}
// Adding a pod with no probes should be a no-op.
m.AddPod(&noProbePod)
if err := expectProbes(m, nil); err != nil {
t.Error(err)
}
// Adding a pod with probes.
m.AddPod(&probePod)
probePaths := []containerPath{{"probe_pod", "prober1"}, {"probe_pod", "prober2"}}
if err := expectProbes(m, probePaths); err != nil {
t.Error(err)
}
// Removing un-probed pod.
m.RemovePod(&noProbePod)
if err := expectProbes(m, probePaths); err != nil {
t.Error(err)
}
// Removing probed pod.
m.RemovePod(&probePod)
if err := waitForWorkerExit(m, probePaths); err != nil {
t.Fatal(err)
}
if err := expectProbes(m, nil); err != nil {
t.Error(err)
}
// Removing already removed pods should be a no-op.
m.RemovePod(&probePod)
if err := expectProbes(m, nil); err != nil {
t.Error(err)
}
}
func TestCleanupPods(t *testing.T) {
m := newTestManager()
podToCleanup := api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "pod_cleanup",
},
Spec: api.PodSpec{
Containers: []api.Container{{
Name: "prober1",
ReadinessProbe: &api.Probe{},
}, {
Name: "prober2",
ReadinessProbe: &api.Probe{},
}},
},
}
podToKeep := api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "pod_keep",
},
Spec: api.PodSpec{
Containers: []api.Container{{
Name: "prober1",
ReadinessProbe: &api.Probe{},
}, {
Name: "prober2",
ReadinessProbe: &api.Probe{},
}},
},
}
m.AddPod(&podToCleanup)
m.AddPod(&podToKeep)
m.CleanupPods([]*api.Pod{&podToKeep})
removedProbes := []containerPath{{"pod_cleanup", "prober1"}, {"pod_cleanup", "prober2"}}
expectedProbes := []containerPath{{"pod_keep", "prober1"}, {"pod_keep", "prober2"}}
if err := waitForWorkerExit(m, removedProbes); err != nil {
t.Fatal(err)
}
if err := expectProbes(m, expectedProbes); err != nil {
t.Error(err)
}
}
func TestUpdatePodStatus(t *testing.T) {
const podUID = "pod_uid"
unprobed := api.ContainerStatus{
Name: "unprobed_container",
ContainerID: "unprobed_container_id",
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
}
probedReady := api.ContainerStatus{
Name: "probed_container_ready",
ContainerID: "probed_container_ready_id",
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
}
probedPending := api.ContainerStatus{
Name: "probed_container_pending",
ContainerID: "probed_container_pending_id",
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
}
probedUnready := api.ContainerStatus{
Name: "probed_container_unready",
ContainerID: "probed_container_unready_id",
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
}
terminated := api.ContainerStatus{
Name: "terminated_container",
ContainerID: "terminated_container_id",
State: api.ContainerState{
Terminated: &api.ContainerStateTerminated{},
},
}
podStatus := api.PodStatus{
Phase: api.PodRunning,
ContainerStatuses: []api.ContainerStatus{
unprobed, probedReady, probedPending, probedUnready, terminated,
},
}
m := newTestManager()
// Setup probe "workers" and cached results.
m.readinessProbes = map[containerPath]*worker{
containerPath{podUID, probedReady.Name}: {},
containerPath{podUID, probedPending.Name}: {},
containerPath{podUID, probedUnready.Name}: {},
containerPath{podUID, terminated.Name}: {},
}
m.readinessCache.setReadiness(probedReady.ContainerID, true)
m.readinessCache.setReadiness(probedUnready.ContainerID, false)
m.readinessCache.setReadiness(terminated.ContainerID, true)
m.UpdatePodStatus(podUID, &podStatus)
expectedReadiness := map[containerPath]bool{
containerPath{podUID, unprobed.Name}: true,
containerPath{podUID, probedReady.Name}: true,
containerPath{podUID, probedPending.Name}: false,
containerPath{podUID, probedUnready.Name}: false,
containerPath{podUID, terminated.Name}: false,
}
for _, c := range podStatus.ContainerStatuses {
expected, ok := expectedReadiness[containerPath{podUID, c.Name}]
if !ok {
t.Fatalf("Missing expectation for test case: %v", c.Name)
}
if expected != c.Ready {
t.Errorf("Unexpected readiness for container %v: Expected %v but got %v",
c.Name, expected, c.Ready)
}
}
}
func expectProbes(m *manager, expectedReadinessProbes []containerPath) error {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
var unexpected []containerPath
missing := make([]containerPath, len(expectedReadinessProbes))
copy(missing, expectedReadinessProbes)
outer:
for probePath := range m.readinessProbes {
for i, expectedPath := range missing {
if probePath == expectedPath {
missing = append(missing[:i], missing[i+1:]...)
continue outer
}
}
unexpected = append(unexpected, probePath)
}
if len(missing) == 0 && len(unexpected) == 0 {
return nil // Yay!
}
return fmt.Errorf("Unexpected probes: %v; Missing probes: %v;", unexpected, missing)
}
func newTestManager() *manager {
const probePeriod = 1
statusManager := status.NewManager(&testclient.Fake{})
prober := FakeProber{Readiness: probe.Success}
return NewManager(probePeriod, statusManager, prober).(*manager)
}
// Wait for the given workers to exit & clean up.
func waitForWorkerExit(m *manager, workerPaths []containerPath) error {
const interval = 100 * time.Millisecond
const timeout = 30 * time.Second
for _, w := range workerPaths {
condition := func() (bool, error) {
_, exists := m.getReadinessProbe(w.podUID, w.containerName)
return !exists, nil
}
if exited, _ := condition(); exited {
continue // Already exited, no need to poll.
}
glog.Infof("Polling %v", w)
if err := wait.Poll(interval, timeout, condition); err != nil {
return err
}
}
return nil
}

View File

@ -41,7 +41,8 @@ const maxProbeRetries = 3
// Prober checks the healthiness of a container.
type Prober interface {
Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error)
ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error)
ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string) (probe.Result, error)
}
// Prober helps to check the liveness/readiness of a container.
@ -51,55 +52,30 @@ type prober struct {
tcp tcprobe.TCPProber
runner kubecontainer.ContainerCommandRunner
readinessManager *kubecontainer.ReadinessManager
refManager *kubecontainer.RefManager
recorder record.EventRecorder
refManager *kubecontainer.RefManager
recorder record.EventRecorder
}
// NewProber creates a Prober, it takes a command runner and
// several container info managers.
func New(
runner kubecontainer.ContainerCommandRunner,
readinessManager *kubecontainer.ReadinessManager,
refManager *kubecontainer.RefManager,
recorder record.EventRecorder) Prober {
return &prober{
exec: execprobe.New(),
http: httprobe.New(),
tcp: tcprobe.New(),
runner: runner,
readinessManager: readinessManager,
refManager: refManager,
recorder: recorder,
exec: execprobe.New(),
http: httprobe.New(),
tcp: tcprobe.New(),
runner: runner,
refManager: refManager,
recorder: recorder,
}
}
// New prober for use in tests.
func NewTestProber(
exec execprobe.ExecProber,
readinessManager *kubecontainer.ReadinessManager,
refManager *kubecontainer.RefManager,
recorder record.EventRecorder) Prober {
return &prober{
exec: exec,
readinessManager: readinessManager,
refManager: refManager,
recorder: recorder,
}
}
// Probe checks the liveness/readiness of the given container.
func (pb *prober) Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) {
pb.probeReadiness(pod, status, container, containerID, createdAt)
return pb.probeLiveness(pod, status, container, containerID, createdAt)
}
// probeLiveness probes the liveness of a container.
// ProbeLiveness probes the liveness of a container.
// If the initalDelay since container creation on liveness probe has not passed the probe will return probe.Success.
func (pb *prober) probeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) {
func (pb *prober) ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) {
var live probe.Result
var output string
var err error
@ -137,24 +113,20 @@ func (pb *prober) probeLiveness(pod *api.Pod, status api.PodStatus, container ap
return probe.Success, nil
}
// probeReadiness probes and sets the readiness of a container.
// If the initial delay on the readiness probe has not passed, we set readiness to false.
func (pb *prober) probeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) {
// ProbeReadiness probes and sets the readiness of a container.
func (pb *prober) ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string) (probe.Result, error) {
var ready probe.Result
var output string
var err error
p := container.ReadinessProbe
if p == nil {
ready = probe.Success
} else if time.Now().Unix()-createdAt < p.InitialDelaySeconds {
ready = probe.Failure
} else {
ready, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries)
}
ctrName := fmt.Sprintf("%s:%s", kubecontainer.GetPodFullName(pod), container.Name)
if err != nil || ready == probe.Failure {
// Readiness failed in one way or another.
pb.readinessManager.SetReadiness(containerID, false)
ref, ok := pb.refManager.GetRef(containerID)
if !ok {
glog.Warningf("No ref for pod '%v' - '%v'", containerID, container.Name)
@ -164,21 +136,17 @@ func (pb *prober) probeReadiness(pod *api.Pod, status api.PodStatus, container a
if ok {
pb.recorder.Eventf(ref, "Unhealthy", "Readiness probe errored: %v", err)
}
return
} else { // ready != probe.Success
glog.V(1).Infof("Readiness probe for %q failed (%v): %s", ctrName, ready, output)
if ok {
pb.recorder.Eventf(ref, "Unhealthy", "Readiness probe failed: %s", output)
}
return
}
}
if ready == probe.Success {
pb.readinessManager.SetReadiness(containerID, true)
return probe.Failure, err
}
glog.V(3).Infof("Readiness probe for %q succeeded", ctrName)
return ready, nil
}
// runProbeWithRetries tries to probe the container in a finite loop, it returns the last result

View File

@ -179,11 +179,9 @@ func TestGetTCPAddrParts(t *testing.T) {
// PLEASE READ THE PROBE DOCS BEFORE CHANGING THIS TEST IF YOU ARE UNSURE HOW PROBES ARE SUPPOSED TO WORK:
// (See https://github.com/GoogleCloudPlatform/kubernetes/blob/master/docs/user-guide/pod-states.md#pod-conditions)
func TestProbeContainer(t *testing.T) {
readinessManager := kubecontainer.NewReadinessManager()
prober := &prober{
readinessManager: readinessManager,
refManager: kubecontainer.NewRefManager(),
recorder: &record.FakeRecorder{},
refManager: kubecontainer.NewRefManager(),
recorder: &record.FakeRecorder{},
}
containerID := "foobar"
createdAt := time.Now().Unix()
@ -191,29 +189,29 @@ func TestProbeContainer(t *testing.T) {
tests := []struct {
testContainer api.Container
expectError bool
expectedResult probe.Result
expectedReadiness bool
expectedLiveness probe.Result
expectedReadiness probe.Result
}{
// No probes.
{
testContainer: api.Container{},
expectedResult: probe.Success,
expectedReadiness: true,
expectedLiveness: probe.Success,
expectedReadiness: probe.Success,
},
// Only LivenessProbe. expectedReadiness should always be true here.
{
testContainer: api.Container{
LivenessProbe: &api.Probe{InitialDelaySeconds: 100},
},
expectedResult: probe.Success,
expectedReadiness: true,
expectedLiveness: probe.Success,
expectedReadiness: probe.Success,
},
{
testContainer: api.Container{
LivenessProbe: &api.Probe{InitialDelaySeconds: -100},
},
expectedResult: probe.Unknown,
expectedReadiness: true,
expectedLiveness: probe.Unknown,
expectedReadiness: probe.Success,
},
{
testContainer: api.Container{
@ -224,8 +222,8 @@ func TestProbeContainer(t *testing.T) {
},
},
},
expectedResult: probe.Failure,
expectedReadiness: true,
expectedLiveness: probe.Failure,
expectedReadiness: probe.Success,
},
{
testContainer: api.Container{
@ -236,8 +234,8 @@ func TestProbeContainer(t *testing.T) {
},
},
},
expectedResult: probe.Success,
expectedReadiness: true,
expectedLiveness: probe.Success,
expectedReadiness: probe.Success,
},
{
testContainer: api.Container{
@ -248,8 +246,8 @@ func TestProbeContainer(t *testing.T) {
},
},
},
expectedResult: probe.Unknown,
expectedReadiness: true,
expectedLiveness: probe.Unknown,
expectedReadiness: probe.Success,
},
{
testContainer: api.Container{
@ -261,23 +259,16 @@ func TestProbeContainer(t *testing.T) {
},
},
expectError: true,
expectedResult: probe.Unknown,
expectedReadiness: true,
expectedLiveness: probe.Unknown,
expectedReadiness: probe.Success,
},
// // Only ReadinessProbe. expectedResult should always be probe.Success here.
// // Only ReadinessProbe. expectedLiveness should always be probe.Success here.
{
testContainer: api.Container{
ReadinessProbe: &api.Probe{InitialDelaySeconds: 100},
},
expectedResult: probe.Success,
expectedReadiness: false,
},
{
testContainer: api.Container{
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
},
expectedResult: probe.Success,
expectedReadiness: false,
expectedLiveness: probe.Success,
expectedReadiness: probe.Unknown,
},
{
testContainer: api.Container{
@ -288,8 +279,8 @@ func TestProbeContainer(t *testing.T) {
},
},
},
expectedResult: probe.Success,
expectedReadiness: true,
expectedLiveness: probe.Success,
expectedReadiness: probe.Success,
},
{
testContainer: api.Container{
@ -300,8 +291,8 @@ func TestProbeContainer(t *testing.T) {
},
},
},
expectedResult: probe.Success,
expectedReadiness: true,
expectedLiveness: probe.Success,
expectedReadiness: probe.Success,
},
{
testContainer: api.Container{
@ -312,8 +303,8 @@ func TestProbeContainer(t *testing.T) {
},
},
},
expectedResult: probe.Success,
expectedReadiness: true,
expectedLiveness: probe.Success,
expectedReadiness: probe.Success,
},
{
testContainer: api.Container{
@ -325,8 +316,8 @@ func TestProbeContainer(t *testing.T) {
},
},
expectError: false,
expectedResult: probe.Success,
expectedReadiness: true,
expectedLiveness: probe.Success,
expectedReadiness: probe.Success,
},
// Both LivenessProbe and ReadinessProbe.
{
@ -334,32 +325,32 @@ func TestProbeContainer(t *testing.T) {
LivenessProbe: &api.Probe{InitialDelaySeconds: 100},
ReadinessProbe: &api.Probe{InitialDelaySeconds: 100},
},
expectedResult: probe.Success,
expectedReadiness: false,
expectedLiveness: probe.Success,
expectedReadiness: probe.Unknown,
},
{
testContainer: api.Container{
LivenessProbe: &api.Probe{InitialDelaySeconds: 100},
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
},
expectedResult: probe.Success,
expectedReadiness: false,
expectedLiveness: probe.Success,
expectedReadiness: probe.Unknown,
},
{
testContainer: api.Container{
LivenessProbe: &api.Probe{InitialDelaySeconds: -100},
ReadinessProbe: &api.Probe{InitialDelaySeconds: 100},
},
expectedResult: probe.Unknown,
expectedReadiness: false,
expectedLiveness: probe.Unknown,
expectedReadiness: probe.Unknown,
},
{
testContainer: api.Container{
LivenessProbe: &api.Probe{InitialDelaySeconds: -100},
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
},
expectedResult: probe.Unknown,
expectedReadiness: false,
expectedLiveness: probe.Unknown,
expectedReadiness: probe.Unknown,
},
{
testContainer: api.Container{
@ -371,8 +362,8 @@ func TestProbeContainer(t *testing.T) {
},
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
},
expectedResult: probe.Unknown,
expectedReadiness: false,
expectedLiveness: probe.Unknown,
expectedReadiness: probe.Unknown,
},
{
testContainer: api.Container{
@ -384,8 +375,8 @@ func TestProbeContainer(t *testing.T) {
},
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
},
expectedResult: probe.Failure,
expectedReadiness: false,
expectedLiveness: probe.Failure,
expectedReadiness: probe.Unknown,
},
{
testContainer: api.Container{
@ -402,29 +393,37 @@ func TestProbeContainer(t *testing.T) {
},
},
},
expectedResult: probe.Success,
expectedReadiness: true,
expectedLiveness: probe.Success,
expectedReadiness: probe.Success,
},
}
for i, test := range tests {
if test.expectError {
prober.exec = fakeExecProber{test.expectedResult, errors.New("exec error")}
prober.exec = fakeExecProber{test.expectedLiveness, errors.New("exec error")}
} else {
prober.exec = fakeExecProber{test.expectedResult, nil}
prober.exec = fakeExecProber{test.expectedLiveness, nil}
}
result, err := prober.Probe(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID, createdAt)
liveness, err := prober.ProbeLiveness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID, createdAt)
if test.expectError && err == nil {
t.Errorf("[%d] Expected error but no error was returned.", i)
t.Errorf("[%d] Expected liveness probe error but no error was returned.", i)
}
if !test.expectError && err != nil {
t.Errorf("[%d] Didn't expect error but got: %v", i, err)
t.Errorf("[%d] Didn't expect liveness probe error but got: %v", i, err)
}
if test.expectedResult != result {
t.Errorf("[%d] Expected result to be %v but was %v", i, test.expectedResult, result)
if test.expectedLiveness != liveness {
t.Errorf("[%d] Expected liveness result to be %v but was %v", i, test.expectedLiveness, liveness)
}
if test.expectedReadiness != readinessManager.GetReadiness(containerID) {
t.Errorf("[%d] Expected readiness to be %v but was %v", i, test.expectedReadiness, readinessManager.GetReadiness(containerID))
// TODO: Test readiness errors
prober.exec = fakeExecProber{test.expectedReadiness, nil}
readiness, err := prober.ProbeReadiness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID)
if err != nil {
t.Errorf("[%d] Unexpected readiness probe error: %v", i, err)
}
if test.expectedReadiness != readiness {
t.Errorf("[%d] Expected readiness result to be %v but was %v", i, test.expectedReadiness, readiness)
}
}
}

View File

@ -14,45 +14,45 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package container
package prober
import "sync"
// ReadinessManager maintains the readiness information(probe results) of
// readinessManager maintains the readiness information(probe results) of
// containers over time to allow for implementation of health thresholds.
// This manager is thread-safe, no locks are necessary for the caller.
type ReadinessManager struct {
type readinessManager struct {
// guards states
sync.RWMutex
// TODO(yifan): To use strong type.
states map[string]bool
}
// NewReadinessManager creates ane returns a readiness manager with empty
// newReadinessManager creates ane returns a readiness manager with empty
// contents.
func NewReadinessManager() *ReadinessManager {
return &ReadinessManager{states: make(map[string]bool)}
func newReadinessManager() *readinessManager {
return &readinessManager{states: make(map[string]bool)}
}
// GetReadiness returns the readiness value for the container with the given ID.
// getReadiness returns the readiness value for the container with the given ID.
// If the readiness value is found, returns it.
// If the readiness is not found, returns false.
func (r *ReadinessManager) GetReadiness(id string) bool {
func (r *readinessManager) getReadiness(id string) (ready bool, found bool) {
r.RLock()
defer r.RUnlock()
state, found := r.states[id]
return state && found
return state, found
}
// SetReadiness sets the readiness value for the container with the given ID.
func (r *ReadinessManager) SetReadiness(id string, value bool) {
// setReadiness sets the readiness value for the container with the given ID.
func (r *readinessManager) setReadiness(id string, value bool) {
r.Lock()
defer r.Unlock()
r.states[id] = value
}
// RemoveReadiness clears the readiness value for the container with the given ID.
func (r *ReadinessManager) RemoveReadiness(id string) {
// removeReadiness clears the readiness value for the container with the given ID.
func (r *readinessManager) removeReadiness(id string) {
r.Lock()
defer r.Unlock()
delete(r.states, id)

View File

@ -0,0 +1,153 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubeutil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
)
// worker handles the periodic probing of its assigned container. Each worker has a go-routine
// associated with it which runs the probe loop until the container permanently terminates, or the
// stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date
// container IDs.
// TODO: Handle liveness probing
type worker struct {
// Channel for stopping the probe, it should be closed to trigger a stop.
stop chan struct{}
// The pod containing this probe (read-only)
pod *api.Pod
// The container to probe (read-only)
container api.Container
// Describes the probe configuration (read-only)
spec *api.Probe
// The last known container ID for this worker.
containerID types.UID
}
// Creates and starts a new probe worker.
func (m *manager) newWorker(
pod *api.Pod,
container api.Container) *worker {
w := &worker{
stop: make(chan struct{}),
pod: pod,
container: container,
spec: container.ReadinessProbe,
}
// Start the worker thread.
go run(m, w)
return w
}
// run periodically probes the container.
func run(m *manager, w *worker) {
probeTicker := time.NewTicker(m.defaultProbePeriod)
defer func() {
// Clean up.
probeTicker.Stop()
if w.containerID != "" {
m.readinessCache.removeReadiness(string(w.containerID))
}
m.removeReadinessProbe(w.pod.UID, w.container.Name)
}()
probeLoop:
for doProbe(m, w) {
// Wait for next probe tick.
select {
case <-w.stop:
break probeLoop
case <-probeTicker.C:
// continue
}
}
}
// doProbe probes the container once and records the result.
// Returns whether the worker should continue.
func doProbe(m *manager, w *worker) (keepGoing bool) {
defer util.HandleCrash(func(_ interface{}) { keepGoing = true })
status, ok := m.statusManager.GetPodStatus(w.pod.UID)
if !ok {
// Either the pod has not been created yet, or it was already deleted.
glog.V(3).Infof("No status for pod: %v", kubeutil.FormatPodName(w.pod))
return true
}
// Worker should terminate if pod is terminated.
if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
glog.V(3).Infof("Pod %v %v, exiting probe worker",
kubeutil.FormatPodName(w.pod), status.Phase)
return false
}
c, ok := api.GetContainerStatus(status.ContainerStatuses, w.container.Name)
if !ok {
// Either the container has not been created yet, or it was deleted.
glog.V(3).Infof("Non-existant container probed: %v - %v",
kubeutil.FormatPodName(w.pod), w.container.Name)
return true // Wait for more information.
}
if w.containerID != types.UID(c.ContainerID) {
if w.containerID != "" {
m.readinessCache.removeReadiness(string(w.containerID))
}
w.containerID = types.UID(kubecontainer.TrimRuntimePrefix(c.ContainerID))
}
if c.State.Running == nil {
glog.V(3).Infof("Non-running container probed: %v - %v",
kubeutil.FormatPodName(w.pod), w.container.Name)
m.readinessCache.setReadiness(string(w.containerID), false)
// Abort if the container will not be restarted.
return c.State.Terminated == nil ||
w.pod.Spec.RestartPolicy != api.RestartPolicyNever
}
if int64(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
// Readiness defaults to false during the initial delay.
m.readinessCache.setReadiness(string(w.containerID), false)
return true
}
// TODO: Move error handling out of prober.
result, _ := m.prober.ProbeReadiness(w.pod, status, w.container, string(w.containerID))
if result != probe.Unknown {
m.readinessCache.setReadiness(string(w.containerID), result != probe.Failure)
}
return true
}

View File

@ -0,0 +1,240 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/probe"
)
const (
containerID = "cOnTaInEr_Id"
containerName = "cOnTaInEr_NaMe"
podUID = "pOd_UiD"
)
func TestDoProbe(t *testing.T) {
m := newTestManager()
// Test statuses.
runningStatus := getRunningStatus()
pendingStatus := getRunningStatus()
pendingStatus.ContainerStatuses[0].State.Running = nil
terminatedStatus := getRunningStatus()
terminatedStatus.ContainerStatuses[0].State.Running = nil
terminatedStatus.ContainerStatuses[0].State.Terminated = &api.ContainerStateTerminated{
StartedAt: unversioned.Now(),
}
otherStatus := getRunningStatus()
otherStatus.ContainerStatuses[0].Name = "otherContainer"
failedStatus := getRunningStatus()
failedStatus.Phase = api.PodFailed
tests := []struct {
probe api.Probe
podStatus *api.PodStatus
expectContinue bool
expectReadySet bool
expectedReadiness bool
}{
{ // No status.
expectContinue: true,
},
{ // Pod failed
podStatus: &failedStatus,
},
{ // No container status
podStatus: &otherStatus,
expectContinue: true,
},
{ // Container waiting
podStatus: &pendingStatus,
expectContinue: true,
expectReadySet: true,
},
{ // Container terminated
podStatus: &terminatedStatus,
expectReadySet: true,
},
{ // Probe successful.
podStatus: &runningStatus,
expectContinue: true,
expectReadySet: true,
expectedReadiness: true,
},
{ // Initial delay passed
podStatus: &runningStatus,
probe: api.Probe{
InitialDelaySeconds: -100,
},
expectContinue: true,
expectReadySet: true,
expectedReadiness: true,
},
}
for i, test := range tests {
w := newTestWorker(test.probe)
if test.podStatus != nil {
m.statusManager.SetPodStatus(w.pod, *test.podStatus)
}
if c := doProbe(m, w); c != test.expectContinue {
t.Errorf("[%d] Expected continue to be %v but got %v", i, test.expectContinue, c)
}
ready, ok := m.readinessCache.getReadiness(containerID)
if ok != test.expectReadySet {
t.Errorf("[%d] Expected to have readiness: %v but got %v", i, test.expectReadySet, ok)
}
if ready != test.expectedReadiness {
t.Errorf("[%d] Expected readiness: %v but got %v", i, test.expectedReadiness, ready)
}
// Clean up.
m.statusManager.DeletePodStatus(podUID)
m.readinessCache.removeReadiness(containerID)
}
}
func TestInitialDelay(t *testing.T) {
m := newTestManager()
w := newTestWorker(api.Probe{
InitialDelaySeconds: 10,
})
m.statusManager.SetPodStatus(w.pod, getRunningStatus())
if !doProbe(m, w) {
t.Errorf("Expected to continue, but did not")
}
ready, ok := m.readinessCache.getReadiness(containerID)
if !ok {
t.Errorf("Expected readiness to be false, but was not set")
} else if ready {
t.Errorf("Expected readiness to be false, but was true")
}
// 100 seconds later...
laterStatus := getRunningStatus()
laterStatus.ContainerStatuses[0].State.Running.StartedAt.Time =
time.Now().Add(-100 * time.Second)
m.statusManager.SetPodStatus(w.pod, laterStatus)
// Second call should succeed (already waited).
if !doProbe(m, w) {
t.Errorf("Expected to continue, but did not")
}
ready, ok = m.readinessCache.getReadiness(containerID)
if !ok {
t.Errorf("Expected readiness to be true, but was not set")
} else if !ready {
t.Errorf("Expected readiness to be true, but was false")
}
}
func TestCleanUp(t *testing.T) {
m := newTestManager()
pod := getTestPod(api.Probe{})
m.statusManager.SetPodStatus(&pod, getRunningStatus())
m.readinessCache.setReadiness(containerID, true)
w := m.newWorker(&pod, pod.Spec.Containers[0])
m.readinessProbes[containerPath{podUID, containerName}] = w
if ready, _ := m.readinessCache.getReadiness(containerID); !ready {
t.Fatal("Expected readiness to be true.")
}
close(w.stop)
if err := waitForWorkerExit(m, []containerPath{{podUID, containerName}}); err != nil {
t.Fatal(err)
}
if _, ok := m.readinessCache.getReadiness(containerID); ok {
t.Error("Expected readiness to be cleared.")
}
if _, ok := m.readinessProbes[containerPath{podUID, containerName}]; ok {
t.Error("Expected worker to be cleared.")
}
}
func TestHandleCrash(t *testing.T) {
m := newTestManager()
m.prober = CrashingProber{}
w := newTestWorker(api.Probe{})
m.statusManager.SetPodStatus(w.pod, getRunningStatus())
// doProbe should recover from the crash, and keep going.
if !doProbe(m, w) {
t.Error("Expected to keep going, but terminated.")
}
if _, ok := m.readinessCache.getReadiness(containerID); ok {
t.Error("Expected readiness to be unchanged from crash.")
}
}
func newTestWorker(probeSpec api.Probe) *worker {
pod := getTestPod(probeSpec)
return &worker{
stop: make(chan struct{}),
pod: &pod,
container: pod.Spec.Containers[0],
spec: &probeSpec,
}
}
func getRunningStatus() api.PodStatus {
containerStatus := api.ContainerStatus{
Name: containerName,
ContainerID: containerID,
}
containerStatus.State.Running = &api.ContainerStateRunning{unversioned.Now()}
podStatus := api.PodStatus{
Phase: api.PodRunning,
ContainerStatuses: []api.ContainerStatus{containerStatus},
}
return podStatus
}
func getTestPod(probeSpec api.Probe) api.Pod {
container := api.Container{
Name: containerName,
ReadinessProbe: &probeSpec,
}
pod := api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{container},
RestartPolicy: api.RestartPolicyNever,
},
}
pod.UID = podUID
return pod
}
type CrashingProber struct{}
func (f CrashingProber) ProbeLiveness(_ *api.Pod, _ api.PodStatus, c api.Container, _ string, _ int64) (probe.Result, error) {
panic("Intentional ProbeLiveness crash.")
}
func (f CrashingProber) ProbeReadiness(_ *api.Pod, _ api.PodStatus, c api.Container, _ string) (probe.Result, error) {
panic("Intentional ProbeReadiness crash.")
}

View File

@ -94,7 +94,6 @@ type runtime struct {
generator kubecontainer.RunContainerOptionsGenerator
recorder record.EventRecorder
prober prober.Prober
readinessManager *kubecontainer.ReadinessManager
volumeGetter volumeGetter
imagePuller kubecontainer.ImagePuller
}
@ -113,7 +112,7 @@ func New(config *Config,
generator kubecontainer.RunContainerOptionsGenerator,
recorder record.EventRecorder,
containerRefManager *kubecontainer.RefManager,
readinessManager *kubecontainer.ReadinessManager,
prober prober.Prober,
volumeGetter volumeGetter) (kubecontainer.Runtime, error) {
systemdVersion, err := getSystemdVersion()
@ -151,10 +150,9 @@ func New(config *Config,
containerRefManager: containerRefManager,
generator: generator,
recorder: recorder,
readinessManager: readinessManager,
prober: prober,
volumeGetter: volumeGetter,
}
rkt.prober = prober.New(rkt, readinessManager, containerRefManager, recorder)
rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt)
// Test the rkt version.
@ -996,7 +994,7 @@ func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus
c := runningPod.FindContainerByName(container.Name)
if c == nil {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus, r.readinessManager) {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus) {
glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
// TODO(yifan): Containers in one pod are fate-sharing at this moment, see:
// https://github.com/appc/spec/issues/276.
@ -1016,7 +1014,7 @@ func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus
break
}
result, err := r.prober.Probe(pod, podStatus, container, string(c.ID), c.Created)
result, err := r.prober.ProbeLiveness(pod, podStatus, container, string(c.ID), c.Created)
// TODO(vmarmol): examine this logic.
if err == nil && result != probe.Success {
glog.Infof("Pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result)

View File

@ -44,7 +44,6 @@ func TestRunOnce(t *testing.T) {
nodeLister: testNodeLister{},
statusManager: status.NewManager(nil),
containerRefManager: kubecontainer.NewRefManager(),
readinessManager: kubecontainer.NewReadinessManager(),
podManager: podManager,
os: kubecontainer.FakeOS{},
volumeManager: newVolumeManager(),

View File

@ -118,6 +118,9 @@ func makePodSpec(readinessProbe, livenessProbe *api.Probe) *api.Pod {
Image: "gcr.io/google_containers/test-webserver",
LivenessProbe: livenessProbe,
ReadinessProbe: readinessProbe,
}, {
Name: "test-noprobe",
Image: "gcr.io/google_containers/pause",
},
},
},