Merge pull request #356 from smarterclayton/refactor_config_sources

Use config sources for Kubelet, switch to podFullName
pull/6/head
Daniel Smith 2014-07-21 18:43:07 -07:00
commit aaf0180ef4
22 changed files with 1882 additions and 917 deletions

View File

@ -32,6 +32,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -85,6 +86,9 @@ func startComponents(manifestURL string) (apiServerURL string) {
handler := delegateHandler{}
apiserver := httptest.NewServer(&handler)
etcdClient := etcd.NewClient(servers)
cl := client.New(apiserver.URL, nil)
cl.PollPeriod = time.Second * 1
cl.Sync = true
@ -93,32 +97,39 @@ func startComponents(manifestURL string) (apiServerURL string) {
m := master.New(servers, machineList, fakePodInfoGetter{}, nil, "", cl)
handler.delegate = m.ConstructHandler("/api/v1beta1")
controllerManager := controller.MakeReplicationManager(etcd.NewClient(servers), cl)
controllerManager := controller.MakeReplicationManager(etcdClient, cl)
controllerManager.Run(1 * time.Second)
// Kubelet
myKubelet := kubelet.Kubelet{
Hostname: machineList[0],
DockerClient: &fakeDocker1,
DockerPuller: &kubelet.FakeDockerPuller{},
FileCheckFrequency: 5 * time.Second,
SyncFrequency: 5 * time.Second,
HTTPCheckFrequency: 5 * time.Second,
// Kubelet (localhost)
cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[0]), etcdClient, 30*time.Second, cfg1.Channel("etcd"))
config.NewSourceURL(manifestURL, 5*time.Second, cfg1.Channel("url"))
myKubelet := &kubelet.Kubelet{
Hostname: machineList[0],
DockerClient: &fakeDocker1,
DockerPuller: &kubelet.FakeDockerPuller{},
}
go myKubelet.RunKubelet("", "", manifestURL, servers, "localhost", 10250)
go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0)
go util.Forever(cfg1.Sync, 3*time.Second)
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), http.DefaultServeMux, "localhost", 10250)
}, 0)
// Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule.
otherKubelet := kubelet.Kubelet{
Hostname: machineList[1],
DockerClient: &fakeDocker2,
DockerPuller: &kubelet.FakeDockerPuller{},
FileCheckFrequency: 5 * time.Second,
SyncFrequency: 5 * time.Second,
HTTPCheckFrequency: 5 * time.Second,
cfg2 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, 30*time.Second, cfg2.Channel("etcd"))
otherKubelet := &kubelet.Kubelet{
Hostname: machineList[1],
DockerClient: &fakeDocker2,
DockerPuller: &kubelet.FakeDockerPuller{},
}
go otherKubelet.RunKubelet("", "", "", servers, "localhost", 10251)
go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0)
go util.Forever(cfg2.Sync, 3*time.Second)
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), http.DefaultServeMux, "localhost", 10251)
}, 0)
return apiserver.URL
}

View File

@ -23,16 +23,20 @@ package main
import (
"flag"
"math/rand"
"net/http"
"os"
"os/exec"
"strings"
"time"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
kconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
"github.com/google/cadvisor/client"
)
var (
@ -77,7 +81,7 @@ func getHostname() string {
}
hostname = fqdn
}
return string(hostname)
return strings.TrimSpace(string(hostname))
}
func main() {
@ -93,12 +97,57 @@ func main() {
glog.Fatal("Couldn't connect to docker.")
}
k := kubelet.Kubelet{
Hostname: getHostname(),
DockerClient: dockerClient,
FileCheckFrequency: *fileCheckFrequency,
SyncFrequency: *syncFrequency,
HTTPCheckFrequency: *httpCheckFrequency,
cadvisorClient, err := cadvisor.NewClient("http://127.0.0.1:5000")
if err != nil {
glog.Errorf("Error on creating cadvisor client: %v", err)
}
k.RunKubelet(*dockerEndpoint, *config, *manifestURL, etcdServerList, *address, *port)
hostname := getHostname()
k := &kubelet.Kubelet{
Hostname: hostname,
DockerClient: dockerClient,
CadvisorClient: cadvisorClient,
}
// source of all configuration
cfg := kconfig.NewPodConfig(kconfig.PodConfigNotificationSnapshotAndUpdates)
// define file config source
if *config != "" {
kconfig.NewSourceFile(*config, *fileCheckFrequency, cfg.Channel("file"))
}
// define url config source
if *manifestURL != "" {
kconfig.NewSourceURL(*manifestURL, *httpCheckFrequency, cfg.Channel("http"))
}
// define etcd config source and initialize etcd client
if len(etcdServerList) > 0 {
glog.Infof("Watching for etcd configs at %v", etcdServerList)
k.EtcdClient = etcd.NewClient(etcdServerList)
kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), k.EtcdClient, 30*time.Second, cfg.Channel("etcd"))
}
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
// start the kubelet
go util.Forever(func() { k.Run(cfg.Updates()) }, 0)
// resynchronize periodically
// TODO: make this part of PodConfig so that it is only delivered after syncFrequency has elapsed without
// an update
go util.Forever(cfg.Sync, *syncFrequency)
// start the kubelet server
if *address != "" {
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), http.DefaultServeMux, *address, *port)
}, 0)
}
// runs forever
select {}
}

View File

@ -272,9 +272,6 @@ func ValidateManifest(manifest *ContainerManifest) []error {
} else if !supportedManifestVersions.Has(strings.ToLower(manifest.Version)) {
allErrs.Append(makeNotSupportedError("ContainerManifest.Version", manifest.Version))
}
if !util.IsDNSSubdomain(manifest.ID) {
allErrs.Append(makeInvalidError("ContainerManifest.ID", manifest.ID))
}
allVolumes, errs := validateVolumes(manifest.Volumes)
if len(errs) != 0 {
allErrs.Append(errs...)

View File

@ -242,11 +242,8 @@ func TestValidateManifest(t *testing.T) {
}
errorCases := map[string]ContainerManifest{
"empty version": {Version: "", ID: "abc"},
"invalid version": {Version: "bogus", ID: "abc"},
"zero-length id": {Version: "v1beta1", ID: ""},
"id > 255 characters": {Version: "v1beta1", ID: strings.Repeat("a", 256)},
"id not a DNS subdomain": {Version: "v1beta1", ID: "a.b.c."},
"empty version": {Version: "", ID: "abc"},
"invalid version": {Version: "bogus", ID: "abc"},
"invalid volume name": {
Version: "v1beta1",
ID: "abc",

View File

@ -0,0 +1,304 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"fmt"
"reflect"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/config"
"github.com/golang/glog"
)
// PodConfigListener receives notifications for changes to a configuration.
type PodConfigListener interface {
// OnUpdate is invoked when the kubelet.Pod configuration has been changed by one of
// the sources. The update is properly normalized to remove duplicates.
OnUpdate(pod kubelet.PodUpdate)
}
// ListenerFunc implements the PodConfigListener interface
type ListenerFunc func(update kubelet.PodUpdate)
func (h ListenerFunc) OnUpdate(update kubelet.PodUpdate) {
h(update)
}
// PodConfigNotificationMode describes how changes are sent to the update channel
type PodConfigNotificationMode int
const (
// PodConfigNotificationSnapshot delivers the full configuration as a SET whenever
// any change occurs
PodConfigNotificationSnapshot = iota
// PodConfigNotificationSetsAndUpdates delivers an UPDATE message whenever pods are
// changed, and a SET message if there are any additions or removals.
PodConfigNotificationSnapshotAndUpdates
// PodConfigNotificationIncremental delivers ADD, UPDATE, and REMOVE to the update channel
PodConfigNotificationIncremental
)
// PodConfig is a configuration mux that merges many sources of pod configuration into a single
// consistent structure, and then delivers incremental change notifications to listeners
// in order.
type PodConfig struct {
pods *podStorage
mux *config.Mux
// the channel of denormalized changes passed to listeners
updates chan kubelet.PodUpdate
}
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode) *PodConfig {
updates := make(chan kubelet.PodUpdate, 1)
storage := newPodStorage(updates, mode)
podConfig := &PodConfig{
pods: storage,
mux: config.NewMux(storage),
updates: updates,
}
return podConfig
}
// Channel creates or returns a config source channel. The channel
// only accepts PodUpdates
func (c *PodConfig) Channel(source string) chan<- interface{} {
return c.mux.Channel(source)
}
// Updates returns a channel of updates to the configuration, properly denormalized.
func (c *PodConfig) Updates() <-chan kubelet.PodUpdate {
return c.updates
}
// Sync requests the full configuration be delivered to the update channel.
func (c *PodConfig) Sync() {
c.pods.Sync()
}
// podStorage manages the current pod state at any point in time and ensures updates
// to the channel are delivered in order. Note that this object is an in-memory source of
// "truth" and on creation contains zero entries. Once all previously read sources are
// available, then this object should be considered authoritative.
type podStorage struct {
podLock sync.RWMutex
// map of source name to pod name to pod reference
pods map[string]map[string]*kubelet.Pod
mode PodConfigNotificationMode
// ensures that updates are delivered in strict order
// on the updates channel
updateLock sync.Mutex
updates chan<- kubelet.PodUpdate
}
// TODO: PodConfigNotificationMode could be handled by a listener to the updates channel
// in the future, especially with multiple listeners.
// TODO: allow initialization of the current state of the store with snapshotted version.
func newPodStorage(updates chan<- kubelet.PodUpdate, mode PodConfigNotificationMode) *podStorage {
return &podStorage{
pods: make(map[string]map[string]*kubelet.Pod),
mode: mode,
updates: updates,
}
}
// Merge normalizes a set of incoming changes from different sources into a map of all Pods
// and ensures that redundant changes are filtered out, and then pushes zero or more minimal
// updates onto the update channel. Ensures that updates are delivered in order.
func (s *podStorage) Merge(source string, change interface{}) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
adds, updates, deletes := s.merge(source, change)
// deliver update notifications
switch s.mode {
case PodConfigNotificationIncremental:
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if len(adds.Pods) > 0 {
s.updates <- *adds
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
case PodConfigNotificationSnapshotAndUpdates:
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 || len(adds.Pods) > 0 {
s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET}
}
case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 {
s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET}
}
default:
panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode))
}
return nil
}
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes *kubelet.PodUpdate) {
s.podLock.Lock()
defer s.podLock.Unlock()
adds = &kubelet.PodUpdate{Op: kubelet.ADD}
updates = &kubelet.PodUpdate{Op: kubelet.UPDATE}
deletes = &kubelet.PodUpdate{Op: kubelet.REMOVE}
pods := s.pods[source]
if pods == nil {
pods = make(map[string]*kubelet.Pod)
}
update := change.(kubelet.PodUpdate)
switch update.Op {
case kubelet.ADD, kubelet.UPDATE:
if update.Op == kubelet.ADD {
glog.Infof("Adding new pods from source %s : %v", source, update.Pods)
} else {
glog.Infof("Updating pods from source %s : %v", source, update.Pods)
}
filtered := filterInvalidPods(update.Pods, source)
for _, ref := range filtered {
name := ref.Name
if existing, found := pods[name]; found {
if !reflect.DeepEqual(existing.Manifest, ref.Manifest) {
// this is an update
existing.Manifest = ref.Manifest
updates.Pods = append(updates.Pods, *existing)
continue
}
// this is a no-op
continue
}
// this is an add
ref.Namespace = source
pods[name] = ref
adds.Pods = append(adds.Pods, *ref)
}
case kubelet.REMOVE:
glog.Infof("Removing a pod %v", update)
for _, value := range update.Pods {
name := value.Name
if existing, found := pods[name]; found {
// this is a delete
delete(pods, name)
deletes.Pods = append(deletes.Pods, *existing)
continue
}
// this is a no-op
}
case kubelet.SET:
glog.Infof("Setting pods for source %s : %v", source, update)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[string]*kubelet.Pod)
filtered := filterInvalidPods(update.Pods, source)
for _, ref := range filtered {
name := ref.Name
if existing, found := oldPods[name]; found {
pods[name] = existing
if !reflect.DeepEqual(existing.Manifest, ref.Manifest) {
// this is an update
existing.Manifest = ref.Manifest
updates.Pods = append(updates.Pods, *existing)
continue
}
// this is a no-op
continue
}
ref.Namespace = source
pods[name] = ref
adds.Pods = append(adds.Pods, *ref)
}
for name, existing := range oldPods {
if _, found := pods[name]; !found {
// this is a delete
deletes.Pods = append(deletes.Pods, *existing)
}
}
default:
glog.Infof("Received invalid update type: %v", update)
}
s.pods[source] = pods
return adds, updates, deletes
}
func filterInvalidPods(pods []kubelet.Pod, source string) (filtered []*kubelet.Pod) {
names := util.StringSet{}
for i := range pods {
var errors []error
if names.Has(pods[i].Name) {
errors = append(errors, api.ValidationError{api.ErrTypeDuplicate, "Pod.Name", pods[i].Name})
} else {
names.Insert(pods[i].Name)
}
if errs := kubelet.ValidatePod(&pods[i]); len(errs) != 0 {
errors = append(errors, errs...)
}
if len(errors) > 0 {
glog.Warningf("Pod %d from %s failed validation, ignoring: %v", i+1, source, errors)
continue
}
filtered = append(filtered, &pods[i])
}
return
}
// Sync sends a copy of the current state through the update channel
func (s *podStorage) Sync() {
s.updateLock.Lock()
defer s.updateLock.Unlock()
s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET}
}
// Object implements config.Accessor
func (s *podStorage) MergedState() interface{} {
s.podLock.RLock()
defer s.podLock.RUnlock()
pods := make([]kubelet.Pod, 0)
for source, sourcePods := range s.pods {
for _, podRef := range sourcePods {
pod := *podRef
pod.Namespace = source
pods = append(pods, pod)
}
}
return pods
}

