mirror of https://github.com/k3s-io/k3s
commit
8c9c68c5ca
|
@ -322,8 +322,7 @@ func NewMainKubelet(
|
|||
procFs := procfs.NewProcFs()
|
||||
imageBackOff := util.NewBackOff(resyncInterval, MaxContainerBackOff)
|
||||
|
||||
readinessManager := proberesults.NewManager()
|
||||
klet.livenessManager = proberesults.NewManagerWithUpdates()
|
||||
klet.livenessManager = proberesults.NewManager()
|
||||
|
||||
// Initialize the runtime.
|
||||
switch containerRuntime {
|
||||
|
@ -419,7 +418,6 @@ func NewMainKubelet(
|
|||
|
||||
klet.probeManager = prober.NewManager(
|
||||
klet.statusManager,
|
||||
readinessManager,
|
||||
klet.livenessManager,
|
||||
klet.runner,
|
||||
containerRefManager,
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
kubeutil "k8s.io/kubernetes/pkg/kubelet/util"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
)
|
||||
|
||||
|
@ -74,19 +75,24 @@ type manager struct {
|
|||
|
||||
func NewManager(
|
||||
statusManager status.Manager,
|
||||
readinessManager results.Manager,
|
||||
livenessManager results.Manager,
|
||||
runner kubecontainer.ContainerCommandRunner,
|
||||
refManager *kubecontainer.RefManager,
|
||||
recorder record.EventRecorder) Manager {
|
||||
prober := newProber(runner, refManager, recorder)
|
||||
return &manager{
|
||||
readinessManager := results.NewManager()
|
||||
m := &manager{
|
||||
statusManager: statusManager,
|
||||
prober: prober,
|
||||
readinessManager: readinessManager,
|
||||
livenessManager: livenessManager,
|
||||
workers: make(map[probeKey]*worker),
|
||||
}
|
||||
|
||||
// Start syncing readiness.
|
||||
go util.Forever(m.updateReadiness, 0)
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
// Key uniquely identifying container probes
|
||||
|
@ -211,3 +217,10 @@ func (m *manager) removeWorker(podUID types.UID, containerName string, probeType
|
|||
defer m.workerLock.Unlock()
|
||||
delete(m.workers, probeKey{podUID, containerName, probeType})
|
||||
}
|
||||
|
||||
func (m *manager) updateReadiness() {
|
||||
update := <-m.readinessManager.Updates()
|
||||
|
||||
ready := update.Result == results.Success
|
||||
m.statusManager.SetContainerReadiness(update.Pod, update.ContainerID, ready)
|
||||
}
|
||||
|
|
|
@ -23,17 +23,17 @@ import (
|
|||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||
"k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
"k8s.io/kubernetes/pkg/probe"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
func init() {
|
||||
util.ReallyCrash = true
|
||||
}
|
||||
|
||||
var defaultProbe *api.Probe = &api.Probe{
|
||||
Handler: api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
|
@ -172,7 +172,6 @@ func TestCleanupPods(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestUpdatePodStatus(t *testing.T) {
|
||||
const podUID = "pod_uid"
|
||||
unprobed := api.ContainerStatus{
|
||||
Name: "unprobed_container",
|
||||
ContainerID: "test://unprobed_container_id",
|
||||
|
@ -218,27 +217,27 @@ func TestUpdatePodStatus(t *testing.T) {
|
|||
m := newTestManager()
|
||||
// Setup probe "workers" and cached results.
|
||||
m.workers = map[probeKey]*worker{
|
||||
probeKey{podUID, unprobed.Name, liveness}: {},
|
||||
probeKey{podUID, probedReady.Name, readiness}: {},
|
||||
probeKey{podUID, probedPending.Name, readiness}: {},
|
||||
probeKey{podUID, probedUnready.Name, readiness}: {},
|
||||
probeKey{podUID, terminated.Name, readiness}: {},
|
||||
probeKey{testPodUID, unprobed.Name, liveness}: {},
|
||||
probeKey{testPodUID, probedReady.Name, readiness}: {},
|
||||
probeKey{testPodUID, probedPending.Name, readiness}: {},
|
||||
probeKey{testPodUID, probedUnready.Name, readiness}: {},
|
||||
probeKey{testPodUID, terminated.Name, readiness}: {},
|
||||
}
|
||||
m.readinessManager.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success, nil)
|
||||
m.readinessManager.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure, nil)
|
||||
m.readinessManager.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success, nil)
|
||||
m.readinessManager.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success, &api.Pod{})
|
||||
m.readinessManager.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure, &api.Pod{})
|
||||
m.readinessManager.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success, &api.Pod{})
|
||||
|
||||
m.UpdatePodStatus(podUID, &podStatus)
|
||||
m.UpdatePodStatus(testPodUID, &podStatus)
|
||||
|
||||
expectedReadiness := map[probeKey]bool{
|
||||
probeKey{podUID, unprobed.Name, readiness}: true,
|
||||
probeKey{podUID, probedReady.Name, readiness}: true,
|
||||
probeKey{podUID, probedPending.Name, readiness}: false,
|
||||
probeKey{podUID, probedUnready.Name, readiness}: false,
|
||||
probeKey{podUID, terminated.Name, readiness}: false,
|
||||
probeKey{testPodUID, unprobed.Name, readiness}: true,
|
||||
probeKey{testPodUID, probedReady.Name, readiness}: true,
|
||||
probeKey{testPodUID, probedPending.Name, readiness}: false,
|
||||
probeKey{testPodUID, probedUnready.Name, readiness}: false,
|
||||
probeKey{testPodUID, terminated.Name, readiness}: false,
|
||||
}
|
||||
for _, c := range podStatus.ContainerStatuses {
|
||||
expected, ok := expectedReadiness[probeKey{podUID, c.Name, readiness}]
|
||||
expected, ok := expectedReadiness[probeKey{testPodUID, c.Name, readiness}]
|
||||
if !ok {
|
||||
t.Fatalf("Missing expectation for test case: %v", c.Name)
|
||||
}
|
||||
|
@ -249,6 +248,31 @@ func TestUpdatePodStatus(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestUpdateReadiness(t *testing.T) {
|
||||
testPod := getTestPod(readiness, api.Probe{})
|
||||
m := newTestManager()
|
||||
m.statusManager.SetPodStatus(&testPod, getTestRunningStatus())
|
||||
|
||||
m.AddPod(&testPod)
|
||||
probePaths := []probeKey{{testPodUID, testContainerName, readiness}}
|
||||
if err := expectProbes(m, probePaths); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// Wait for ready status.
|
||||
if err := waitForReadyStatus(m, true); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// Prober fails.
|
||||
m.prober.exec = fakeExecProber{probe.Failure, nil}
|
||||
|
||||
// Wait for failed status.
|
||||
if err := waitForReadyStatus(m, false); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func expectProbes(m *manager, expectedProbes []probeKey) error {
|
||||
m.workerLock.RLock()
|
||||
defer m.workerLock.RUnlock()
|
||||
|
@ -275,24 +299,10 @@ outer:
|
|||
return fmt.Errorf("Unexpected probes: %v; Missing probes: %v;", unexpected, missing)
|
||||
}
|
||||
|
||||
func newTestManager() *manager {
|
||||
m := NewManager(
|
||||
status.NewManager(&testclient.Fake{}, kubepod.NewBasicPodManager(nil)),
|
||||
results.NewManager(),
|
||||
results.NewManager(),
|
||||
nil, // runner
|
||||
kubecontainer.NewRefManager(),
|
||||
&record.FakeRecorder{},
|
||||
).(*manager)
|
||||
// Don't actually execute probes.
|
||||
m.prober.exec = fakeExecProber{probe.Success, nil}
|
||||
return m
|
||||
}
|
||||
const interval = 100 * time.Millisecond
|
||||
|
||||
// Wait for the given workers to exit & clean up.
|
||||
func waitForWorkerExit(m *manager, workerPaths []probeKey) error {
|
||||
const interval = 100 * time.Millisecond
|
||||
|
||||
for _, w := range workerPaths {
|
||||
condition := func() (bool, error) {
|
||||
_, exists := m.getWorker(w.podUID, w.containerName, w.probeType)
|
||||
|
@ -309,3 +319,27 @@ func waitForWorkerExit(m *manager, workerPaths []probeKey) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Wait for the given workers to exit & clean up.
|
||||
func waitForReadyStatus(m *manager, ready bool) error {
|
||||
condition := func() (bool, error) {
|
||||
status, ok := m.statusManager.GetPodStatus(testPodUID)
|
||||
if !ok {
|
||||
return false, fmt.Errorf("status not found: %q", testPodUID)
|
||||
}
|
||||
if len(status.ContainerStatuses) != 1 {
|
||||
return false, fmt.Errorf("expected single container, found %d", len(status.ContainerStatuses))
|
||||
}
|
||||
if status.ContainerStatuses[0].ContainerID != testContainerID.String() {
|
||||
return false, fmt.Errorf("expected container %q, found %q",
|
||||
testContainerID, status.ContainerStatuses[0].ContainerID)
|
||||
}
|
||||
return status.ContainerStatuses[0].Ready == ready, nil
|
||||
}
|
||||
glog.Infof("Polling for ready state %v", ready)
|
||||
if err := wait.Poll(interval, util.ForeverTestTimeout, condition); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -27,7 +27,6 @@ import (
|
|||
"k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
"k8s.io/kubernetes/pkg/probe"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/exec"
|
||||
)
|
||||
|
||||
func TestFormatURL(t *testing.T) {
|
||||
|
@ -246,12 +245,3 @@ func TestProbe(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
type fakeExecProber struct {
|
||||
result probe.Result
|
||||
err error
|
||||
}
|
||||
|
||||
func (p fakeExecProber) Probe(_ exec.Cmd) (probe.Result, string, error) {
|
||||
return p.result, "", p.err
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ type manager struct {
|
|||
sync.RWMutex
|
||||
// map of container ID -> probe Result
|
||||
cache map[kubecontainer.ContainerID]Result
|
||||
// channel of updates (may be nil)
|
||||
// channel of updates
|
||||
updates chan Update
|
||||
}
|
||||
|
||||
|
@ -78,15 +78,10 @@ var _ Manager = &manager{}
|
|||
|
||||
// NewManager creates ane returns an empty results manager.
|
||||
func NewManager() Manager {
|
||||
m := &manager{cache: make(map[kubecontainer.ContainerID]Result)}
|
||||
return m
|
||||
}
|
||||
|
||||
// NewManager creates ane returns an empty results manager.
|
||||
func NewManagerWithUpdates() Manager {
|
||||
m := NewManager().(*manager)
|
||||
m.updates = make(chan Update, 20)
|
||||
return m
|
||||
return &manager{
|
||||
cache: make(map[kubecontainer.ContainerID]Result),
|
||||
updates: make(chan Update, 20),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) {
|
||||
|
@ -98,7 +93,7 @@ func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) {
|
|||
|
||||
func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *api.Pod) {
|
||||
if m.setInternal(id, result) {
|
||||
m.pushUpdate(Update{id, result, pod})
|
||||
m.updates <- Update{id, result, pod}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -123,10 +118,3 @@ func (m *manager) Remove(id kubecontainer.ContainerID) {
|
|||
func (m *manager) Updates() <-chan Update {
|
||||
return m.updates
|
||||
}
|
||||
|
||||
// pushUpdates sends an update on the updates channel if it is initialized.
|
||||
func (m *manager) pushUpdate(update Update) {
|
||||
if m.updates != nil {
|
||||
m.updates <- update
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,7 +46,7 @@ func TestCacheOperations(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestUpdates(t *testing.T) {
|
||||
m := NewManagerWithUpdates()
|
||||
m := NewManager()
|
||||
|
||||
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "test-pod"}}
|
||||
fooID := kubecontainer.ContainerID{"test", "foo"}
|
||||
|
|
|
@ -0,0 +1,122 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package prober
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||
"k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
"k8s.io/kubernetes/pkg/probe"
|
||||
"k8s.io/kubernetes/pkg/util/exec"
|
||||
)
|
||||
|
||||
const (
|
||||
testContainerName = "cOnTaInEr_NaMe"
|
||||
testPodUID = "pOd_UiD"
|
||||
)
|
||||
|
||||
var testContainerID = kubecontainer.ContainerID{"test", "cOnTaInEr_Id"}
|
||||
|
||||
func getTestRunningStatus() api.PodStatus {
|
||||
containerStatus := api.ContainerStatus{
|
||||
Name: testContainerName,
|
||||
ContainerID: testContainerID.String(),
|
||||
}
|
||||
containerStatus.State.Running = &api.ContainerStateRunning{unversioned.Now()}
|
||||
podStatus := api.PodStatus{
|
||||
Phase: api.PodRunning,
|
||||
ContainerStatuses: []api.ContainerStatus{containerStatus},
|
||||
}
|
||||
return podStatus
|
||||
}
|
||||
|
||||
func getTestPod(probeType probeType, probeSpec api.Probe) api.Pod {
|
||||
container := api.Container{
|
||||
Name: testContainerName,
|
||||
}
|
||||
|
||||
// All tests rely on the fake exec prober.
|
||||
probeSpec.Handler = api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
}
|
||||
|
||||
// Apply test defaults, overwridden for test speed.
|
||||
defaults := map[string]int64{
|
||||
"TimeoutSeconds": 1,
|
||||
"PeriodSeconds": 1,
|
||||
"SuccessThreshold": 1,
|
||||
"FailureThreshold": 1,
|
||||
}
|
||||
for field, value := range defaults {
|
||||
f := reflect.ValueOf(&probeSpec).Elem().FieldByName(field)
|
||||
if f.Int() == 0 {
|
||||
f.SetInt(value)
|
||||
}
|
||||
}
|
||||
|
||||
switch probeType {
|
||||
case readiness:
|
||||
container.ReadinessProbe = &probeSpec
|
||||
case liveness:
|
||||
container.LivenessProbe = &probeSpec
|
||||
}
|
||||
pod := api.Pod{
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{container},
|
||||
RestartPolicy: api.RestartPolicyNever,
|
||||
},
|
||||
}
|
||||
pod.Name = "testPod"
|
||||
pod.UID = testPodUID
|
||||
return pod
|
||||
}
|
||||
|
||||
func newTestManager() *manager {
|
||||
refManager := kubecontainer.NewRefManager()
|
||||
refManager.SetRef(testContainerID, &api.ObjectReference{}) // Suppress prober warnings.
|
||||
m := NewManager(
|
||||
status.NewManager(&testclient.Fake{}, kubepod.NewBasicPodManager(nil)),
|
||||
results.NewManager(),
|
||||
nil, // runner
|
||||
refManager,
|
||||
&record.FakeRecorder{},
|
||||
).(*manager)
|
||||
// Don't actually execute probes.
|
||||
m.prober.exec = fakeExecProber{probe.Success, nil}
|
||||
return m
|
||||
}
|
||||
|
||||
func newTestWorker(m *manager, probeType probeType, probeSpec api.Probe) *worker {
|
||||
pod := getTestPod(probeType, probeSpec)
|
||||
return newWorker(m, probeType, &pod, pod.Spec.Containers[0])
|
||||
}
|
||||
|
||||
type fakeExecProber struct {
|
||||
result probe.Result
|
||||
err error
|
||||
}
|
||||
|
||||
func (p fakeExecProber) Probe(_ exec.Cmd) (probe.Result, string, error) {
|
||||
return p.result, "", p.err
|
||||
}
|
|
@ -18,7 +18,6 @@ package prober
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -36,28 +35,25 @@ import (
|
|||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
const (
|
||||
containerName = "cOnTaInEr_NaMe"
|
||||
podUID = "pOd_UiD"
|
||||
)
|
||||
|
||||
var containerID = kubecontainer.ContainerID{"test", "cOnTaInEr_Id"}
|
||||
func init() {
|
||||
util.ReallyCrash = true
|
||||
}
|
||||
|
||||
func TestDoProbe(t *testing.T) {
|
||||
m := newTestManager()
|
||||
|
||||
// Test statuses.
|
||||
runningStatus := getRunningStatus()
|
||||
pendingStatus := getRunningStatus()
|
||||
runningStatus := getTestRunningStatus()
|
||||
pendingStatus := getTestRunningStatus()
|
||||
pendingStatus.ContainerStatuses[0].State.Running = nil
|
||||
terminatedStatus := getRunningStatus()
|
||||
terminatedStatus := getTestRunningStatus()
|
||||
terminatedStatus.ContainerStatuses[0].State.Running = nil
|
||||
terminatedStatus.ContainerStatuses[0].State.Terminated = &api.ContainerStateTerminated{
|
||||
StartedAt: unversioned.Now(),
|
||||
}
|
||||
otherStatus := getRunningStatus()
|
||||
otherStatus := getTestRunningStatus()
|
||||
otherStatus.ContainerStatuses[0].Name = "otherContainer"
|
||||
failedStatus := getRunningStatus()
|
||||
failedStatus := getTestRunningStatus()
|
||||
failedStatus.Phase = api.PodFailed
|
||||
|
||||
tests := []struct {
|
||||
|
@ -112,7 +108,7 @@ func TestDoProbe(t *testing.T) {
|
|||
if c := w.doProbe(); c != test.expectContinue {
|
||||
t.Errorf("[%s-%d] Expected continue to be %v but got %v", probeType, i, test.expectContinue, c)
|
||||
}
|
||||
result, ok := resultsManager(m, probeType).Get(containerID)
|
||||
result, ok := resultsManager(m, probeType).Get(testContainerID)
|
||||
if ok != test.expectSet {
|
||||
t.Errorf("[%s-%d] Expected to have result: %v but got %v", probeType, i, test.expectSet, ok)
|
||||
}
|
||||
|
@ -122,7 +118,7 @@ func TestDoProbe(t *testing.T) {
|
|||
|
||||
// Clean up.
|
||||
m.statusManager = status.NewManager(&testclient.Fake{}, kubepod.NewBasicPodManager(nil))
|
||||
resultsManager(m, probeType).Remove(containerID)
|
||||
resultsManager(m, probeType).Remove(testContainerID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -134,13 +130,13 @@ func TestInitialDelay(t *testing.T) {
|
|||
w := newTestWorker(m, probeType, api.Probe{
|
||||
InitialDelaySeconds: 10,
|
||||
})
|
||||
m.statusManager.SetPodStatus(w.pod, getRunningStatus())
|
||||
m.statusManager.SetPodStatus(w.pod, getTestRunningStatus())
|
||||
|
||||
expectContinue(t, w, w.doProbe(), "during initial delay")
|
||||
expectResult(t, w, results.Result(probeType == liveness), "during initial delay")
|
||||
|
||||
// 100 seconds later...
|
||||
laterStatus := getRunningStatus()
|
||||
laterStatus := getTestRunningStatus()
|
||||
laterStatus.ContainerStatuses[0].State.Running.StartedAt.Time =
|
||||
time.Now().Add(-100 * time.Second)
|
||||
m.statusManager.SetPodStatus(w.pod, laterStatus)
|
||||
|
@ -153,8 +149,8 @@ func TestInitialDelay(t *testing.T) {
|
|||
|
||||
func TestFailureThreshold(t *testing.T) {
|
||||
m := newTestManager()
|
||||
w := newTestWorker(m, readiness, api.Probe{})
|
||||
m.statusManager.SetPodStatus(w.pod, getRunningStatus())
|
||||
w := newTestWorker(m, readiness, api.Probe{SuccessThreshold: 1, FailureThreshold: 3})
|
||||
m.statusManager.SetPodStatus(w.pod, getTestRunningStatus())
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
// First probe should succeed.
|
||||
|
@ -171,7 +167,7 @@ func TestFailureThreshold(t *testing.T) {
|
|||
|
||||
// Next 2 probes should still be "success".
|
||||
for j := 0; j < 2; j++ {
|
||||
msg := fmt.Sprintf("%d failure (%d)", j+1, i)
|
||||
msg := fmt.Sprintf("%d failing (%d)", j+1, i)
|
||||
expectContinue(t, w, w.doProbe(), msg)
|
||||
expectResult(t, w, results.Success, msg)
|
||||
}
|
||||
|
@ -188,10 +184,10 @@ func TestFailureThreshold(t *testing.T) {
|
|||
func TestSuccessThreshold(t *testing.T) {
|
||||
m := newTestManager()
|
||||
w := newTestWorker(m, readiness, api.Probe{SuccessThreshold: 3, FailureThreshold: 1})
|
||||
m.statusManager.SetPodStatus(w.pod, getRunningStatus())
|
||||
m.statusManager.SetPodStatus(w.pod, getTestRunningStatus())
|
||||
|
||||
// Start out failure.
|
||||
w.resultsManager.Set(containerID, results.Failure, nil)
|
||||
w.resultsManager.Set(testContainerID, results.Failure, &api.Pod{})
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
// Probe defaults to Failure.
|
||||
|
@ -223,15 +219,15 @@ func TestCleanUp(t *testing.T) {
|
|||
m := newTestManager()
|
||||
|
||||
for _, probeType := range [...]probeType{liveness, readiness} {
|
||||
key := probeKey{podUID, containerName, probeType}
|
||||
key := probeKey{testPodUID, testContainerName, probeType}
|
||||
w := newTestWorker(m, probeType, api.Probe{})
|
||||
m.statusManager.SetPodStatus(w.pod, getRunningStatus())
|
||||
m.statusManager.SetPodStatus(w.pod, getTestRunningStatus())
|
||||
go w.run()
|
||||
m.workers[key] = w
|
||||
|
||||
// Wait for worker to run.
|
||||
condition := func() (bool, error) {
|
||||
ready, _ := resultsManager(m, probeType).Get(containerID)
|
||||
ready, _ := resultsManager(m, probeType).Get(testContainerID)
|
||||
return ready == results.Success, nil
|
||||
}
|
||||
if ready, _ := condition(); !ready {
|
||||
|
@ -245,7 +241,7 @@ func TestCleanUp(t *testing.T) {
|
|||
t.Fatalf("[%s] error waiting for worker exit: %v", probeType, err)
|
||||
}
|
||||
|
||||
if _, ok := resultsManager(m, probeType).Get(containerID); ok {
|
||||
if _, ok := resultsManager(m, probeType).Get(testContainerID); ok {
|
||||
t.Errorf("[%s] Expected result to be cleared.", probeType)
|
||||
}
|
||||
if _, ok := m.workers[key]; ok {
|
||||
|
@ -255,9 +251,11 @@ func TestCleanUp(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestHandleCrash(t *testing.T) {
|
||||
util.ReallyCrash = false // Test that we *don't* really crash.
|
||||
|
||||
m := newTestManager()
|
||||
w := newTestWorker(m, readiness, api.Probe{})
|
||||
m.statusManager.SetPodStatus(w.pod, getRunningStatus())
|
||||
m.statusManager.SetPodStatus(w.pod, getTestRunningStatus())
|
||||
|
||||
expectContinue(t, w, w.doProbe(), "Initial successful probe.")
|
||||
expectResult(t, w, results.Success, "Initial successful probe.")
|
||||
|
@ -274,64 +272,8 @@ func TestHandleCrash(t *testing.T) {
|
|||
expectResult(t, w, results.Success, "Crashing probe unchanged.")
|
||||
}
|
||||
|
||||
func newTestWorker(m *manager, probeType probeType, probeSpec api.Probe) *worker {
|
||||
// All tests rely on the fake exec prober.
|
||||
probeSpec.Handler = api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
}
|
||||
// Apply default values.
|
||||
defaults := map[string]int64{
|
||||
"TimeoutSeconds": 1,
|
||||
"PeriodSeconds": 10,
|
||||
"SuccessThreshold": 1,
|
||||
"FailureThreshold": 3,
|
||||
}
|
||||
for field, value := range defaults {
|
||||
f := reflect.ValueOf(&probeSpec).Elem().FieldByName(field)
|
||||
if f.Int() == 0 {
|
||||
f.SetInt(value)
|
||||
}
|
||||
}
|
||||
|
||||
pod := getTestPod(probeType, probeSpec)
|
||||
return newWorker(m, probeType, &pod, pod.Spec.Containers[0])
|
||||
}
|
||||
|
||||
func getRunningStatus() api.PodStatus {
|
||||
containerStatus := api.ContainerStatus{
|
||||
Name: containerName,
|
||||
ContainerID: containerID.String(),
|
||||
}
|
||||
containerStatus.State.Running = &api.ContainerStateRunning{unversioned.Now()}
|
||||
podStatus := api.PodStatus{
|
||||
Phase: api.PodRunning,
|
||||
ContainerStatuses: []api.ContainerStatus{containerStatus},
|
||||
}
|
||||
return podStatus
|
||||
}
|
||||
|
||||
func getTestPod(probeType probeType, probeSpec api.Probe) api.Pod {
|
||||
container := api.Container{
|
||||
Name: containerName,
|
||||
}
|
||||
switch probeType {
|
||||
case readiness:
|
||||
container.ReadinessProbe = &probeSpec
|
||||
case liveness:
|
||||
container.LivenessProbe = &probeSpec
|
||||
}
|
||||
pod := api.Pod{
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{container},
|
||||
RestartPolicy: api.RestartPolicyNever,
|
||||
},
|
||||
}
|
||||
pod.UID = podUID
|
||||
return pod
|
||||
}
|
||||
|
||||
func expectResult(t *testing.T, w *worker, expectedResult results.Result, msg string) {
|
||||
result, ok := resultsManager(w.probeManager, w.probeType).Get(containerID)
|
||||
result, ok := resultsManager(w.probeManager, w.probeType).Get(testContainerID)
|
||||
if !ok {
|
||||
t.Errorf("[%s - %s] Expected result to be set, but was not set", w.probeType, msg)
|
||||
} else if result != expectedResult {
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
|
||||
|
@ -77,6 +78,10 @@ type Manager interface {
|
|||
// SetPodStatus caches updates the cached status for the given pod, and triggers a status update.
|
||||
SetPodStatus(pod *api.Pod, status api.PodStatus)
|
||||
|
||||
// SetContainerReadiness updates the cached container status with the given readiness, and
|
||||
// triggers a status update.
|
||||
SetContainerReadiness(pod *api.Pod, containerID kubecontainer.ContainerID, ready bool)
|
||||
|
||||
// TerminatePods resets the container status for the provided pods to terminated and triggers
|
||||
// a status update. This function may not enqueue all the provided pods, in which case it will
|
||||
// return false
|
||||
|
@ -171,19 +176,50 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
|
|||
status.StartTime = &now
|
||||
}
|
||||
|
||||
newStatus := m.updateStatusInternal(pod, status)
|
||||
if newStatus != nil {
|
||||
select {
|
||||
case m.podStatusChannel <- podStatusSyncRequest{pod.UID, *newStatus}:
|
||||
default:
|
||||
// Let the periodic syncBatch handle the update if the channel is full.
|
||||
// We can't block, since we hold the mutex lock.
|
||||
m.updateStatusInternal(pod, status)
|
||||
}
|
||||
|
||||
func (m *manager) SetContainerReadiness(pod *api.Pod, containerID kubecontainer.ContainerID, ready bool) {
|
||||
m.podStatusesLock.Lock()
|
||||
defer m.podStatusesLock.Unlock()
|
||||
|
||||
oldStatus, found := m.podStatuses[pod.UID]
|
||||
if !found {
|
||||
glog.Warningf("Container readiness changed before pod has synced: %q - %q",
|
||||
kubeletutil.FormatPodName(pod), containerID.String())
|
||||
return
|
||||
}
|
||||
status := oldStatus.status
|
||||
|
||||
// Find the container to update.
|
||||
containerIndex := -1
|
||||
for i, c := range status.ContainerStatuses {
|
||||
if c.ContainerID == containerID.String() {
|
||||
containerIndex = i
|
||||
break
|
||||
}
|
||||
}
|
||||
if containerIndex == -1 {
|
||||
glog.Warningf("Container readiness changed for unknown container: %q - %q",
|
||||
kubeletutil.FormatPodName(pod), containerID.String())
|
||||
return
|
||||
}
|
||||
|
||||
if status.ContainerStatuses[containerIndex].Ready == ready {
|
||||
glog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready,
|
||||
kubeletutil.FormatPodName(pod), containerID.String())
|
||||
return
|
||||
}
|
||||
|
||||
// Make sure we're not updating the cached version.
|
||||
status.ContainerStatuses = make([]api.ContainerStatus, len(status.ContainerStatuses))
|
||||
copy(status.ContainerStatuses, oldStatus.status.ContainerStatuses)
|
||||
status.ContainerStatuses[containerIndex].Ready = ready
|
||||
m.updateStatusInternal(pod, status)
|
||||
}
|
||||
|
||||
func (m *manager) TerminatePods(pods []*api.Pod) bool {
|
||||
sent := true
|
||||
allSent := true
|
||||
m.podStatusesLock.Lock()
|
||||
defer m.podStatusesLock.Unlock()
|
||||
for _, pod := range pods {
|
||||
|
@ -192,39 +228,41 @@ func (m *manager) TerminatePods(pods []*api.Pod) bool {
|
|||
Terminated: &api.ContainerStateTerminated{},
|
||||
}
|
||||
}
|
||||
newStatus := m.updateStatusInternal(pod, pod.Status)
|
||||
if newStatus != nil {
|
||||
select {
|
||||
case m.podStatusChannel <- podStatusSyncRequest{pod.UID, *newStatus}:
|
||||
default:
|
||||
sent = false
|
||||
glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletutil.FormatPodName(pod))
|
||||
}
|
||||
} else {
|
||||
sent = false
|
||||
if sent := m.updateStatusInternal(pod, pod.Status); !sent {
|
||||
glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletutil.FormatPodName(pod))
|
||||
allSent = false
|
||||
}
|
||||
}
|
||||
return sent
|
||||
return allSent
|
||||
}
|
||||
|
||||
// updateStatusInternal updates the internal status cache, and returns a versioned status if an
|
||||
// update is necessary. This method IS NOT THREAD SAFE and must be called from a locked function.
|
||||
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) *versionedPodStatus {
|
||||
// updateStatusInternal updates the internal status cache, and queues an update to the api server if
|
||||
// necessary. Returns whether an update was triggered.
|
||||
// This method IS NOT THREAD SAFE and must be called from a locked function.
|
||||
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool {
|
||||
// The intent here is to prevent concurrent updates to a pod's status from
|
||||
// clobbering each other so the phase of a pod progresses monotonically.
|
||||
oldStatus, found := m.podStatuses[pod.UID]
|
||||
if !found || !isStatusEqual(&oldStatus.status, &status) || pod.DeletionTimestamp != nil {
|
||||
newStatus := versionedPodStatus{
|
||||
status: status,
|
||||
version: oldStatus.version + 1,
|
||||
podName: pod.Name,
|
||||
podNamespace: pod.Namespace,
|
||||
}
|
||||
m.podStatuses[pod.UID] = newStatus
|
||||
return &newStatus
|
||||
} else {
|
||||
if found && isStatusEqual(&oldStatus.status, &status) && pod.DeletionTimestamp == nil {
|
||||
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", kubeletutil.FormatPodName(pod), status)
|
||||
return nil // No new status.
|
||||
return false // No new status.
|
||||
}
|
||||
|
||||
newStatus := versionedPodStatus{
|
||||
status: status,
|
||||
version: oldStatus.version + 1,
|
||||
podName: pod.Name,
|
||||
podNamespace: pod.Namespace,
|
||||
}
|
||||
m.podStatuses[pod.UID] = newStatus
|
||||
|
||||
select {
|
||||
case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
|
||||
return true
|
||||
default:
|
||||
// Let the periodic syncBatch handle the update if the channel is full.
|
||||
// We can't block, since we hold the mutex lock.
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
|
@ -499,3 +500,37 @@ func TestStaticPodStatus(t *testing.T) {
|
|||
_, found := m.GetPodStatus(otherPod.UID)
|
||||
assert.False(t, found, "otherPod status should have been deleted")
|
||||
}
|
||||
|
||||
func TestSetContainerReadiness(t *testing.T) {
|
||||
containerID := kubecontainer.ContainerID{"test", "cOnTaInEr_Id"}
|
||||
containerStatus := api.ContainerStatus{
|
||||
Name: "cOnTaInEr_NaMe",
|
||||
ContainerID: containerID.String(),
|
||||
Ready: false,
|
||||
}
|
||||
status := api.PodStatus{
|
||||
ContainerStatuses: []api.ContainerStatus{containerStatus},
|
||||
}
|
||||
|
||||
m := newTestManager(&testclient.Fake{})
|
||||
|
||||
t.Log("Setting readiness before status should fail.")
|
||||
m.SetContainerReadiness(testPod, containerID, true)
|
||||
verifyUpdates(t, m, 0)
|
||||
|
||||
t.Log("Setting initial status.")
|
||||
m.SetPodStatus(testPod, status)
|
||||
verifyUpdates(t, m, 1)
|
||||
|
||||
t.Log("Setting unchanged readiness should do nothing.")
|
||||
m.SetContainerReadiness(testPod, containerID, false)
|
||||
verifyUpdates(t, m, 0)
|
||||
|
||||
t.Log("Setting different readiness should generate update.")
|
||||
m.SetContainerReadiness(testPod, containerID, true)
|
||||
verifyUpdates(t, m, 1)
|
||||
|
||||
t.Log("Setting non-existant container readiness should fail.")
|
||||
m.SetContainerReadiness(testPod, kubecontainer.ContainerID{"test", "foo"}, true)
|
||||
verifyUpdates(t, m, 0)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue