add http health checks.

pull/6/head
Brendan Burns 2014-07-02 22:35:50 -07:00
parent 4b6ff69925
commit 41c6680943
8 changed files with 339 additions and 10 deletions

View File

@ -5,11 +5,21 @@
"version": "v1beta1",
"id": "php",
"containers": [{
"name": "nginx",
"image": "dockerfile/nginx",
"ports": [{
"containerPort": 80,
"hostPort": 8080
}]
}],
"livenessProbe": {
"enabled": true,
"type": "http",
"initialDelaySeconds": 30,
"httpGet": {
"path": "/index.html",
"port": "8080"
}
}
}]
}
},

View File

@ -55,6 +55,8 @@ server {
proxy_buffers 16 32k;
proxy_busy_buffers_size 64k;
proxy_temp_file_write_size 64k;
# Disable retry
proxy_next_upstream off;
}
location /etcd/ {
auth_basic "Restricted";

View File

@ -102,6 +102,25 @@ type EnvVar struct {
Value string `yaml:"value,omitempty" json:"value,omitempty"`
}
type HTTPGetProbe struct {
// Path to access on the http server
Path string `yaml:"path,omitempty" json:"path,omitempty"`
// Name or number of the port to access on the container
Port string `yaml:"port,omitempty" json:"port,omitempty"`
// Host name to connect to. Optional, default: "localhost"
Host string `yaml:"host,omitempty" json:"host,omitempty"`
}
type LivenessProbe struct {
Enabled bool `yaml:"enabled,omitempty" json:"enabled,omitempty"`
// Type of liveness probe. Current legal values "http"
Type string `yaml:"type,omitempty" json:"type,omitempty"`
// HTTPGetProbe parameters, required if Type == 'http'
HTTPGet HTTPGetProbe `yaml:"httpGet,omitempty" json:"httpGet,omitempty"`
// Length of time before health checking is activated. In seconds.
InitialDelaySeconds int64 `yaml:"initialDelaySeconds,omitempty" json:"initialDelaySeconds,omitempty"`
}
// Container represents a single container that is expected to be run on the host.
type Container struct {
// Required: This must be a DNS_LABEL. Each container in a pod must
@ -118,8 +137,9 @@ type Container struct {
// Optional: Defaults to unlimited.
Memory int `yaml:"memory,omitempty" json:"memory,omitempty"`
// Optional: Defaults to unlimited.
CPU int `yaml:"cpu,omitempty" json:"cpu,omitempty"`
VolumeMounts []VolumeMount `yaml:"volumeMounts,omitempty" json:"volumeMounts,omitempty"`
CPU int `yaml:"cpu,omitempty" json:"cpu,omitempty"`
VolumeMounts []VolumeMount `yaml:"volumeMounts,omitempty" json:"volumeMounts,omitempty"`
LivenessProbe LivenessProbe `yaml:"livenessProbe,omitempty" json:"livenessProbe,omitempty"`
}
// Percentile represents a pair which contains a percentage from 0 to 100 and

View File

@ -56,8 +56,9 @@ func (f *FakeDockerClient) CreateContainer(c docker.CreateContainerOptions) (*do
f.Created = append(f.Created, c.Name)
// This is not a very good fake. We'll just add this container's name to the list.
// Docker likes to add a '/', so copy that behavior.
f.containerList = append(f.containerList, docker.APIContainers{ID: c.Name, Names: []string{"/" + c.Name}})
return &docker.Container{ID: "/" + c.Name}, nil
name := "/" + c.Name
f.containerList = append(f.containerList, docker.APIContainers{ID: name, Names: []string{name}})
return &docker.Container{ID: name}, nil
}
func (f *FakeDockerClient) StartContainer(id string, hostConfig *docker.HostConfig) error {
@ -68,6 +69,13 @@ func (f *FakeDockerClient) StartContainer(id string, hostConfig *docker.HostConf
func (f *FakeDockerClient) StopContainer(id string, timeout uint) error {
f.appendCall("stop")
f.stopped = append(f.stopped, id)
var newList []docker.APIContainers
for _, container := range f.containerList {
if container.ID != id {
newList = append(newList, container)
}
}
f.containerList = newList
return f.err
}

View File

@ -0,0 +1,99 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
"net/http"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/golang/glog"
)
type HealthChecker interface {
IsHealthy(container api.Container) (bool, error)
}
type httpDoInterface interface {
Get(string) (*http.Response, error)
}
func MakeHealthChecker() HealthChecker {
return &MuxHealthChecker{
checkers: map[string]HealthChecker{
"http": &HTTPHealthChecker{
client: &http.Client{},
},
},
}
}
type MuxHealthChecker struct {
checkers map[string]HealthChecker
}
func (m *MuxHealthChecker) IsHealthy(container api.Container) (bool, error) {
checker, ok := m.checkers[container.LivenessProbe.Type]
if !ok || checker == nil {
glog.Warningf("Failed to find health checker for %s %s", container.Name, container.LivenessProbe.Type)
return true, nil
}
return checker.IsHealthy(container)
}
type HTTPHealthChecker struct {
client httpDoInterface
}
func (h *HTTPHealthChecker) findPort(container api.Container, portName string) int64 {
for _, port := range container.Ports {
if port.Name == portName {
// TODO This means you can only health check exposed ports
return int64(port.HostPort)
}
}
return -1
}
func (h *HTTPHealthChecker) IsHealthy(container api.Container) (bool, error) {
params := container.LivenessProbe.HTTPGet
port := h.findPort(container, params.Port)
if port == -1 {
var err error
port, err = strconv.ParseInt(params.Port, 10, 0)
if err != nil {
return true, err
}
}
var host string
if len(params.Host) > 0 {
host = params.Host
} else {
host = "localhost"
}
url := fmt.Sprintf("http://%s:%d%s", host, port, params.Path)
res, err := h.client.Get(url)
if res != nil && res.Body != nil {
defer res.Body.Close()
}
if err != nil {
// At this point, if it fails, its either a policy (unlikely) or HTTP protocol (likely) error.
return false, nil
}
return res.StatusCode == http.StatusOK, nil
}

View File

@ -0,0 +1,91 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"net/http"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
type fakeHttpClient struct {
req string
res http.Response
err error
}
func (f *fakeHttpClient) Get(url string) (*http.Response, error) {
f.req = url
return &f.res, f.err
}
func TestHttpHealth(t *testing.T) {
fakeClient := fakeHttpClient{
res: http.Response{
StatusCode: http.StatusOK,
},
}
check := HTTPHealthChecker{
client: &fakeClient,
}
container := api.Container{
LivenessProbe: api.LivenessProbe{
HTTPGet: api.HTTPGetProbe{
Port: "8080",
Path: "/foo/bar",
},
Type: "http",
},
}
ok, err := check.IsHealthy(container)
if !ok {
t.Error("Unexpected unhealthy")
}
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
if fakeClient.req != "http://localhost:8080/foo/bar" {
t.Errorf("Unexpected url: %s", fakeClient.req)
}
}
func TestFindPort(t *testing.T) {
container := api.Container{
Ports: []api.Port{
{
Name: "foo",
HostPort: 8080,
},
{
Name: "bar",
HostPort: 9000,
},
},
}
check := HTTPHealthChecker{}
validatePort(t, check.findPort(container, "foo"), 8080)
}
func validatePort(t *testing.T, port int64, expectedPort int64) {
if port != expectedPort {
t.Errorf("Unexpected port: %d, expected: %d", port, expectedPort)
}
}

View File

@ -92,6 +92,7 @@ type Kubelet struct {
SyncFrequency time.Duration
HTTPCheckFrequency time.Duration
pullLock sync.Mutex
HealthChecker HealthChecker
}
type manifestUpdate struct {
@ -155,6 +156,7 @@ func (kl *Kubelet) RunKubelet(dockerEndpoint, config_path, manifest_url, etcd_se
}
go util.Forever(func() { s.ListenAndServe() }, 0)
}
kl.HealthChecker = MakeHealthChecker()
kl.syncLoop(updateChannel, kl)
}
@ -219,6 +221,19 @@ func (kl *Kubelet) getContainerID(manifest *api.ContainerManifest, container *ap
return "", nil
}
func (kl *Kubelet) getContainer(ID DockerID) (*docker.APIContainers, error) {
dockerContainers, err := kl.getDockerContainers()
if err != nil {
return nil, err
}
for dockerID, dockerContainer := range dockerContainers {
if dockerID == ID {
return &dockerContainer, nil
}
}
return nil, nil
}
func (kl *Kubelet) MakeDockerPuller() DockerPuller {
return dockerPuller{
client: kl.DockerClient,
@ -686,7 +701,6 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel cha
}
keepChannel <- netID
for _, container := range manifest.Containers {
glog.Infof("Syncing container: %v", container)
containerID, err := kl.getContainerID(manifest, &container)
if err != nil {
glog.Errorf("Error finding container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name)
@ -706,7 +720,28 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel cha
continue
}
} else {
glog.Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID)
glog.V(1).Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID)
dockerContainer, err := kl.getContainer(containerID)
// TODO: This should probably be separated out into a separate goroutine.
healthy, err := kl.healthy(container, dockerContainer)
if err != nil {
glog.V(1).Infof("health check errored: %v", err)
continue
}
if !healthy {
glog.V(1).Infof("manifest %s container %s is unhealthy.", manifest.ID, container.Name)
if err != nil {
glog.V(1).Infof("Failed to get container info %v, for %s", err, containerID)
continue
}
err = kl.killContainer(*dockerContainer)
if err != nil {
glog.V(1).Infof("Failed to kill container %s: %v", containerID, err)
continue
}
containerID, err = kl.runContainer(manifest, &container, "container:"+string(netID))
}
}
keepChannel <- containerID
}
@ -942,3 +977,17 @@ func (kl *Kubelet) GetContainerStats(podID, containerName string) (*api.Containe
func (kl *Kubelet) GetMachineStats() (*api.ContainerStats, error) {
return kl.statsFromContainerPath("/")
}
func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIContainers) (bool, error) {
// Give the container 60 seconds to start up.
if !container.LivenessProbe.Enabled {
return true, nil
}
if time.Now().Unix()-dockerContainer.Created < container.LivenessProbe.InitialDelaySeconds {
return true, nil
}
if kl.HealthChecker == nil {
return true, nil
}
return kl.HealthChecker.IsHealthy(container)
}

View File

@ -106,9 +106,11 @@ func verifyCalls(t *testing.T, fakeDocker *FakeDockerClient, calls []string) {
func verifyStringArrayEquals(t *testing.T, actual, expected []string) {
invalid := len(actual) != len(expected)
for ix, value := range actual {
if expected[ix] != value {
invalid = true
if !invalid {
for ix, value := range actual {
if expected[ix] != value {
invalid = true
}
}
}
if invalid {
@ -382,7 +384,7 @@ func TestSyncManifestsDoesNothing(t *testing.T) {
},
})
expectNoError(t, err)
verifyCalls(t, fakeDocker, []string{"list", "list", "list"})
verifyCalls(t, fakeDocker, []string{"list", "list", "list", "list"})
}
func TestSyncManifestsDeletes(t *testing.T) {
@ -420,6 +422,54 @@ func TestSyncManifestsDeletes(t *testing.T) {
}
}
type FalseHealthChecker struct{}
func (f *FalseHealthChecker) IsHealthy(container api.Container) (bool, error) {
return false, nil
}
func TestSyncManifestsUnhealthy(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
kubelet.HealthChecker = &FalseHealthChecker{}
fakeDocker.containerList = []docker.APIContainers{
{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s--bar--foo"},
ID: "1234",
},
{
// network container
Names: []string{"/k8s--net--foo--"},
ID: "9876",
},
}
err := kubelet.SyncManifests([]api.ContainerManifest{
{
ID: "foo",
Containers: []api.Container{
{Name: "bar",
LivenessProbe: api.LivenessProbe{
Enabled: true,
// Always returns healthy == false
Type: "false",
},
},
},
}})
expectNoError(t, err)
verifyCalls(t, fakeDocker, []string{"list", "list", "list", "stop", "create", "start", "list"})
// A map interation is used to delete containers, so must not depend on
// order here.
expectedToStop := map[string]bool{
"1234": true,
}
if len(fakeDocker.stopped) != 1 ||
!expectedToStop[fakeDocker.stopped[0]] {
t.Errorf("Wrong containers were stopped: %v", fakeDocker.stopped)
}
}
func TestEventWriting(t *testing.T) {
kubelet, fakeEtcd, _ := makeTestKubelet(t)
expectedEvent := api.Event{