View File

@ -0,0 +1,212 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
)
// TODO: remove this
func expectError(t *testing.T, err error) {
if err == nil {
t.Errorf("Expected error, Got %v", err)
}
}
// TODO: remove this
func expectNoError(t *testing.T, err error) {
if err != nil {
t.Errorf("Expected no error, Got %v", err)
}
}
func expectEmptyChannel(t *testing.T, ch <-chan interface{}) {
select {
case update := <-ch:
t.Errorf("Expected no update in channel, Got %v", update)
default:
}
}
type sortedPods []kubelet.Pod
func (s sortedPods) Len() int {
return len(s)
}
func (s sortedPods) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s sortedPods) Less(i, j int) bool {
if s[i].Namespace < s[j].Namespace {
return true
}
return s[i].Name < s[j].Name
}
func CreateValidPod(name, namespace string) kubelet.Pod {
return kubelet.Pod{
Name: name,
Namespace: namespace,
Manifest: api.ContainerManifest{
Version: "v1beta1",
},
}
}
func CreatePodUpdate(op kubelet.PodOperation, pods ...kubelet.Pod) kubelet.PodUpdate {
newPods := make([]kubelet.Pod, len(pods))
for i := range pods {
newPods[i] = pods[i]
}
return kubelet.PodUpdate{newPods, op}
}
func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubelet.PodUpdate, *PodConfig) {
config := NewPodConfig(mode)
channel := config.Channel("test")
ch := config.Updates()
return channel, ch, config
}
func expectPodUpdate(t *testing.T, ch <-chan kubelet.PodUpdate, expected ...kubelet.PodUpdate) {
for i := range expected {
update := <-ch
if !reflect.DeepEqual(expected[i], update) {
t.Fatalf("Expected %#v, Got %#v", expected[i], update)
}
}
expectNoPodUpdate(t, ch)
}
func expectNoPodUpdate(t *testing.T, ch <-chan kubelet.PodUpdate) {
select {
case update := <-ch:
t.Errorf("Expected no update in channel, Got %#v", update)
default:
}
}
func TestNewPodAdded(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test")))
}
func TestInvalidPodFiltered(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test")))
// add an invalid update
podUpdate = CreatePodUpdate(kubelet.UPDATE, kubelet.Pod{Name: "foo"})
channel <- podUpdate
expectNoPodUpdate(t, ch)
}
func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates)
// see an set
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test")))
// container updates are separated as UPDATE
pod := podUpdate.Pods[0]
pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}}
channel <- CreatePodUpdate(kubelet.ADD, pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod))
}
func TestNewPodAddedSnapshot(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot)
// see an set
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test")))
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test")))
// container updates are separated as UPDATE
pod := podUpdate.Pods[0]
pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}}
channel <- CreatePodUpdate(kubelet.ADD, pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, pod))
}
func TestNewPodAddedUpdatedRemoved(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
// should register an add
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test")))
// should ignore ADDs that are identical
expectNoPodUpdate(t, ch)
// an kubelet.ADD should be converted to kubelet.UPDATE
pod := CreateValidPod("foo", "test")
pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}}
podUpdate = CreatePodUpdate(kubelet.ADD, pod)
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod))
podUpdate = CreatePodUpdate(kubelet.REMOVE, kubelet.Pod{Name: "foo"})
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, pod))
}
func TestNewPodAddedUpdatedSet(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
// should register an add
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""), CreateValidPod("foo2", ""), CreateValidPod("foo3", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"), CreateValidPod("foo2", "test"), CreateValidPod("foo3", "test")))
// should ignore ADDs that are identical
expectNoPodUpdate(t, ch)
// should be converted to an kubelet.ADD, kubelet.REMOVE, and kubelet.UPDATE
pod := CreateValidPod("foo2", "test")
pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}}
podUpdate = CreatePodUpdate(kubelet.SET, pod, CreateValidPod("foo3", ""), CreateValidPod("foo4", "test"))
channel <- podUpdate
expectPodUpdate(t, ch,
CreatePodUpdate(kubelet.REMOVE, CreateValidPod("foo", "test")),
CreatePodUpdate(kubelet.ADD, CreateValidPod("foo4", "test")),
CreatePodUpdate(kubelet.UPDATE, pod))
}

140
pkg/kubelet/config/etcd.go Normal file
View File

@ -0,0 +1,140 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or sied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Reads the pod configuration from etcd using the Kubernetes etcd schema
package config
import (
"fmt"
"path"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
"gopkg.in/v1/yaml"
)
func EtcdKeyForHost(hostname string) string {
return path.Join("/", "registry", "hosts", hostname, "kubelet")
}
// TODO(lavalamp): Use a watcher interface instead of the etcd client directly
type SourceEtcd struct {
key string
client tools.EtcdClient
updates chan<- interface{}
waitDuration time.Duration
}
// NewSourceEtcd creates a config source that watches and pulls from a key in etcd
func NewSourceEtcd(key string, client tools.EtcdClient, period time.Duration, updates chan<- interface{}) *SourceEtcd {
config := &SourceEtcd{
key: key,
client: client,
updates: updates,
waitDuration: period,
}
glog.Infof("Watching etcd for %s", key)
go util.Forever(config.run, period)
return config
}
// run loops forever looking for changes to a key in etcd
func (s *SourceEtcd) run() {
index := uint64(0)
for {
lastIndex, err := s.fetchNextState(index)
if err != nil {
if !tools.IsEtcdNotFound(err) {
glog.Errorf("Unable to extract from the response (%s): %%v", s.key, err)
}
return
}
index = lastIndex + 1
}
}
// fetchNextState fetches the key (or waits for a change to a key) and then returns
// the index read. It will watch no longer than s.waitDuration and then return
func (s *SourceEtcd) fetchNextState(fromIndex uint64) (lastIndex uint64, err error) {
var response *etcd.Response
if fromIndex == 0 {
response, err = s.client.Get(s.key, true, false)
} else {
response, err = s.client.Watch(s.key, fromIndex, false, nil, stopChannel(s.waitDuration))
if tools.IsEtcdWatchStoppedByUser(err) {
return fromIndex, nil
}
}
if err != nil {
return 0, err
}
pods, err := responseToPods(response)
if err != nil {
glog.Infof("Response was in error: %#v", response)
return 0, fmt.Errorf("error parsing response: %#v", err)
}
glog.Infof("Got state from etcd: %+v", pods)
s.updates <- kubelet.PodUpdate{pods, kubelet.SET}
return response.Node.ModifiedIndex, nil
}
// responseToPods takes an etcd Response object, and turns it into a structured list of containers.
// It returns a list of containers, or an error if one occurs.
func responseToPods(response *etcd.Response) ([]kubelet.Pod, error) {
pods := []kubelet.Pod{}
if response.Node == nil || len(response.Node.Value) == 0 {
return pods, fmt.Errorf("no nodes field: %v", response)
}
manifests := []api.ContainerManifest{}
if err := yaml.Unmarshal([]byte(response.Node.Value), &manifests); err != nil {
return pods, fmt.Errorf("could not unmarshal manifests: %v", err)
}
for i, manifest := range manifests {
name := manifest.ID
if name == "" {
name = fmt.Sprintf("_%d", i+1)
}
pods = append(pods, kubelet.Pod{Name: name, Manifest: manifest})
}
return pods, nil
}
// stopChannel creates a channel that is closed after a duration for use with etcd client API
func stopChannel(until time.Duration) chan bool {
stop := make(chan bool)
go func() {
select {
case <-time.After(until):
}
stop <- true
close(stop)
}()
return stop
}

View File

@ -0,0 +1,147 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"reflect"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
)
// TODO(lavalamp): Use the etcd watcher from the tools package, and make sure all test cases here are tested there.
func TestGetEtcdData(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
ch := make(chan interface{})
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: util.MakeJSONString([]api.ContainerManifest{api.ContainerManifest{ID: "foo"}}),
ModifiedIndex: 1,
},
},
E: nil,
}
NewSourceEtcd("/registry/hosts/machine/kubelet", fakeClient, time.Millisecond, ch)
//TODO: update FakeEtcdClient.Watch to handle receiver=nil with a given index
//returns an infinite stream of updates
for i := 0; i < 2; i++ {
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: "foo", Manifest: api.ContainerManifest{ID: "foo"}})
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
}
}
}
func TestGetEtcdNoData(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
ch := make(chan interface{}, 1)
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: nil,
}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
_, err := c.fetchNextState(0)
expectError(t, err)
expectEmptyChannel(t, ch)
}
func TestGetEtcd(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
ch := make(chan interface{}, 1)
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: util.MakeJSONString([]api.ContainerManifest{api.ContainerManifest{ID: "foo"}}),
ModifiedIndex: 1,
},
},
E: nil,
}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
lastIndex, err := c.fetchNextState(0)
expectNoError(t, err)
if lastIndex != 1 {
t.Errorf("Expected %#v, Got %#v", 1, lastIndex)
}
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: "foo", Manifest: api.ContainerManifest{ID: "foo"}})
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
}
}
func TestWatchEtcd(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
ch := make(chan interface{}, 1)
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: util.MakeJSONString([]api.Container{}),
ModifiedIndex: 2,
},
},
E: nil,
}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
lastIndex, err := c.fetchNextState(1)
expectNoError(t, err)
if lastIndex != 2 {
t.Errorf("Expected %d, Got %d", 1, lastIndex)
}
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET)
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
}
}
func TestGetEtcdNotFound(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
ch := make(chan interface{}, 1)
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
_, err := c.fetchNextState(0)
expectError(t, err)
expectEmptyChannel(t, ch)
}
func TestGetEtcdError(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
ch := make(chan interface{}, 1)
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: &etcd.EtcdError{
ErrorCode: 200, // non not found error
},
}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
_, err := c.fetchNextState(0)
expectError(t, err)
expectEmptyChannel(t, ch)
}

