diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 0fe9e0ab79..6fcce92d48 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -37,6 +37,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" "github.com/coreos/go-etcd/etcd" "github.com/golang/glog" ) @@ -135,7 +136,24 @@ func startComponents(manifestURL string) (apiServerURL string) { return apiServer.URL } -func runReplicationControllerTest(kubeClient *client.Client) { +// podsOnMinions returns true when all of the selected pods exist on a minion. +func podsOnMinions(c *client.Client, pods api.PodList) wait.ConditionFunc { + podInfo := fakePodInfoGetter{} + return func() (bool, error) { + for i := range pods.Items { + host, id := pods.Items[i].CurrentState.Host, pods.Items[i].ID + if len(host) == 0 { + return false, nil + } + if _, err := podInfo.GetPodInfo(host, id); err != nil { + return false, nil + } + } + return true, nil + } +} + +func runReplicationControllerTest(c *client.Client) { data, err := ioutil.ReadFile("api/examples/controller.json") if err != nil { glog.Fatalf("Unexpected error: %#v", err) @@ -146,20 +164,26 @@ func runReplicationControllerTest(kubeClient *client.Client) { } glog.Infof("Creating replication controllers") - if _, err := kubeClient.CreateReplicationController(controllerRequest); err != nil { + if _, err := c.CreateReplicationController(controllerRequest); err != nil { glog.Fatalf("Unexpected error: %#v", err) } glog.Infof("Done creating replication controllers") // Give the controllers some time to actually create the pods - time.Sleep(time.Second * 10) - - // Validate that they're truly up. - pods, err := kubeClient.ListPods(labels.Set(controllerRequest.DesiredState.ReplicaSelector).AsSelector()) - if err != nil || len(pods.Items) != controllerRequest.DesiredState.Replicas { - glog.Fatalf("FAILED: %#v", pods.Items) + if err := wait.Poll(time.Second, 10, c.ControllerHasDesiredReplicas(controllerRequest)); err != nil { + glog.Fatalf("FAILED: pods never created %v", err) } - glog.Infof("Replication controller produced:\n\n%#v\n\n", pods) + + // wait for minions to indicate they have info about the desired pods + pods, err := c.ListPods(labels.Set(controllerRequest.DesiredState.ReplicaSelector).AsSelector()) + if err != nil { + glog.Fatalf("FAILED: unable to get pods to list: %v", err) + } + if err := wait.Poll(time.Second, 10, podsOnMinions(c, pods)); err != nil { + glog.Fatalf("FAILED: pods never started running %v", err) + } + + glog.Infof("Pods created") } func runAtomicPutTest(c *client.Client) { diff --git a/pkg/client/conditions.go b/pkg/client/conditions.go new file mode 100644 index 0000000000..4af01fe535 --- /dev/null +++ b/pkg/client/conditions.go @@ -0,0 +1,35 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" +) + +// ControllerHasDesiredReplicas returns a condition that will be true iff the desired replica count +// for a controller's ReplicaSelector equals the Replicas count. +func (c *Client) ControllerHasDesiredReplicas(controller api.ReplicationController) wait.ConditionFunc { + return func() (bool, error) { + pods, err := c.ListPods(labels.Set(controller.DesiredState.ReplicaSelector).AsSelector()) + if err != nil { + return false, err + } + return len(pods.Items) == controller.DesiredState.Replicas, nil + } +} diff --git a/pkg/kubelet/config/etcd_test.go b/pkg/kubelet/config/etcd_test.go index f0b025deaf..c044c99ed0 100644 --- a/pkg/kubelet/config/etcd_test.go +++ b/pkg/kubelet/config/etcd_test.go @@ -63,7 +63,7 @@ func TestGetEtcdNoData(t *testing.T) { R: &etcd.Response{}, E: nil, } - c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} + c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond, time.Minute} _, err := c.fetchNextState(0) if err == nil { t.Errorf("Expected error") @@ -86,7 +86,7 @@ func TestGetEtcd(t *testing.T) { }, E: nil, } - c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} + c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond, time.Minute} lastIndex, err := c.fetchNextState(0) if err != nil { t.Errorf("Unexpected error: %v", err) @@ -118,7 +118,7 @@ func TestWatchEtcd(t *testing.T) { }, E: nil, } - c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} + c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond, time.Minute} lastIndex, err := c.fetchNextState(1) if err != nil { t.Errorf("Unexpected error: %v", err) @@ -140,7 +140,7 @@ func TestGetEtcdNotFound(t *testing.T) { R: &etcd.Response{}, E: tools.EtcdErrorNotFound, } - c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} + c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond, time.Minute} _, err := c.fetchNextState(0) if err == nil { t.Errorf("Expected error") @@ -157,7 +157,7 @@ func TestGetEtcdError(t *testing.T) { ErrorCode: 200, // non not found error }, } - c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} + c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond, time.Minute} _, err := c.fetchNextState(0) if err == nil { t.Errorf("Expected error") diff --git a/pkg/registry/controller/storage.go b/pkg/registry/controller/storage.go index 6eb90f14b5..c6523a1c61 100644 --- a/pkg/registry/controller/storage.go +++ b/pkg/registry/controller/storage.go @@ -48,7 +48,7 @@ func NewRegistryStorage(registry Registry, podRegistry pod.Registry) apiserver.R } } -// Create registers then given ReplicationController. +// Create registers the given ReplicationController. func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { controller, ok := obj.(*api.ReplicationController) if !ok { @@ -70,7 +70,7 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { if err != nil { return nil, err } - return rs.waitForController(*controller) + return rs.registry.GetController(controller.ID) }), nil } @@ -124,7 +124,7 @@ func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { if err != nil { return nil, err } - return rs.waitForController(*controller) + return rs.registry.GetController(controller.ID) }), nil } diff --git a/pkg/registry/controller/storage_test.go b/pkg/registry/controller/storage_test.go index 9447126b46..cc06017774 100644 --- a/pkg/registry/controller/storage_test.go +++ b/pkg/registry/controller/storage_test.go @@ -239,30 +239,10 @@ func TestCreateController(t *testing.T) { } select { + case <-channel: + // expected case case <-time.After(time.Millisecond * 100): - // Do nothing, this is expected. - case <-channel: - t.Error("Unexpected read from async channel") - } - - mockPodRegistry.Lock() - mockPodRegistry.Pods = []api.Pod{ - { - JSONBase: api.JSONBase{ID: "foo"}, - Labels: map[string]string{"a": "b"}, - }, - { - JSONBase: api.JSONBase{ID: "bar"}, - Labels: map[string]string{"a": "b"}, - }, - } - mockPodRegistry.Unlock() - - select { - case <-time.After(time.Second * 1): - t.Error("Unexpected timeout") - case <-channel: - // Do nothing, this is expected + t.Error("Unexpected timeout from async channel") } } diff --git a/pkg/registry/pod/storage.go b/pkg/registry/pod/storage.go index 85f64eea2f..34a3bcb80e 100644 --- a/pkg/registry/pod/storage.go +++ b/pkg/registry/pod/storage.go @@ -85,7 +85,7 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { if err := rs.scheduleAndCreatePod(*pod); err != nil { return nil, err } - return rs.waitForPodRunning(*pod) + return rs.registry.GetPod(pod.ID) }), nil } @@ -153,7 +153,7 @@ func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { if err := rs.registry.UpdatePod(*pod); err != nil { return nil, err } - return rs.waitForPodRunning(*pod) + return rs.registry.GetPod(pod.ID) }), nil } diff --git a/pkg/registry/pod/storage_test.go b/pkg/registry/pod/storage_test.go index dfa7627494..37a3a8df16 100644 --- a/pkg/registry/pod/storage_test.go +++ b/pkg/registry/pod/storage_test.go @@ -406,23 +406,10 @@ func TestCreatePod(t *testing.T) { } select { + case <-channel: + // Do nothing, this is expected. case <-time.After(time.Millisecond * 100): - // Do nothing, this is expected. - case <-channel: - t.Error("Unexpected read from async channel") - } - // TODO: Is the below actually testing something? - podRegistry.UpdatePod(api.Pod{ - JSONBase: api.JSONBase{ID: "foo"}, - CurrentState: api.PodState{ - Status: api.PodRunning, - }, - }) - select { - case <-time.After(time.Second * 1): - t.Error("Unexpected timeout") - case <-channel: - // Do nothing, this is expected. + t.Error("Unexpected timeout on async channel") } } diff --git a/pkg/util/wait/doc.go b/pkg/util/wait/doc.go new file mode 100644 index 0000000000..24724fe41b --- /dev/null +++ b/pkg/util/wait/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package wait provides tools for polling or listening for changes +// to a condition. +package wait diff --git a/pkg/util/wait/wait.go b/pkg/util/wait/wait.go new file mode 100644 index 0000000000..a89f048ff3 --- /dev/null +++ b/pkg/util/wait/wait.go @@ -0,0 +1,83 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wait + +import ( + "errors" + "time" +) + +// ErrWaitTimeout is returned when the condition exited without success +var ErrWaitTimeout = errors.New("timed out waiting for the condition") + +// ConditionFunc returns true if the condition is satisfied, or an error +// if the loop should be aborted. +type ConditionFunc func() (done bool, err error) + +// Poll tries a condition func until it returns true, an error, or the +// wait channel is closed. Will always poll at least once. +func Poll(interval time.Duration, cycles int, condition ConditionFunc) error { + return WaitFor(poller(interval, cycles), condition) +} + +// WaitFunc creates a channel that receives an item every time a test +// should be executed and is closed when the last test should be invoked. +type WaitFunc func() <-chan struct{} + +// WaitFor implements the looping for a wait. +func WaitFor(wait WaitFunc, c ConditionFunc) error { + w := wait() + for { + _, open := <-w + ok, err := c() + if err != nil { + return err + } + if ok { + return nil + } + if !open { + break + } + } + return ErrWaitTimeout +} + +// poller returns a WaitFunc that will send to the channel every +// interval until at most cycles * interval has elapsed and then +// close the channel. Over very short intervals you may receive +// no ticks before being closed. +func poller(interval time.Duration, cycles int) WaitFunc { + return WaitFunc(func() <-chan struct{} { + ch := make(chan struct{}) + go func() { + tick := time.NewTicker(interval) + defer tick.Stop() + after := time.After(interval * time.Duration(cycles)) + for { + select { + case <-tick.C: + ch <- struct{}{} + case <-after: + close(ch) + return + } + } + }() + return ch + }) +} diff --git a/pkg/util/wait/wait_test.go b/pkg/util/wait/wait_test.go new file mode 100644 index 0000000000..eb30b4da73 --- /dev/null +++ b/pkg/util/wait/wait_test.go @@ -0,0 +1,125 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wait + +import ( + "errors" + "testing" + "time" +) + +func TestPoller(t *testing.T) { + w := poller(time.Millisecond, 2) + ch := w() + count := 0 +DRAIN: + for { + select { + case _, open := <-ch: + if !open { + break DRAIN + } + count++ + case <-time.After(time.Millisecond * 5): + t.Errorf("unexpected timeout after poll") + } + } + if count > 3 { + t.Errorf("expected up to three values, got %d", count) + } +} + +func fakeTicker(count int) WaitFunc { + return func() <-chan struct{} { + ch := make(chan struct{}) + go func() { + for i := 0; i < count; i++ { + ch <- struct{}{} + } + close(ch) + }() + return ch + } +} + +func TestPoll(t *testing.T) { + invocations := 0 + f := ConditionFunc(func() (bool, error) { + invocations++ + return true, nil + }) + if err := Poll(time.Microsecond, 1, f); err != nil { + t.Fatalf("unexpected error %v", err) + } + if invocations == 0 { + t.Errorf("Expected at least one invocation, got zero") + } +} + +func TestWaitFor(t *testing.T) { + var invocations int + testCases := map[string]struct { + F ConditionFunc + Ticks int + Invoked int + Err bool + }{ + "invoked once": { + ConditionFunc(func() (bool, error) { + invocations++ + return true, nil + }), + 2, + 1, + false, + }, + "invoked and returns a timeout": { + ConditionFunc(func() (bool, error) { + invocations++ + return false, nil + }), + 2, + 3, + true, + }, + "returns immediately on error": { + ConditionFunc(func() (bool, error) { + invocations++ + return false, errors.New("test") + }), + 2, + 1, + true, + }, + } + for k, c := range testCases { + invocations = 0 + ticker := fakeTicker(c.Ticks) + err := WaitFor(ticker, c.F) + switch { + case c.Err && err == nil: + t.Errorf("%s: Expected error, got nil", k) + continue + case !c.Err && err != nil: + t.Errorf("%s: Expected no error, got: %#v", k, err) + continue + } + if invocations != c.Invoked { + t.Errorf("%s: Expected %d invocations, called %d", k, c.Invoked, invocations) + } + } +}