Return immediately when controllers/pods are committed

Add client waiting conditions.
pull/6/head
Clayton Coleman 2014-08-18 17:42:08 -04:00
parent 34031dbc6a
commit 493863eb93
10 changed files with 311 additions and 58 deletions

View File

@ -37,6 +37,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
)
@ -135,7 +136,24 @@ func startComponents(manifestURL string) (apiServerURL string) {
return apiServer.URL
}
func runReplicationControllerTest(kubeClient *client.Client) {
// podsOnMinions returns true when all of the selected pods exist on a minion.
func podsOnMinions(c *client.Client, pods api.PodList) wait.ConditionFunc {
podInfo := fakePodInfoGetter{}
return func() (bool, error) {
for i := range pods.Items {
host, id := pods.Items[i].CurrentState.Host, pods.Items[i].ID
if len(host) == 0 {
return false, nil
}
if _, err := podInfo.GetPodInfo(host, id); err != nil {
return false, nil
}
}
return true, nil
}
}
func runReplicationControllerTest(c *client.Client) {
data, err := ioutil.ReadFile("api/examples/controller.json")
if err != nil {
glog.Fatalf("Unexpected error: %#v", err)
@ -146,20 +164,26 @@ func runReplicationControllerTest(kubeClient *client.Client) {
}
glog.Infof("Creating replication controllers")
if _, err := kubeClient.CreateReplicationController(controllerRequest); err != nil {
if _, err := c.CreateReplicationController(controllerRequest); err != nil {
glog.Fatalf("Unexpected error: %#v", err)
}
glog.Infof("Done creating replication controllers")
// Give the controllers some time to actually create the pods
time.Sleep(time.Second * 10)
// Validate that they're truly up.
pods, err := kubeClient.ListPods(labels.Set(controllerRequest.DesiredState.ReplicaSelector).AsSelector())
if err != nil || len(pods.Items) != controllerRequest.DesiredState.Replicas {
glog.Fatalf("FAILED: %#v", pods.Items)
if err := wait.Poll(time.Second, 10, c.ControllerHasDesiredReplicas(controllerRequest)); err != nil {
glog.Fatalf("FAILED: pods never created %v", err)
}
glog.Infof("Replication controller produced:\n\n%#v\n\n", pods)
// wait for minions to indicate they have info about the desired pods
pods, err := c.ListPods(labels.Set(controllerRequest.DesiredState.ReplicaSelector).AsSelector())
if err != nil {
glog.Fatalf("FAILED: unable to get pods to list: %v", err)
}
if err := wait.Poll(time.Second, 10, podsOnMinions(c, pods)); err != nil {
glog.Fatalf("FAILED: pods never started running %v", err)
}
glog.Infof("Pods created")
}
func runAtomicPutTest(c *client.Client) {

35
pkg/client/conditions.go Normal file
View File

@ -0,0 +1,35 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
)
// ControllerHasDesiredReplicas returns a condition that will be true iff the desired replica count
// for a controller's ReplicaSelector equals the Replicas count.
func (c *Client) ControllerHasDesiredReplicas(controller api.ReplicationController) wait.ConditionFunc {
return func() (bool, error) {
pods, err := c.ListPods(labels.Set(controller.DesiredState.ReplicaSelector).AsSelector())
if err != nil {
return false, err
}
return len(pods.Items) == controller.DesiredState.Replicas, nil
}
}

View File

@ -63,7 +63,7 @@ func TestGetEtcdNoData(t *testing.T) {
R: &etcd.Response{},
E: nil,
}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond, time.Minute}
_, err := c.fetchNextState(0)
if err == nil {
t.Errorf("Expected error")
@ -86,7 +86,7 @@ func TestGetEtcd(t *testing.T) {
},
E: nil,
}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond, time.Minute}
lastIndex, err := c.fetchNextState(0)
if err != nil {
t.Errorf("Unexpected error: %v", err)
@ -118,7 +118,7 @@ func TestWatchEtcd(t *testing.T) {
},
E: nil,
}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond, time.Minute}
lastIndex, err := c.fetchNextState(1)
if err != nil {
t.Errorf("Unexpected error: %v", err)
@ -140,7 +140,7 @@ func TestGetEtcdNotFound(t *testing.T) {
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond, time.Minute}
_, err := c.fetchNextState(0)
if err == nil {
t.Errorf("Expected error")
@ -157,7 +157,7 @@ func TestGetEtcdError(t *testing.T) {
ErrorCode: 200, // non not found error
},
}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond, time.Minute}
_, err := c.fetchNextState(0)
if err == nil {
t.Errorf("Expected error")

View File

@ -48,7 +48,7 @@ func NewRegistryStorage(registry Registry, podRegistry pod.Registry) apiserver.R
}
}
// Create registers then given ReplicationController.
// Create registers the given ReplicationController.
func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
controller, ok := obj.(*api.ReplicationController)
if !ok {
@ -70,7 +70,7 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
if err != nil {
return nil, err
}
return rs.waitForController(*controller)
return rs.registry.GetController(controller.ID)
}), nil
}
@ -124,7 +124,7 @@ func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
if err != nil {
return nil, err
}
return rs.waitForController(*controller)
return rs.registry.GetController(controller.ID)
}), nil
}

View File

