diff --git a/pkg/api/validation.go b/pkg/api/validation.go index 8b7c76609c..ecef6e8898 100644 --- a/pkg/api/validation.go +++ b/pkg/api/validation.go @@ -272,9 +272,6 @@ func ValidateManifest(manifest *ContainerManifest) []error { } else if !supportedManifestVersions.Has(strings.ToLower(manifest.Version)) { allErrs.Append(makeNotSupportedError("ContainerManifest.Version", manifest.Version)) } - if !util.IsDNSSubdomain(manifest.ID) { - allErrs.Append(makeInvalidError("ContainerManifest.ID", manifest.ID)) - } allVolumes, errs := validateVolumes(manifest.Volumes) if len(errs) != 0 { allErrs.Append(errs...) diff --git a/pkg/api/validation_test.go b/pkg/api/validation_test.go index 5a82be6508..6916db9da5 100644 --- a/pkg/api/validation_test.go +++ b/pkg/api/validation_test.go @@ -242,11 +242,8 @@ func TestValidateManifest(t *testing.T) { } errorCases := map[string]ContainerManifest{ - "empty version": {Version: "", ID: "abc"}, - "invalid version": {Version: "bogus", ID: "abc"}, - "zero-length id": {Version: "v1beta1", ID: ""}, - "id > 255 characters": {Version: "v1beta1", ID: strings.Repeat("a", 256)}, - "id not a DNS subdomain": {Version: "v1beta1", ID: "a.b.c."}, + "empty version": {Version: "", ID: "abc"}, + "invalid version": {Version: "bogus", ID: "abc"}, "invalid volume name": { Version: "v1beta1", ID: "abc", diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go new file mode 100644 index 0000000000..33206edce1 --- /dev/null +++ b/pkg/kubelet/config/config.go @@ -0,0 +1,304 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "fmt" + "reflect" + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/config" + "github.com/golang/glog" +) + +// PodConfigListener receives notifications for changes to a configuration. +type PodConfigListener interface { + // OnUpdate is invoked when the kubelet.Pod configuration has been changed by one of + // the sources. The update is properly normalized to remove duplicates. + OnUpdate(pod kubelet.PodUpdate) +} + +// ListenerFunc implements the PodConfigListener interface +type ListenerFunc func(update kubelet.PodUpdate) + +func (h ListenerFunc) OnUpdate(update kubelet.PodUpdate) { + h(update) +} + +// PodConfigNotificationMode describes how changes are sent to the update channel +type PodConfigNotificationMode int + +const ( + // PodConfigNotificationSnapshot delivers the full configuration as a SET whenever + // any change occurs + PodConfigNotificationSnapshot = iota + // PodConfigNotificationSetsAndUpdates delivers an UPDATE message whenever pods are + // changed, and a SET message if there are any additions or removals. + PodConfigNotificationSnapshotAndUpdates + // PodConfigNotificationIncremental delivers ADD, UPDATE, and REMOVE to the update channel + PodConfigNotificationIncremental +) + +// PodConfig is a configuration mux that merges many sources of pod configuration into a single +// consistent structure, and then delivers incremental change notifications to listeners +// in order. +type PodConfig struct { + pods *podStorage + mux *config.Mux + + // the channel of denormalized changes passed to listeners + updates chan kubelet.PodUpdate +} + +// NewPodConfig creates an object that can merge many configuration sources into a stream +// of normalized updates to a pod configuration. +func NewPodConfig(mode PodConfigNotificationMode) *PodConfig { + updates := make(chan kubelet.PodUpdate, 1) + storage := newPodStorage(updates, mode) + podConfig := &PodConfig{ + pods: storage, + mux: config.NewMux(storage), + updates: updates, + } + return podConfig +} + +// Channel creates or returns a config source channel. The channel +// only accepts PodUpdates +func (c *PodConfig) Channel(source string) chan<- interface{} { + return c.mux.Channel(source) +} + +// Updates returns a channel of updates to the configuration, properly denormalized. +func (c *PodConfig) Updates() <-chan kubelet.PodUpdate { + return c.updates +} + +// Sync requests the full configuration be delivered to the update channel. +func (c *PodConfig) Sync() { + c.pods.Sync() +} + +// podStorage manages the current pod state at any point in time and ensures updates +// to the channel are delivered in order. Note that this object is an in-memory source of +// "truth" and on creation contains zero entries. Once all previously read sources are +// available, then this object should be considered authoritative. +type podStorage struct { + podLock sync.RWMutex + // map of source name to pod name to pod reference + pods map[string]map[string]*kubelet.Pod + mode PodConfigNotificationMode + + // ensures that updates are delivered in strict order + // on the updates channel + updateLock sync.Mutex + updates chan<- kubelet.PodUpdate +} + +// TODO: PodConfigNotificationMode could be handled by a listener to the updates channel +// in the future, especially with multiple listeners. +// TODO: allow initialization of the current state of the store with snapshotted version. +func newPodStorage(updates chan<- kubelet.PodUpdate, mode PodConfigNotificationMode) *podStorage { + return &podStorage{ + pods: make(map[string]map[string]*kubelet.Pod), + mode: mode, + updates: updates, + } +} + +// Merge normalizes a set of incoming changes from different sources into a map of all Pods +// and ensures that redundant changes are filtered out, and then pushes zero or more minimal +// updates onto the update channel. Ensures that updates are delivered in order. +func (s *podStorage) Merge(source string, change interface{}) error { + s.updateLock.Lock() + defer s.updateLock.Unlock() + + adds, updates, deletes := s.merge(source, change) + + // deliver update notifications + switch s.mode { + case PodConfigNotificationIncremental: + if len(deletes.Pods) > 0 { + s.updates <- *deletes + } + if len(adds.Pods) > 0 { + s.updates <- *adds + } + if len(updates.Pods) > 0 { + s.updates <- *updates + } + + case PodConfigNotificationSnapshotAndUpdates: + if len(updates.Pods) > 0 { + s.updates <- *updates + } + if len(deletes.Pods) > 0 || len(adds.Pods) > 0 { + s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET} + } + + case PodConfigNotificationSnapshot: + if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 { + s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET} + } + + default: + panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode)) + } + + return nil +} + +func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes *kubelet.PodUpdate) { + s.podLock.Lock() + defer s.podLock.Unlock() + + adds = &kubelet.PodUpdate{Op: kubelet.ADD} + updates = &kubelet.PodUpdate{Op: kubelet.UPDATE} + deletes = &kubelet.PodUpdate{Op: kubelet.REMOVE} + + pods := s.pods[source] + if pods == nil { + pods = make(map[string]*kubelet.Pod) + } + + update := change.(kubelet.PodUpdate) + switch update.Op { + case kubelet.ADD, kubelet.UPDATE: + if update.Op == kubelet.ADD { + glog.Infof("Adding new pods from source %s : %v", source, update.Pods) + } else { + glog.Infof("Updating pods from source %s : %v", source, update.Pods) + } + + filtered := filterInvalidPods(update.Pods, source) + for _, ref := range filtered { + name := ref.Name + if existing, found := pods[name]; found { + if !reflect.DeepEqual(existing.Manifest, ref.Manifest) { + // this is an update + existing.Manifest = ref.Manifest + updates.Pods = append(updates.Pods, *existing) + continue + } + // this is a no-op + continue + } + // this is an add + ref.Namespace = source + pods[name] = ref + adds.Pods = append(adds.Pods, *ref) + } + + case kubelet.REMOVE: + glog.Infof("Removing a pod %v", update) + for _, value := range update.Pods { + name := value.Name + if existing, found := pods[name]; found { + // this is a delete + delete(pods, name) + deletes.Pods = append(deletes.Pods, *existing) + continue + } + // this is a no-op + } + + case kubelet.SET: + glog.Infof("Setting pods for source %s : %v", source, update) + // Clear the old map entries by just creating a new map + oldPods := pods + pods = make(map[string]*kubelet.Pod) + + filtered := filterInvalidPods(update.Pods, source) + for _, ref := range filtered { + name := ref.Name + if existing, found := oldPods[name]; found { + pods[name] = existing + if !reflect.DeepEqual(existing.Manifest, ref.Manifest) { + // this is an update + existing.Manifest = ref.Manifest + updates.Pods = append(updates.Pods, *existing) + continue + } + // this is a no-op + continue + } + ref.Namespace = source + pods[name] = ref + adds.Pods = append(adds.Pods, *ref) + } + + for name, existing := range oldPods { + if _, found := pods[name]; !found { + // this is a delete + deletes.Pods = append(deletes.Pods, *existing) + } + } + + default: + glog.Infof("Received invalid update type: %v", update) + + } + + s.pods[source] = pods + return adds, updates, deletes +} + +func filterInvalidPods(pods []kubelet.Pod, source string) (filtered []*kubelet.Pod) { + names := util.StringSet{} + for i := range pods { + var errors []error + if names.Has(pods[i].Name) { + errors = append(errors, api.ValidationError{api.ErrTypeDuplicate, "Pod.Name", pods[i].Name}) + } else { + names.Insert(pods[i].Name) + } + if errs := kubelet.ValidatePod(&pods[i]); len(errs) != 0 { + errors = append(errors, errs...) + } + if len(errors) > 0 { + glog.Warningf("Pod %d from %s failed validation, ignoring: %v", i+1, source, errors) + continue + } + filtered = append(filtered, &pods[i]) + } + return +} + +// Sync sends a copy of the current state through the update channel +func (s *podStorage) Sync() { + s.updateLock.Lock() + defer s.updateLock.Unlock() + s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET} +} + +// Object implements config.Accessor +func (s *podStorage) MergedState() interface{} { + s.podLock.RLock() + defer s.podLock.RUnlock() + pods := make([]kubelet.Pod, 0) + for source, sourcePods := range s.pods { + for _, podRef := range sourcePods { + pod := *podRef + pod.Namespace = source + pods = append(pods, pod) + } + } + return pods +} diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go new file mode 100644 index 0000000000..338b77bcfc --- /dev/null +++ b/pkg/kubelet/config/config_test.go @@ -0,0 +1,212 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" +) + +// TODO: remove this +func expectError(t *testing.T, err error) { + if err == nil { + t.Errorf("Expected error, Got %v", err) + } +} + +// TODO: remove this +func expectNoError(t *testing.T, err error) { + if err != nil { + t.Errorf("Expected no error, Got %v", err) + } +} + +func expectEmptyChannel(t *testing.T, ch <-chan interface{}) { + select { + case update := <-ch: + t.Errorf("Expected no update in channel, Got %v", update) + default: + } +} + +type sortedPods []kubelet.Pod + +func (s sortedPods) Len() int { + return len(s) +} +func (s sortedPods) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} +func (s sortedPods) Less(i, j int) bool { + if s[i].Namespace < s[j].Namespace { + return true + } + return s[i].Name < s[j].Name +} + +func CreateValidPod(name, namespace string) kubelet.Pod { + return kubelet.Pod{ + Name: name, + Namespace: namespace, + Manifest: api.ContainerManifest{ + Version: "v1beta1", + }, + } +} + +func CreatePodUpdate(op kubelet.PodOperation, pods ...kubelet.Pod) kubelet.PodUpdate { + newPods := make([]kubelet.Pod, len(pods)) + for i := range pods { + newPods[i] = pods[i] + } + return kubelet.PodUpdate{newPods, op} +} + +func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubelet.PodUpdate, *PodConfig) { + config := NewPodConfig(mode) + channel := config.Channel("test") + ch := config.Updates() + return channel, ch, config +} + +func expectPodUpdate(t *testing.T, ch <-chan kubelet.PodUpdate, expected ...kubelet.PodUpdate) { + for i := range expected { + update := <-ch + if !reflect.DeepEqual(expected[i], update) { + t.Fatalf("Expected %#v, Got %#v", expected[i], update) + } + } + expectNoPodUpdate(t, ch) +} + +func expectNoPodUpdate(t *testing.T, ch <-chan kubelet.PodUpdate) { + select { + case update := <-ch: + t.Errorf("Expected no update in channel, Got %#v", update) + default: + } +} + +func TestNewPodAdded(t *testing.T) { + channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) + + // see an update + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"))) + + config.Sync() + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) +} + +func TestInvalidPodFiltered(t *testing.T) { + channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) + + // see an update + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"))) + + // add an invalid update + podUpdate = CreatePodUpdate(kubelet.UPDATE, kubelet.Pod{Name: "foo"}) + channel <- podUpdate + expectNoPodUpdate(t, ch) +} + +func TestNewPodAddedSnapshotAndUpdates(t *testing.T) { + channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates) + + // see an set + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + + config.Sync() + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + + // container updates are separated as UPDATE + pod := podUpdate.Pods[0] + pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}} + channel <- CreatePodUpdate(kubelet.ADD, pod) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod)) +} + +func TestNewPodAddedSnapshot(t *testing.T) { + channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot) + + // see an set + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + + config.Sync() + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + + // container updates are separated as UPDATE + pod := podUpdate.Pods[0] + pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}} + channel <- CreatePodUpdate(kubelet.ADD, pod) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, pod)) +} + +func TestNewPodAddedUpdatedRemoved(t *testing.T) { + channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) + + // should register an add + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"))) + + // should ignore ADDs that are identical + expectNoPodUpdate(t, ch) + + // an kubelet.ADD should be converted to kubelet.UPDATE + pod := CreateValidPod("foo", "test") + pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}} + podUpdate = CreatePodUpdate(kubelet.ADD, pod) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod)) + + podUpdate = CreatePodUpdate(kubelet.REMOVE, kubelet.Pod{Name: "foo"}) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, pod)) +} + +func TestNewPodAddedUpdatedSet(t *testing.T) { + channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) + + // should register an add + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""), CreateValidPod("foo2", ""), CreateValidPod("foo3", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"), CreateValidPod("foo2", "test"), CreateValidPod("foo3", "test"))) + + // should ignore ADDs that are identical + expectNoPodUpdate(t, ch) + + // should be converted to an kubelet.ADD, kubelet.REMOVE, and kubelet.UPDATE + pod := CreateValidPod("foo2", "test") + pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}} + podUpdate = CreatePodUpdate(kubelet.SET, pod, CreateValidPod("foo3", ""), CreateValidPod("foo4", "test")) + channel <- podUpdate + expectPodUpdate(t, ch, + CreatePodUpdate(kubelet.REMOVE, CreateValidPod("foo", "test")), + CreatePodUpdate(kubelet.ADD, CreateValidPod("foo4", "test")), + CreatePodUpdate(kubelet.UPDATE, pod)) +} diff --git a/pkg/kubelet/config/etcd.go b/pkg/kubelet/config/etcd.go new file mode 100644 index 0000000000..48dc7b411b --- /dev/null +++ b/pkg/kubelet/config/etcd.go @@ -0,0 +1,140 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or sied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Reads the pod configuration from etcd using the Kubernetes etcd schema +package config + +import ( + "fmt" + "path" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/coreos/go-etcd/etcd" + "github.com/golang/glog" + "gopkg.in/v1/yaml" +) + +func EtcdKeyForHost(hostname string) string { + return path.Join("/", "registry", "hosts", hostname, "kubelet") +} + +// TODO(lavalamp): Use a watcher interface instead of the etcd client directly +type SourceEtcd struct { + key string + client tools.EtcdClient + updates chan<- interface{} + + waitDuration time.Duration +} + +// NewSourceEtcd creates a config source that watches and pulls from a key in etcd +func NewSourceEtcd(key string, client tools.EtcdClient, period time.Duration, updates chan<- interface{}) *SourceEtcd { + config := &SourceEtcd{ + key: key, + client: client, + updates: updates, + + waitDuration: period, + } + glog.Infof("Watching etcd for %s", key) + go util.Forever(config.run, period) + return config +} + +// run loops forever looking for changes to a key in etcd +func (s *SourceEtcd) run() { + index := uint64(0) + for { + lastIndex, err := s.fetchNextState(index) + if err != nil { + if !tools.IsEtcdNotFound(err) { + glog.Errorf("Unable to extract from the response (%s): %%v", s.key, err) + } + return + } + index = lastIndex + 1 + } +} + +// fetchNextState fetches the key (or waits for a change to a key) and then returns +// the index read. It will watch no longer than s.waitDuration and then return +func (s *SourceEtcd) fetchNextState(fromIndex uint64) (lastIndex uint64, err error) { + var response *etcd.Response + + if fromIndex == 0 { + response, err = s.client.Get(s.key, true, false) + } else { + response, err = s.client.Watch(s.key, fromIndex, false, nil, stopChannel(s.waitDuration)) + if tools.IsEtcdWatchStoppedByUser(err) { + return fromIndex, nil + } + } + if err != nil { + return 0, err + } + + pods, err := responseToPods(response) + if err != nil { + glog.Infof("Response was in error: %#v", response) + return 0, fmt.Errorf("error parsing response: %#v", err) + } + + glog.Infof("Got state from etcd: %+v", pods) + s.updates <- kubelet.PodUpdate{pods, kubelet.SET} + + return response.Node.ModifiedIndex, nil +} + +// responseToPods takes an etcd Response object, and turns it into a structured list of containers. +// It returns a list of containers, or an error if one occurs. +func responseToPods(response *etcd.Response) ([]kubelet.Pod, error) { + pods := []kubelet.Pod{} + if response.Node == nil || len(response.Node.Value) == 0 { + return pods, fmt.Errorf("no nodes field: %v", response) + } + + manifests := []api.ContainerManifest{} + if err := yaml.Unmarshal([]byte(response.Node.Value), &manifests); err != nil { + return pods, fmt.Errorf("could not unmarshal manifests: %v", err) + } + + for i, manifest := range manifests { + name := manifest.ID + if name == "" { + name = fmt.Sprintf("_%d", i+1) + } + pods = append(pods, kubelet.Pod{Name: name, Manifest: manifest}) + } + return pods, nil +} + +// stopChannel creates a channel that is closed after a duration for use with etcd client API +func stopChannel(until time.Duration) chan bool { + stop := make(chan bool) + go func() { + select { + case <-time.After(until): + } + stop <- true + close(stop) + }() + return stop + +} diff --git a/pkg/kubelet/config/etcd_test.go b/pkg/kubelet/config/etcd_test.go new file mode 100644 index 0000000000..462ac77ea2 --- /dev/null +++ b/pkg/kubelet/config/etcd_test.go @@ -0,0 +1,147 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "reflect" + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/coreos/go-etcd/etcd" +) + +// TODO(lavalamp): Use the etcd watcher from the tools package, and make sure all test cases here are tested there. + +func TestGetEtcdData(t *testing.T) { + fakeClient := tools.MakeFakeEtcdClient(t) + ch := make(chan interface{}) + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: util.MakeJSONString([]api.ContainerManifest{api.ContainerManifest{ID: "foo"}}), + ModifiedIndex: 1, + }, + }, + E: nil, + } + NewSourceEtcd("/registry/hosts/machine/kubelet", fakeClient, time.Millisecond, ch) + + //TODO: update FakeEtcdClient.Watch to handle receiver=nil with a given index + //returns an infinite stream of updates + for i := 0; i < 2; i++ { + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: "foo", Manifest: api.ContainerManifest{ID: "foo"}}) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected %#v, Got %#v", expected, update) + } + } +} + +func TestGetEtcdNoData(t *testing.T) { + fakeClient := tools.MakeFakeEtcdClient(t) + ch := make(chan interface{}, 1) + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: nil, + } + c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} + _, err := c.fetchNextState(0) + expectError(t, err) + expectEmptyChannel(t, ch) +} + +func TestGetEtcd(t *testing.T) { + fakeClient := tools.MakeFakeEtcdClient(t) + ch := make(chan interface{}, 1) + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: util.MakeJSONString([]api.ContainerManifest{api.ContainerManifest{ID: "foo"}}), + ModifiedIndex: 1, + }, + }, + E: nil, + } + c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} + lastIndex, err := c.fetchNextState(0) + expectNoError(t, err) + if lastIndex != 1 { + t.Errorf("Expected %#v, Got %#v", 1, lastIndex) + } + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: "foo", Manifest: api.ContainerManifest{ID: "foo"}}) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected %#v, Got %#v", expected, update) + } +} + +func TestWatchEtcd(t *testing.T) { + fakeClient := tools.MakeFakeEtcdClient(t) + ch := make(chan interface{}, 1) + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: util.MakeJSONString([]api.Container{}), + ModifiedIndex: 2, + }, + }, + E: nil, + } + c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} + lastIndex, err := c.fetchNextState(1) + expectNoError(t, err) + if lastIndex != 2 { + t.Errorf("Expected %d, Got %d", 1, lastIndex) + } + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected %#v, Got %#v", expected, update) + } +} + +func TestGetEtcdNotFound(t *testing.T) { + fakeClient := tools.MakeFakeEtcdClient(t) + ch := make(chan interface{}, 1) + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} + _, err := c.fetchNextState(0) + expectError(t, err) + expectEmptyChannel(t, ch) +} + +func TestGetEtcdError(t *testing.T) { + fakeClient := tools.MakeFakeEtcdClient(t) + ch := make(chan interface{}, 1) + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: &etcd.EtcdError{ + ErrorCode: 200, // non not found error + }, + } + c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} + _, err := c.fetchNextState(0) + expectError(t, err) + expectEmptyChannel(t, ch) +} diff --git a/pkg/kubelet/config/file.go b/pkg/kubelet/config/file.go new file mode 100644 index 0000000000..6128b8d8bf --- /dev/null +++ b/pkg/kubelet/config/file.go @@ -0,0 +1,149 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or sied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Reads the pod configuration from file or a directory of files +package config + +import ( + "crypto/sha1" + "encoding/base64" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" + "gopkg.in/v1/yaml" +) + +type SourceFile struct { + path string + updates chan<- interface{} +} + +func NewSourceFile(path string, period time.Duration, updates chan<- interface{}) *SourceFile { + config := &SourceFile{ + path: path, + updates: updates, + } + glog.Infof("Watching file %s", path) + go util.Forever(config.run, period) + return config +} + +func (s *SourceFile) run() { + if err := s.extractFromPath(); err != nil { + glog.Errorf("Unable to read config file: %s", err) + } +} + +func (s *SourceFile) extractFromPath() error { + path := s.path + statInfo, err := os.Stat(path) + if err != nil { + if !os.IsNotExist(err) { + return fmt.Errorf("unable to access path: %s", err) + } + return fmt.Errorf("path does not exist: %s", path) + } + + switch { + case statInfo.Mode().IsDir(): + pods, err := extractFromDir(path) + if err != nil { + return err + } + s.updates <- kubelet.PodUpdate{pods, kubelet.SET} + + case statInfo.Mode().IsRegular(): + pod, err := extractFromFile(path) + if err != nil { + return err + } + s.updates <- kubelet.PodUpdate{[]kubelet.Pod{pod}, kubelet.SET} + + default: + return fmt.Errorf("path is not a directory or file") + } + + return nil +} + +func extractFromDir(name string) ([]kubelet.Pod, error) { + pods := []kubelet.Pod{} + + files, err := filepath.Glob(filepath.Join(name, "[^.]*")) + if err != nil { + return pods, err + } + + sort.Strings(files) + + for _, file := range files { + pod, err := extractFromFile(file) + if err != nil { + return []kubelet.Pod{}, err + } + pods = append(pods, pod) + } + return pods, nil +} + +func extractFromFile(name string) (kubelet.Pod, error) { + var pod kubelet.Pod + + file, err := os.Open(name) + if err != nil { + return pod, err + } + defer file.Close() + + data, err := ioutil.ReadAll(file) + if err != nil { + glog.Errorf("Couldn't read from file: %v", err) + return pod, err + } + + if err := yaml.Unmarshal(data, &pod.Manifest); err != nil { + return pod, fmt.Errorf("could not unmarshal manifest: %v", err) + } + + podName := pod.Manifest.ID + if podName == "" { + podName = simpleSubdomainSafeHash(name) + } + pod.Name = podName + + return pod, nil +} + +var simpleSubdomainSafeEncoding = base64.NewEncoding("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ012345678900") + +// simpleSubdomainSafeHash generates a compact hash of the input that uses characters +// only in the range a-zA-Z0-9, making it suitable for DNS subdomain labels +func simpleSubdomainSafeHash(s string) string { + hasher := sha1.New() + hasher.Write([]byte(s)) + sha := simpleSubdomainSafeEncoding.EncodeToString(hasher.Sum(nil)) + if len(sha) > 20 { + sha = sha[:20] + } + return sha +} diff --git a/pkg/kubelet/config/file_test.go b/pkg/kubelet/config/file_test.go new file mode 100644 index 0000000000..6909eca5ef --- /dev/null +++ b/pkg/kubelet/config/file_test.go @@ -0,0 +1,211 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "encoding/json" + "io/ioutil" + "os" + "reflect" + "sort" + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "gopkg.in/v1/yaml" +) + +func TestExtractFromNonExistentFile(t *testing.T) { + ch := make(chan interface{}, 1) + c := SourceFile{"/some/fake/file", ch} + err := c.extractFromPath() + expectError(t, err) +} + +func TestUpdateOnNonExistentFile(t *testing.T) { + ch := make(chan interface{}) + NewSourceFile("random_non_existent_path", time.Millisecond, ch) + select { + case got := <-ch: + t.Errorf("Expected no update, Got %#v", got) + case <-time.After(2 * time.Millisecond): + } +} + +func writeTestFile(t *testing.T, dir, name string, contents string) *os.File { + file, err := ioutil.TempFile(os.TempDir(), "test_pod_config") + if err != nil { + t.Fatalf("Unable to create test file %#v", err) + } + file.Close() + if err := ioutil.WriteFile(file.Name(), []byte(contents), 0555); err != nil { + t.Fatalf("Unable to write test file %#v", err) + } + return file +} + +func TestReadFromFile(t *testing.T) { + file := writeTestFile(t, os.TempDir(), "test_pod_config", "version: v1beta1\nid: test\ncontainers:\n- image: test/image") + defer os.Remove(file.Name()) + + ch := make(chan interface{}) + NewSourceFile(file.Name(), time.Millisecond, ch) + select { + case got := <-ch: + update := got.(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{ + Name: "test", + Manifest: api.ContainerManifest{ + ID: "test", + Version: "v1beta1", + Containers: []api.Container{api.Container{ + Image: "test/image"}, + }, + }, + }) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected %#v, Got %#v", expected, update) + } + + case <-time.After(2 * time.Millisecond): + t.Errorf("Expected update, timeout instead") + } +} + +func TestExtractFromBadDataFile(t *testing.T) { + file := writeTestFile(t, os.TempDir(), "test_pod_config", string([]byte{1, 2, 3})) + defer os.Remove(file.Name()) + + ch := make(chan interface{}, 1) + c := SourceFile{file.Name(), ch} + err := c.extractFromPath() + expectError(t, err) + expectEmptyChannel(t, ch) +} + +func TestExtractFromValidDataFile(t *testing.T) { + manifest := api.ContainerManifest{ID: ""} + + text, err := json.Marshal(manifest) + expectNoError(t, err) + file := writeTestFile(t, os.TempDir(), "test_pod_config", string(text)) + defer os.Remove(file.Name()) + + ch := make(chan interface{}, 1) + c := SourceFile{file.Name(), ch} + err = c.extractFromPath() + expectNoError(t, err) + + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: simpleSubdomainSafeHash(file.Name()), Manifest: manifest}) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected %#v, Got %#v", expected, update) + } +} + +func TestExtractFromEmptyDir(t *testing.T) { + dirName, err := ioutil.TempDir("", "foo") + expectNoError(t, err) + defer os.RemoveAll(dirName) + + ch := make(chan interface{}, 1) + c := SourceFile{dirName, ch} + err = c.extractFromPath() + expectNoError(t, err) + + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected %#v, Got %#v", expected, update) + } +} + +func TestExtractFromDir(t *testing.T) { + manifests := []api.ContainerManifest{ + {ID: "", Containers: []api.Container{{Image: "foo"}}}, + {ID: "", Containers: []api.Container{{Image: "bar"}}}, + } + files := make([]*os.File, len(manifests)) + + dirName, err := ioutil.TempDir("", "foo") + expectNoError(t, err) + + for i, manifest := range manifests { + data, err := json.Marshal(manifest) + expectNoError(t, err) + file, err := ioutil.TempFile(dirName, manifest.ID) + expectNoError(t, err) + name := file.Name() + expectNoError(t, file.Close()) + ioutil.WriteFile(name, data, 0755) + files[i] = file + } + + ch := make(chan interface{}, 1) + c := SourceFile{dirName, ch} + err = c.extractFromPath() + expectNoError(t, err) + + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate( + kubelet.SET, + kubelet.Pod{Name: simpleSubdomainSafeHash(files[0].Name()), Manifest: manifests[0]}, + kubelet.Pod{Name: simpleSubdomainSafeHash(files[1].Name()), Manifest: manifests[1]}, + ) + sort.Sort(sortedPods(update.Pods)) + sort.Sort(sortedPods(expected.Pods)) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected %#v, Got %#v", expected, update) + } +} + +// These are used for testing extract json (below) +type TestData struct { + Value string + Number int +} + +type TestObject struct { + Name string + Data TestData +} + +func verifyStringEquals(t *testing.T, actual, expected string) { + if actual != expected { + t.Errorf("Verification failed. Expected: %s, Found %s", expected, actual) + } +} + +func verifyIntEquals(t *testing.T, actual, expected int) { + if actual != expected { + t.Errorf("Verification failed. Expected: %d, Found %d", expected, actual) + } +} + +func TestExtractJSON(t *testing.T) { + obj := TestObject{} + data := `{ "name": "foo", "data": { "value": "bar", "number": 10 } }` + + if err := yaml.Unmarshal([]byte(data), &obj); err != nil { + t.Fatalf("Could not unmarshal JSON: %v", err) + } + + verifyStringEquals(t, obj.Name, "foo") + verifyStringEquals(t, obj.Data.Value, "bar") + verifyIntEquals(t, obj.Data.Number, 10) +} diff --git a/pkg/kubelet/config/http.go b/pkg/kubelet/config/http.go new file mode 100644 index 0000000000..b28f68056e --- /dev/null +++ b/pkg/kubelet/config/http.go @@ -0,0 +1,117 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or sied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Reads the pod configuration from an HTTP GET response +package config + +import ( + "fmt" + "io/ioutil" + "net/http" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" + "gopkg.in/v1/yaml" +) + +type SourceURL struct { + url string + updates chan<- interface{} +} + +func NewSourceURL(url string, period time.Duration, updates chan<- interface{}) *SourceURL { + config := &SourceURL{ + url: url, + updates: updates, + } + glog.Infof("Watching URL %s", url) + go util.Forever(config.run, period) + return config +} + +func (s *SourceURL) run() { + if err := s.extractFromURL(); err != nil { + glog.Errorf("Failed to read URL: %s", err) + } +} + +func (s *SourceURL) extractFromURL() error { + resp, err := http.Get(s.url) + if err != nil { + return err + } + defer resp.Body.Close() + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + if len(data) == 0 { + return fmt.Errorf("zero-length data received from %v", s.url) + } + + // First try as if it's a single manifest + var pod kubelet.Pod + singleErr := yaml.Unmarshal(data, &pod.Manifest) + // TODO: replace with validation + if singleErr == nil && pod.Manifest.Version == "" { + // If data is a []ContainerManifest, trying to put it into a ContainerManifest + // will not give an error but also won't set any of the fields. + // Our docs say that the version field is mandatory, so using that to judge wether + // this was actually successful. + singleErr = fmt.Errorf("got blank version field") + } + + if singleErr == nil { + name := pod.Manifest.ID + if name == "" { + name = "1" + } + pod.Name = name + s.updates <- kubelet.PodUpdate{[]kubelet.Pod{pod}, kubelet.SET} + return nil + } + + // That didn't work, so try an array of manifests. + var manifests []api.ContainerManifest + multiErr := yaml.Unmarshal(data, &manifests) + // We're not sure if the person reading the logs is going to care about the single or + // multiple manifest unmarshalling attempt, so we need to put both in the logs, as is + // done at the end. Hence not returning early here. + if multiErr == nil && len(manifests) > 0 && manifests[0].Version == "" { + multiErr = fmt.Errorf("got blank version field") + } + if multiErr == nil { + pods := []kubelet.Pod{} + for i := range manifests { + pod := kubelet.Pod{Manifest: manifests[i]} + name := pod.Manifest.ID + if name == "" { + name = fmt.Sprintf("%d", i+1) + } + pod.Name = name + pods = append(pods, pod) + } + s.updates <- kubelet.PodUpdate{pods, kubelet.SET} + return nil + } + + return fmt.Errorf("%v: received '%v', but couldn't parse as a "+ + "single manifest (%v: %+v) or as multiple manifests (%v: %+v).\n", + s.url, string(data), singleErr, pod.Manifest, multiErr, manifests) +} diff --git a/pkg/kubelet/config/http_test.go b/pkg/kubelet/config/http_test.go new file mode 100644 index 0000000000..b1306f0611 --- /dev/null +++ b/pkg/kubelet/config/http_test.go @@ -0,0 +1,124 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "encoding/json" + "net/http/httptest" + "reflect" + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +func TestURLErrorNotExistNoUpdate(t *testing.T) { + ch := make(chan interface{}) + NewSourceURL("http://localhost:49575/_not_found_", time.Millisecond, ch) + select { + case got := <-ch: + t.Errorf("Expected no update, Got %#v", got) + case <-time.After(2 * time.Millisecond): + } +} + +func TestExtractFromHttpBadness(t *testing.T) { + ch := make(chan interface{}, 1) + c := SourceURL{"http://localhost:49575/_not_found_", ch} + err := c.extractFromURL() + expectError(t, err) + expectEmptyChannel(t, ch) +} + +func TestExtractFromHttpSingle(t *testing.T) { + manifests := []api.ContainerManifest{ + {Version: "v1beta1", ID: "foo"}, + } + // Taking a single-manifest from a URL allows kubelet to be used + // in the implementation of google's container VM image. + data, err := json.Marshal(manifests[0]) + + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: string(data), + } + testServer := httptest.NewServer(&fakeHandler) + ch := make(chan interface{}, 1) + c := SourceURL{testServer.URL, ch} + + err = c.extractFromURL() + expectNoError(t, err) + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: "foo", Manifest: manifests[0]}) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected: %#v, Got: %#v", expected, update) + } +} + +func TestExtractFromHttpMultiple(t *testing.T) { + manifests := []api.ContainerManifest{ + {Version: "v1beta1", ID: ""}, + {Version: "v1beta1", ID: "bar"}, + } + data, err := json.Marshal(manifests) + if err != nil { + t.Fatalf("Some weird json problem: %v", err) + } + + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: string(data), + } + testServer := httptest.NewServer(&fakeHandler) + ch := make(chan interface{}, 1) + c := SourceURL{testServer.URL, ch} + + err = c.extractFromURL() + expectNoError(t, err) + + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: "1", Manifest: manifests[0]}, kubelet.Pod{Name: "bar", Manifest: manifests[1]}) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected: %#v, Got: %#v", expected, update) + } +} + +func TestExtractFromHttpEmptyArray(t *testing.T) { + manifests := []api.ContainerManifest{} + data, err := json.Marshal(manifests) + if err != nil { + t.Fatalf("Some weird json problem: %v", err) + } + + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: string(data), + } + testServer := httptest.NewServer(&fakeHandler) + ch := make(chan interface{}, 1) + c := SourceURL{testServer.URL, ch} + + err = c.extractFromURL() + expectNoError(t, err) + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected: %#v, Got: %#v", expected, update) + } +} diff --git a/pkg/kubelet/types.go b/pkg/kubelet/types.go new file mode 100644 index 0000000000..ca5ba15ccd --- /dev/null +++ b/pkg/kubelet/types.go @@ -0,0 +1,60 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "fmt" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +// Pod represents the structure of a pod on the Kubelet, distinct from the apiserver +// representation of a Pod. +type Pod struct { + Namespace string + Name string + Manifest api.ContainerManifest +} + +// PodOperation defines what changes will be made on a pod configuration. +type PodOperation int + +const ( + // This is the current pod configuration + SET PodOperation = iota + // Pods with the given ids are new to this source + ADD + // Pods with the given ids have been removed from this source + REMOVE + // Pods with the given ids have been updated in this source + UPDATE +) + +// PodUpdate defines an operation sent on the channel. You can add or remove single services by +// sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required). +// For setting the state of the system to a given state for this source configuration, set +// Pods as desired and Op to SET, which will reset the system state to that specified in this +// operation for this source channel. To remove all pods, set Pods to empty array and Op to SET. +type PodUpdate struct { + Pods []Pod + Op PodOperation +} + +//GetPodFullName returns a name that full identifies a pod across all config sources. +func GetPodFullName(pod *Pod) string { + return fmt.Sprintf("%s.%s", pod.Name, pod.Namespace) +} diff --git a/pkg/kubelet/validation.go b/pkg/kubelet/validation.go new file mode 100644 index 0000000000..710c14bb9b --- /dev/null +++ b/pkg/kubelet/validation.go @@ -0,0 +1,36 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +func makeInvalidError(field string, value interface{}) api.ValidationError { + return api.ValidationError{api.ErrTypeInvalid, field, value} +} + +func ValidatePod(pod *Pod) (errors []error) { + if !util.IsDNSSubdomain(pod.Name) { + errors = append(errors, makeInvalidError("Pod.Name", pod.Name)) + } + if errs := api.ValidateManifest(&pod.Manifest); len(errs) != 0 { + errors = append(errors, errs...) + } + return errors +} diff --git a/pkg/kubelet/validation_test.go b/pkg/kubelet/validation_test.go new file mode 100644 index 0000000000..dfb02fa2f2 --- /dev/null +++ b/pkg/kubelet/validation_test.go @@ -0,0 +1,42 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet_test + +import ( + "strings" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + . "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" +) + +func TestValidatePodNoName(t *testing.T) { + errorCases := map[string]Pod{ + // manifest is tested in api/validation_test.go, ensure it is invoked + "empty version": {Name: "test", Manifest: api.ContainerManifest{Version: "", ID: "abc"}}, + + // Name + "zero-length name": {Name: "", Manifest: api.ContainerManifest{Version: "v1beta1"}}, + "name > 255 characters": {Name: strings.Repeat("a", 256), Manifest: api.ContainerManifest{Version: "v1beta1"}}, + "name not a DNS subdomain": {Name: "a.b.c.", Manifest: api.ContainerManifest{Version: "v1beta1"}}, + } + for k, v := range errorCases { + if errs := ValidatePod(&v); len(errs) == 0 { + t.Errorf("expected failure for %s", k) + } + } +} diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 1d3e39d03e..1c6a3fc8ee 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -70,6 +70,11 @@ func IsEtcdConflict(err error) bool { return isEtcdErrorNum(err, 101) } +// IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop. +func IsEtcdWatchStoppedByUser(err error) bool { + return etcd.ErrWatchStoppedByUser == err +} + // Returns true iff err is an etcd error, whose errorCode matches errorCode func isEtcdErrorNum(err error, errorCode int) bool { etcdError, ok := err.(*etcd.EtcdError) diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index 6dd3c38ec9..5979e71dae 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -124,6 +124,10 @@ func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, defer close(injectedError) f.WatchInjectError = injectedError + if receiver == nil { + return f.Get(prefix, false, recursive) + } + f.watchCompletedChan <- true select { case <-stop: