Merge pull request #549 from lavalamp/regWatch

Make replication controllers use new watch interface
pull/6/head
Clayton Coleman 2014-07-31 18:07:23 -04:00
commit 241ab692f6
13 changed files with 187 additions and 331 deletions

View File

@ -104,7 +104,10 @@ func startComponents(manifestURL string) (apiServerURL string) {
handler.delegate = m.ConstructHandler("/api/v1beta1")
controllerManager := controller.MakeReplicationManager(etcdClient, cl)
controllerManager.Run(1 * time.Second)
// Prove that controllerManager's watch works by making it not sync until after this
// test is over. (Hopefully we don't take 10 minutes!)
controllerManager.Run(10 * time.Minute)
// Kubelet (localhost)
cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
@ -192,7 +195,12 @@ func runAtomicPutTest(c *client.Client) {
for {
glog.Infof("Starting to update (%s, %s)", l, v)
var tmpSvc api.Service
err := c.Get().Path("services").Path(svc.ID).Do().Into(&tmpSvc)
err := c.Get().
Path("services").
Path(svc.ID).
PollPeriod(100 * time.Millisecond).
Do().
Into(&tmpSvc)
if err != nil {
glog.Errorf("Error getting atomicService: %v", err)
continue

View File

@ -43,6 +43,7 @@ type RESTStorage interface {
// ResourceWatcher should be implemented by all RESTStorage objects that
// want to offer the ability to watch for changes through the watch api.
type ResourceWatcher interface {
// TODO: take a query, like List, to filter out unwanted events.
WatchAll() (watch.Interface, error)
WatchSingle(id string) (watch.Interface, error)
}

View File

@ -17,8 +17,6 @@ limitations under the License.
package controller
import (
"encoding/json"
"fmt"
"sync"
"time"
@ -27,13 +25,14 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// ReplicationManager is responsible for synchronizing ReplicationController objects stored in etcd
// with actual running pods.
// TODO: Remove the etcd dependency and re-factor in terms of a generic watch interface
// TODO: Allow choice of switching between etcd/apiserver watching, or remove etcd references
// from this file completely.
type ReplicationManager struct {
etcdClient tools.EtcdClient
kubeClient client.Interface
@ -42,6 +41,9 @@ type ReplicationManager struct {
// To allow injection of syncReplicationController for testing.
syncHandler func(controllerSpec api.ReplicationController) error
// To allow injection of watch creation.
watchMaker func() (watch.Interface, error)
}
// PodControlInterface is an interface that knows how to add or delete pods
@ -60,6 +62,7 @@ type RealPodControl struct {
func (r RealPodControl) createReplica(controllerSpec api.ReplicationController) {
labels := controllerSpec.DesiredState.PodTemplate.Labels
// TODO: don't fail to set this label just because the map isn't created.
if labels != nil {
labels["replicationController"] = controllerSpec.ID
}
@ -86,9 +89,8 @@ func MakeReplicationManager(etcdClient tools.EtcdClient, kubeClient client.Inter
kubeClient: kubeClient,
},
}
rm.syncHandler = func(controllerSpec api.ReplicationController) error {
return rm.syncReplicationController(controllerSpec)
}
rm.syncHandler = rm.syncReplicationController
rm.watchMaker = rm.makeAPIWatch
return rm
}
@ -98,71 +100,51 @@ func (rm *ReplicationManager) Run(period time.Duration) {
go util.Forever(func() { rm.watchControllers() }, period)
}
func (rm *ReplicationManager) watchControllers() {
watchChannel := make(chan *etcd.Response)
stop := make(chan bool)
// Ensure that the call to watch ends.
defer close(stop)
// makeEtcdWatch starts watching via etcd.
func (rm *ReplicationManager) makeEtcdWatch() (watch.Interface, error) {
helper := tools.EtcdHelper{rm.etcdClient}
return helper.WatchList("/registry/controllers", tools.Everything)
}
go func() {
defer util.HandleCrash()
_, err := rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, stop)
if err == etcd.ErrWatchStoppedByUser {
close(watchChannel)
} else {
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err)
}
}()
// makeAPIWatch starts watching via the apiserver.
func (rm *ReplicationManager) makeAPIWatch() (watch.Interface, error) {
// TODO: Fix this ugly type assertion.
return rm.kubeClient.(*client.Client).
Get().
Path("watch").
Path("replicationControllers").
Watch()
}
func (rm *ReplicationManager) watchControllers() {
watching, err := rm.watchMaker()
if err != nil {
glog.Errorf("Unexpected failure to watch: %v", err)
time.Sleep(5 * time.Second)
return
}
for {
select {
case <-rm.syncTime:
rm.synchronize()
case watchResponse, open := <-watchChannel:
if !open || watchResponse == nil {
case event, open := <-watching.ResultChan():
if !open {
// watchChannel has been closed, or something else went
// wrong with our etcd watch call. Let the util.Forever()
// that called us call us again.
return
}
glog.Infof("Got watch: %#v", watchResponse)
controller, err := rm.handleWatchResponse(watchResponse)
if err != nil {
glog.Errorf("Error handling data: %#v, %#v", err, watchResponse)
continue
glog.Infof("Got watch: %#v", event)
if rc, ok := event.Object.(*api.ReplicationController); !ok {
glog.Errorf("unexpected object: %#v", event.Object)
} else {
rm.syncHandler(*rc)
}
rm.syncHandler(*controller)
}
}
}
func (rm *ReplicationManager) handleWatchResponse(response *etcd.Response) (*api.ReplicationController, error) {
switch response.Action {
case "set":
if response.Node == nil {
return nil, fmt.Errorf("response node is null %#v", response)
}
var controllerSpec api.ReplicationController
if err := json.Unmarshal([]byte(response.Node.Value), &controllerSpec); err != nil {
return nil, err
}
return &controllerSpec, nil
case "delete":
// Ensure that the final state of a replication controller is applied before it is deleted.
// Otherwise, a replication controller could be modified and then deleted (for example, from 3 to 0
// replicas), and it would be non-deterministic which of its pods continued to exist.
if response.PrevNode == nil {
return nil, fmt.Errorf("previous node is null %#v", response)
}
var controllerSpec api.ReplicationController
if err := json.Unmarshal([]byte(response.PrevNode.Value), &controllerSpec); err != nil {
return nil, err
}
return &controllerSpec, nil
}
return nil, nil
}
func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod {
var result []api.Pod
for _, value := range pods {

View File

@ -29,6 +29,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
)
@ -221,142 +222,6 @@ func TestCreateReplica(t *testing.T) {
}
}
func TestHandleWatchResponseNotSet(t *testing.T) {
body, _ := api.Encode(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, client)
manager.podControl = &fakePodControl
_, err := manager.handleWatchResponse(&etcd.Response{
Action: "update",
})
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
}
func TestHandleWatchResponseNoNode(t *testing.T) {
body, _ := api.Encode(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, client)
manager.podControl = &fakePodControl
_, err := manager.handleWatchResponse(&etcd.Response{
Action: "set",
})
if err == nil {
t.Error("Unexpected non-error")
}
}
func TestHandleWatchResponseBadData(t *testing.T) {
body, _ := api.Encode(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, client)
manager.podControl = &fakePodControl
_, err := manager.handleWatchResponse(&etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: "foobar",
},
})
if err == nil {
t.Error("Unexpected non-error")
}
}
func TestHandleWatchResponse(t *testing.T) {
body, _ := api.Encode(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, client)
manager.podControl = &fakePodControl
controller := makeReplicationController(2)
// TODO: fixme when etcd uses Encode/Decode
data, err := json.Marshal(controller)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
controllerOut, err := manager.handleWatchResponse(&etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: string(data),
},
})
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
if !reflect.DeepEqual(controller, *controllerOut) {
t.Errorf("Unexpected mismatch. Expected %#v, Saw: %#v", controller, controllerOut)
}
}
func TestHandleWatchResponseDelete(t *testing.T) {
body, _ := api.Encode(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, client)
manager.podControl = &fakePodControl
controller := makeReplicationController(2)
// TODO: fixme when etcd writing uses api.Encode
data, err := json.Marshal(controller)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
controllerOut, err := manager.handleWatchResponse(&etcd.Response{
Action: "delete",
PrevNode: &etcd.Node{
Value: string(data),
},
})
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
if !reflect.DeepEqual(controller, *controllerOut) {
t.Errorf("Unexpected mismatch. Expected %#v, Saw: %#v", controller, controllerOut)
}
}
func TestSyncronize(t *testing.T) {
controllerSpec1 := api.ReplicationController{
JSONBase: api.JSONBase{APIVersion: "v1beta1"},
@ -435,9 +300,14 @@ func TestSyncronize(t *testing.T) {
func TestWatchControllers(t *testing.T) {
fakeEtcd := tools.MakeFakeEtcdClient(t)
fakeWatcher := watch.NewFake()
manager := MakeReplicationManager(fakeEtcd, nil)
manager.watchMaker = func() (watch.Interface, error) {
return fakeWatcher, nil
}
var testControllerSpec api.ReplicationController
received := make(chan bool)
received := make(chan struct{})
manager.syncHandler = func(controllerSpec api.ReplicationController) error {
if !reflect.DeepEqual(controllerSpec, testControllerSpec) {
t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec)
@ -448,38 +318,13 @@ func TestWatchControllers(t *testing.T) {
go manager.watchControllers()
fakeEtcd.WaitForWatchCompletion()
// Test normal case
testControllerSpec.ID = "foo"
fakeEtcd.WatchResponse <- &etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: util.MakeJSONString(testControllerSpec),
},
}
fakeWatcher.Add(&testControllerSpec)
select {
case <-received:
case <-time.After(10 * time.Millisecond):
t.Errorf("Expected 1 call but got 0")
}
// Test error case
fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error")
// Did everything shut down?
if _, open := <-fakeEtcd.WatchResponse; open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
// Test purposeful shutdown
go manager.watchControllers()
fakeEtcd.WaitForWatchCompletion()
fakeEtcd.WatchStop <- true
// Did everything shut down?
if _, open := <-fakeEtcd.WatchResponse; open {
t.Errorf("A stop did not cause a graceful shutdown")
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package registry
import (
"errors"
"fmt"
"time"
@ -24,6 +25,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// ControllerRegistryStorage is an implementation of RESTStorage for the api server.
@ -135,3 +137,15 @@ func (storage *ControllerRegistryStorage) waitForController(ctrl api.Replication
}
return ctrl, nil
}
// WatchAll returns ReplicationController events via a watch.Interface, implementing
// apiserver.ResourceWatcher.
func (storage *ControllerRegistryStorage) WatchAll() (watch.Interface, error) {
return storage.registry.WatchControllers()
}
// WatchSingle returns events for a single ReplicationController via a watch.Interface,
// implementing apiserver.ResourceWatcher.
func (storage *ControllerRegistryStorage) WatchSingle(id string) (watch.Interface, error) {
return nil, errors.New("unimplemented")
}

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
type MockControllerRegistry struct {
@ -51,6 +52,9 @@ func (registry *MockControllerRegistry) UpdateController(controller api.Replicat
func (registry *MockControllerRegistry) DeleteController(ID string) error {
return registry.err
}
func (registry *MockControllerRegistry) WatchControllers() (watch.Interface, error) {
return nil, registry.err
}
func TestListControllersError(t *testing.T) {
mockRegistry := MockControllerRegistry{
@ -267,13 +271,13 @@ func TestControllerStorageValidatesCreate(t *testing.T) {
}
failureCases := map[string]api.ReplicationController{
"empty ID": api.ReplicationController{
"empty ID": {
JSONBase: api.JSONBase{ID: ""},
DesiredState: api.ReplicationControllerState{
ReplicaSelector: map[string]string{"bar": "baz"},
},
},
"empty selector": api.ReplicationController{
"empty selector": {
JSONBase: api.JSONBase{ID: "abc"},
DesiredState: api.ReplicationControllerState{},
},
@ -298,13 +302,13 @@ func TestControllerStorageValidatesUpdate(t *testing.T) {
}
failureCases := map[string]api.ReplicationController{
"empty ID": api.ReplicationController{
"empty ID": {
JSONBase: api.JSONBase{ID: ""},
DesiredState: api.ReplicationControllerState{
ReplicaSelector: map[string]string{"bar": "baz"},
},
},
"empty selector": api.ReplicationController{
"empty selector": {
JSONBase: api.JSONBase{ID: "abc"},
DesiredState: api.ReplicationControllerState{},
},

View File

@ -23,6 +23,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
@ -207,6 +208,12 @@ func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, er
return controllers, err
}
// WatchControllers begins watching for new, changed, or deleted controllers.
// TODO: Add id/selector parameters?
func (registry *EtcdRegistry) WatchControllers() (watch.Interface, error) {
return registry.helper().WatchList("/registry/controllers", tools.Everything)
}
func makeControllerKey(id string) string {
return "/registry/controllers/" + id
}

View File

@ -19,6 +19,7 @@ package registry
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// PodRegistry is an interface implemented by things that know how to store Pod objects.
@ -38,6 +39,7 @@ type PodRegistry interface {
// ControllerRegistry is an interface for things that know how to store ReplicationControllers.
type ControllerRegistry interface {
ListControllers() ([]api.ReplicationController, error)
WatchControllers() (watch.Interface, error)
GetController(controllerID string) (*api.ReplicationController, error)
CreateController(controller api.ReplicationController) error
UpdateController(controller api.ReplicationController) error

View File

@ -17,9 +17,12 @@ limitations under the License.
package registry
import (
"errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// An implementation of PodRegistry and ControllerRegistry that is backed by memory
@ -86,6 +89,10 @@ func (registry *MemoryRegistry) ListControllers() ([]api.ReplicationController,
return result, nil
}
func (registry *MemoryRegistry) WatchControllers() (watch.Interface, error) {
return nil, errors.New("unimplemented")
}
func (registry *MemoryRegistry) GetController(controllerID string) (*api.ReplicationController, error) {
controller, found := registry.controllerData[controllerID]
if found {

View File

@ -18,6 +18,7 @@ package tools
import (
"encoding/json"
"fmt"
"io"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -44,7 +45,14 @@ func NewAPIEventDecoder(stream io.ReadCloser) *APIEventDecoder {
func (d *APIEventDecoder) Decode() (action watch.EventType, object interface{}, err error) {
var got api.WatchEvent
err = d.decoder.Decode(&got)
return got.Type, got.Object.Object, err
if err != nil {
return action, nil, err
}
switch got.Type {
case watch.Added, watch.Modified, watch.Deleted:
return got.Type, got.Object.Object, err
}
return action, nil, fmt.Errorf("got invalid watch event type: %v", got.Type)
}
// Close closes the underlying stream.

View File

@ -19,6 +19,7 @@ package tools
import (
"fmt"
"reflect"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -267,23 +268,31 @@ type etcdWatcher struct {
list bool // If we're doing a recursive watch, should be true.
filter FilterFunc
etcdIncoming chan *etcd.Response
etcdStop chan bool
etcdIncoming chan *etcd.Response
etcdStop chan bool
etcdCallEnded chan struct{}
outgoing chan watch.Event
userStop chan struct{}
stopped bool
stopLock sync.Mutex
// Injectable for testing. Send the event down the outgoing channel.
emit func(watch.Event)
}
// Returns a new etcdWatcher; if list is true, watch sub-nodes.
func newEtcdWatcher(list bool, filter FilterFunc) *etcdWatcher {
w := &etcdWatcher{
list: list,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
etcdStop: make(chan bool),
outgoing: make(chan watch.Event),
userStop: make(chan struct{}),
list: list,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
etcdStop: make(chan bool),
etcdCallEnded: make(chan struct{}),
outgoing: make(chan watch.Event),
userStop: make(chan struct{}),
}
w.emit = func(e watch.Event) { w.outgoing <- e }
go w.translate()
return w
}
@ -292,11 +301,9 @@ func newEtcdWatcher(list bool, filter FilterFunc) *etcdWatcher {
// as a goroutine.
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string) {
defer util.HandleCrash()
defer close(w.etcdCallEnded)
_, err := client.Watch(key, 0, w.list, w.etcdIncoming, w.etcdStop)
if err == etcd.ErrWatchStoppedByUser {
// etcd doesn't close the channel in this case.
close(w.etcdIncoming)
} else {
if err != etcd.ErrWatchStoppedByUser {
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err)
}
}
@ -309,6 +316,8 @@ func (w *etcdWatcher) translate() {
for {
select {
case <-w.etcdCallEnded:
return
case <-w.userStop:
w.etcdStop <- true
return
@ -324,7 +333,6 @@ func (w *etcdWatcher) translate() {
func (w *etcdWatcher) sendResult(res *etcd.Response) {
var action watch.EventType
var data []byte
var nodes etcd.Nodes
switch res.Action {
case "set":
if res.Node == nil {
@ -332,7 +340,6 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
return
}
data = []byte(res.Node.Value)
nodes = res.Node.Nodes
// TODO: Is this conditional correct?
if res.EtcdIndex > 0 {
action = watch.Modified
@ -345,38 +352,23 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
return
}
data = []byte(res.PrevNode.Value)
nodes = res.PrevNode.Nodes
action = watch.Deleted
}
// If listing, we're interested in sub-nodes.
if w.list {
for _, n := range nodes {
obj, err := api.Decode([]byte(n.Value))
if err != nil {
glog.Errorf("failure to decode api object: %#v", res)
continue
}
if w.filter != nil && !w.filter(obj) {
continue
}
w.outgoing <- watch.Event{
Type: action,
Object: obj,
}
}
default:
glog.Errorf("unknown action: %v", res.Action)
return
}
obj, err := api.Decode(data)
if err != nil {
glog.Errorf("failure to decode api object: %#v", res)
glog.Errorf("failure to decode api object: '%v' from %#v", string(data), res)
// TODO: expose an error through watch.Interface?
w.Stop()
return
}
w.outgoing <- watch.Event{
w.emit(watch.Event{
Type: action,
Object: obj,
}
})
}
// ResultChannel implements watch.Interface.
@ -386,5 +378,11 @@ func (w *etcdWatcher) ResultChan() <-chan watch.Event {
// Stop implements watch.Interface.
func (w *etcdWatcher) Stop() {
close(w.userStop)
w.stopLock.Lock()
defer w.stopLock.Unlock()
// Prevent double channel closes.
if !w.stopped {
w.stopped = true
close(w.userStop)
}
}

View File

@ -220,71 +220,7 @@ func TestAtomicUpdate(t *testing.T) {
}
func TestWatchInterpretation_ListAdd(t *testing.T) {
called := false
w := newEtcdWatcher(true, func(interface{}) bool {
called = true
return true
})
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := api.Encode(pod)
go w.sendResult(&etcd.Response{
Action: "set",
Node: &etcd.Node{
Nodes: etcd.Nodes{
{
Value: string(podBytes),
},
},
},
})
got := <-w.outgoing
if e, a := watch.Added, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
if !called {
t.Errorf("filter never called")
}
}
func TestWatchInterpretation_ListDelete(t *testing.T) {
called := false
w := newEtcdWatcher(true, func(interface{}) bool {
called = true
return true
})
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := api.Encode(pod)
go w.sendResult(&etcd.Response{
Action: "delete",
PrevNode: &etcd.Node{
Nodes: etcd.Nodes{
{
Value: string(podBytes),
},
},
},
})
got := <-w.outgoing
if e, a := watch.Deleted, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
if !called {
t.Errorf("filter never called")
}
}
func TestWatchInterpretation_SingleAdd(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
})
@ -307,8 +243,8 @@ func TestWatchInterpretation_SingleAdd(t *testing.T) {
}
}
func TestWatchInterpretation_SingleDelete(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
func TestWatchInterpretation_Delete(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
})
@ -331,6 +267,49 @@ func TestWatchInterpretation_SingleDelete(t *testing.T) {
}
}
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: "update",
})
}
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: "set",
})
}
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: "foobar",
},
})
}
func TestWatch(t *testing.T) {
fakeEtcd := MakeFakeEtcdClient(t)
h := EtcdHelper{fakeEtcd}

View File

@ -215,6 +215,9 @@ func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool,
if receiver == nil {
return f.Get(prefix, false, recursive)
} else {
// Emulate etcd's behavior. (I think.)
defer close(receiver)
}
f.watchCompletedChan <- true
@ -222,8 +225,6 @@ func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool,
case <-stop:
return nil, etcd.ErrWatchStoppedByUser
case err := <-injectedError:
// Emulate etcd's behavior.
close(receiver)
return nil, err
}
// Never get here.