149
pkg/kubelet/config/file.go Normal file
View File

@ -0,0 +1,149 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or sied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Reads the pod configuration from file or a directory of files
package config
import (
"crypto/sha1"
"encoding/base64"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
"gopkg.in/v1/yaml"
)
type SourceFile struct {
path string
updates chan<- interface{}
}
func NewSourceFile(path string, period time.Duration, updates chan<- interface{}) *SourceFile {
config := &SourceFile{
path: path,
updates: updates,
}
glog.Infof("Watching file %s", path)
go util.Forever(config.run, period)
return config
}
func (s *SourceFile) run() {
if err := s.extractFromPath(); err != nil {
glog.Errorf("Unable to read config file: %s", err)
}
}
func (s *SourceFile) extractFromPath() error {
path := s.path
statInfo, err := os.Stat(path)
if err != nil {
if !os.IsNotExist(err) {
return fmt.Errorf("unable to access path: %s", err)
}
return fmt.Errorf("path does not exist: %s", path)
}
switch {
case statInfo.Mode().IsDir():
pods, err := extractFromDir(path)
if err != nil {
return err
}
s.updates <- kubelet.PodUpdate{pods, kubelet.SET}
case statInfo.Mode().IsRegular():
pod, err := extractFromFile(path)
if err != nil {
return err
}
s.updates <- kubelet.PodUpdate{[]kubelet.Pod{pod}, kubelet.SET}
default:
return fmt.Errorf("path is not a directory or file")
}
return nil
}
func extractFromDir(name string) ([]kubelet.Pod, error) {
pods := []kubelet.Pod{}
files, err := filepath.Glob(filepath.Join(name, "[^.]*"))
if err != nil {
return pods, err
}
sort.Strings(files)
for _, file := range files {
pod, err := extractFromFile(file)
if err != nil {
return []kubelet.Pod{}, err
}
pods = append(pods, pod)
}
return pods, nil
}
func extractFromFile(name string) (kubelet.Pod, error) {
var pod kubelet.Pod
file, err := os.Open(name)
if err != nil {
return pod, err
}
defer file.Close()
data, err := ioutil.ReadAll(file)
if err != nil {
glog.Errorf("Couldn't read from file: %v", err)
return pod, err
}
if err := yaml.Unmarshal(data, &pod.Manifest); err != nil {
return pod, fmt.Errorf("could not unmarshal manifest: %v", err)
}
podName := pod.Manifest.ID
if podName == "" {
podName = simpleSubdomainSafeHash(name)
}
pod.Name = podName
return pod, nil
}
var simpleSubdomainSafeEncoding = base64.NewEncoding("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ012345678900")
// simpleSubdomainSafeHash generates a compact hash of the input that uses characters
// only in the range a-zA-Z0-9, making it suitable for DNS subdomain labels
func simpleSubdomainSafeHash(s string) string {
hasher := sha1.New()
hasher.Write([]byte(s))
sha := simpleSubdomainSafeEncoding.EncodeToString(hasher.Sum(nil))
if len(sha) > 20 {
sha = sha[:20]
}
return sha
}

View File

@ -0,0 +1,211 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"encoding/json"
"io/ioutil"
"os"
"reflect"
"sort"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"gopkg.in/v1/yaml"
)
func TestExtractFromNonExistentFile(t *testing.T) {
ch := make(chan interface{}, 1)
c := SourceFile{"/some/fake/file", ch}
err := c.extractFromPath()
expectError(t, err)
}
func TestUpdateOnNonExistentFile(t *testing.T) {
ch := make(chan interface{})
NewSourceFile("random_non_existent_path", time.Millisecond, ch)
select {
case got := <-ch:
t.Errorf("Expected no update, Got %#v", got)
case <-time.After(2 * time.Millisecond):
}
}
func writeTestFile(t *testing.T, dir, name string, contents string) *os.File {
file, err := ioutil.TempFile(os.TempDir(), "test_pod_config")
if err != nil {
t.Fatalf("Unable to create test file %#v", err)
}
file.Close()
if err := ioutil.WriteFile(file.Name(), []byte(contents), 0555); err != nil {
t.Fatalf("Unable to write test file %#v", err)
}
return file
}
func TestReadFromFile(t *testing.T) {
file := writeTestFile(t, os.TempDir(), "test_pod_config", "version: v1beta1\nid: test\ncontainers:\n- image: test/image")
defer os.Remove(file.Name())
ch := make(chan interface{})
NewSourceFile(file.Name(), time.Millisecond, ch)
select {
case got := <-ch:
update := got.(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{
Name: "test",
Manifest: api.ContainerManifest{
ID: "test",
Version: "v1beta1",
Containers: []api.Container{api.Container{
Image: "test/image"},
},
},
})
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
}
case <-time.After(2 * time.Millisecond):
t.Errorf("Expected update, timeout instead")
}
}
func TestExtractFromBadDataFile(t *testing.T) {
file := writeTestFile(t, os.TempDir(), "test_pod_config", string([]byte{1, 2, 3}))
defer os.Remove(file.Name())
ch := make(chan interface{}, 1)
c := SourceFile{file.Name(), ch}
err := c.extractFromPath()
expectError(t, err)
expectEmptyChannel(t, ch)
}
func TestExtractFromValidDataFile(t *testing.T) {
manifest := api.ContainerManifest{ID: ""}
text, err := json.Marshal(manifest)
expectNoError(t, err)
file := writeTestFile(t, os.TempDir(), "test_pod_config", string(text))
defer os.Remove(file.Name())
ch := make(chan interface{}, 1)
c := SourceFile{file.Name(), ch}
err = c.extractFromPath()
expectNoError(t, err)
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: simpleSubdomainSafeHash(file.Name()), Manifest: manifest})
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
}
}
func TestExtractFromEmptyDir(t *testing.T) {
dirName, err := ioutil.TempDir("", "foo")
expectNoError(t, err)
defer os.RemoveAll(dirName)
ch := make(chan interface{}, 1)
c := SourceFile{dirName, ch}
err = c.extractFromPath()
expectNoError(t, err)
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET)
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
}
}
func TestExtractFromDir(t *testing.T) {
manifests := []api.ContainerManifest{
{ID: "", Containers: []api.Container{{Image: "foo"}}},
{ID: "", Containers: []api.Container{{Image: "bar"}}},
}
files := make([]*os.File, len(manifests))
dirName, err := ioutil.TempDir("", "foo")
expectNoError(t, err)
for i, manifest := range manifests {
data, err := json.Marshal(manifest)
expectNoError(t, err)
file, err := ioutil.TempFile(dirName, manifest.ID)
expectNoError(t, err)
name := file.Name()
expectNoError(t, file.Close())
ioutil.WriteFile(name, data, 0755)
files[i] = file
}
ch := make(chan interface{}, 1)
c := SourceFile{dirName, ch}
err = c.extractFromPath()
expectNoError(t, err)
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(
kubelet.SET,
kubelet.Pod{Name: simpleSubdomainSafeHash(files[0].Name()), Manifest: manifests[0]},
kubelet.Pod{Name: simpleSubdomainSafeHash(files[1].Name()), Manifest: manifests[1]},
)
sort.Sort(sortedPods(update.Pods))
sort.Sort(sortedPods(expected.Pods))
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
}
}
// These are used for testing extract json (below)
type TestData struct {
Value string
Number int
}
type TestObject struct {
Name string
Data TestData
}
func verifyStringEquals(t *testing.T, actual, expected string) {
if actual != expected {
t.Errorf("Verification failed. Expected: %s, Found %s", expected, actual)
}
}
func verifyIntEquals(t *testing.T, actual, expected int) {
if actual != expected {
t.Errorf("Verification failed. Expected: %d, Found %d", expected, actual)
}
}
func TestExtractJSON(t *testing.T) {
obj := TestObject{}
data := `{ "name": "foo", "data": { "value": "bar", "number": 10 } }`
if err := yaml.Unmarshal([]byte(data), &obj); err != nil {
t.Fatalf("Could not unmarshal JSON: %v", err)
}
verifyStringEquals(t, obj.Name, "foo")
verifyStringEquals(t, obj.Data.Value, "bar")
verifyIntEquals(t, obj.Data.Number, 10)
}

117
pkg/kubelet/config/http.go Normal file
View File

