Need to remove pods that change labels.

pull/6/head
Daniel Smith 2014-08-21 18:16:46 -07:00
parent 85f98a79c1
commit 72b35816cd
6 changed files with 276 additions and 157 deletions

View File

@ -79,8 +79,15 @@ func (r *Registry) ListPods(selector labels.Selector) ([]api.Pod, error) {
}
// WatchPods begins watching for new, changed, or deleted pods.
func (r *Registry) WatchPods(resourceVersion uint64) (watch.Interface, error) {
return r.WatchList("/registry/pods", resourceVersion, tools.Everything)
func (r *Registry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) {
return r.WatchList("/registry/pods", resourceVersion, func(obj interface{}) bool {
pod, ok := obj.(*api.Pod)
if !ok {
glog.Errorf("Unexpected object during pod watch: %#v", obj)
return false
}
return filter(pod)
})
}
// GetPod gets a specific pod specified by its ID.

View File

@ -27,7 +27,7 @@ type Registry interface {
// ListPods obtains a list of pods that match selector.
ListPods(selector labels.Selector) ([]api.Pod, error)
// Watch for new/changed/deleted pods
WatchPods(resourceVersion uint64) (watch.Interface, error)
WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error)
// Get a specific pod
GetPod(podID string) (*api.Pod, error)
// Create a pod based on a specification.

View File

@ -118,20 +118,14 @@ func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) {
// Watch begins watching for new, changed, or deleted pods.
func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
source, err := rs.registry.WatchPods(resourceVersion)
if err != nil {
return nil, err
}
return watch.Filter(source, func(e watch.Event) (watch.Event, bool) {
pod := e.Object.(*api.Pod)
return rs.registry.WatchPods(resourceVersion, func(pod *api.Pod) bool {
fields := labels.Set{
"ID": pod.ID,
"DesiredState.Status": string(pod.DesiredState.Status),
"DesiredState.Host": pod.DesiredState.Host,
}
passesFilter := label.Matches(labels.Set(pod.Labels)) && field.Matches(fields)
return e, passesFilter
}), nil
return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields)
})
}
func (rs RegistryStorage) New() interface{} {

View File

@ -55,7 +55,8 @@ func (r *PodRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) {
return filtered, nil
}
func (r *PodRegistry) WatchPods(resourceVersion uint64) (watch.Interface, error) {
func (r *PodRegistry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) {
// TODO: wire filter down into the mux; it needs access to current and previous state :(
return r.mux.Watch(), nil
}

View File

@ -343,7 +343,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface,
// })
//
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) {
w := newEtcdWatcher(false, nil, h.Codec, h.ResourceVersioner, transform)
w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}
@ -442,11 +442,7 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming
return
}
copied := *response
if node.ModifiedIndex == node.CreatedIndex {
copied.Action = "create"
} else {
copied.Action = "set"
}
copied.Action = "get"
copied.Node = node
incoming <- &copied
}
@ -473,46 +469,10 @@ func (w *etcdWatcher) translate() {
}
}
func (w *etcdWatcher) sendResult(res *etcd.Response) {
var action watch.EventType
var data []byte
var index uint64
switch res.Action {
case "create":
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
data = []byte(res.Node.Value)
index = res.Node.ModifiedIndex
action = watch.Added
case "set", "compareAndSwap", "get":
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
data = []byte(res.Node.Value)
index = res.Node.ModifiedIndex
action = watch.Modified
case "delete":
if res.PrevNode == nil {
glog.Errorf("unexpected nil prev node: %#v", res)
return
}
data = []byte(res.PrevNode.Value)
index = res.Node.ModifiedIndex
action = watch.Deleted
default:
glog.Errorf("unknown action: %v", res.Action)
return
}
func (w *etcdWatcher) decodeObject(data []byte, index uint64) (interface{}, error) {
obj, err := w.encoding.Decode(data)
if err != nil {
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.Node)
// TODO: expose an error through watch.Interface?
w.Stop()
return
return nil, err
}
// ensure resource version is set on the object we load from etcd
@ -527,18 +487,122 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
obj, err = w.transform(obj)
if err != nil {
glog.Errorf("failure to transform api object %#v: %v", obj, err)
// TODO: expose an error through watch.Interface?
w.Stop()
return
return nil, err
}
}
return obj, nil
}
func (w *etcdWatcher) sendCreate(res *etcd.Response) {
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
data := []byte(res.Node.Value)
obj, err := w.decodeObject(data, res.Node.ModifiedIndex)
if err != nil {
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.Node)
// TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value.
return
}
if !w.filter(obj) {
return
}
action := watch.Added
if res.Node.ModifiedIndex != res.Node.CreatedIndex {
action = watch.Modified
}
w.emit(watch.Event{
Type: action,
Object: obj,
})
}
func (w *etcdWatcher) sendModify(res *etcd.Response) {
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
curData := []byte(res.Node.Value)
curObj, err := w.decodeObject(curData, res.Node.ModifiedIndex)
if err != nil {
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(curData), res, res.Node)
// TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value.
return
}
curObjPasses := w.filter(curObj)
oldObjPasses := false
var oldObj interface{}
if res.PrevNode != nil && res.PrevNode.Value != "" {
// Ignore problems reading the old object.
if oldObj, err = w.decodeObject([]byte(res.PrevNode.Value), res.PrevNode.ModifiedIndex); err == nil {
oldObjPasses = w.filter(oldObj)
}
}
// Some changes to an object may cause it to start or stop matching a filter.
// We need to report those as adds/deletes. So we have to check both the previous
// and current value of the object.
switch {
case curObjPasses && oldObjPasses:
w.emit(watch.Event{
Type: watch.Modified,
Object: curObj,
})
case curObjPasses && !oldObjPasses:
w.emit(watch.Event{
Type: watch.Added,
Object: curObj,
})
case !curObjPasses && oldObjPasses:
w.emit(watch.Event{
Type: watch.Deleted,
Object: oldObj,
})
}
// Do nothing if neither new nor old object passed the filter.
}
func (w *etcdWatcher) sendDelete(res *etcd.Response) {
if res.PrevNode == nil {
glog.Errorf("unexpected nil prev node: %#v", res)
return
}
data := []byte(res.PrevNode.Value)
obj, err := w.decodeObject(data, res.PrevNode.ModifiedIndex)
if err != nil {
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.PrevNode)
// TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value.
return
}
if !w.filter(obj) {
return
}
w.emit(watch.Event{
Type: watch.Deleted,
Object: obj,
})
}
func (w *etcdWatcher) sendResult(res *etcd.Response) {
switch res.Action {
case "create", "get":
w.sendCreate(res)
case "set", "compareAndSwap":
w.sendModify(res)
case "delete":
w.sendDelete(res)
default:
glog.Errorf("unknown action: %v", res.Action)
}
}
// ResultChannel implements watch.Interface.
func (w *etcdWatcher) ResultChan() <-chan watch.Event {
return w.outgoing

View File

@ -365,88 +365,127 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) {
}
}
func TestWatchInterpretation_ListCreate(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec, versioner, nil)
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := codec.Encode(pod)
func TestWatchInterpretations(t *testing.T) {
// Declare some pods to make the test cases compact.
podFoo := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBar := &api.Pod{JSONBase: api.JSONBase{ID: "bar"}}
podBaz := &api.Pod{JSONBase: api.JSONBase{ID: "baz"}}
firstLetterIsB := func(obj interface{}) bool {
return obj.(*api.Pod).ID[0] == 'b'
}
go w.sendResult(&etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(podBytes),
// All of these test cases will be run with the firstLetterIsB FilterFunc.
table := map[string]struct {
actions []string // Run this test item for every action here.
prevNodeValue string
nodeValue string
expectEmit bool
expectType watch.EventType
expectObject interface{}
}{
"create": {
actions: []string{"create", "get"},
nodeValue: api.EncodeOrDie(podBar),
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
})
got := <-w.outgoing
if e, a := watch.Added, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}
func TestWatchInterpretation_ListAdd(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec, versioner, nil)
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := codec.Encode(pod)
go w.sendResult(&etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: string(podBytes),
"create but filter blocks": {
actions: []string{"create", "get"},
nodeValue: api.EncodeOrDie(podFoo),
expectEmit: false,
},
})
got := <-w.outgoing
if e, a := watch.Modified, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}
func TestWatchInterpretation_Delete(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec, versioner, nil)
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := codec.Encode(pod)
go w.sendResult(&etcd.Response{
Action: "delete",
Node: &etcd.Node{
ModifiedIndex: 2,
"delete": {
actions: []string{"delete"},
prevNodeValue: api.EncodeOrDie(podBar),
expectEmit: true,
expectType: watch.Deleted,
expectObject: podBar,
},
PrevNode: &etcd.Node{
Value: string(podBytes),
ModifiedIndex: 1,
"delete but filter blocks": {
actions: []string{"delete"},
nodeValue: api.EncodeOrDie(podFoo),
expectEmit: false,
},
"modify appears to create 1": {
actions: []string{"set", "compareAndSwap"},
nodeValue: api.EncodeOrDie(podBar),
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
"modify appears to create 2": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: api.EncodeOrDie(podFoo),
nodeValue: api.EncodeOrDie(podBar),
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
"modify appears to delete": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: api.EncodeOrDie(podBar),
nodeValue: api.EncodeOrDie(podFoo),
expectEmit: true,
expectType: watch.Deleted,
expectObject: podBar, // Should return last state that passed the filter!
},
"modify modifies": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: api.EncodeOrDie(podBar),
nodeValue: api.EncodeOrDie(podBaz),
expectEmit: true,
expectType: watch.Modified,
expectObject: podBaz,
},
"modify ignores": {
actions: []string{"set", "compareAndSwap"},
nodeValue: api.EncodeOrDie(podFoo),
expectEmit: false,
},
})
got := <-w.outgoing
if e, a := watch.Deleted, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
pod.ResourceVersion = 2
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
for name, item := range table {
for _, action := range item.actions {
w := newEtcdWatcher(true, firstLetterIsB, codec, versioner, nil)
emitCalled := false
w.emit = func(event watch.Event) {
emitCalled = true
if !item.expectEmit {
return
}
if e, a := item.expectType, event.Type; e != a {
t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
}
if e, a := item.expectObject, event.Object; !reflect.DeepEqual(e, a) {
t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
}
}
var n, pn *etcd.Node
if item.nodeValue != "" {
n = &etcd.Node{Value: item.nodeValue}
}
if item.prevNodeValue != "" {
pn = &etcd.Node{Value: item.prevNodeValue}
}
w.sendResult(&etcd.Response{
Action: action,
Node: n,
PrevNode: pn,
})
if e, a := item.expectEmit, emitCalled; e != a {
t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
}
w.Stop()
}
}
}
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec, versioner, nil)
w := newEtcdWatcher(false, Everything, codec, versioner, nil)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
@ -454,35 +493,44 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
w.sendResult(&etcd.Response{
Action: "update",
})
w.Stop()
}
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec, versioner, nil)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, Everything, codec, versioner, nil)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: action,
})
w.Stop()
}
w.sendResult(&etcd.Response{
Action: "set",
})
}
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec, versioner, nil)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, Everything, codec, versioner, nil)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: action,
Node: &etcd.Node{
Value: "foobar",
},
})
w.sendResult(&etcd.Response{
Action: action,
PrevNode: &etcd.Node{
Value: "foobar",
},
})
w.Stop()
}
w.sendResult(&etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: "foobar",
},
})
}
func TestWatch(t *testing.T) {
@ -512,7 +560,7 @@ func TestWatch(t *testing.T) {
}
event := <-watching.ResultChan()
if e, a := watch.Modified, event.Type; e != a {
if e, a := watch.Added, event.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, event.Object; !reflect.DeepEqual(e, a) {
@ -617,11 +665,13 @@ func TestWatchListFromZeroIndex(t *testing.T) {
Nodes: etcd.Nodes{
&etcd.Node{
Value: api.EncodeOrDie(pod),
CreatedIndex: 1,
ModifiedIndex: 1,
Nodes: etcd.Nodes{},
},
&etcd.Node{
Value: api.EncodeOrDie(pod),
CreatedIndex: 2,
ModifiedIndex: 2,
Nodes: etcd.Nodes{},
},
@ -633,15 +683,18 @@ func TestWatchListFromZeroIndex(t *testing.T) {
}
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.WatchList("/some/key", 0, nil)
watching, err := h.WatchList("/some/key", 0, Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// the existing node is detected and the index set
event := <-watching.ResultChan()
event, open := <-watching.ResultChan()
if !open {
t.Fatalf("unexpected channel close")
}
for i := 0; i < 2; i++ {
if e, a := watch.Modified, event.Type; e != a {
if e, a := watch.Added, event.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
actualPod, ok := event.Object.(*api.Pod)