@ -239,30 +239,10 @@ func TestCreateController(t *testing.T) {
}
select {
case <-channel:
// expected case
case <-time.After(time.Millisecond * 100):
// Do nothing, this is expected.
case <-channel:
t.Error("Unexpected read from async channel")
}
mockPodRegistry.Lock()
mockPodRegistry.Pods = []api.Pod{
{
JSONBase: api.JSONBase{ID: "foo"},
Labels: map[string]string{"a": "b"},
},
{
JSONBase: api.JSONBase{ID: "bar"},
Labels: map[string]string{"a": "b"},
},
}
mockPodRegistry.Unlock()
select {
case <-time.After(time.Second * 1):
t.Error("Unexpected timeout")
case <-channel:
// Do nothing, this is expected
t.Error("Unexpected timeout from async channel")
}
}

View File

@ -85,7 +85,7 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
if err := rs.scheduleAndCreatePod(*pod); err != nil {
return nil, err
}
return rs.waitForPodRunning(*pod)
return rs.registry.GetPod(pod.ID)
}), nil
}
@ -153,7 +153,7 @@ func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
if err := rs.registry.UpdatePod(*pod); err != nil {
return nil, err
}
return rs.waitForPodRunning(*pod)
return rs.registry.GetPod(pod.ID)
}), nil
}

View File

@ -406,23 +406,10 @@ func TestCreatePod(t *testing.T) {
}
select {
case <-channel:
// Do nothing, this is expected.
case <-time.After(time.Millisecond * 100):
// Do nothing, this is expected.
case <-channel:
t.Error("Unexpected read from async channel")
}
// TODO: Is the below actually testing something?
podRegistry.UpdatePod(api.Pod{
JSONBase: api.JSONBase{ID: "foo"},
CurrentState: api.PodState{
Status: api.PodRunning,
},
})
select {
case <-time.After(time.Second * 1):
t.Error("Unexpected timeout")
case <-channel:
// Do nothing, this is expected.
t.Error("Unexpected timeout on async channel")
}
}

19
pkg/util/wait/doc.go Normal file
View File

@ -0,0 +1,19 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package wait provides tools for polling or listening for changes
// to a condition.
package wait

83
pkg/util/wait/wait.go Normal file
View File

@ -0,0 +1,83 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package wait
import (
"errors"
"time"
)
// ErrWaitTimeout is returned when the condition exited without success
var ErrWaitTimeout = errors.New("timed out waiting for the condition")
// ConditionFunc returns true if the condition is satisfied, or an error
// if the loop should be aborted.
type ConditionFunc func() (done bool, err error)
// Poll tries a condition func until it returns true, an error, or the
// wait channel is closed. Will always poll at least once.
func Poll(interval time.Duration, cycles int, condition ConditionFunc) error {
return WaitFor(poller(interval, cycles), condition)
}
// WaitFunc creates a channel that receives an item every time a test
// should be executed and is closed when the last test should be invoked.
type WaitFunc func() <-chan struct{}
// WaitFor implements the looping for a wait.
func WaitFor(wait WaitFunc, c ConditionFunc) error {
w := wait()
for {
_, open := <-w
ok, err := c()
if err != nil {
return err
}
if ok {
return nil
}
if !open {
break
}
}
return ErrWaitTimeout
}
// poller returns a WaitFunc that will send to the channel every
// interval until at most cycles * interval has elapsed and then
// close the channel. Over very short intervals you may receive
// no ticks before being closed.
func poller(interval time.Duration, cycles int) WaitFunc {
return WaitFunc(func() <-chan struct{} {
ch := make(chan struct{})
go func() {
tick := time.NewTicker(interval)
defer tick.Stop()
after := time.After(interval * time.Duration(cycles))
for {
select {
case <-tick.C:
ch <- struct{}{}
case <-after:
close(ch)
return
}
}
}()
return ch
})
}

125
pkg/util/wait/wait_test.go Normal file
View File

@ -0,0 +1,125 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package wait
import (
"errors"
"testing"
"time"
)
func TestPoller(t *testing.T) {
w := poller(time.Millisecond, 2)
ch := w()
count := 0
DRAIN:
for {
select {
case _, open := <-ch:
if !open {
break DRAIN
}
count++
case <-time.After(time.Millisecond * 5):
t.Errorf("unexpected timeout after poll")
}
}
if count > 3 {
t.Errorf("expected up to three values, got %d", count)
}
}
func fakeTicker(count int) WaitFunc {
return func() <-chan struct{} {
ch := make(chan struct{})
go func() {
for i := 0; i < count; i++ {
ch <- struct{}{}
}
close(ch)
}()
return ch
}
}
func TestPoll(t *testing.T) {
invocations := 0
f := ConditionFunc(func() (bool, error) {
invocations++
return true, nil
})
if err := Poll(time.Microsecond, 1, f); err != nil {
t.Fatalf("unexpected error %v", err)
}
if invocations == 0 {
t.Errorf("Expected at least one invocation, got zero")
}
}
func TestWaitFor(t *testing.T) {
var invocations int
testCases := map[string]struct {
F ConditionFunc
Ticks int
Invoked int
Err bool
}{
"invoked once": {
ConditionFunc(func() (bool, error) {
invocations++
return true, nil
}),
2,
1,
false,
},
"invoked and returns a timeout": {
ConditionFunc(func() (bool, error) {
invocations++
return false, nil
}),
2,
3,
true,
},
"returns immediately on error": {
ConditionFunc(func() (bool, error) {
invocations++
return false, errors.New("test")
}),
2,
1,
true,
},
}
for k, c := range testCases {
invocations = 0
ticker := fakeTicker(c.Ticks)
err := WaitFor(ticker, c.F)
switch {
case c.Err && err == nil:
t.Errorf("%s: Expected error, got nil", k)
continue
case !c.Err && err != nil:
t.Errorf("%s: Expected no error, got: %#v", k, err)
continue
}
if invocations != c.Invoked {
t.Errorf("%s: Expected %d invocations, called %d", k, c.Invoked, invocations)
}
}
}