Don't wait for sync to update readiness

Push status updates as soon as readiness state changes for containers,
rather than waiting for the sync loop to update the status. In
particular, this should help new containers to come online faster.

Additionally, consolidates prober test helpers into a single file.
pull/6/head
Tim St. Clair 2015-11-10 14:00:12 -08:00
parent 447fe209ab
commit 67cfed5bf3
10 changed files with 345 additions and 185 deletions

View File

@ -322,8 +322,7 @@ func NewMainKubelet(
procFs := procfs.NewProcFs() procFs := procfs.NewProcFs()
imageBackOff := util.NewBackOff(resyncInterval, MaxContainerBackOff) imageBackOff := util.NewBackOff(resyncInterval, MaxContainerBackOff)
readinessManager := proberesults.NewManager() klet.livenessManager = proberesults.NewManager()
klet.livenessManager = proberesults.NewManagerWithUpdates()
// Initialize the runtime. // Initialize the runtime.
switch containerRuntime { switch containerRuntime {
@ -419,7 +418,6 @@ func NewMainKubelet(
klet.probeManager = prober.NewManager( klet.probeManager = prober.NewManager(
klet.statusManager, klet.statusManager,
readinessManager,
klet.livenessManager, klet.livenessManager,
klet.runner, klet.runner,
containerRefManager, containerRefManager,

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
kubeutil "k8s.io/kubernetes/pkg/kubelet/util" kubeutil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
@ -74,19 +75,24 @@ type manager struct {
func NewManager( func NewManager(
statusManager status.Manager, statusManager status.Manager,
readinessManager results.Manager,
livenessManager results.Manager, livenessManager results.Manager,
runner kubecontainer.ContainerCommandRunner, runner kubecontainer.ContainerCommandRunner,
refManager *kubecontainer.RefManager, refManager *kubecontainer.RefManager,
recorder record.EventRecorder) Manager { recorder record.EventRecorder) Manager {
prober := newProber(runner, refManager, recorder) prober := newProber(runner, refManager, recorder)
return &manager{ readinessManager := results.NewManager()
m := &manager{
statusManager: statusManager, statusManager: statusManager,
prober: prober, prober: prober,
readinessManager: readinessManager, readinessManager: readinessManager,
livenessManager: livenessManager, livenessManager: livenessManager,
workers: make(map[probeKey]*worker), workers: make(map[probeKey]*worker),
} }
// Start syncing readiness.
go util.Forever(m.updateReadiness, 0)
return m
} }
// Key uniquely identifying container probes // Key uniquely identifying container probes
@ -211,3 +217,10 @@ func (m *manager) removeWorker(podUID types.UID, containerName string, probeType
defer m.workerLock.Unlock() defer m.workerLock.Unlock()
delete(m.workers, probeKey{podUID, containerName, probeType}) delete(m.workers, probeKey{podUID, containerName, probeType})
} }
func (m *manager) updateReadiness() {
update := <-m.readinessManager.Updates()
ready := update.Result == results.Success
m.statusManager.SetContainerReadiness(update.Pod, update.ContainerID, ready)
}

View File

@ -23,17 +23,17 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
) )
func init() {
util.ReallyCrash = true
}
var defaultProbe *api.Probe = &api.Probe{ var defaultProbe *api.Probe = &api.Probe{
Handler: api.Handler{ Handler: api.Handler{
Exec: &api.ExecAction{}, Exec: &api.ExecAction{},
@ -172,7 +172,6 @@ func TestCleanupPods(t *testing.T) {
} }
func TestUpdatePodStatus(t *testing.T) { func TestUpdatePodStatus(t *testing.T) {
const podUID = "pod_uid"
unprobed := api.ContainerStatus{ unprobed := api.ContainerStatus{
Name: "unprobed_container", Name: "unprobed_container",
ContainerID: "test://unprobed_container_id", ContainerID: "test://unprobed_container_id",
@ -218,27 +217,27 @@ func TestUpdatePodStatus(t *testing.T) {
m := newTestManager() m := newTestManager()
// Setup probe "workers" and cached results. // Setup probe "workers" and cached results.
m.workers = map[probeKey]*worker{ m.workers = map[probeKey]*worker{
probeKey{podUID, unprobed.Name, liveness}: {}, probeKey{testPodUID, unprobed.Name, liveness}: {},
probeKey{podUID, probedReady.Name, readiness}: {}, probeKey{testPodUID, probedReady.Name, readiness}: {},
probeKey{podUID, probedPending.Name, readiness}: {}, probeKey{testPodUID, probedPending.Name, readiness}: {},
probeKey{podUID, probedUnready.Name, readiness}: {}, probeKey{testPodUID, probedUnready.Name, readiness}: {},
probeKey{podUID, terminated.Name, readiness}: {}, probeKey{testPodUID, terminated.Name, readiness}: {},
} }
m.readinessManager.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success, nil) m.readinessManager.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success, &api.Pod{})
m.readinessManager.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure, nil) m.readinessManager.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure, &api.Pod{})
m.readinessManager.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success, nil) m.readinessManager.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success, &api.Pod{})
m.UpdatePodStatus(podUID, &podStatus) m.UpdatePodStatus(testPodUID, &podStatus)
expectedReadiness := map[probeKey]bool{ expectedReadiness := map[probeKey]bool{
probeKey{podUID, unprobed.Name, readiness}: true, probeKey{testPodUID, unprobed.Name, readiness}: true,
probeKey{podUID, probedReady.Name, readiness}: true, probeKey{testPodUID, probedReady.Name, readiness}: true,
probeKey{podUID, probedPending.Name, readiness}: false, probeKey{testPodUID, probedPending.Name, readiness}: false,
probeKey{podUID, probedUnready.Name, readiness}: false, probeKey{testPodUID, probedUnready.Name, readiness}: false,
probeKey{podUID, terminated.Name, readiness}: false, probeKey{testPodUID, terminated.Name, readiness}: false,
} }
for _, c := range podStatus.ContainerStatuses { for _, c := range podStatus.ContainerStatuses {
expected, ok := expectedReadiness[probeKey{podUID, c.Name, readiness}] expected, ok := expectedReadiness[probeKey{testPodUID, c.Name, readiness}]
if !ok { if !ok {
t.Fatalf("Missing expectation for test case: %v", c.Name) t.Fatalf("Missing expectation for test case: %v", c.Name)
} }
@ -249,6 +248,31 @@ func TestUpdatePodStatus(t *testing.T) {
} }
} }
func TestUpdateReadiness(t *testing.T) {
testPod := getTestPod(readiness, api.Probe{})
m := newTestManager()
m.statusManager.SetPodStatus(&testPod, getTestRunningStatus())
m.AddPod(&testPod)
probePaths := []probeKey{{testPodUID, testContainerName, readiness}}
if err := expectProbes(m, probePaths); err != nil {
t.Error(err)
}
// Wait for ready status.
if err := waitForReadyStatus(m, true); err != nil {
t.Error(err)
}
// Prober fails.
m.prober.exec = fakeExecProber{probe.Failure, nil}
// Wait for failed status.
if err := waitForReadyStatus(m, false); err != nil {
t.Error(err)
}
}
func expectProbes(m *manager, expectedProbes []probeKey) error { func expectProbes(m *manager, expectedProbes []probeKey) error {
m.workerLock.RLock() m.workerLock.RLock()
defer m.workerLock.RUnlock() defer m.workerLock.RUnlock()
@ -275,24 +299,10 @@ outer:
return fmt.Errorf("Unexpected probes: %v; Missing probes: %v;", unexpected, missing) return fmt.Errorf("Unexpected probes: %v; Missing probes: %v;", unexpected, missing)
} }
func newTestManager() *manager { const interval = 100 * time.Millisecond
m := NewManager(
status.NewManager(&testclient.Fake{}, kubepod.NewBasicPodManager(nil)),
results.NewManager(),
results.NewManager(),
nil, // runner
kubecontainer.NewRefManager(),
&record.FakeRecorder{},
).(*manager)
// Don't actually execute probes.
m.prober.exec = fakeExecProber{probe.Success, nil}
return m
}
// Wait for the given workers to exit & clean up. // Wait for the given workers to exit & clean up.
func waitForWorkerExit(m *manager, workerPaths []probeKey) error { func waitForWorkerExit(m *manager, workerPaths []probeKey) error {
const interval = 100 * time.Millisecond
for _, w := range workerPaths { for _, w := range workerPaths {
condition := func() (bool, error) { condition := func() (bool, error) {
_, exists := m.getWorker(w.podUID, w.containerName, w.probeType) _, exists := m.getWorker(w.podUID, w.containerName, w.probeType)
@ -309,3 +319,27 @@ func waitForWorkerExit(m *manager, workerPaths []probeKey) error {
return nil return nil
} }
// Wait for the given workers to exit & clean up.
func waitForReadyStatus(m *manager, ready bool) error {
condition := func() (bool, error) {
status, ok := m.statusManager.GetPodStatus(testPodUID)
if !ok {
return false, fmt.Errorf("status not found: %q", testPodUID)
}
if len(status.ContainerStatuses) != 1 {
return false, fmt.Errorf("expected single container, found %d", len(status.ContainerStatuses))
}
if status.ContainerStatuses[0].ContainerID != testContainerID.String() {
return false, fmt.Errorf("expected container %q, found %q",
testContainerID, status.ContainerStatuses[0].ContainerID)
}
return status.ContainerStatuses[0].Ready == ready, nil
}
glog.Infof("Polling for ready state %v", ready)
if err := wait.Poll(interval, util.ForeverTestTimeout, condition); err != nil {
return err
}
return nil
}

View File

@ -27,7 +27,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/exec"
) )
func TestFormatURL(t *testing.T) { func TestFormatURL(t *testing.T) {
@ -246,12 +245,3 @@ func TestProbe(t *testing.T) {
} }
} }
} }
type fakeExecProber struct {
result probe.Result
err error
}
func (p fakeExecProber) Probe(_ exec.Cmd) (probe.Result, string, error) {
return p.result, "", p.err
}

View File

@ -70,7 +70,7 @@ type manager struct {
sync.RWMutex sync.RWMutex
// map of container ID -> probe Result // map of container ID -> probe Result
cache map[kubecontainer.ContainerID]Result cache map[kubecontainer.ContainerID]Result
// channel of updates (may be nil) // channel of updates
updates chan Update updates chan Update
} }
@ -78,15 +78,10 @@ var _ Manager = &manager{}
// NewManager creates ane returns an empty results manager. // NewManager creates ane returns an empty results manager.
func NewManager() Manager { func NewManager() Manager {
m := &manager{cache: make(map[kubecontainer.ContainerID]Result)} return &manager{
return m cache: make(map[kubecontainer.ContainerID]Result),
} updates: make(chan Update, 20),
}
// NewManager creates ane returns an empty results manager.
func NewManagerWithUpdates() Manager {
m := NewManager().(*manager)
m.updates = make(chan Update, 20)
return m
} }
func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) { func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) {
@ -98,7 +93,7 @@ func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) {
func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *api.Pod) { func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *api.Pod) {
if m.setInternal(id, result) { if m.setInternal(id, result) {
m.pushUpdate(Update{id, result, pod}) m.updates <- Update{id, result, pod}
} }
} }
@ -123,10 +118,3 @@ func (m *manager) Remove(id kubecontainer.ContainerID) {
func (m *manager) Updates() <-chan Update { func (m *manager) Updates() <-chan Update {
return m.updates return m.updates
} }
// pushUpdates sends an update on the updates channel if it is initialized.
func (m *manager) pushUpdate(update Update) {
if m.updates != nil {
m.updates <- update
}
}

View File

@ -46,7 +46,7 @@ func TestCacheOperations(t *testing.T) {
} }
func TestUpdates(t *testing.T) { func TestUpdates(t *testing.T) {
m := NewManagerWithUpdates() m := NewManager()
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "test-pod"}} pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "test-pod"}}
fooID := kubecontainer.ContainerID{"test", "foo"} fooID := kubecontainer.ContainerID{"test", "foo"}

