Merge pull request #735 from brendandburns/tcp-health

Add TCP socket based health checking.
pull/6/head
erictune 2014-08-01 16:35:12 -07:00
commit 5b589cf115
7 changed files with 127 additions and 16 deletions

View File

@ -144,12 +144,20 @@ type HTTPGetProbe struct {
Host string `yaml:"host,omitempty" json:"host,omitempty"`
}
// TCPSocketProbe describes a liveness probe based on opening a socket
type TCPSocketProbe struct {
// Port is the port to connect to. Required.
Port int `yaml:"port,omitempty" json:"port,omitempty"`
}
// LivenessProbe describes a liveness probe to be examined to the container.
type LivenessProbe struct {
// Type of liveness probe. Current legal values "http"
// Type of liveness probe. Current legal values "http", "tcp"
Type string `yaml:"type,omitempty" json:"type,omitempty"`
// HTTPGetProbe parameters, required if Type == 'http'
HTTPGet *HTTPGetProbe `yaml:"httpGet,omitempty" json:"httpGet,omitempty"`
// TCPSocketProbe parameter, required if Type == 'tcp'
TCPSocket *TCPSocketProbe `yaml:"tcpSocket,omitempty" json:"tcpSocket,omitempty"`
// Length of time before health checking is activated. In seconds.
InitialDelaySeconds int64 `yaml:"initialDelaySeconds,omitempty" json:"initialDelaySeconds,omitempty"`
}

View File

@ -147,12 +147,20 @@ type HTTPGetProbe struct {
Host string `yaml:"host,omitempty" json:"host,omitempty"`
}
// TCPSocketProbe describes a liveness probe based on opening a socket
type TCPSocketProbe struct {
// Port is the port to connect to. Required.
Port int `yaml:"port,omitempty" json:"port,omitempty"`
}
// LivenessProbe describes a liveness probe to be examined to the container.
type LivenessProbe struct {
// Type of liveness probe. Current legal values "http"
// Type of liveness probe. Current legal values "http", "tcp"
Type string `yaml:"type,omitempty" json:"type,omitempty"`
// HTTPGetProbe parameters, required if Type == 'http'
HTTPGet *HTTPGetProbe `yaml:"httpGet,omitempty" json:"httpGet,omitempty"`
// TCPSocketProbe parameter, required if Type == 'tcp'
TCPSocket *TCPSocketProbe `yaml:"tcpSocket,omitempty" json:"tcpSocket,omitempty"`
// Length of time before health checking is activated. In seconds.
InitialDelaySeconds int64 `yaml:"initialDelaySeconds,omitempty" json:"initialDelaySeconds,omitempty"`
}

View File