@ -0,0 +1,117 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or sied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Reads the pod configuration from an HTTP GET response
package config
import (
"fmt"
"io/ioutil"
"net/http"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
"gopkg.in/v1/yaml"
)
type SourceURL struct {
url string
updates chan<- interface{}
}
func NewSourceURL(url string, period time.Duration, updates chan<- interface{}) *SourceURL {
config := &SourceURL{
url: url,
updates: updates,
}
glog.Infof("Watching URL %s", url)
go util.Forever(config.run, period)
return config
}
func (s *SourceURL) run() {
if err := s.extractFromURL(); err != nil {
glog.Errorf("Failed to read URL: %s", err)
}
}
func (s *SourceURL) extractFromURL() error {
resp, err := http.Get(s.url)
if err != nil {
return err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if len(data) == 0 {
return fmt.Errorf("zero-length data received from %v", s.url)
}
// First try as if it's a single manifest
var pod kubelet.Pod
singleErr := yaml.Unmarshal(data, &pod.Manifest)
// TODO: replace with validation
if singleErr == nil && pod.Manifest.Version == "" {
// If data is a []ContainerManifest, trying to put it into a ContainerManifest
// will not give an error but also won't set any of the fields.
// Our docs say that the version field is mandatory, so using that to judge wether
// this was actually successful.
singleErr = fmt.Errorf("got blank version field")
}
if singleErr == nil {
name := pod.Manifest.ID
if name == "" {
name = "1"
}
pod.Name = name
s.updates <- kubelet.PodUpdate{[]kubelet.Pod{pod}, kubelet.SET}
return nil
}
// That didn't work, so try an array of manifests.
var manifests []api.ContainerManifest
multiErr := yaml.Unmarshal(data, &manifests)
// We're not sure if the person reading the logs is going to care about the single or
// multiple manifest unmarshalling attempt, so we need to put both in the logs, as is
// done at the end. Hence not returning early here.
if multiErr == nil && len(manifests) > 0 && manifests[0].Version == "" {
multiErr = fmt.Errorf("got blank version field")
}
if multiErr == nil {
pods := []kubelet.Pod{}
for i := range manifests {
pod := kubelet.Pod{Manifest: manifests[i]}
name := pod.Manifest.ID
if name == "" {
name = fmt.Sprintf("%d", i+1)
}
pod.Name = name
pods = append(pods, pod)
}
s.updates <- kubelet.PodUpdate{pods, kubelet.SET}
return nil
}
return fmt.Errorf("%v: received '%v', but couldn't parse as a "+
"single manifest (%v: %+v) or as multiple manifests (%v: %+v).\n",
s.url, string(data), singleErr, pod.Manifest, multiErr, manifests)
}

View File

@ -0,0 +1,124 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"encoding/json"
"net/http/httptest"
"reflect"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func TestURLErrorNotExistNoUpdate(t *testing.T) {
ch := make(chan interface{})
NewSourceURL("http://localhost:49575/_not_found_", time.Millisecond, ch)
select {
case got := <-ch:
t.Errorf("Expected no update, Got %#v", got)
case <-time.After(2 * time.Millisecond):
}
}
func TestExtractFromHttpBadness(t *testing.T) {
ch := make(chan interface{}, 1)
c := SourceURL{"http://localhost:49575/_not_found_", ch}
err := c.extractFromURL()
expectError(t, err)
expectEmptyChannel(t, ch)
}
func TestExtractFromHttpSingle(t *testing.T) {
manifests := []api.ContainerManifest{
{Version: "v1beta1", ID: "foo"},
}
// Taking a single-manifest from a URL allows kubelet to be used
// in the implementation of google's container VM image.
data, err := json.Marshal(manifests[0])
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(data),
}
testServer := httptest.NewServer(&fakeHandler)
ch := make(chan interface{}, 1)
c := SourceURL{testServer.URL, ch}
err = c.extractFromURL()
expectNoError(t, err)
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: "foo", Manifest: manifests[0]})
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected: %#v, Got: %#v", expected, update)
}
}
func TestExtractFromHttpMultiple(t *testing.T) {
manifests := []api.ContainerManifest{
{Version: "v1beta1", ID: ""},
{Version: "v1beta1", ID: "bar"},
}
data, err := json.Marshal(manifests)
if err != nil {
t.Fatalf("Some weird json problem: %v", err)
}
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(data),
}
testServer := httptest.NewServer(&fakeHandler)
ch := make(chan interface{}, 1)
c := SourceURL{testServer.URL, ch}
err = c.extractFromURL()
expectNoError(t, err)
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: "1", Manifest: manifests[0]}, kubelet.Pod{Name: "bar", Manifest: manifests[1]})
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected: %#v, Got: %#v", expected, update)
}
}
func TestExtractFromHttpEmptyArray(t *testing.T) {
manifests := []api.ContainerManifest{}
data, err := json.Marshal(manifests)
if err != nil {
t.Fatalf("Some weird json problem: %v", err)
}
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(data),
}
testServer := httptest.NewServer(&fakeHandler)
ch := make(chan interface{}, 1)
c := SourceURL{testServer.URL, ch}
err = c.extractFromURL()
expectNoError(t, err)
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET)
if !reflect.DeepEqual(expected, update) {
t.Errorf("Expected: %#v, Got: %#v", expected, update)
}
}

View File