View File

@ -0,0 +1,122 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"reflect"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/util/exec"
)
const (
testContainerName = "cOnTaInEr_NaMe"
testPodUID = "pOd_UiD"
)
var testContainerID = kubecontainer.ContainerID{"test", "cOnTaInEr_Id"}
func getTestRunningStatus() api.PodStatus {
containerStatus := api.ContainerStatus{
Name: testContainerName,
ContainerID: testContainerID.String(),
}
containerStatus.State.Running = &api.ContainerStateRunning{unversioned.Now()}
podStatus := api.PodStatus{
Phase: api.PodRunning,
ContainerStatuses: []api.ContainerStatus{containerStatus},
}
return podStatus
}
func getTestPod(probeType probeType, probeSpec api.Probe) api.Pod {
container := api.Container{
Name: testContainerName,
}
// All tests rely on the fake exec prober.
probeSpec.Handler = api.Handler{
Exec: &api.ExecAction{},
}
// Apply test defaults, overwridden for test speed.
defaults := map[string]int64{
"TimeoutSeconds": 1,
"PeriodSeconds": 1,
"SuccessThreshold": 1,
"FailureThreshold": 1,
}
for field, value := range defaults {
f := reflect.ValueOf(&probeSpec).Elem().FieldByName(field)
if f.Int() == 0 {
f.SetInt(value)
}
}
switch probeType {
case readiness:
container.ReadinessProbe = &probeSpec
case liveness:
container.LivenessProbe = &probeSpec
}
pod := api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{container},
RestartPolicy: api.RestartPolicyNever,
},
}
pod.Name = "testPod"
pod.UID = testPodUID
return pod
}
func newTestManager() *manager {
refManager := kubecontainer.NewRefManager()
refManager.SetRef(testContainerID, &api.ObjectReference{}) // Suppress prober warnings.
m := NewManager(
status.NewManager(&testclient.Fake{}, kubepod.NewBasicPodManager(nil)),
results.NewManager(),
nil, // runner
refManager,
&record.FakeRecorder{},
).(*manager)
// Don't actually execute probes.
m.prober.exec = fakeExecProber{probe.Success, nil}
return m
}
func newTestWorker(m *manager, probeType probeType, probeSpec api.Probe) *worker {
pod := getTestPod(probeType, probeSpec)
return newWorker(m, probeType, &pod, pod.Spec.Containers[0])
}
type fakeExecProber struct {
result probe.Result
err error
}
func (p fakeExecProber) Probe(_ exec.Cmd) (probe.Result, string, error) {
return p.result, "", p.err
}

View File

@ -18,7 +18,6 @@ package prober
import ( import (
"fmt" "fmt"
"reflect"
"testing" "testing"
"time" "time"
@ -36,28 +35,25 @@ import (
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
) )
const ( func init() {
containerName = "cOnTaInEr_NaMe" util.ReallyCrash = true
podUID = "pOd_UiD" }
)
var containerID = kubecontainer.ContainerID{"test", "cOnTaInEr_Id"}
func TestDoProbe(t *testing.T) { func TestDoProbe(t *testing.T) {
m := newTestManager() m := newTestManager()
// Test statuses. // Test statuses.
runningStatus := getRunningStatus() runningStatus := getTestRunningStatus()
pendingStatus := getRunningStatus() pendingStatus := getTestRunningStatus()
pendingStatus.ContainerStatuses[0].State.Running = nil pendingStatus.ContainerStatuses[0].State.Running = nil
terminatedStatus := getRunningStatus() terminatedStatus := getTestRunningStatus()
terminatedStatus.ContainerStatuses[0].State.Running = nil terminatedStatus.ContainerStatuses[0].State.Running = nil
terminatedStatus.ContainerStatuses[0].State.Terminated = &api.ContainerStateTerminated{ terminatedStatus.ContainerStatuses[0].State.Terminated = &api.ContainerStateTerminated{
StartedAt: unversioned.Now(), StartedAt: unversioned.Now(),
} }
otherStatus := getRunningStatus() otherStatus := getTestRunningStatus()
otherStatus.ContainerStatuses[0].Name = "otherContainer" otherStatus.ContainerStatuses[0].Name = "otherContainer"
failedStatus := getRunningStatus() failedStatus := getTestRunningStatus()
failedStatus.Phase = api.PodFailed failedStatus.Phase = api.PodFailed
tests := []struct { tests := []struct {
@ -112,7 +108,7 @@ func TestDoProbe(t *testing.T) {
if c := w.doProbe(); c != test.expectContinue { if c := w.doProbe(); c != test.expectContinue {
t.Errorf("[%s-%d] Expected continue to be %v but got %v", probeType, i, test.expectContinue, c) t.Errorf("[%s-%d] Expected continue to be %v but got %v", probeType, i, test.expectContinue, c)
} }
result, ok := resultsManager(m, probeType).Get(containerID) result, ok := resultsManager(m, probeType).Get(testContainerID)
if ok != test.expectSet { if ok != test.expectSet {
t.Errorf("[%s-%d] Expected to have result: %v but got %v", probeType, i, test.expectSet, ok) t.Errorf("[%s-%d] Expected to have result: %v but got %v", probeType, i, test.expectSet, ok)
} }
@ -122,7 +118,7 @@ func TestDoProbe(t *testing.T) {
// Clean up. // Clean up.
m.statusManager = status.NewManager(&testclient.Fake{}, kubepod.NewBasicPodManager(nil)) m.statusManager = status.NewManager(&testclient.Fake{}, kubepod.NewBasicPodManager(nil))
resultsManager(m, probeType).Remove(containerID) resultsManager(m, probeType).Remove(testContainerID)
} }
} }
} }
@ -134,13 +130,13 @@ func TestInitialDelay(t *testing.T) {
w := newTestWorker(m, probeType, api.Probe{ w := newTestWorker(m, probeType, api.Probe{
InitialDelaySeconds: 10, InitialDelaySeconds: 10,
}) })
m.statusManager.SetPodStatus(w.pod, getRunningStatus()) m.statusManager.SetPodStatus(w.pod, getTestRunningStatus())
expectContinue(t, w, w.doProbe(), "during initial delay") expectContinue(t, w, w.doProbe(), "during initial delay")
expectResult(t, w, results.Result(probeType == liveness), "during initial delay") expectResult(t, w, results.Result(probeType == liveness), "during initial delay")
// 100 seconds later... // 100 seconds later...
laterStatus := getRunningStatus() laterStatus := getTestRunningStatus()
laterStatus.ContainerStatuses[0].State.Running.StartedAt.Time = laterStatus.ContainerStatuses[0].State.Running.StartedAt.Time =
time.Now().Add(-100 * time.Second) time.Now().Add(-100 * time.Second)
m.statusManager.SetPodStatus(w.pod, laterStatus) m.statusManager.SetPodStatus(w.pod, laterStatus)
@ -153,8 +149,8 @@ func TestInitialDelay(t *testing.T) {
func TestFailureThreshold(t *testing.T) { func TestFailureThreshold(t *testing.T) {
m := newTestManager() m := newTestManager()
w := newTestWorker(m, readiness, api.Probe{}) w := newTestWorker(m, readiness, api.Probe{SuccessThreshold: 1, FailureThreshold: 3})
m.statusManager.SetPodStatus(w.pod, getRunningStatus()) m.statusManager.SetPodStatus(w.pod, getTestRunningStatus())
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
// First probe should succeed. // First probe should succeed.
@ -171,7 +167,7 @@ func TestFailureThreshold(t *testing.T) {
// Next 2 probes should still be "success". // Next 2 probes should still be "success".
for j := 0; j < 2; j++ { for j := 0; j < 2; j++ {
msg := fmt.Sprintf("%d failure (%d)", j+1, i) msg := fmt.Sprintf("%d failing (%d)", j+1, i)
expectContinue(t, w, w.doProbe(), msg) expectContinue(t, w, w.doProbe(), msg)
expectResult(t, w, results.Success, msg) expectResult(t, w, results.Success, msg)
} }
@ -188,10 +184,10 @@ func TestFailureThreshold(t *testing.T) {
func TestSuccessThreshold(t *testing.T) { func TestSuccessThreshold(t *testing.T) {
m := newTestManager() m := newTestManager()
w := newTestWorker(m, readiness, api.Probe{SuccessThreshold: 3, FailureThreshold: 1}) w := newTestWorker(m, readiness, api.Probe{SuccessThreshold: 3, FailureThreshold: 1})
m.statusManager.SetPodStatus(w.pod, getRunningStatus()) m.statusManager.SetPodStatus(w.pod, getTestRunningStatus())
// Start out failure. // Start out failure.
w.resultsManager.Set(containerID, results.Failure, nil) w.resultsManager.Set(testContainerID, results.Failure, &api.Pod{})
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
// Probe defaults to Failure. // Probe defaults to Failure.
@ -223,15 +219,15 @@ func TestCleanUp(t *testing.T) {
m := newTestManager() m := newTestManager()
for _, probeType := range [...]probeType{liveness, readiness} { for _, probeType := range [...]probeType{liveness, readiness} {
key := probeKey{podUID, containerName, probeType} key := probeKey{testPodUID, testContainerName, probeType}
w := newTestWorker(m, probeType, api.Probe{}) w := newTestWorker(m, probeType, api.Probe{})
m.statusManager.SetPodStatus(w.pod, getRunningStatus()) m.statusManager.SetPodStatus(w.pod, getTestRunningStatus())
go w.run() go w.run()
m.workers[key] = w m.workers[key] = w
// Wait for worker to run. // Wait for worker to run.
condition := func() (bool, error) { condition := func() (bool, error) {
ready, _ := resultsManager(m, probeType).Get(containerID) ready, _ := resultsManager(m, probeType).Get(testContainerID)
return ready == results.Success, nil return ready == results.Success, nil
} }
if ready, _ := condition(); !ready { if ready, _ := condition(); !ready {
@ -245,7 +241,7 @@ func TestCleanUp(t *testing.T) {
t.Fatalf("[%s] error waiting for worker exit: %v", probeType, err) t.Fatalf("[%s] error waiting for worker exit: %v", probeType, err)
} }
if _, ok := resultsManager(m, probeType).Get(containerID); ok { if _, ok := resultsManager(m, probeType).Get(testContainerID); ok {
t.Errorf("[%s] Expected result to be cleared.", probeType) t.Errorf("[%s] Expected result to be cleared.", probeType)
} }
if _, ok := m.workers[key]; ok { if _, ok := m.workers[key]; ok {
@ -255,9 +251,11 @@ func TestCleanUp(t *testing.T) {
} }
func TestHandleCrash(t *testing.T) { func TestHandleCrash(t *testing.T) {
util.ReallyCrash = false // Test that we *don't* really crash.
m := newTestManager() m := newTestManager()
w := newTestWorker(m, readiness, api.Probe{}) w := newTestWorker(m, readiness, api.Probe{})
m.statusManager.SetPodStatus(w.pod, getRunningStatus()) m.statusManager.SetPodStatus(w.pod, getTestRunningStatus())
expectContinue(t, w, w.doProbe(), "Initial successful probe.") expectContinue(t, w, w.doProbe(), "Initial successful probe.")
expectResult(t, w, results.Success, "Initial successful probe.") expectResult(t, w, results.Success, "Initial successful probe.")
@ -274,64 +272,8 @@ func TestHandleCrash(t *testing.T) {
expectResult(t, w, results.Success, "Crashing probe unchanged.") expectResult(t, w, results.Success, "Crashing probe unchanged.")
} }
func newTestWorker(m *manager, probeType probeType, probeSpec api.Probe) *worker {
// All tests rely on the fake exec prober.
probeSpec.Handler = api.Handler{
Exec: &api.ExecAction{},
}
// Apply default values.
defaults := map[string]int64{
"TimeoutSeconds": 1,
"PeriodSeconds": 10,
"SuccessThreshold": 1,
"FailureThreshold": 3,
}
for field, value := range defaults {
f := reflect.ValueOf(&probeSpec).Elem().FieldByName(field)
if f.Int() == 0 {
f.SetInt(value)
}
}
pod := getTestPod(probeType, probeSpec)
return newWorker(m, probeType, &pod, pod.Spec.Containers[0])
}
func getRunningStatus() api.PodStatus {
containerStatus := api.ContainerStatus{
Name: containerName,
ContainerID: containerID.String(),
}
containerStatus.State.Running = &api.ContainerStateRunning{unversioned.Now()}
podStatus := api.PodStatus{
Phase: api.PodRunning,
ContainerStatuses: []api.ContainerStatus{containerStatus},
}
return podStatus
}
func getTestPod(probeType probeType, probeSpec api.Probe) api.Pod {
container := api.Container{
Name: containerName,
}
switch probeType {
case readiness:
container.ReadinessProbe = &probeSpec
case liveness:
container.LivenessProbe = &probeSpec
}
pod := api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{container},
RestartPolicy: api.RestartPolicyNever,
},
}
pod.UID = podUID
return pod
}
func expectResult(t *testing.T, w *worker, expectedResult results.Result, msg string) { func expectResult(t *testing.T, w *worker, expectedResult results.Result, msg string) {
result, ok := resultsManager(w.probeManager, w.probeType).Get(containerID) result, ok := resultsManager(w.probeManager, w.probeType).Get(testContainerID)
if !ok { if !ok {
t.Errorf("[%s - %s] Expected result to be set, but was not set", w.probeType, msg) t.Errorf("[%s - %s] Expected result to be set, but was not set", w.probeType, msg)
} else if result != expectedResult { } else if result != expectedResult {

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod" kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util" kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
@ -77,6 +78,10 @@ type Manager interface {
// SetPodStatus caches updates the cached status for the given pod, and triggers a status update. // SetPodStatus caches updates the cached status for the given pod, and triggers a status update.
SetPodStatus(pod *api.Pod, status api.PodStatus) SetPodStatus(pod *api.Pod, status api.PodStatus)
// SetContainerReadiness updates the cached container status with the given readiness, and
// triggers a status update.
SetContainerReadiness(pod *api.Pod, containerID kubecontainer.ContainerID, ready bool)
// TerminatePods resets the container status for the provided pods to terminated and triggers // TerminatePods resets the container status for the provided pods to terminated and triggers
// a status update. This function may not enqueue all the provided pods, in which case it will // a status update. This function may not enqueue all the provided pods, in which case it will
// return false // return false
@ -171,19 +176,50 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
status.StartTime = &now status.StartTime = &now
} }
newStatus := m.updateStatusInternal(pod, status) m.updateStatusInternal(pod, status)
if newStatus != nil { }
select {
case m.podStatusChannel <- podStatusSyncRequest{pod.UID, *newStatus}: func (m *manager) SetContainerReadiness(pod *api.Pod, containerID kubecontainer.ContainerID, ready bool) {
default: m.podStatusesLock.Lock()
// Let the periodic syncBatch handle the update if the channel is full. defer m.podStatusesLock.Unlock()
// We can't block, since we hold the mutex lock.
oldStatus, found := m.podStatuses[pod.UID]
if !found {
glog.Warningf("Container readiness changed before pod has synced: %q - %q",
kubeletutil.FormatPodName(pod), containerID.String())
return
}
status := oldStatus.status
// Find the container to update.
containerIndex := -1
for i, c := range status.ContainerStatuses {
if c.ContainerID == containerID.String() {
containerIndex = i
break
} }
} }
if containerIndex == -1 {
glog.Warningf("Container readiness changed for unknown container: %q - %q",
kubeletutil.FormatPodName(pod), containerID.String())
return
}
if status.ContainerStatuses[containerIndex].Ready == ready {
glog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready,
kubeletutil.FormatPodName(pod), containerID.String())
return
}
// Make sure we're not updating the cached version.
status.ContainerStatuses = make([]api.ContainerStatus, len(status.ContainerStatuses))
copy(status.ContainerStatuses, oldStatus.status.ContainerStatuses)
status.ContainerStatuses[containerIndex].Ready = ready
m.updateStatusInternal(pod, status)
} }
func (m *manager) TerminatePods(pods []*api.Pod) bool { func (m *manager) TerminatePods(pods []*api.Pod) bool {
sent := true allSent := true
m.podStatusesLock.Lock() m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock() defer m.podStatusesLock.Unlock()
for _, pod := range pods { for _, pod := range pods {
@ -192,28 +228,26 @@ func (m *manager) TerminatePods(pods []*api.Pod) bool {
Terminated: &api.ContainerStateTerminated{}, Terminated: &api.ContainerStateTerminated{},
} }
} }
newStatus := m.updateStatusInternal(pod, pod.Status) if sent := m.updateStatusInternal(pod, pod.Status); !sent {
if newStatus != nil {
select {
case m.podStatusChannel <- podStatusSyncRequest{pod.UID, *newStatus}:
default:
sent = false
glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletutil.FormatPodName(pod)) glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletutil.FormatPodName(pod))
} allSent = false
} else {
sent = false
} }
} }
return sent return allSent
} }
// updateStatusInternal updates the internal status cache, and returns a versioned status if an // updateStatusInternal updates the internal status cache, and queues an update to the api server if
// update is necessary. This method IS NOT THREAD SAFE and must be called from a locked function. // necessary. Returns whether an update was triggered.
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) *versionedPodStatus { // This method IS NOT THREAD SAFE and must be called from a locked function.
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool {
// The intent here is to prevent concurrent updates to a pod's status from // The intent here is to prevent concurrent updates to a pod's status from
// clobbering each other so the phase of a pod progresses monotonically. // clobbering each other so the phase of a pod progresses monotonically.
oldStatus, found := m.podStatuses[pod.UID] oldStatus, found := m.podStatuses[pod.UID]
if !found || !isStatusEqual(&oldStatus.status, &status) || pod.DeletionTimestamp != nil { if found && isStatusEqual(&oldStatus.status, &status) && pod.DeletionTimestamp == nil {
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", kubeletutil.FormatPodName(pod), status)
return false // No new status.
}
newStatus := versionedPodStatus{ newStatus := versionedPodStatus{
status: status, status: status,
version: oldStatus.version + 1, version: oldStatus.version + 1,
@ -221,10 +255,14 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) *vers
podNamespace: pod.Namespace, podNamespace: pod.Namespace,
} }
m.podStatuses[pod.UID] = newStatus m.podStatuses[pod.UID] = newStatus
return &newStatus
} else { select {
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", kubeletutil.FormatPodName(pod), status) case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
return nil // No new status. return true
default:
// Let the periodic syncBatch handle the update if the channel is full.
// We can't block, since we hold the mutex lock.
return false
} }
} }

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/testclient" "k8s.io/kubernetes/pkg/client/unversioned/testclient"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod" kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
@ -499,3 +500,37 @@ func TestStaticPodStatus(t *testing.T) {
_, found := m.GetPodStatus(otherPod.UID) _, found := m.GetPodStatus(otherPod.UID)
assert.False(t, found, "otherPod status should have been deleted") assert.False(t, found, "otherPod status should have been deleted")
} }
func TestSetContainerReadiness(t *testing.T) {
containerID := kubecontainer.ContainerID{"test", "cOnTaInEr_Id"}
containerStatus := api.ContainerStatus{
Name: "cOnTaInEr_NaMe",
ContainerID: containerID.String(),
Ready: false,
}
status := api.PodStatus{
ContainerStatuses: []api.ContainerStatus{containerStatus},
}
m := newTestManager(&testclient.Fake{})
t.Log("Setting readiness before status should fail.")
m.SetContainerReadiness(testPod, containerID, true)
verifyUpdates(t, m, 0)
t.Log("Setting initial status.")
m.SetPodStatus(testPod, status)
verifyUpdates(t, m, 1)
t.Log("Setting unchanged readiness should do nothing.")
m.SetContainerReadiness(testPod, containerID, false)
verifyUpdates(t, m, 0)
t.Log("Setting different readiness should generate update.")
m.SetContainerReadiness(testPod, containerID, true)
verifyUpdates(t, m, 1)
t.Log("Setting non-existant container readiness should fail.")
m.SetContainerReadiness(testPod, kubecontainer.ContainerID{"test", "foo"}, true)
verifyUpdates(t, m, 0)
}