@ -192,6 +192,12 @@ func (c *Client) CreatePod(pod api.Pod) (result api.Pod, err error) {
// UpdatePod takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs
func (c *Client) UpdatePod(pod api.Pod) (result api.Pod, err error) {
var prev api.Pod
err = c.Get().Path("pod").Path(pod.ID).Do().Into(&prev)
if err != nil {
return
}
pod.ResourceVersion = prev.ResourceVersion
err = c.Put().Path("pods").Path(pod.ID).Body(pod).Do().Into(&result)
return
}
@ -216,6 +222,12 @@ func (c *Client) CreateReplicationController(controller api.ReplicationControlle
// UpdateReplicationController updates an existing replication controller
func (c *Client) UpdateReplicationController(controller api.ReplicationController) (result api.ReplicationController, err error) {
var prev api.ReplicationController
err = c.Get().Path("replicationControllers").Path(controller.ID).Do().Into(&prev)
if err != nil {
return
}
controller.ResourceVersion = prev.ResourceVersion
err = c.Put().Path("replicationControllers").Path(controller.ID).Body(controller).Do().Into(&result)
return
}
@ -239,6 +251,12 @@ func (c *Client) CreateService(svc api.Service) (result api.Service, err error)
// UpdateService updates an existing service.
func (c *Client) UpdateService(svc api.Service) (result api.Service, err error) {
var prev api.Service
err = c.Get().Path("services").Path(svc.ID).Do().Into(&prev)
if err != nil {
return
}
svc.ResourceVersion = prev.ResourceVersion
err = c.Put().Path("services").Path(svc.ID).Body(svc).Do().Into(&result)
return
}

View File

@ -18,6 +18,7 @@ package health
import (
"fmt"
"net"
"net/http"
"strconv"
@ -27,7 +28,7 @@ import (
// HealthChecker defines an abstract interface for checking container health.
type HealthChecker interface {
HealthCheck(container api.Container) (Status, error)
HealthCheck(currentState api.PodState, container api.Container) (Status, error)
}
// NewHealthChecker creates a new HealthChecker which supports multiple types of liveness probes.
@ -37,6 +38,7 @@ func NewHealthChecker() HealthChecker {
"http": &HTTPHealthChecker{
client: &http.Client{},
},
"tcp": &TCPHealthChecker{},
},
}
}
@ -49,13 +51,13 @@ type MuxHealthChecker struct {
// HealthCheck delegates the health-checking of the container to one of the bundled implementations.
// It chooses an implementation according to container.LivenessProbe.Type.
// If there is no matching health checker it returns Unknown, nil.
func (m *MuxHealthChecker) HealthCheck(container api.Container) (Status, error) {
func (m *MuxHealthChecker) HealthCheck(currentState api.PodState, container api.Container) (Status, error) {
checker, ok := m.checkers[container.LivenessProbe.Type]
if !ok || checker == nil {
glog.Warningf("Failed to find health checker for %s %s", container.Name, container.LivenessProbe.Type)
return Unknown, nil
}
return checker.HealthCheck(container)
return checker.HealthCheck(currentState, container)
}
// HTTPHealthChecker is an implementation of HealthChecker which checks container health by sending HTTP Get requests.
@ -74,7 +76,7 @@ func (h *HTTPHealthChecker) findPort(container api.Container, portName string) i
}
// HealthCheck checks if the container is healthy by trying sending HTTP Get requests to the container.
func (h *HTTPHealthChecker) HealthCheck(container api.Container) (Status, error) {
func (h *HTTPHealthChecker) HealthCheck(currentState api.PodState, container api.Container) (Status, error) {
params := container.LivenessProbe.HTTPGet
if params == nil {
return Unknown, fmt.Errorf("Error, no HTTP parameters specified: %v", container)
@ -91,8 +93,29 @@ func (h *HTTPHealthChecker) HealthCheck(container api.Container) (Status, error)
if len(params.Host) > 0 {
host = params.Host
} else {
host = "localhost"
host = currentState.PodIP
}
url := fmt.Sprintf("http://%s:%d%s", host, port, params.Path)
return Check(url, h.client)
}
type TCPHealthChecker struct{}
func (t *TCPHealthChecker) HealthCheck(currentState api.PodState, container api.Container) (Status, error) {
params := container.LivenessProbe.TCPSocket
if params == nil {
return Unknown, fmt.Errorf("error, no TCP parameters specified: %v", container)
}
if len(currentState.PodIP) == 0 {
return Unknown, fmt.Errorf("no host specified.")
}
conn, err := net.Dial("tcp", net.JoinHostPort(currentState.PodIP, strconv.Itoa(params.Port)))
if err != nil {
return Unhealthy, nil
}
err = conn.Close()
if err != nil {
glog.Errorf("unexpected error closing health check socket: %v (%#v)", err, err)
}
return Healthy, nil
}

View File

@ -21,6 +21,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -66,7 +67,7 @@ func TestHealthChecker(t *testing.T) {
},
}
hc := NewHealthChecker()
health, err := hc.HealthCheck(container)
health, err := hc.HealthCheck(api.PodState{}, container)
if err != nil && tt.health != Unknown {
t.Errorf("Unexpected error: %v", err)
}
@ -139,7 +140,7 @@ func TestHTTPHealthChecker(t *testing.T) {
params.Host = host
}
}
health, err := hc.HealthCheck(container)
health, err := hc.HealthCheck(api.PodState{PodIP: host}, container)
if tt.health == Unknown && err == nil {
t.Errorf("Expected error")
}
@ -152,6 +153,47 @@ func TestHTTPHealthChecker(t *testing.T) {
}
}
func TestTcpHealthChecker(t *testing.T) {
type tcpHealthTest struct {
probe *api.LivenessProbe
expectedStatus Status
expectError bool
}
checker := &TCPHealthChecker{}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
u, err := url.Parse(server.URL)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
host, port, err := net.SplitHostPort(u.Host)
portNum, _ := strconv.Atoi(port)
tests := []tcpHealthTest{
{&api.LivenessProbe{TCPSocket: &api.TCPSocketProbe{Port: portNum}}, Healthy, false},
{&api.LivenessProbe{TCPSocket: &api.TCPSocketProbe{Port: 100000}}, Unhealthy, false},
{&api.LivenessProbe{}, Unknown, true},
}
for _, test := range tests {
probe := test.probe
container := api.Container{
LivenessProbe: probe,
}
status, err := checker.HealthCheck(api.PodState{PodIP: host}, container)
if status != test.expectedStatus {
t.Errorf("expected: %v, got: %v", test.expectedStatus, status)
}
if err != nil && !test.expectError {
t.Errorf("unexpected error: %#v", err)
}
if err == nil && test.expectError {
t.Errorf("unexpected non-error.")
}
}
}
func TestMuxHealthChecker(t *testing.T) {
muxHealthCheckerTests := []struct {
health Status
@ -188,7 +230,7 @@ func TestMuxHealthChecker(t *testing.T) {
container.LivenessProbe.Type = tt.probeType
container.LivenessProbe.HTTPGet.Port = port
container.LivenessProbe.HTTPGet.Host = host
health, err := mc.HealthCheck(container)
health, err := mc.HealthCheck(api.PodState{}, container)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

View File

@ -376,6 +376,16 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error {
return err
}
podState := api.PodState{}
info, err := kl.GetPodInfo(podFullName)
if err != nil {
glog.Errorf("Unable to get pod info, health checks may be invalid.")
}
netInfo, found := info[networkContainerName]
if found && netInfo.NetworkSettings != nil {
podState.PodIP = netInfo.NetworkSettings.IPAddress
}
for _, container := range pod.Manifest.Containers {
if dockerContainer, found := dockerContainers.FindPodContainer(podFullName, container.Name); found {
containerID := DockerID(dockerContainer.ID)
@ -383,7 +393,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error {
glog.V(1).Infof("pod %s container %s exists as %v", podFullName, container.Name, containerID)
// TODO: This should probably be separated out into a separate goroutine.
healthy, err := kl.healthy(container, dockerContainer)
healthy, err := kl.healthy(podState, container, dockerContainer)
if err != nil {
glog.V(1).Infof("health check errored: %v", err)
continue
@ -592,7 +602,7 @@ func (kl *Kubelet) GetMachineInfo() (*info.MachineInfo, error) {
return kl.cadvisorClient.MachineInfo()
}
func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIContainers) (health.Status, error) {
func (kl *Kubelet) healthy(currentState api.PodState, container api.Container, dockerContainer *docker.APIContainers) (health.Status, error) {
// Give the container 60 seconds to start up.
if container.LivenessProbe == nil {
return health.Healthy, nil
@ -603,7 +613,7 @@ func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIC
if kl.healthChecker == nil {
return health.Healthy, nil
}
return kl.healthChecker.HealthCheck(container)
return kl.healthChecker.HealthCheck(currentState, container)
}
// Returns logs of current machine.

View File

@ -67,6 +67,8 @@ func makeTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *FakeDocker
}
func verifyCalls(t *testing.T, fakeDocker *FakeDockerClient, calls []string) {
fakeDocker.lock.Lock()
defer fakeDocker.lock.Unlock()
verifyStringArrayEquals(t, fakeDocker.called, calls)
}
@ -318,7 +320,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
},
}, dockerContainers)
expectNoError(t, err)
verifyCalls(t, fakeDocker, []string{"stop"})
verifyCalls(t, fakeDocker, []string{"list", "stop"})
// Expect one of the duplicates to be killed.
if len(fakeDocker.stopped) != 1 || (len(fakeDocker.stopped) != 0 && fakeDocker.stopped[0] != "1234" && fakeDocker.stopped[0] != "4567") {
@ -328,7 +330,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
type FalseHealthChecker struct{}
func (f *FalseHealthChecker) HealthCheck(container api.Container) (health.Status, error) {
func (f *FalseHealthChecker) HealthCheck(state api.PodState, container api.Container) (health.Status, error) {
return health.Unhealthy, nil
}
@ -363,7 +365,7 @@ func TestSyncPodUnhealthy(t *testing.T) {
},
}, dockerContainers)
expectNoError(t, err)
verifyCalls(t, fakeDocker, []string{"stop", "create", "start"})
verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start"})
// A map interation is used to delete containers, so must not depend on
// order here.