@ -174,14 +174,14 @@ func unescapeDash(in string) (out string) {
const containerNamePrefix = "k8s"
// Creates a name which can be reversed to identify both manifest id and container name.
func buildDockerName(manifest *api.ContainerManifest, container *api.Container) string {
func buildDockerName(pod *Pod, container *api.Container) string {
// Note, manifest.ID could be blank.
return fmt.Sprintf("%s--%s--%s--%08x", containerNamePrefix, escapeDash(container.Name), escapeDash(manifest.ID), rand.Uint32())
return fmt.Sprintf("%s--%s--%s--%08x", containerNamePrefix, escapeDash(container.Name), escapeDash(GetPodFullName(pod)), rand.Uint32())
}
// Upacks a container name, returning the manifest id and container name we would have used to
// construct the docker name. If the docker name isn't one we created, we may return empty strings.
func parseDockerName(name string) (manifestID, containerName string) {
func parseDockerName(name string) (podFullName, containerName string) {
// For some reason docker appears to be appending '/' to names.
// If its there, strip it.
if name[0] == '/' {
@ -195,7 +195,7 @@ func parseDockerName(name string) (manifestID, containerName string) {
containerName = unescapeDash(parts[1])
}
if len(parts) > 2 {
manifestID = unescapeDash(parts[2])
podFullName = unescapeDash(parts[2])
}
return
}

View File

@ -20,13 +20,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
@ -40,9 +34,7 @@ import (
"github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
"github.com/google/cadvisor/client"
"github.com/google/cadvisor/info"
"gopkg.in/v1/yaml"
)
const defaultChanSize = 1024
@ -58,6 +50,13 @@ type CadvisorInterface interface {
MachineInfo() (*info.MachineInfo, error)
}
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
SyncPods([]Pod) error
}
type volumeMap map[string]volume.Interface
// New creates a new Kubelet.
// TODO: currently it is only called by test code.
// Need cleanup.
@ -65,94 +64,35 @@ func New() *Kubelet {
return &Kubelet{}
}
type volumeMap map[string]volume.Interface
// Kubelet is the main kubelet implementation.
type Kubelet struct {
Hostname string
EtcdClient tools.EtcdClient
DockerClient DockerInterface
DockerPuller DockerPuller
CadvisorClient CadvisorInterface
FileCheckFrequency time.Duration
SyncFrequency time.Duration
HTTPCheckFrequency time.Duration
pullLock sync.Mutex
HealthChecker health.HealthChecker
LogServer http.Handler
Hostname string
DockerClient DockerInterface
// Optional, no events will be sent without it
EtcdClient tools.EtcdClient
// Optional, no statistics will be available if omitted
CadvisorClient CadvisorInterface
// Optional, defaults to simple implementaiton
HealthChecker health.HealthChecker
// Optional, defaults to simple Docker implementation
DockerPuller DockerPuller
// Optional, defaults to /logs/ from /var/log
LogServer http.Handler
}
type manifestUpdate struct {
source string
manifests []api.ContainerManifest
}
const (
fileSource = "file"
etcdSource = "etcd"
httpClientSource = "http_client"
httpServerSource = "http_server"
)
// RunKubelet starts background goroutines. If config_path, manifest_url, or address are empty,
// they are not watched. Never returns.
func (kl *Kubelet) RunKubelet(dockerEndpoint, configPath, manifestURL string, etcdServers []string, address string, port uint) {
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan PodUpdate) {
if kl.LogServer == nil {
kl.LogServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.CadvisorClient == nil {
var err error
kl.CadvisorClient, err = cadvisor.NewClient("http://127.0.0.1:5000")
if err != nil {
glog.Errorf("Error on creating cadvisor client: %v", err)
}
}
if kl.DockerPuller == nil {
kl.DockerPuller = NewDockerPuller(kl.DockerClient)
}
updateChannel := make(chan manifestUpdate)
if configPath != "" {
glog.Infof("Watching for file configs at %s", configPath)
go util.Forever(func() {
kl.WatchFiles(configPath, updateChannel)
}, kl.FileCheckFrequency)
if kl.HealthChecker == nil {
kl.HealthChecker = health.NewHealthChecker()
}
if manifestURL != "" {
glog.Infof("Watching for HTTP configs at %s", manifestURL)
go util.Forever(func() {
if err := kl.extractFromHTTP(manifestURL, updateChannel); err != nil {
glog.Errorf("Error syncing http: %v", err)
}
}, kl.HTTPCheckFrequency)
}
if len(etcdServers) > 0 {
glog.Infof("Watching for etcd configs at %v", etcdServers)
kl.EtcdClient = etcd.NewClient(etcdServers)
go util.Forever(func() { kl.SyncAndSetupEtcdWatch(updateChannel) }, 20*time.Second)
}
if address != "" {
glog.Infof("Starting to listen on %s:%d", address, port)
handler := Server{
Kubelet: kl,
UpdateChannel: updateChannel,
DelegateHandler: http.DefaultServeMux,
}
s := &http.Server{
Addr: net.JoinHostPort(address, strconv.FormatUint(uint64(port), 10)),
Handler: &handler,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
go util.Forever(func() { s.ListenAndServe() }, 0)
}
kl.HealthChecker = health.NewHealthChecker()
kl.syncLoop(updateChannel, kl)
}
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
SyncManifests([]api.ContainerManifest) error
kl.syncLoop(updates, kl)
}
// LogEvent logs an event to the etcd backend.
@ -186,7 +126,7 @@ func makeEnvironmentVariables(container *api.Container) []string {
return result
}
func makeVolumesAndBinds(manifestID string, container *api.Container, podVolumes volumeMap) (map[string]struct{}, []string) {
func makeVolumesAndBinds(pod *Pod, container *api.Container, podVolumes volumeMap) (map[string]struct{}, []string) {
volumes := map[string]struct{}{}
binds := []string{}
for _, volume := range container.VolumeMounts {
@ -201,7 +141,7 @@ func makeVolumesAndBinds(manifestID string, container *api.Container, podVolumes
// TODO(jonesdl) This clause should be deleted and an error should be thrown. The default
// behavior is now supported by the EmptyDirectory type.
volumes[volume.MountPath] = struct{}{}
basePath = fmt.Sprintf("/exports/%s/%s:%s", manifestID, volume.Name, volume.MountPath)
basePath = fmt.Sprintf("/exports/%s/%s:%s", GetPodFullName(pod), volume.Name, volume.MountPath)
}
if volume.ReadOnly {
basePath += ":ro"
@ -268,14 +208,14 @@ func (kl *Kubelet) mountExternalVolumes(manifest *api.ContainerManifest) (volume
return podVolumes, nil
}
// Run a single container from a manifest. Returns the docker container ID
func (kl *Kubelet) runContainer(manifest *api.ContainerManifest, container *api.Container, podVolumes volumeMap, netMode string) (id DockerID, err error) {
// Run a single container from a pod. Returns the docker container ID
func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes volumeMap, netMode string) (id DockerID, err error) {
envVariables := makeEnvironmentVariables(container)
volumes, binds := makeVolumesAndBinds(manifest.ID, container, podVolumes)
volumes, binds := makeVolumesAndBinds(pod, container, podVolumes)
exposedPorts, portBindings := makePortsAndBindings(container)
opts := docker.CreateContainerOptions{
Name: buildDockerName(manifest, container),
Name: buildDockerName(pod, container),
Config: &docker.Config{
Cmd: container.Command,
Env: envVariables,
@ -301,13 +241,14 @@ func (kl *Kubelet) runContainer(manifest *api.ContainerManifest, container *api.
}
// Kill a docker container
func (kl *Kubelet) killContainer(container docker.APIContainers) error {
err := kl.DockerClient.StopContainer(container.ID, 10)
manifestID, containerName := parseDockerName(container.Names[0])
func (kl *Kubelet) killContainer(dockerContainer docker.APIContainers) error {
err := kl.DockerClient.StopContainer(dockerContainer.ID, 10)
podFullName, containerName := parseDockerName(dockerContainer.Names[0])
kl.LogEvent(&api.Event{
Event: "STOP",
Manifest: &api.ContainerManifest{
ID: manifestID,
//TODO: This should be reported using either the apiserver schema or the kubelet schema
ID: podFullName,
},
Container: &api.Container{
Name: containerName,
@ -317,247 +258,17 @@ func (kl *Kubelet) killContainer(container docker.APIContainers) error {
return err
}
func (kl *Kubelet) extractFromFile(name string) (api.ContainerManifest, error) {
var file *os.File
var err error
var manifest api.ContainerManifest
if file, err = os.Open(name); err != nil {
return manifest, err
}
defer file.Close()
data, err := ioutil.ReadAll(file)
if err != nil {
glog.Errorf("Couldn't read from file: %v", err)
return manifest, err
}
if err = kl.ExtractYAMLData(data, &manifest); err != nil {
return manifest, err
}
return manifest, nil
}
func (kl *Kubelet) extractFromDir(name string) ([]api.ContainerManifest, error) {
var manifests []api.ContainerManifest
files, err := filepath.Glob(filepath.Join(name, "[^.]*"))
if err != nil {
return manifests, err
}
sort.Strings(files)
for _, file := range files {
manifest, err := kl.extractFromFile(file)
if err != nil {
return manifests, err
}
manifests = append(manifests, manifest)
}
return manifests, nil
}
// WatchFiles watches a file or direcory of files for changes to the set of pods that
// should run on this Kubelet.
func (kl *Kubelet) WatchFiles(configPath string, updateChannel chan<- manifestUpdate) {
statInfo, err := os.Stat(configPath)
if err != nil {
if !os.IsNotExist(err) {
glog.Errorf("Error accessing path: %v", err)
}
return
}
switch {
case statInfo.Mode().IsDir():
manifests, err := kl.extractFromDir(configPath)
if err != nil {
glog.Errorf("Error polling dir: %v", err)
return
}
updateChannel <- manifestUpdate{fileSource, manifests}
case statInfo.Mode().IsRegular():
manifest, err := kl.extractFromFile(configPath)
if err != nil {
glog.Errorf("Error polling file: %v", err)
return
}
updateChannel <- manifestUpdate{fileSource, []api.ContainerManifest{manifest}}
default:
glog.Errorf("Error accessing config - not a directory or file")
}
}
func (kl *Kubelet) extractFromHTTP(url string, updateChannel chan<- manifestUpdate) error {
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if len(data) == 0 {
return fmt.Errorf("zero-length data received from %v", url)
}
// First try as if it's a single manifest
var manifest api.ContainerManifest
singleErr := yaml.Unmarshal(data, &manifest)
if singleErr == nil && manifest.Version == "" {
// If data is a []ContainerManifest, trying to put it into a ContainerManifest
// will not give an error but also won't set any of the fields.
// Our docs say that the version field is mandatory, so using that to judge wether
// this was actually successful.
singleErr = fmt.Errorf("got blank version field")
}
if singleErr == nil {
updateChannel <- manifestUpdate{httpClientSource, []api.ContainerManifest{manifest}}
return nil
}
// That didn't work, so try an array of manifests.
var manifests []api.ContainerManifest
multiErr := yaml.Unmarshal(data, &manifests)
// We're not sure if the person reading the logs is going to care about the single or
// multiple manifest unmarshalling attempt, so we need to put both in the logs, as is
// done at the end. Hence not returning early here.
if multiErr == nil && len(manifests) > 0 && manifests[0].Version == "" {
multiErr = fmt.Errorf("got blank version field")
}
if multiErr == nil {
updateChannel <- manifestUpdate{httpClientSource, manifests}
return nil
}
return fmt.Errorf("%v: received '%v', but couldn't parse as a "+
"single manifest (%v: %+v) or as multiple manifests (%v: %+v).\n",
url, string(data), singleErr, manifest, multiErr, manifests)
}
// ResponseToManifests takes an etcd Response object, and turns it into a structured list of containers.
// It returns a list of containers, or an error if one occurs.
func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.ContainerManifest, error) {
if response.Node == nil || len(response.Node.Value) == 0 {
return nil, fmt.Errorf("no nodes field: %v", response)
}
var manifests []api.ContainerManifest
err := kl.ExtractYAMLData([]byte(response.Node.Value), &manifests)
return manifests, err
}
func (kl *Kubelet) getKubeletStateFromEtcd(key string, updateChannel chan<- manifestUpdate) error {
response, err := kl.EtcdClient.Get(key, true, false)
if err != nil {
if tools.IsEtcdNotFound(err) {
return nil
}
glog.Errorf("Error on etcd get of %s: %v", key, err)
return err
}
manifests, err := kl.ResponseToManifests(response)
if err != nil {
glog.Errorf("Error parsing response (%v): %s", response, err)
return err
}
glog.Infof("Got state from etcd: %+v", manifests)
updateChannel <- manifestUpdate{etcdSource, manifests}
return nil
}
// SyncAndSetupEtcdWatch synchronizes with etcd, and sets up an etcd watch for new configurations.
// The channel to send new configurations across
// This function loops forever and is intended to be run in a go routine.
func (kl *Kubelet) SyncAndSetupEtcdWatch(updateChannel chan<- manifestUpdate) {
key := path.Join("registry", "hosts", strings.TrimSpace(kl.Hostname), "kubelet")
// First fetch the initial configuration (watch only gives changes...)
for {
err := kl.getKubeletStateFromEtcd(key, updateChannel)
if err == nil {
// We got a successful response, etcd is up, set up the watch.
break
}
time.Sleep(30 * time.Second)
}
done := make(chan bool)
go util.Forever(func() { kl.TimeoutWatch(done) }, 0)
for {
// The etcd client will close the watch channel when it exits. So we need
// to create and service a new one every time.
watchChannel := make(chan *etcd.Response)
// We don't push this through Forever because if it dies, we just do it again in 30 secs.
// anyway.
go kl.WatchEtcd(watchChannel, updateChannel)
kl.getKubeletStateFromEtcd(key, updateChannel)
glog.V(1).Infof("Setting up a watch for configuration changes in etcd for %s", key)
kl.EtcdClient.Watch(key, 0, true, watchChannel, done)
}
}
// TimeoutWatch timeout the watch after 30 seconds.
func (kl *Kubelet) TimeoutWatch(done chan bool) {
t := time.Tick(30 * time.Second)
for _ = range t {
done <- true
}
}
// ExtractYAMLData extracts data from YAML file into a list of containers.
func (kl *Kubelet) ExtractYAMLData(buf []byte, output interface{}) error {
if err := yaml.Unmarshal(buf, output); err != nil {
glog.Errorf("Couldn't unmarshal configuration: %v", err)
return err
}
return nil
}
func (kl *Kubelet) extractFromEtcd(response *etcd.Response) ([]api.ContainerManifest, error) {
var manifests []api.ContainerManifest
if response.Node == nil || len(response.Node.Value) == 0 {
return manifests, fmt.Errorf("no nodes field: %v", response)
}
err := kl.ExtractYAMLData([]byte(response.Node.Value), &manifests)
return manifests, err
}
// WatchEtcd watches etcd for changes, receives config objects from the etcd client watch.
// This function loops until the watchChannel is closed, and is intended to be run as a goroutine.
func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, updateChannel chan<- manifestUpdate) {
defer util.HandleCrash()
for {
watchResponse := <-watchChannel
// This means the channel has been closed.
if watchResponse == nil {
return
}
glog.Infof("Got etcd change: %v", watchResponse)
manifests, err := kl.extractFromEtcd(watchResponse)
if err != nil {
glog.Errorf("Error handling response from etcd: %v", err)
continue
}
glog.Infof("manifests: %+v", manifests)
// Ok, we have a valid configuration, send to channel for
// rejiggering.
updateChannel <- manifestUpdate{etcdSource, manifests}
}
}
const (
networkContainerName = "net"
networkContainerImage = "kubernetes/pause:latest"
)
// Create a network container for a manifest. Returns the docker container ID of the newly created container.
func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (DockerID, error) {
// createNetworkContainer starts the network container for a pod. Returns the docker container ID of the newly created container.
func (kl *Kubelet) createNetworkContainer(pod *Pod) (DockerID, error) {
var ports []api.Port
// Docker only exports ports from the network container. Let's
// collect all of the relevant ports and export them.
for _, container := range manifest.Containers {
for _, container := range pod.Manifest.Containers {
ports = append(ports, container.Ports...)
}
container := &api.Container{
@ -566,32 +277,36 @@ func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (Dock
Ports: ports,
}
kl.DockerPuller.Pull(networkContainerImage)
return kl.runContainer(manifest, container, nil, "")
return kl.runContainer(pod, container, nil, "")
}
func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerContainers DockerContainers, keepChannel chan<- DockerID) error {
// Make sure we have a network container
func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChannel chan<- DockerID) error {
podFullName := GetPodFullName(pod)
var netID DockerID
if networkDockerContainer, found := dockerContainers.FindPodContainer(manifest.ID, networkContainerName); found {
if networkDockerContainer, found := dockerContainers.FindPodContainer(podFullName, networkContainerName); found {
netID = DockerID(networkDockerContainer.ID)
} else {
dockerNetworkID, err := kl.createNetworkContainer(manifest)
glog.Infof("Network container doesn't exist, creating")
dockerNetworkID, err := kl.createNetworkContainer(pod)
if err != nil {
glog.Errorf("Failed to introspect network container. (%v) Skipping manifest %s", err, manifest.ID)
glog.Errorf("Failed to introspect network container. (%v) Skipping pod %s", err, podFullName)
return err
}
netID = dockerNetworkID
}
keepChannel <- netID
podVolumes, err := kl.mountExternalVolumes(manifest)
podVolumes, err := kl.mountExternalVolumes(&pod.Manifest)
if err != nil {
glog.Errorf("Unable to mount volumes for manifest %s: (%v)", manifest.ID, err)
glog.Errorf("Unable to mount volumes for pod %s: (%v)", podFullName, err)
}
for _, container := range manifest.Containers {
if dockerContainer, found := dockerContainers.FindPodContainer(manifest.ID, container.Name); found {
for _, container := range pod.Manifest.Containers {
if dockerContainer, found := dockerContainers.FindPodContainer(podFullName, container.Name); found {
containerID := DockerID(dockerContainer.ID)
glog.Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID)
glog.V(1).Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID)
glog.Infof("pod %s container %s exists as %v", podFullName, container.Name, containerID)
glog.V(1).Infof("pod %s container %s exists as %v", podFullName, container.Name, containerID)
// TODO: This should probably be separated out into a separate goroutine.
healthy, err := kl.healthy(container, dockerContainer)
@ -604,22 +319,22 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerContainer
continue
}
glog.V(1).Infof("manifest %s container %s is unhealthy %d.", manifest.ID, container.Name, healthy)
glog.V(1).Infof("pod %s container %s is unhealthy.", podFullName, container.Name, healthy)
if err := kl.killContainer(*dockerContainer); err != nil {
glog.V(1).Infof("Failed to kill container %s: %v", containerID, err)
glog.V(1).Infof("Failed to kill container %s: %v", dockerContainer.ID, err)
continue
}
}
glog.Infof("%+v doesn't exist, creating", container)
glog.Infof("Container doesn't exist, creating %#v", container)
if err := kl.DockerPuller.Pull(container.Image); err != nil {
glog.Errorf("Failed to create container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name)
glog.Errorf("Failed to pull image: %v skipping pod %s container %s.", err, podFullName, container.Name)
continue
}
containerID, err := kl.runContainer(manifest, &container, podVolumes, "container:"+string(netID))
containerID, err := kl.runContainer(pod, &container, podVolumes, "container:"+string(netID))
if err != nil {
// TODO(bburns) : Perhaps blacklist a container after N failures?
glog.Errorf("Error running manifest %s container %s: %v", manifest.ID, container.Name, err)
glog.Errorf("Error running pod %s container %s: %v", podFullName, container.Name, err)
continue
}
keepChannel <- containerID
@ -629,9 +344,10 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerContainer
type empty struct{}
// SyncManifests synchronizes the configured list of containers (desired state) with the host current state.
func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
glog.Infof("Desired: %+v", config)
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []Pod) error {
glog.Infof("Desired [%s]: %+v", kl.Hostname, pods)
var err error
dockerIdsToKeep := map[DockerID]empty{}
keepChannel := make(chan DockerID, defaultChanSize)
waitGroup := sync.WaitGroup{}
@ -643,18 +359,18 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
}
// Check for any containers that need starting
for ix := range config {
for i := range pods {
waitGroup.Add(1)
go func(index int) {
defer util.HandleCrash()
defer waitGroup.Done()
// necessary to dereference by index here b/c otherwise the shared value
// in the for each is re-used.
err := kl.syncManifest(&config[index], dockerContainers, keepChannel)
err := kl.syncPod(&pods[index], dockerContainers, keepChannel)
if err != nil {
glog.Errorf("Error syncing manifest: %v skipping.", err)
glog.Errorf("Error syncing pod: %v skipping.", err)
}
}(ix)
}(i)
}
ch := make(chan bool)
go func() {
@ -663,7 +379,7 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
}
ch <- true
}()
if len(config) > 0 {
if len(pods) > 0 {
waitGroup.Wait()
}
close(keepChannel)
@ -687,69 +403,51 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
return err
}
// Check that all Port.HostPort values are unique across all manifests.
func checkHostPortConflicts(allManifests []api.ContainerManifest, newManifest *api.ContainerManifest) []error {
allErrs := []error{}
allPorts := map[int]bool{}
// filterHostPortConflicts removes pods that conflict on Port.HostPort values
func filterHostPortConflicts(pods []Pod) []Pod {
filtered := []Pod{}
ports := map[int]bool{}
extract := func(p *api.Port) int { return p.HostPort }
for i := range allManifests {
manifest := &allManifests[i]
errs := api.AccumulateUniquePorts(manifest.Containers, allPorts, extract)
if len(errs) != 0 {
allErrs = append(allErrs, errs...)
for i := range pods {
pod := &pods[i]
if errs := api.AccumulateUniquePorts(pod.Manifest.Containers, ports, extract); len(errs) != 0 {
glog.Warningf("Pod %s has conflicting ports, ignoring: %v", GetPodFullName(pod), errs)
continue
}
filtered = append(filtered, *pod)
}
if errs := api.AccumulateUniquePorts(newManifest.Containers, allPorts, extract); len(errs) != 0 {
allErrs = append(allErrs, errs...)
}
return allErrs
return filtered
}
// syncLoop is the main loop for processing changes. It watches for changes from
// four channels (file, etcd, server, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync_frequency seconds.
// Never returns.
func (kl *Kubelet) syncLoop(updateChannel <-chan manifestUpdate, handler SyncHandler) {
last := make(map[string][]api.ContainerManifest)
// state every sync_frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
for {
var pods []Pod
select {
case u := <-updateChannel:
glog.Infof("Got configuration from %s: %+v", u.source, u.manifests)
last[u.source] = u.manifests
case <-time.After(kl.SyncFrequency):
}
case u := <-updates:
switch u.Op {
case SET:
glog.Infof("Containers changed [%s]", kl.Hostname)
pods = u.Pods
allManifests := []api.ContainerManifest{}
allIds := util.StringSet{}
for src, srcManifests := range last {
for i := range srcManifests {
allErrs := []error{}
case UPDATE:
//TODO: implement updates of containers
glog.Infof("Containers updated, not implemented [%s]", kl.Hostname)
continue
m := &srcManifests[i]
if allIds.Has(m.ID) {
allErrs = append(allErrs, api.ValidationError{api.ErrTypeDuplicate, "ContainerManifest.ID", m.ID})
} else {
allIds.Insert(m.ID)
}
if errs := api.ValidateManifest(m); len(errs) != 0 {
allErrs = append(allErrs, errs...)
}
// Check for host-wide HostPort conflicts.
if errs := checkHostPortConflicts(allManifests, m); len(errs) != 0 {
allErrs = append(allErrs, errs...)
}
if len(allErrs) > 0 {
glog.Warningf("Manifest from %s failed validation, ignoring: %v", src, allErrs)
}
default:
panic("syncLoop does not support incremental changes")
}
// TODO(thockin): There's no reason to collect manifests by value. Don't pessimize.
allManifests = append(allManifests, srcManifests...)
}
err := handler.SyncManifests(allManifests)
pods = filterHostPortConflicts(pods)
err := handler.SyncPods(pods)
if err != nil {
glog.Errorf("Couldn't sync containers : %v", err)
}
@ -778,12 +476,12 @@ func (kl *Kubelet) statsFromContainerPath(containerPath string, req *info.Contai
}
// GetPodInfo returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodInfo(manifestID string) (api.PodInfo, error) {
return getDockerPodInfo(kl.DockerClient, manifestID)
func (kl *Kubelet) GetPodInfo(podFullName string) (api.PodInfo, error) {
return getDockerPodInfo(kl.DockerClient, podFullName)
}
// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(manifestID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
func (kl *Kubelet) GetContainerInfo(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
if kl.CadvisorClient == nil {
return nil, nil
}
@ -791,7 +489,7 @@ func (kl *Kubelet) GetContainerInfo(manifestID, containerName string, req *info.
if err != nil {
return nil, err
}
dockerContainer, found := dockerContainers.FindPodContainer(manifestID, containerName)
dockerContainer, found := dockerContainers.FindPodContainer(podFullName, containerName)
if !found {
return nil, errors.New("couldn't find container")
}

View File

@ -19,8 +19,6 @@ package kubelet
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http/httptest"
"reflect"
"sync"
"testing"
@ -28,9 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
"github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient"
"github.com/google/cadvisor/info"
"github.com/stretchr/testify/mock"
@ -43,29 +39,6 @@ func expectNoError(t *testing.T, err error) {
}
}
// These are used for testing extract json (below)
type TestData struct {
Value string
Number int
}
type TestObject struct {
Name string
Data TestData
}
func verifyStringEquals(t *testing.T, actual, expected string) {
if actual != expected {
t.Errorf("Verification failed. Expected: %s, Found %s", expected, actual)
}
}
func verifyIntEquals(t *testing.T, actual, expected int) {
if actual != expected {
t.Errorf("Verification failed. Expected: %d, Found %d", expected, actual)
}
}
func verifyNoError(t *testing.T, e error) {
if e != nil {
t.Errorf("Expected no error, found %#v", e)
@ -91,17 +64,6 @@ func makeTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *FakeDocker
return kubelet, fakeEtcdClient, fakeDocker
}
func TestExtractJSON(t *testing.T) {
obj := TestObject{}
kubelet, _, _ := makeTestKubelet(t)
data := `{ "name": "foo", "data": { "value": "bar", "number": 10 } }`
kubelet.ExtractYAMLData([]byte(data), &obj)
verifyStringEquals(t, obj.Name, "foo")
verifyStringEquals(t, obj.Data.Value, "bar")
verifyIntEquals(t, obj.Data.Number, 10)
}
func verifyCalls(t *testing.T, fakeDocker *FakeDockerClient, calls []string) {
verifyStringArrayEquals(t, fakeDocker.called, calls)
}
@ -120,14 +82,15 @@ func verifyStringArrayEquals(t *testing.T, actual, expected []string) {
}
}
func verifyPackUnpack(t *testing.T, manifestID, containerName string) {
func verifyPackUnpack(t *testing.T, podNamespace, podName, containerName string) {
name := buildDockerName(
&api.ContainerManifest{ID: manifestID},
&Pod{Name: podName, Namespace: podNamespace},
&api.Container{Name: containerName},
)
returnedManifestID, returnedContainerName := parseDockerName(name)
if manifestID != returnedManifestID || containerName != returnedContainerName {
t.Errorf("For (%s, %s), unpacked (%s, %s)", manifestID, containerName, returnedManifestID, returnedContainerName)
podFullName := fmt.Sprintf("%s.%s", podName, podNamespace)
returnedPodFullName, returnedContainerName := parseDockerName(name)
if podFullName != returnedPodFullName || containerName != returnedContainerName {
t.Errorf("For (%s, %s), unpacked (%s, %s)", podFullName, containerName, returnedPodFullName, returnedContainerName)
}
}
@ -138,11 +101,11 @@ func verifyBoolean(t *testing.T, expected, value bool) {
}
func TestContainerManifestNaming(t *testing.T) {
verifyPackUnpack(t, "manifest1234", "container5678")
verifyPackUnpack(t, "manifest--", "container__")
verifyPackUnpack(t, "--manifest", "__container")
verifyPackUnpack(t, "m___anifest_", "container-_-")
verifyPackUnpack(t, "_m___anifest", "-_-container")
verifyPackUnpack(t, "file", "manifest1234", "container5678")
verifyPackUnpack(t, "file", "manifest--", "container__")
verifyPackUnpack(t, "file", "--manifest", "__container")
verifyPackUnpack(t, "", "m___anifest_", "container-_-")
verifyPackUnpack(t, "other", "_m___anifest", "-_-container")
}
func TestGetContainerID(t *testing.T) {
@ -224,39 +187,12 @@ func TestKillContainer(t *testing.T) {
verifyCalls(t, fakeDocker, []string{"stop"})
}
func TestResponseToContainersNil(t *testing.T) {
kubelet, _, _ := makeTestKubelet(t)
list, err := kubelet.ResponseToManifests(&etcd.Response{Node: nil})
if len(list) != 0 {
t.Errorf("Unexpected non-zero list: %#v", list)
}
if err == nil {
t.Error("Unexpected non-error")
}
}
func TestResponseToManifests(t *testing.T) {
kubelet, _, _ := makeTestKubelet(t)
list, err := kubelet.ResponseToManifests(&etcd.Response{
Node: &etcd.Node{
Value: util.MakeJSONString([]api.ContainerManifest{
{ID: "foo"},
{ID: "bar"},
}),
},
})
if len(list) != 2 || list[0].ID != "foo" || list[1].ID != "bar" {
t.Errorf("Unexpected list: %#v", list)
}
expectNoError(t, err)
}
type channelReader struct {
list [][]api.ContainerManifest
list [][]Pod
wg sync.WaitGroup
}
func startReading(channel <-chan manifestUpdate) *channelReader {
func startReading(channel <-chan interface{}) *channelReader {
cr := &channelReader{}
cr.wg.Add(1)
go func() {
@ -265,118 +201,44 @@ func startReading(channel <-chan manifestUpdate) *channelReader {
if !ok {
break
}
cr.list = append(cr.list, update.manifests)
cr.list = append(cr.list, update.(PodUpdate).Pods)
}
cr.wg.Done()
}()
return cr
}
func (cr *channelReader) GetList() [][]api.ContainerManifest {
func (cr *channelReader) GetList() [][]Pod {
cr.wg.Wait()
return cr.list
}
func TestGetKubeletStateFromEtcdNoData(t *testing.T) {
kubelet, fakeClient, _ := makeTestKubelet(t)
channel := make(chan manifestUpdate)
reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: nil,
}
err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine/kubelet", channel)
if err == nil {
t.Error("Unexpected no err.")
}
close(channel)
list := reader.GetList()
if len(list) != 0 {
t.Errorf("Unexpected list: %#v", list)
}
}
func TestGetKubeletStateFromEtcd(t *testing.T) {
kubelet, fakeClient, _ := makeTestKubelet(t)
channel := make(chan manifestUpdate)
reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: util.MakeJSONString([]api.Container{}),
},
},
E: nil,
}
err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine/kubelet", channel)
expectNoError(t, err)
close(channel)
list := reader.GetList()
if len(list) != 1 {
t.Errorf("Unexpected list: %#v", list)
}
}
func TestGetKubeletStateFromEtcdNotFound(t *testing.T) {
kubelet, fakeClient, _ := makeTestKubelet(t)
channel := make(chan manifestUpdate)
reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}
err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine/kubelet", channel)
expectNoError(t, err)
close(channel)
list := reader.GetList()
if len(list) != 0 {
t.Errorf("Unexpected list: %#v", list)
}
}
func TestGetKubeletStateFromEtcdError(t *testing.T) {
kubelet, fakeClient, _ := makeTestKubelet(t)
channel := make(chan manifestUpdate)
reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: &etcd.EtcdError{
ErrorCode: 200, // non not found error
},
}
err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine/kubelet", channel)
if err == nil {
t.Error("Unexpected non-error")
}
close(channel)
list := reader.GetList()
if len(list) != 0 {
t.Errorf("Unexpected list: %#v", list)
}
}
func TestSyncManifestsDoesNothing(t *testing.T) {
func TestSyncPodsDoesNothing(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
fakeDocker.containerList = []docker.APIContainers{
{
// format is k8s--<container-id>--<manifest-id>
Names: []string{"/k8s--bar--foo"},
// format is k8s--<container-id>--<pod-fullname>
Names: []string{"/k8s--bar--foo.test"},
ID: "1234",
},
{
// network container
Names: []string{"/k8s--net--foo--"},
Names: []string{"/k8s--net--foo.test--"},
ID: "9876",
},
}
fakeDocker.container = &docker.Container{
ID: "1234",
}
err := kubelet.SyncManifests([]api.ContainerManifest{
err := kubelet.SyncPods([]Pod{
{
ID: "foo",
Containers: []api.Container{
{Name: "bar"},
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
Containers: []api.Container{
{Name: "bar"},
},
},
},
})
@ -384,17 +246,17 @@ func TestSyncManifestsDoesNothing(t *testing.T) {
verifyCalls(t, fakeDocker, []string{"list", "list"})
}
func TestSyncManifestsDeletes(t *testing.T) {
func TestSyncPodsDeletes(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
fakeDocker.containerList = []docker.APIContainers{
{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s--foo--bar"},
Names: []string{"/k8s--foo--bar.test"},
ID: "1234",
},
{
// network container
Names: []string{"/k8s--net--foo--"},
Names: []string{"/k8s--net--foo.test--"},
ID: "9876",
},
{
@ -402,7 +264,7 @@ func TestSyncManifestsDeletes(t *testing.T) {
ID: "4567",
},
}
err := kubelet.SyncManifests([]api.ContainerManifest{})
err := kubelet.SyncPods([]Pod{})
expectNoError(t, err)
verifyCalls(t, fakeDocker, []string{"list", "list", "stop", "stop"})
@ -425,29 +287,33 @@ func (f *FalseHealthChecker) HealthCheck(container api.Container) (health.Status
return health.Unhealthy, nil
}
func TestSyncManifestsUnhealthy(t *testing.T) {
func TestSyncPodsUnhealthy(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
kubelet.HealthChecker = &FalseHealthChecker{}
fakeDocker.containerList = []docker.APIContainers{
{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s--bar--foo"},
Names: []string{"/k8s--bar--foo.test"},
ID: "1234",
},
{
// network container
Names: []string{"/k8s--net--foo--"},
Names: []string{"/k8s--net--foo.test--"},
ID: "9876",
},
}
err := kubelet.SyncManifests([]api.ContainerManifest{
err := kubelet.SyncPods([]Pod{
{
ID: "foo",
Containers: []api.Container{
{Name: "bar",
LivenessProbe: &api.LivenessProbe{
// Always returns healthy == false
Type: "false",
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
Containers: []api.Container{
{Name: "bar",
LivenessProbe: &api.LivenessProbe{
// Always returns healthy == false
Type: "false",
},
},
},
},
@ -582,14 +448,20 @@ func TestMakeVolumesAndBinds(t *testing.T) {
},
}
pod := Pod{
Name: "pod",
Namespace: "test",
}
podVolumes := make(volumeMap)
podVolumes["disk4"] = &volume.HostDirectory{"/mnt/host"}
volumes, binds := makeVolumesAndBinds("pod", &container, podVolumes)
volumes, binds := makeVolumesAndBinds(&pod, &container, podVolumes)
expectedVolumes := []string{"/mnt/path", "/mnt/path2"}
expectedBinds := []string{"/exports/pod/disk:/mnt/path", "/exports/pod/disk2:/mnt/path2:ro", "/mnt/path3:/mnt/path3",
expectedBinds := []string{"/exports/pod.test/disk:/mnt/path", "/exports/pod.test/disk2:/mnt/path2:ro", "/mnt/path3:/mnt/path3",
"/mnt/host:/mnt/path4"}
if len(volumes) != len(expectedVolumes) {
t.Errorf("Unexpected volumes. Expected %#v got %#v. Container was: %#v", expectedVolumes, volumes, container)
}
@ -669,274 +541,29 @@ func TestMakePortsAndBindings(t *testing.T) {
}
func TestCheckHostPortConflicts(t *testing.T) {
successCaseAll := []api.ContainerManifest{
{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}},
{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}},
{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}},
successCaseAll := []Pod{
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}},
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}},
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}},
}
successCaseNew := api.ContainerManifest{
Containers: []api.Container{{Ports: []api.Port{{HostPort: 83}}}},
successCaseNew := Pod{
Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 83}}}}},
}
if errs := checkHostPortConflicts(successCaseAll, &successCaseNew); len(errs) != 0 {
t.Errorf("Expected success: %v", errs)
expected := append(successCaseAll, successCaseNew)
if actual := filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) {
t.Errorf("Expected %#v, Got %#v", expected, actual)
}
failureCaseAll := []api.ContainerManifest{
{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}},
{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}},
{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}},
failureCaseAll := []Pod{
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}},
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}},
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}},
}
failureCaseNew := api.ContainerManifest{
Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}},
failureCaseNew := Pod{
Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}},
}
if errs := checkHostPortConflicts(failureCaseAll, &failureCaseNew); len(errs) == 0 {
t.Errorf("Expected failure")
}
}
func TestExtractFromNonExistentFile(t *testing.T) {
kubelet := New()
_, err := kubelet.extractFromFile("/some/fake/file")
if err == nil {
t.Error("Unexpected non-error.")
}
}
func TestExtractFromBadDataFile(t *testing.T) {
kubelet := New()
badData := []byte{1, 2, 3}
file, err := ioutil.TempFile("", "foo")
expectNoError(t, err)
name := file.Name()
file.Close()
ioutil.WriteFile(name, badData, 0755)
_, err = kubelet.extractFromFile(name)
if err == nil {
t.Error("Unexpected non-error.")
}
}
func TestExtractFromValidDataFile(t *testing.T) {
kubelet := New()
manifest := api.ContainerManifest{ID: "bar"}
data, err := json.Marshal(manifest)
expectNoError(t, err)
file, err := ioutil.TempFile("", "foo")
expectNoError(t, err)
name := file.Name()
expectNoError(t, file.Close())
ioutil.WriteFile(name, data, 0755)
read, err := kubelet.extractFromFile(name)
expectNoError(t, err)
if !reflect.DeepEqual(read, manifest) {
t.Errorf("Unexpected difference. Expected %#v, got %#v", manifest, read)
}
}
func TestExtractFromEmptyDir(t *testing.T) {
kubelet := New()
dirName, err := ioutil.TempDir("", "foo")
expectNoError(t, err)
_, err = kubelet.extractFromDir(dirName)
expectNoError(t, err)
}
func TestExtractFromDir(t *testing.T) {
kubelet := New()
manifests := []api.ContainerManifest{
{ID: "aaaa"},
{ID: "bbbb"},
}
dirName, err := ioutil.TempDir("", "foo")
expectNoError(t, err)
for _, manifest := range manifests {
data, err := json.Marshal(manifest)
expectNoError(t, err)
file, err := ioutil.TempFile(dirName, manifest.ID)
expectNoError(t, err)
name := file.Name()
expectNoError(t, file.Close())
ioutil.WriteFile(name, data, 0755)
}
read, err := kubelet.extractFromDir(dirName)
expectNoError(t, err)
if !reflect.DeepEqual(read, manifests) {
t.Errorf("Unexpected difference. Expected %#v, got %#v", manifests, read)
}
}
func TestExtractFromHttpBadness(t *testing.T) {
kubelet := New()
updateChannel := make(chan manifestUpdate)
reader := startReading(updateChannel)
err := kubelet.extractFromHTTP("http://localhost:12345", updateChannel)
if err == nil {
t.Error("Unexpected non-error.")
}
close(updateChannel)
list := reader.GetList()
if len(list) != 0 {
t.Errorf("Unexpected list: %#v", list)
}
}
func TestExtractFromHttpSingle(t *testing.T) {
kubelet := New()
updateChannel := make(chan manifestUpdate)
reader := startReading(updateChannel)
manifests := []api.ContainerManifest{
{Version: "v1beta1", ID: "foo"},
}
// Taking a single-manifest from a URL allows kubelet to be used
// in the implementation of google's container VM image.
data, err := json.Marshal(manifests[0])
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(data),
}
testServer := httptest.NewServer(&fakeHandler)
err = kubelet.extractFromHTTP(testServer.URL, updateChannel)
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
close(updateChannel)
read := reader.GetList()
if len(read) != 1 {
t.Errorf("Unexpected list: %#v", read)
return
}
if !reflect.DeepEqual(manifests, read[0]) {
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", manifests, read[0])
}
}
func TestExtractFromHttpMultiple(t *testing.T) {
kubelet := New()
updateChannel := make(chan manifestUpdate)
reader := startReading(updateChannel)
manifests := []api.ContainerManifest{
{Version: "v1beta1", ID: "foo"},
{Version: "v1beta1", ID: "bar"},
}
data, err := json.Marshal(manifests)
if err != nil {
t.Fatalf("Some weird json problem: %v", err)
}
t.Logf("Serving: %v", string(data))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(data),
}
testServer := httptest.NewServer(&fakeHandler)
err = kubelet.extractFromHTTP(testServer.URL, updateChannel)
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
close(updateChannel)
read := reader.GetList()
if len(read) != 1 {
t.Errorf("Unexpected list: %#v", read)
return
}
if !reflect.DeepEqual(manifests, read[0]) {
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", manifests, read[0])
}
}
func TestExtractFromHttpEmptyArray(t *testing.T) {
kubelet := New()
updateChannel := make(chan manifestUpdate)
reader := startReading(updateChannel)
manifests := []api.ContainerManifest{}
data, err := json.Marshal(manifests)
if err != nil {
t.Fatalf("Some weird json problem: %v", err)
}
t.Logf("Serving: %v", string(data))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(data),
}
testServer := httptest.NewServer(&fakeHandler)
err = kubelet.extractFromHTTP(testServer.URL, updateChannel)
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
close(updateChannel)
read := reader.GetList()
if len(read) != 1 {
t.Errorf("Unexpected list: %#v", read)
return
}
if len(read[0]) != 0 {
t.Errorf("Unexpected manifests: %#v", read[0])
}
}
func TestWatchEtcd(t *testing.T) {
watchChannel := make(chan *etcd.Response)
updateChannel := make(chan manifestUpdate)
kubelet := New()
reader := startReading(updateChannel)
manifest := []api.ContainerManifest{
{
ID: "foo",
},
}
data, err := json.Marshal(manifest)
expectNoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
kubelet.WatchEtcd(watchChannel, updateChannel)
wg.Done()
}()
watchChannel <- &etcd.Response{
Node: &etcd.Node{
Value: string(data),
},
}
close(watchChannel)
wg.Wait()
close(updateChannel)
read := reader.GetList()
if len(read) != 1 {
t.Errorf("Expected number of results: %v", len(read))
} else if !reflect.DeepEqual(read[0], manifest) {
t.Errorf("Unexpected manifest(s) %#v %#v", read[0], manifest)
if actual := filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) {
t.Errorf("Expected %#v, Got %#v", expected, actual)
}
}

View File

@ -22,28 +22,49 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/golang/glog"
"github.com/google/cadvisor/info"
"gopkg.in/v1/yaml"
)
// Server is a http.Handler which exposes kubelet functionality over HTTP.
type Server struct {
Kubelet kubeletInterface
UpdateChannel chan<- manifestUpdate
DelegateHandler http.Handler
host HostInterface
updates chan<- interface{}
handler http.Handler
}
// kubeletInterface contains all the kubelet methods required by the server.
func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{}, delegate http.Handler, address string, port uint) {
glog.Infof("Starting to listen on %s:%d", address, port)
handler := Server{
host: host,
updates: updates,
handler: delegate,
}
s := &http.Server{
Addr: net.JoinHostPort(address, strconv.FormatUint(uint64(port), 10)),
Handler: &handler,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
s.ListenAndServe()
}
// HostInterface contains all the kubelet methods required by the server.
// For testablitiy.
type kubeletInterface interface {
GetContainerInfo(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
type HostInterface interface {
GetContainerInfo(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
GetMachineInfo() (*info.MachineInfo, error)
GetPodInfo(name string) (api.PodInfo, error)
@ -78,13 +99,15 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
if u.Path == "/container" {
// This is to provide backward compatibility. It only supports a single manifest
var manifest api.ContainerManifest
err = yaml.Unmarshal(data, &manifest)
var pod Pod
err = yaml.Unmarshal(data, &pod.Manifest)
if err != nil {
s.error(w, err)
return
}
s.UpdateChannel <- manifestUpdate{httpServerSource, []api.ContainerManifest{manifest}}
//TODO: sha1 of manifest?
pod.Name = "1"
s.updates <- PodUpdate{[]Pod{pod}, SET}
} else if u.Path == "/containers" {
var manifests []api.ContainerManifest
err = yaml.Unmarshal(data, &manifests)
@ -92,15 +115,23 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.error(w, err)
return
}
s.UpdateChannel <- manifestUpdate{httpServerSource, manifests}
pods := make([]Pod, len(manifests))
for i := range manifests {
pods[i].Name = fmt.Sprintf("%d", i+1)
pods[i].Manifest = manifests[i]
}
s.updates <- PodUpdate{pods, SET}
}
case u.Path == "/podInfo":
podID := u.Query().Get("podID")
if len(podID) == 0 {
w.WriteHeader(http.StatusBadRequest)
http.Error(w, "Missing 'podID=' query entry.", http.StatusBadRequest)
return
}
info, err := s.Kubelet.GetPodInfo(podID)
// TODO: backwards compatibility with existing API, needs API change
podFullName := GetPodFullName(&Pod{Name: podID, Namespace: "etcd"})
info, err := s.host.GetPodInfo(podFullName)
if err == ErrNoContainersInPod {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
@ -120,7 +151,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
case strings.HasPrefix(u.Path, "/stats"):
s.serveStats(w, req)
case strings.HasPrefix(u.Path, "/spec"):
info, err := s.Kubelet.GetMachineInfo()
info, err := s.host.GetMachineInfo()
if err != nil {
s.error(w, err)
return
@ -133,14 +164,16 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.Header().Add("Content-type", "application/json")
w.Write(data)
case strings.HasPrefix(u.Path, "/logs/"):
s.Kubelet.ServeLogs(w, req)
s.host.ServeLogs(w, req)
default:
s.DelegateHandler.ServeHTTP(w, req)
if s.handler != nil {
s.handler.ServeHTTP(w, req)
}
}
}
func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) {
// /stats/<podid>/<containerName>
// /stats/<podfullname>/<containerName>
components := strings.Split(strings.TrimPrefix(path.Clean(req.URL.Path), "/"), "/")
var stats *info.ContainerInfo
var err error
@ -153,13 +186,13 @@ func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) {
switch len(components) {
case 1:
// Machine stats
stats, err = s.Kubelet.GetRootInfo(&query)
stats, err = s.host.GetRootInfo(&query)
case 2:
// pod stats
// TODO(monnand) Implement this
errors.New("pod level status currently unimplemented")
case 3:
stats, err = s.Kubelet.GetContainerInfo(components[1], components[2], &query)
stats, err = s.host.GetContainerInfo(components[1], components[2], &query)
default:
http.Error(w, "unknown resource.", http.StatusNotFound)
return

View File

@ -36,7 +36,7 @@ import (
type fakeKubelet struct {
infoFunc func(name string) (api.PodInfo, error)
containerInfoFunc func(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
containerInfoFunc func(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
rootInfoFunc func(query *info.ContainerInfoRequest) (*info.ContainerInfo, error)
machineInfoFunc func() (*info.MachineInfo, error)
logFunc func(w http.ResponseWriter, req *http.Request)
@ -46,8 +46,8 @@ func (fk *fakeKubelet) GetPodInfo(name string) (api.PodInfo, error) {
return fk.infoFunc(name)
}
func (fk *fakeKubelet) GetContainerInfo(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
return fk.containerInfoFunc(podID, containerName, req)
func (fk *fakeKubelet) GetContainerInfo(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
return fk.containerInfoFunc(podFullName, containerName, req)
}
func (fk *fakeKubelet) GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
@ -63,7 +63,7 @@ func (fk *fakeKubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
}
type serverTestFramework struct {
updateChan chan manifestUpdate
updateChan chan interface{}
updateReader *channelReader
serverUnderTest *Server
fakeKubelet *fakeKubelet
@ -72,13 +72,13 @@ type serverTestFramework struct {
func makeServerTest() *serverTestFramework {
fw := &serverTestFramework{
updateChan: make(chan manifestUpdate),
updateChan: make(chan interface{}),
}
fw.updateReader = startReading(fw.updateChan)
fw.fakeKubelet = &fakeKubelet{}
fw.serverUnderTest = &Server{
Kubelet: fw.fakeKubelet,
UpdateChannel: fw.updateChan,
host: fw.fakeKubelet,
updates: fw.updateChan,
}
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
return fw
@ -106,8 +106,9 @@ func TestContainer(t *testing.T) {
if len(received) != 1 {
t.Errorf("Expected 1 manifest, but got %v", len(received))
}
if !reflect.DeepEqual(expected, received[0]) {
t.Errorf("Expected %#v, but got %#v", expected, received[0])
expectedPods := []Pod{Pod{Name: "1", Manifest: expected[0]}}
if !reflect.DeepEqual(expectedPods, received[0]) {
t.Errorf("Expected %#v, but got %#v", expectedPods, received[0])
}
}
@ -128,8 +129,9 @@ func TestContainers(t *testing.T) {
if len(received) != 1 {
t.Errorf("Expected 1 update, but got %v", len(received))
}
if !reflect.DeepEqual(expected, received[0]) {
t.Errorf("Expected %#v, but got %#v", expected, received[0])
expectedPods := []Pod{Pod{Name: "1", Manifest: expected[0]}, Pod{Name: "2", Manifest: expected[1]}}
if !reflect.DeepEqual(expectedPods, received[0]) {
t.Errorf("Expected %#v, but got %#v", expectedPods, received[0])
}
}
@ -137,10 +139,10 @@ func TestPodInfo(t *testing.T) {
fw := makeServerTest()
expected := api.PodInfo{"goodpod": docker.Container{ID: "myContainerID"}}
fw.fakeKubelet.infoFunc = func(name string) (api.PodInfo, error) {
if name == "goodpod" {
if name == "goodpod.etcd" {
return expected, nil
}
return nil, fmt.Errorf("bad pod")
return nil, fmt.Errorf("bad pod %s", name)
}
resp, err := http.Get(fw.testHTTPServer.URL + "/podInfo?podID=goodpod")
if err != nil {

60
pkg/kubelet/types.go Normal file
View File

@ -0,0 +1,60 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// Pod represents the structure of a pod on the Kubelet, distinct from the apiserver
// representation of a Pod.
type Pod struct {
Namespace string
Name string
Manifest api.ContainerManifest
}
// PodOperation defines what changes will be made on a pod configuration.
type PodOperation int
const (
// This is the current pod configuration
SET PodOperation = iota
// Pods with the given ids are new to this source
ADD
// Pods with the given ids have been removed from this source
REMOVE
// Pods with the given ids have been updated in this source
UPDATE
)
// PodUpdate defines an operation sent on the channel. You can add or remove single services by
// sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required).
// For setting the state of the system to a given state for this source configuration, set
// Pods as desired and Op to SET, which will reset the system state to that specified in this
// operation for this source channel. To remove all pods, set Pods to empty array and Op to SET.
type PodUpdate struct {
Pods []Pod
Op PodOperation
}
//GetPodFullName returns a name that full identifies a pod across all config sources.
func GetPodFullName(pod *Pod) string {
return fmt.Sprintf("%s.%s", pod.Name, pod.Namespace)
}

36
pkg/kubelet/validation.go Normal file
View File

@ -0,0 +1,36 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func makeInvalidError(field string, value interface{}) api.ValidationError {
return api.ValidationError{api.ErrTypeInvalid, field, value}
}
func ValidatePod(pod *Pod) (errors []error) {
if !util.IsDNSSubdomain(pod.Name) {
errors = append(errors, makeInvalidError("Pod.Name", pod.Name))
}
if errs := api.ValidateManifest(&pod.Manifest); len(errs) != 0 {
errors = append(errors, errs...)
}
return errors
}

View File

@ -0,0 +1,42 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet_test
import (
"strings"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
. "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
)
func TestValidatePodNoName(t *testing.T) {
errorCases := map[string]Pod{
// manifest is tested in api/validation_test.go, ensure it is invoked
"empty version": {Name: "test", Manifest: api.ContainerManifest{Version: "", ID: "abc"}},
// Name
"zero-length name": {Name: "", Manifest: api.ContainerManifest{Version: "v1beta1"}},
"name > 255 characters": {Name: strings.Repeat("a", 256), Manifest: api.ContainerManifest{Version: "v1beta1"}},
"name not a DNS subdomain": {Name: "a.b.c.", Manifest: api.ContainerManifest{Version: "v1beta1"}},
}
for k, v := range errorCases {
if errs := ValidatePod(&v); len(errs) == 0 {
t.Errorf("expected failure for %s", k)
}
}
}

View File

@ -70,6 +70,11 @@ func IsEtcdConflict(err error) bool {
return isEtcdErrorNum(err, 101)
}
// IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop.
func IsEtcdWatchStoppedByUser(err error) bool {
return etcd.ErrWatchStoppedByUser == err
}
// Returns true iff err is an etcd error, whose errorCode matches errorCode
func isEtcdErrorNum(err error, errorCode int) bool {
etcdError, ok := err.(*etcd.EtcdError)

View File

@ -124,6 +124,10 @@ func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool,
defer close(injectedError)
f.WatchInjectError = injectedError
if receiver == nil {
return f.Get(prefix, false, recursive)
}
f.watchCompletedChan <- true
select {
case <-stop: