refactor pkg/probe ProbeRunners to interfaces and move global probers into kubelet field for testability

pull/6/head
Mike Danese 2015-02-08 12:19:34 -08:00
parent 043794492e
commit 3d0cd81feb
5 changed files with 52 additions and 28 deletions

View File

@ -120,6 +120,7 @@ func NewMainKubelet(
clusterDNS: clusterDNS, clusterDNS: clusterDNS,
serviceLister: serviceLister, serviceLister: serviceLister,
masterServiceNamespace: masterServiceNamespace, masterServiceNamespace: masterServiceNamespace,
prober: newProbeHolder(),
readiness: newReadinessStates(), readiness: newReadinessStates(),
} }
@ -198,6 +199,9 @@ type Kubelet struct {
// Volume plugins. // Volume plugins.
volumePluginMgr volume.PluginMgr volumePluginMgr volume.PluginMgr
// probe runner holder
prober probeHolder
// container readiness state holder
readiness *readinessStates readiness *readinessStates
} }
@ -1055,8 +1059,8 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
// readiness accordingly. If the initalDelay since container creation on liveness probe has not passed the probe will return Success. // readiness accordingly. If the initalDelay since container creation on liveness probe has not passed the probe will return Success.
// If the initial delay on the readiness probe has not passed the probe will return Failure. // If the initial delay on the readiness probe has not passed the probe will return Failure.
ready := probe.Unknown ready := probe.Unknown
healthy, err := kl.probeContainer(container.LivenessProbe, podFullName, uid, podStatus, container, dockerContainer, probe.Success) live, err := kl.probeContainer(container.LivenessProbe, podFullName, uid, podStatus, container, dockerContainer, probe.Success)
if healthy == probe.Success { if live == probe.Success {
ready, _ = kl.probeContainer(container.ReadinessProbe, podFullName, uid, podStatus, container, dockerContainer, probe.Failure) ready, _ = kl.probeContainer(container.ReadinessProbe, podFullName, uid, podStatus, container, dockerContainer, probe.Failure)
} }
if ready == probe.Success { if ready == probe.Success {
@ -1069,11 +1073,11 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
containersToKeep[containerID] = empty{} containersToKeep[containerID] = empty{}
continue continue
} }
if healthy == probe.Success { if live == probe.Success {
containersToKeep[containerID] = empty{} containersToKeep[containerID] = empty{}
continue continue
} }
glog.V(1).Infof("pod %q container %q is unhealthy. Container will be killed and re-created.", podFullName, container.Name, healthy) glog.V(1).Infof("pod %q container %q is unhealthy. Container will be killed and re-created.", podFullName, container.Name, live)
} else { } else {
glog.V(1).Infof("pod %q container %q hash changed (%d vs %d). Container will be killed and re-created.", podFullName, container.Name, hash, expectedHash) glog.V(1).Infof("pod %q container %q hash changed (%d vs %d). Container will be killed and re-created.", podFullName, container.Name, hash, expectedHash)
} }
@ -1098,6 +1102,10 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
glog.Errorf("Error listing recent containers:%s", dockerContainerName) glog.Errorf("Error listing recent containers:%s", dockerContainerName)
// TODO(dawnchen): error handling here? // TODO(dawnchen): error handling here?
} }
// set dead containers to unready state
for _, c := range recentContainers {
kl.readiness.remove(c.ID)
}
if len(recentContainers) > 0 && pod.Spec.RestartPolicy.Always == nil { if len(recentContainers) > 0 && pod.Spec.RestartPolicy.Always == nil {
if pod.Spec.RestartPolicy.Never != nil { if pod.Spec.RestartPolicy.Never != nil {
@ -1113,6 +1121,7 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
continue continue
} }
} }
} }
glog.V(3).Infof("Container with name %s doesn't exist, creating %#v", dockerContainerName) glog.V(3).Infof("Container with name %s doesn't exist, creating %#v", dockerContainerName)
@ -1547,12 +1556,7 @@ func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatu
var podStatus api.PodStatus var podStatus api.PodStatus
podStatus.Phase = getPhase(&spec, info) podStatus.Phase = getPhase(&spec, info)
if isPodReady(&spec, info) { podStatus.Conditions = append(podStatus.Conditions, getPodReadyCondition(&spec, info)...)
podStatus.Conditions = append(podStatus.Conditions, api.PodCondition{
Kind: api.PodReady,
Status: api.ConditionFull,
})
}
netContainerInfo, found := info[dockertools.PodInfraContainerName] netContainerInfo, found := info[dockertools.PodInfraContainerName]
if found { if found {
podStatus.PodIP = netContainerInfo.PodIP podStatus.PodIP = netContainerInfo.PodIP

View File

@ -36,12 +36,6 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
var (
execprober = execprobe.New()
httprober = httprobe.New()
tcprober = tcprobe.New()
)
const ( const (
defaultProbeTimeout = 1 * time.Second defaultProbeTimeout = 1 * time.Second
maxProbeRetries = 3 maxProbeRetries = 3
@ -85,7 +79,7 @@ func (kl *Kubelet) runProbe(p *api.Probe, podFullName string, podUID types.UID,
timeout = defaultProbeTimeout timeout = defaultProbeTimeout
} }
if p.Exec != nil { if p.Exec != nil {
return execprober.Probe(kl.newExecInContainer(podFullName, podUID, container)) return kl.prober.exec.Probe(kl.newExecInContainer(podFullName, podUID, container))
} }
if p.HTTPGet != nil { if p.HTTPGet != nil {
port, err := extractPort(p.HTTPGet.Port, container) port, err := extractPort(p.HTTPGet.Port, container)
@ -93,14 +87,14 @@ func (kl *Kubelet) runProbe(p *api.Probe, podFullName string, podUID types.UID,
return probe.Unknown, err return probe.Unknown, err
} }
host, port, path := extractGetParams(p.HTTPGet, status, port) host, port, path := extractGetParams(p.HTTPGet, status, port)
return httprober.Probe(host, port, path, timeout) return kl.prober.http.Probe(host, port, path, timeout)
} }
if p.TCPSocket != nil { if p.TCPSocket != nil {
port, err := extractPort(p.TCPSocket.Port, container) port, err := extractPort(p.TCPSocket.Port, container)
if err != nil { if err != nil {
return probe.Unknown, err return probe.Unknown, err
} }
return tcprober.Probe(status.PodIP, port, timeout) return kl.prober.tcp.Probe(status.PodIP, port, timeout)
} }
glog.Warningf("Failed to find probe builder for %s %+v", container.Name, container.LivenessProbe) glog.Warningf("Failed to find probe builder for %s %+v", container.Name, container.LivenessProbe)
return probe.Unknown, nil return probe.Unknown, nil
@ -207,3 +201,17 @@ func (r *readinessStates) remove(key string) {
defer r.Unlock() defer r.Unlock()
delete(r.states, key) delete(r.states, key)
} }
func newProbeHolder() probeHolder {
return probeHolder{
exec: execprobe.New(),
http: httprobe.New(),
tcp: tcprobe.New(),
}
}
type probeHolder struct {
exec execprobe.ExecProber
http httprobe.HTTPProber
tcp tcprobe.TCPProber
}

View File

@ -28,12 +28,16 @@ import (
const defaultHealthyOutput = "ok" const defaultHealthyOutput = "ok"
func New() ExecProber { func New() ExecProber {
return ExecProber{} return execProber{}
} }
type ExecProber struct{} type ExecProber interface {
Probe(e uexec.Cmd) (probe.Result, error)
}
func (pr ExecProber) Probe(e uexec.Cmd) (probe.Result, error) { type execProber struct{}
func (pr execProber) Probe(e uexec.Cmd) (probe.Result, error) {
data, err := e.CombinedOutput() data, err := e.CombinedOutput()
glog.V(4).Infof("health check response: %s", string(data)) glog.V(4).Infof("health check response: %s", string(data))
if err != nil { if err != nil {

View File

@ -30,15 +30,19 @@ import (
func New() HTTPProber { func New() HTTPProber {
transport := &http.Transport{} transport := &http.Transport{}
return HTTPProber{transport} return httpProber{transport}
} }
type HTTPProber struct { type HTTPProber interface {
Probe(host string, port int, path string, timeout time.Duration) (probe.Result, error)
}
type httpProber struct {
transport *http.Transport transport *http.Transport
} }
// Probe returns a ProbeRunner capable of running an http check. // Probe returns a ProbeRunner capable of running an http check.
func (pr *HTTPProber) Probe(host string, port int, path string, timeout time.Duration) (probe.Result, error) { func (pr httpProber) Probe(host string, port int, path string, timeout time.Duration) (probe.Result, error) {
return DoHTTPProbe(formatURL(host, port, path), &http.Client{Timeout: timeout, Transport: pr.transport}) return DoHTTPProbe(formatURL(host, port, path), &http.Client{Timeout: timeout, Transport: pr.transport})
} }

View File

@ -27,12 +27,16 @@ import (
) )
func New() TCPProber { func New() TCPProber {
return TCPProber{} return tcpProber{}
} }
type TCPProber struct{} type TCPProber interface {
Probe(host string, port int, timeout time.Duration) (probe.Result, error)
}
func (pr TCPProber) Probe(host string, port int, timeout time.Duration) (probe.Result, error) { type tcpProber struct{}
func (pr tcpProber) Probe(host string, port int, timeout time.Duration) (probe.Result, error) {
return DoTCPProbe(net.JoinHostPort(host, strconv.Itoa(port)), timeout) return DoTCPProbe(net.JoinHostPort(host, strconv.Itoa(port)), timeout)
} }