mirror of https://github.com/k3s-io/k3s
Merge pull request #6039 from yifan-gu/readiness
kubelet: Add container readiness manager.pull/6/head
commit
4ed3346587
|
@ -36,9 +36,7 @@ type RefManager struct {
|
|||
// NewRefManager creates and returns a container reference manager
|
||||
// with empty contents.
|
||||
func NewRefManager() *RefManager {
|
||||
c := RefManager{}
|
||||
c.containerIDToRef = make(map[string]*api.ObjectReference)
|
||||
return &c
|
||||
return &RefManager{containerIDToRef: make(map[string]*api.ObjectReference)}
|
||||
}
|
||||
|
||||
// SetRef stores a reference to a pod's container, associating it with the given container ID.
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
Copyright 2015 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package container
|
||||
|
||||
import "sync"
|
||||
|
||||
// ReadinessManager maintains the readiness information(probe results) of
|
||||
// containers over time to allow for implementation of health thresholds.
|
||||
// This manager is thread-safe, no locks are necessary for the caller.
|
||||
type ReadinessManager struct {
|
||||
// guards states
|
||||
sync.RWMutex
|
||||
// TODO(yifan): To use strong type.
|
||||
states map[string]bool
|
||||
}
|
||||
|
||||
// NewReadinessManager creates ane returns a readiness manager with empty
|
||||
// contents.
|
||||
func NewReadinessManager() *ReadinessManager {
|
||||
return &ReadinessManager{states: make(map[string]bool)}
|
||||
}
|
||||
|
||||
// GetReadiness returns the readiness value for the container with the given ID.
|
||||
// If the readiness value is found, returns it.
|
||||
// If the readiness is not found, returns false.
|
||||
func (r *ReadinessManager) GetReadiness(id string) bool {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
state, found := r.states[id]
|
||||
return state && found
|
||||
}
|
||||
|
||||
// SetReadiness sets the readiness value for the container with the given ID.
|
||||
func (r *ReadinessManager) SetReadiness(id string, value bool) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
r.states[id] = value
|
||||
}
|
||||
|
||||
// RemoveReadiness clears the readiness value for the container with the given ID.
|
||||
func (r *ReadinessManager) RemoveReadiness(id string) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
delete(r.states, id)
|
||||
}
|
|
@ -206,25 +206,25 @@ func NewMainKubelet(
|
|||
statusManager := newStatusManager(kubeClient)
|
||||
|
||||
klet := &Kubelet{
|
||||
hostname: hostname,
|
||||
dockerClient: dockerClient,
|
||||
kubeClient: kubeClient,
|
||||
rootDirectory: rootDirectory,
|
||||
resyncInterval: resyncInterval,
|
||||
podInfraContainerImage: podInfraContainerImage,
|
||||
containerRefManager: kubecontainer.NewRefManager(),
|
||||
runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
|
||||
httpClient: &http.Client{},
|
||||
pullQPS: pullQPS,
|
||||
pullBurst: pullBurst,
|
||||
sourcesReady: sourcesReady,
|
||||
clusterDomain: clusterDomain,
|
||||
clusterDNS: clusterDNS,
|
||||
serviceLister: serviceLister,
|
||||
nodeLister: nodeLister,
|
||||
masterServiceNamespace: masterServiceNamespace,
|
||||
prober: newProbeHolder(),
|
||||
readiness: newReadinessStates(),
|
||||
hostname: hostname,
|
||||
dockerClient: dockerClient,
|
||||
kubeClient: kubeClient,
|
||||
rootDirectory: rootDirectory,
|
||||
resyncInterval: resyncInterval,
|
||||
podInfraContainerImage: podInfraContainerImage,
|
||||
containerRefManager: kubecontainer.NewRefManager(),
|
||||
readinessManager: kubecontainer.NewReadinessManager(),
|
||||
runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
|
||||
httpClient: &http.Client{},
|
||||
pullQPS: pullQPS,
|
||||
pullBurst: pullBurst,
|
||||
sourcesReady: sourcesReady,
|
||||
clusterDomain: clusterDomain,
|
||||
clusterDNS: clusterDNS,
|
||||
serviceLister: serviceLister,
|
||||
nodeLister: nodeLister,
|
||||
masterServiceNamespace: masterServiceNamespace,
|
||||
prober: newProbeHolder(),
|
||||
streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,
|
||||
recorder: recorder,
|
||||
cadvisor: cadvisorInterface,
|
||||
|
@ -326,8 +326,8 @@ type Kubelet struct {
|
|||
|
||||
// Probe runner holder
|
||||
prober probeHolder
|
||||
// Container readiness state holder
|
||||
readiness *readinessStates
|
||||
// Container readiness state manager.
|
||||
readinessManager *kubecontainer.ReadinessManager
|
||||
|
||||
// How long to keep idle streaming command execution/port forwarding
|
||||
// connections open before terminating them
|
||||
|
@ -818,7 +818,7 @@ func (kl *Kubelet) killContainer(c *kubecontainer.Container) error {
|
|||
|
||||
func (kl *Kubelet) killContainerByID(ID string) error {
|
||||
glog.V(2).Infof("Killing container with id %q", ID)
|
||||
kl.readiness.remove(ID)
|
||||
kl.readinessManager.RemoveReadiness(ID)
|
||||
err := kl.dockerClient.StopContainer(ID, 10)
|
||||
|
||||
ref, ok := kl.containerRefManager.GetRef(ID)
|
||||
|
@ -986,7 +986,7 @@ func (kl *Kubelet) shouldContainerBeRestarted(container *api.Container, pod *api
|
|||
}
|
||||
// set dead containers to unready state
|
||||
for _, c := range recentContainers {
|
||||
kl.readiness.remove(c.ID)
|
||||
kl.readinessManager.RemoveReadiness(c.ID)
|
||||
}
|
||||
|
||||
if len(recentContainers) > 0 {
|
||||
|
@ -1915,7 +1915,8 @@ func (kl *Kubelet) generatePodStatusByPod(pod *api.Pod) (api.PodStatus, error) {
|
|||
for _, c := range spec.Containers {
|
||||
for i, st := range podStatus.ContainerStatuses {
|
||||
if st.Name == c.Name {
|
||||
podStatus.ContainerStatuses[i].Ready = kl.readiness.IsReady(st)
|
||||
ready := st.State.Running != nil && kl.readinessManager.GetReadiness(strings.TrimPrefix(st.ContainerID, "docker://"))
|
||||
podStatus.ContainerStatuses[i].Ready = ready
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
|
@ -101,7 +101,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
|||
kubelet.masterServiceNamespace = api.NamespaceDefault
|
||||
kubelet.serviceLister = testServiceLister{}
|
||||
kubelet.nodeLister = testNodeLister{}
|
||||
kubelet.readiness = newReadinessStates()
|
||||
kubelet.readinessManager = kubecontainer.NewReadinessManager()
|
||||
kubelet.recorder = fakeRecorder
|
||||
kubelet.statusManager = newStatusManager(fakeKubeClient)
|
||||
if err := kubelet.setupDataDirs(); err != nil {
|
||||
|
@ -369,7 +369,7 @@ func TestKillContainerWithError(t *testing.T) {
|
|||
testKubelet := newTestKubelet(t)
|
||||
kubelet := testKubelet.kubelet
|
||||
for _, c := range fakeDocker.ContainerList {
|
||||
kubelet.readiness.set(c.ID, true)
|
||||
kubelet.readinessManager.SetReadiness(c.ID, true)
|
||||
}
|
||||
kubelet.dockerClient = fakeDocker
|
||||
c := apiContainerToContainer(fakeDocker.ContainerList[0])
|
||||
|
@ -380,11 +380,13 @@ func TestKillContainerWithError(t *testing.T) {
|
|||
verifyCalls(t, fakeDocker, []string{"stop"})
|
||||
killedContainer := containers[0]
|
||||
liveContainer := containers[1]
|
||||
if _, found := kubelet.readiness.states[killedContainer.ID]; found {
|
||||
t.Errorf("exepcted container entry ID '%v' to not be found. states: %+v", killedContainer.ID, kubelet.readiness.states)
|
||||
ready := kubelet.readinessManager.GetReadiness(killedContainer.ID)
|
||||
if ready {
|
||||
t.Errorf("exepcted container entry ID '%v' to not be found. states: %+v", killedContainer.ID, ready)
|
||||
}
|
||||
if _, found := kubelet.readiness.states[liveContainer.ID]; !found {
|
||||
t.Errorf("exepcted container entry ID '%v' to be found. states: %+v", liveContainer.ID, kubelet.readiness.states)
|
||||
ready = kubelet.readinessManager.GetReadiness(liveContainer.ID)
|
||||
if !ready {
|
||||
t.Errorf("exepcted container entry ID '%v' to be found. states: %+v", liveContainer.ID, ready)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -407,7 +409,7 @@ func TestKillContainer(t *testing.T) {
|
|||
Name: "foobar",
|
||||
}
|
||||
for _, c := range fakeDocker.ContainerList {
|
||||
kubelet.readiness.set(c.ID, true)
|
||||
kubelet.readinessManager.SetReadiness(c.ID, true)
|
||||
}
|
||||
|
||||
c := apiContainerToContainer(fakeDocker.ContainerList[0])
|
||||
|
@ -418,11 +420,13 @@ func TestKillContainer(t *testing.T) {
|
|||
verifyCalls(t, fakeDocker, []string{"stop"})
|
||||
killedContainer := containers[0]
|
||||
liveContainer := containers[1]
|
||||
if _, found := kubelet.readiness.states[killedContainer.ID]; found {
|
||||
t.Errorf("exepcted container entry ID '%v' to not be found. states: %+v", killedContainer.ID, kubelet.readiness.states)
|
||||
ready := kubelet.readinessManager.GetReadiness(killedContainer.ID)
|
||||
if ready {
|
||||
t.Errorf("exepcted container entry ID '%v' to not be found. states: %+v", killedContainer.ID, ready)
|
||||
}
|
||||
if _, found := kubelet.readiness.states[liveContainer.ID]; !found {
|
||||
t.Errorf("exepcted container entry ID '%v' to be found. states: %+v", liveContainer.ID, kubelet.readiness.states)
|
||||
ready = kubelet.readinessManager.GetReadiness(liveContainer.ID)
|
||||
if !ready {
|
||||
t.Errorf("exepcted container entry ID '%v' to be found. states: %+v", liveContainer.ID, ready)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,8 +19,6 @@ package kubelet
|
|||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
|
@ -45,12 +43,12 @@ func (kl *Kubelet) probeContainer(pod *api.Pod, status api.PodStatus, container
|
|||
live, err := kl.probeContainerLiveness(pod, status, container, createdAt)
|
||||
if err != nil {
|
||||
glog.V(1).Infof("Liveness probe errored: %v", err)
|
||||
kl.readiness.set(containerID, false)
|
||||
kl.readinessManager.SetReadiness(containerID, false)
|
||||
return probe.Unknown, err
|
||||
}
|
||||
if live != probe.Success {
|
||||
glog.V(1).Infof("Liveness probe unsuccessful: %v", live)
|
||||
kl.readiness.set(containerID, false)
|
||||
kl.readinessManager.SetReadiness(containerID, false)
|
||||
return live, nil
|
||||
}
|
||||
|
||||
|
@ -58,12 +56,12 @@ func (kl *Kubelet) probeContainer(pod *api.Pod, status api.PodStatus, container
|
|||
ready, err := kl.probeContainerReadiness(pod, status, container, createdAt)
|
||||
if err == nil && ready == probe.Success {
|
||||
glog.V(3).Infof("Readiness probe successful: %v", ready)
|
||||
kl.readiness.set(containerID, true)
|
||||
kl.readinessManager.SetReadiness(containerID, true)
|
||||
return probe.Success, nil
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Readiness probe failed/errored: %v, %v", ready, err)
|
||||
kl.readiness.set(containerID, false)
|
||||
kl.readinessManager.SetReadiness(containerID, false)
|
||||
|
||||
ref, ok := kl.containerRefManager.GetRef(containerID)
|
||||
if !ok {
|
||||
|
@ -204,44 +202,6 @@ func (eic execInContainer) SetDir(dir string) {
|
|||
//unimplemented
|
||||
}
|
||||
|
||||
// This will eventually maintain info about probe results over time
|
||||
// to allow for implementation of health thresholds
|
||||
func newReadinessStates() *readinessStates {
|
||||
return &readinessStates{states: make(map[string]bool)}
|
||||
}
|
||||
|
||||
type readinessStates struct {
|
||||
// guards states
|
||||
sync.RWMutex
|
||||
states map[string]bool
|
||||
}
|
||||
|
||||
func (r *readinessStates) IsReady(c api.ContainerStatus) bool {
|
||||
if c.State.Running == nil {
|
||||
return false
|
||||
}
|
||||
return r.get(strings.TrimPrefix(c.ContainerID, "docker://"))
|
||||
}
|
||||
|
||||
func (r *readinessStates) get(key string) bool {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
state, found := r.states[key]
|
||||
return state && found
|
||||
}
|
||||
|
||||
func (r *readinessStates) set(key string, value bool) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
r.states[key] = value
|
||||
}
|
||||
|
||||
func (r *readinessStates) remove(key string) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
delete(r.states, key)
|
||||
}
|
||||
|
||||
func newProbeHolder() probeHolder {
|
||||
return probeHolder{
|
||||
exec: execprobe.New(),
|
||||
|
|
|
@ -147,7 +147,7 @@ func (p fakeExecProber) Probe(_ exec.Cmd) (probe.Result, error) {
|
|||
|
||||
func makeTestKubelet(result probe.Result, err error) *Kubelet {
|
||||
return &Kubelet{
|
||||
readiness: newReadinessStates(),
|
||||
readinessManager: kubecontainer.NewReadinessManager(),
|
||||
prober: probeHolder{
|
||||
exec: fakeExecProber{
|
||||
result: result,
|
||||
|
@ -412,8 +412,8 @@ func TestProbeContainer(t *testing.T) {
|
|||
if test.expectedResult != result {
|
||||
t.Errorf("Expected result was %v but probeContainer() returned %v", test.expectedResult, result)
|
||||
}
|
||||
if test.expectedReadiness != kl.readiness.get(dc.ID) {
|
||||
t.Errorf("Expected readiness was %v but probeContainer() set %v", test.expectedReadiness, kl.readiness.get(dc.ID))
|
||||
if test.expectedReadiness != kl.readinessManager.GetReadiness(dc.ID) {
|
||||
t.Errorf("Expected readiness was %v but probeContainer() set %v", test.expectedReadiness, kl.readinessManager.GetReadiness(dc.ID))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue