Merge pull request #18290 from wojtek-t/fast_namespace_deletion

Support collection deletion in apiserver.
pull/6/head
Jeff Lowdermilk 2015-12-11 13:34:56 -08:00
commit f6686ba3a4
14 changed files with 12299 additions and 8479 deletions

File diff suppressed because it is too large Load Diff

View File

@ -129,6 +129,83 @@
"consumes": [
"*/*"
]
},
{
"type": "unversioned.Status",
"method": "DELETE",
"summary": "delete collection of HorizontalPodAutoscaler",
"nickname": "deletecollectionNamespacedHorizontalPodAutoscaler",
"parameters": [
{
"type": "string",
"paramType": "query",
"name": "pretty",
"description": "If 'true', then the output is pretty printed.",
"required": false,
"allowMultiple": false
},
{
"type": "string",
"paramType": "query",
"name": "labelSelector",
"description": "A selector to restrict the list of returned objects by their labels. Defaults to everything.",
"required": false,
"allowMultiple": false
},
{
"type": "string",
"paramType": "query",
"name": "fieldSelector",
"description": "A selector to restrict the list of returned objects by their fields. Defaults to everything.",
"required": false,
"allowMultiple": false
},
{
"type": "boolean",
"paramType": "query",
"name": "watch",
"description": "Watch for changes to the described resources and return them as a stream of add, update, and remove notifications. Specify resourceVersion.",
"required": false,
"allowMultiple": false
},
{
"type": "string",
"paramType": "query",
"name": "resourceVersion",
"description": "When specified with a watch call, shows changes that occur after that particular version of a resource. Defaults to changes from the beginning of history.",
"required": false,
"allowMultiple": false
},
{
"type": "integer",
"paramType": "query",
"name": "timeoutSeconds",
"description": "Timeout for the list/watch call.",
"required": false,
"allowMultiple": false
},
{
"type": "string",
"paramType": "path",
"name": "namespace",
"description": "object name and auth scope, such as for teams and projects",
"required": true,
"allowMultiple": false
}
],
"responseMessages": [
{
"code": 200,
"message": "OK",
"responseModel": "unversioned.Status"
}
],
"produces": [
"application/json"
],
"consumes": [
"*/*"
]
}
]
},
@ -852,6 +929,83 @@
"consumes": [
"*/*"
]
},
{
"type": "unversioned.Status",
"method": "DELETE",
"summary": "delete collection of Ingress",
"nickname": "deletecollectionNamespacedIngress",
"parameters": [
{
"type": "string",
"paramType": "query",
"name": "pretty",
"description": "If 'true', then the output is pretty printed.",
"required": false,
"allowMultiple": false
},
{
"type": "string",
"paramType": "query",
"name": "labelSelector",
"description": "A selector to restrict the list of returned objects by their labels. Defaults to everything.",
"required": false,
"allowMultiple": false
},
{
"type": "string",
"paramType": "query",
"name": "fieldSelector",
"description": "A selector to restrict the list of returned objects by their fields. Defaults to everything.",
"required": false,
"allowMultiple": false
},
{
"type": "boolean",
"paramType": "query",
"name": "watch",
"description": "Watch for changes to the described resources and return them as a stream of add, update, and remove notifications. Specify resourceVersion.",
"required": false,
"allowMultiple": false
},
{
"type": "string",
"paramType": "query",
"name": "resourceVersion",
"description": "When specified with a watch call, shows changes that occur after that particular version of a resource. Defaults to changes from the beginning of history.",
"required": false,
"allowMultiple": false
},
{
"type": "integer",
"paramType": "query",
"name": "timeoutSeconds",
"description": "Timeout for the list/watch call.",
"required": false,
"allowMultiple": false
},
{
"type": "string",
"paramType": "path",
"name": "namespace",
"description": "object name and auth scope, such as for teams and projects",
"required": true,
"allowMultiple": false
}
],
"responseMessages": [
{
"code": 200,
"message": "OK",
"responseModel": "unversioned.Status"
}
],
"produces": [
"application/json"
],
"consumes": [
"*/*"
]
}
]
},
@ -1575,6 +1729,83 @@
"consumes": [
"*/*"
]
},
{
"type": "unversioned.Status",
"method": "DELETE",
"summary": "delete collection of Job",
"nickname": "deletecollectionNamespacedJob",
"parameters": [
{
"type": "string",
"paramType": "query",
"name": "pretty",
"description": "If 'true', then the output is pretty printed.",
"required": false,
"allowMultiple": false
},
{
"type": "string",
"paramType": "query",
"name": "labelSelector",
"description": "A selector to restrict the list of returned objects by their labels. Defaults to everything.",
"required": false,
"allowMultiple": false
},
{
"type": "string",
"paramType": "query",
"name": "fieldSelector",
"description": "A selector to restrict the list of returned objects by their fields. Defaults to everything.",
"required": false,
"allowMultiple": false
},
{
"type": "boolean",
"paramType": "query",
"name": "watch",
"description": "Watch for changes to the described resources and return them as a stream of add, update, and remove notifications. Specify resourceVersion.",
"required": false,
"allowMultiple": false
},
{
"type": "string",
"paramType": "query",
"name": "resourceVersion",
"description": "When specified with a watch call, shows changes that occur after that particular version of a resource. Defaults to changes from the beginning of history.",
"required": false,
"allowMultiple": false
},
{
"type": "integer",
"paramType": "query",
"name": "timeoutSeconds",
"description": "Timeout for the list/watch call.",
"required": false,
"allowMultiple": false
},
{
"type": "string",
"paramType": "path",
"name": "namespace",
"description": "object name and auth scope, such as for teams and projects",
"required": true,
"allowMultiple": false
}
],
"responseMessages": [
{
"code": 200,
"message": "OK",
"responseModel": "unversioned.Status"
}
],
"produces": [
"application/json"
],
"consumes": [
"*/*"
]
}
]
},
@ -2574,24 +2805,6 @@
}
}
},
"json.WatchEvent": {
"id": "json.WatchEvent",
"properties": {
"type": {
"type": "string",
"description": "the type of watch event; may be ADDED, MODIFIED, DELETED, or ERROR"
},
"object": {
"type": "string",
"description": "the object being watched; will match the type of the resource endpoint or be a Status object if the type is ERROR"
}
}
},
"unversioned.Patch": {
"id": "unversioned.Patch",
"description": "Patch is provided to give a concrete name and type to the Kubernetes PATCH request body.",
"properties": {}
},
"unversioned.Status": {
"id": "unversioned.Status",
"description": "Status is a return value for calls that don't return other objects.",
@ -2675,6 +2888,24 @@
}
}
},
"json.WatchEvent": {
"id": "json.WatchEvent",
"properties": {
"type": {
"type": "string",
"description": "the type of watch event; may be ADDED, MODIFIED, DELETED, or ERROR"
},
"object": {
"type": "string",
"description": "the object being watched; will match the type of the resource endpoint or be a Status object if the type is ERROR"
}
}
},
"unversioned.Patch": {
"id": "unversioned.Patch",
"description": "Patch is provided to give a concrete name and type to the Kubernetes PATCH request body.",
"properties": {}
},
"v1.DeleteOptions": {
"id": "v1.DeleteOptions",
"description": "DeleteOptions may be provided when deleting an API object",

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -125,6 +125,17 @@ func (w GracefulDeleteAdapter) Delete(ctx api.Context, name string, options *api
return w.Deleter.Delete(ctx, name)
}
// CollectionDeleter is an object that can delete a collection
// of RESTful resources.
type CollectionDeleter interface {
// DeleteCollection selects all resources in the storage matching given 'listOptions'
// and deletes them. If 'options' are provided, the resource will attempt to honor
// them or return an invalid request error.
// DeleteCollection may not be atomic - i.e. it may delete some objects and still
// return an error after it. On success, returns a list of deleted objects.
DeleteCollection(ctx api.Context, options *api.DeleteOptions, listOptions *unversioned.ListOptions) (runtime.Object, error)
}
// Creater is an object that can create an instance of a RESTful object.
type Creater interface {
// New returns an empty object that can be used with Create after request data has been put into it.
@ -192,6 +203,7 @@ type StandardStorage interface {
Lister
CreaterUpdater
GracefulDeleter
CollectionDeleter
Watcher
}

View File

@ -199,6 +199,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
deleter, isDeleter := storage.(rest.Deleter)
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
updater, isUpdater := storage.(rest.Updater)
patcher, isPatcher := storage.(rest.Patcher)
watcher, isWatcher := storage.(rest.Watcher)
@ -334,6 +335,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
// Add actions at the resource path: /api/apiVersion/resource
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer}, isCollectionDeleter)
// DEPRECATED
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer}, allowWatchList)
// Add actions at the item path: /api/apiVersion/resource/{name}
@ -372,6 +375,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer}, isCollectionDeleter)
// DEPRECATED
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer}, allowWatchList)
@ -567,6 +571,24 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
}
addParams(route, action.Params)
ws.Route(route)
case "DELETECOLLECTION":
doc := "delete collection of " + kind
if hasSubresource {
doc = "delete collection of " + subresource + " of a " + kind
}
route := ws.DELETE(action.Path).To(DeleteCollection(collectionDeleter, isCollectionDeleter, reqScope, admit)).
Filter(m).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("deletecollection"+namespaced+kind+strings.Title(subresource)).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...).
Writes(versionedStatus).
Returns(http.StatusOK, "OK", versionedStatus)
if err := addObjectParams(ws, route, versionedListOptions); err != nil {
return nil, err
}
addParams(route, action.Params)
ws.Route(route)
// TODO: deprecated
case "WATCH": // Watch a resource.
doc := "watch changes to an object of kind " + kind

View File

@ -730,6 +730,99 @@ func DeleteResource(r rest.GracefulDeleter, checkBody bool, scope RequestScope,
}
}
// DeleteCollection returns a function that will handle a collection deletion
func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestScope, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
w := res.ResponseWriter
// TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer)
timeout := parseTimeout(req.Request.URL.Query().Get("timeout"))
namespace, err := scope.Namer.Namespace(req)
if err != nil {
errorJSON(err, scope.Codec, w)
return
}
ctx := scope.ContextFunc(req)
ctx = api.WithNamespace(ctx, namespace)
if admit != nil && admit.Handles(admission.Delete) {
userInfo, _ := api.UserFrom(ctx)
err = admit.Admit(admission.NewAttributesRecord(nil, scope.Kind.GroupKind(), namespace, "", scope.Resource.GroupResource(), scope.Subresource, admission.Delete, userInfo))
if err != nil {
errorJSON(err, scope.Codec, w)
return
}
}
listOptions := unversioned.ListOptions{}
if err := scope.Codec.DecodeParametersInto(req.Request.URL.Query(), &listOptions); err != nil {
errorJSON(err, scope.Codec, w)
return
}
// transform fields
// TODO: DecodeParametersInto should do this.
if listOptions.FieldSelector.Selector != nil {
fn := func(label, value string) (newLabel, newValue string, err error) {
return scope.Convertor.ConvertFieldLabel(scope.Kind.GroupVersion().String(), scope.Kind.Kind, label, value)
}
if listOptions.FieldSelector.Selector, err = listOptions.FieldSelector.Selector.Transform(fn); err != nil {
// TODO: allow bad request to set field causes based on query parameters
err = errors.NewBadRequest(err.Error())
errorJSON(err, scope.Codec, w)
return
}
}
options := &api.DeleteOptions{}
if checkBody {
body, err := readBody(req.Request)
if err != nil {
errorJSON(err, scope.Codec, w)
return
}
if len(body) > 0 {
if err := scope.Codec.DecodeInto(body, options); err != nil {
errorJSON(err, scope.Codec, w)
return
}
}
}
result, err := finishRequest(timeout, func() (runtime.Object, error) {
return r.DeleteCollection(ctx, options, &listOptions)
})
if err != nil {
errorJSON(err, scope.Codec, w)
return
}
// if the rest.Deleter returns a nil object, fill out a status. Callers may return a valid
// object with the response.
if result == nil {
result = &unversioned.Status{
Status: unversioned.StatusSuccess,
Code: http.StatusOK,
Details: &unversioned.StatusDetails{
Kind: scope.Kind.Kind,
},
}
} else {
// when a non-status response is returned, set the self link
if _, ok := result.(*unversioned.Status); !ok {
if _, err := setListSelfLink(result, req, scope.Namer); err != nil {
errorJSON(err, scope.Codec, w)
return
}
}
}
write(http.StatusOK, scope.Kind.GroupVersion(), scope.Codec, result, w, req.Request)
}
}
// resultFunc is a function that returns a rest result and can be run in a goroutine
type resultFunc func() (runtime.Object, error)

View File

@ -42,6 +42,8 @@ type EventInterface interface {
// Search finds events about the specified object
Search(objOrRef runtime.Object) (*api.EventList, error)
Delete(name string) error
// DeleteCollection deletes a collection of events.
DeleteCollection(options *api.DeleteOptions, listOptions unversioned.ListOptions) error
// Returns the appropriate field selector based on the API version being used to communicate with the server.
// The returned field selector can be used with List and Watch to filter desired events.
GetFieldSelector(involvedObjectName, involvedObjectNamespace, involvedObjectKind, involvedObjectUID *string) fields.Selector
@ -184,6 +186,30 @@ func (e *events) Delete(name string) error {
Error()
}
// DeleteCollection deletes a collection of objects.
func (e *events) DeleteCollection(options *api.DeleteOptions, listOptions unversioned.ListOptions) error {
// TODO: to make this reusable in other client libraries
if options == nil {
return e.client.Delete().
NamespaceIfScoped(e.namespace, len(e.namespace) > 0).
Resource("events").
VersionedParams(&listOptions, api.Scheme).
Do().
Error()
}
body, err := api.Scheme.EncodeToVersion(options, e.client.APIVersion().String())
if err != nil {
return err
}
return e.client.Delete().
NamespaceIfScoped(e.namespace, len(e.namespace) > 0).
Resource("events").
VersionedParams(&listOptions, api.Scheme).
Body(body).
Do().
Error()
}
// Returns the appropriate field selector based on the API version being used to communicate with the server.
// The returned field selector can be used with List and Watch to filter desired events.
func (e *events) GetFieldSelector(involvedObjectName, involvedObjectNamespace, involvedObjectKind, involvedObjectUID *string) fields.Selector {

View File

@ -166,6 +166,41 @@ func NewDeleteAction(resource, namespace, name string) DeleteActionImpl {
return action
}
func NewRootDeleteCollectionAction(resource string, opts unversioned.ListOptions) DeleteCollectionActionImpl {
action := DeleteCollectionActionImpl{}
action.Verb = "delete-collection"
action.Resource = resource
labelSelector := opts.LabelSelector.Selector
if labelSelector == nil {
labelSelector = labels.Everything()
}
fieldSelector := opts.FieldSelector.Selector
if fieldSelector == nil {
fieldSelector = fields.Everything()
}
action.ListRestrictions = ListRestrictions{labelSelector, fieldSelector}
return action
}
func NewDeleteCollectionAction(resource, namespace string, opts unversioned.ListOptions) DeleteCollectionActionImpl {
action := DeleteCollectionActionImpl{}
action.Verb = "delete-collection"
action.Resource = resource
action.Namespace = namespace
labelSelector := opts.LabelSelector.Selector
if labelSelector == nil {
labelSelector = labels.Everything()
}
fieldSelector := opts.FieldSelector.Selector
if fieldSelector == nil {
fieldSelector = fields.Everything()
}
action.ListRestrictions = ListRestrictions{labelSelector, fieldSelector}
return action
}
func NewRootWatchAction(resource string, opts unversioned.ListOptions) WatchActionImpl {
action := WatchActionImpl{}
action.Verb = "watch"
@ -363,6 +398,15 @@ func (a DeleteActionImpl) GetName() string {
return a.Name
}
type DeleteCollectionActionImpl struct {
ActionImpl
ListRestrictions ListRestrictions
}
func (a DeleteCollectionActionImpl) GetListRestrictions() ListRestrictions {
return a.ListRestrictions
}
type WatchActionImpl struct {
ActionImpl
WatchRestrictions WatchRestrictions

View File

@ -110,6 +110,15 @@ func (c *FakeEvents) Delete(name string) error {
return err
}
func (c *FakeEvents) DeleteCollection(options *api.DeleteOptions, listOptions unversioned.ListOptions) error {
action := NewRootDeleteCollectionAction("events", listOptions)
if c.Namespace != "" {
action = NewDeleteCollectionAction("events", c.Namespace, listOptions)
}
_, err := c.Fake.Invokes(action, &api.EventList{})
return err
}
// Watch starts watching for events matching the given selectors.
func (c *FakeEvents) Watch(opts unversioned.ListOptions) (watch.Interface, error) {
action := NewRootWatchAction("events", opts)

View File

@ -436,17 +436,7 @@ func deletePods(kubeClient client.Interface, ns string, before unversioned.Time)
}
func deleteEvents(kubeClient client.Interface, ns string) error {
items, err := kubeClient.Events(ns).List(unversioned.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Events(ns).Delete(items.Items[i].Name)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
return kubeClient.Events(ns).DeleteCollection(nil, unversioned.ListOptions{})
}
func deleteSecrets(kubeClient client.Interface, ns string) error {

View File

@ -109,7 +109,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV
strings.Join([]string{"list", "resourcequotas", ""}, "-"),
strings.Join([]string{"list", "secrets", ""}, "-"),
strings.Join([]string{"list", "limitranges", ""}, "-"),
strings.Join([]string{"list", "events", ""}, "-"),
strings.Join([]string{"delete-collection", "events", ""}, "-"),
strings.Join([]string{"list", "serviceaccounts", ""}, "-"),
strings.Join([]string{"list", "persistentvolumeclaims", ""}, "-"),
strings.Join([]string{"create", "namespaces", "finalize"}, "-"),

View File

@ -22,6 +22,7 @@ import (
"k8s.io/kubernetes/pkg/api"
kubeerr "k8s.io/kubernetes/pkg/api/errors"
etcderr "k8s.io/kubernetes/pkg/api/errors/etcd"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation"
@ -418,6 +419,37 @@ func (e *Etcd) Delete(ctx api.Context, name string, options *api.DeleteOptions)
return e.finalizeDelete(out, true)
}
// DeleteCollection remove all items returned by List with a given ListOptions from etcd.
//
// DeleteCollection is currently NOT atomic. It can happen that only subset of objects
// will be deleted from etcd, and then an error will be returned.
// In case of success, the list of deleted objects will be returned.
//
// TODO: Currently, there is no easy way to remove 'directory' entry from etcd (if we
// are removing all objects of a given type) with the current API (it's technically
// possibly with etcd API, but watch is not delivered correctly then).
// It will be possible to fix it with v3 etcd API.
func (e *Etcd) DeleteCollection(ctx api.Context, options *api.DeleteOptions, listOptions *unversioned.ListOptions) (runtime.Object, error) {
listObj, err := e.List(ctx, listOptions)
if err != nil {
return nil, err
}
items, err := meta.ExtractList(listObj)
if err != nil {
return nil, err
}
for _, item := range items {
accessor, err := meta.Accessor(item)
if err != nil {
return nil, err
}
if _, err := e.Delete(ctx, accessor.Name(), options); err != nil {
return nil, err
}
}
return listObj, nil
}
func (e *Etcd) finalizeDelete(obj runtime.Object, runHooks bool) (runtime.Object, error) {
if runHooks && e.AfterDelete != nil {
if err := e.AfterDelete(obj); err != nil {

View File

@ -25,6 +25,9 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
@ -90,7 +93,20 @@ func NewTestGenericEtcdRegistry(t *testing.T) (*etcdtesting.EtcdTestServer, *Etc
return path.Join(podPrefix, id), nil
},
ObjectNameFunc: func(obj runtime.Object) (string, error) { return obj.(*api.Pod).Name, nil },
Storage: s,
PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher {
return &generic.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, nil, fmt.Errorf("not a pod")
}
return labels.Set(pod.ObjectMeta.Labels), generic.ObjectMetaFieldsSet(pod.ObjectMeta, true), nil
},
}
},
Storage: s,
}
}
@ -366,6 +382,78 @@ func TestEtcdDelete(t *testing.T) {
}
}
func TestEtcdDeleteCollection(t *testing.T) {
podA := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
podB := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}
testContext := api.WithNamespace(api.NewContext(), "test")
server, registry := NewTestGenericEtcdRegistry(t)
defer server.Terminate(t)
if _, err := registry.Create(testContext, podA); err != nil {
t.Errorf("Unexpected error: %v", err)
}
if _, err := registry.Create(testContext, podB); err != nil {
t.Errorf("Unexpected error: %v", err)
}
// Delete all pods.
deleted, err := registry.DeleteCollection(testContext, nil, &unversioned.ListOptions{})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
deletedPods := deleted.(*api.PodList)
if len(deletedPods.Items) != 2 {
t.Errorf("Unexpected number of pods deleted: %d, expected: 2", len(deletedPods.Items))
}
if _, err := registry.Get(testContext, podA.Name); !errors.IsNotFound(err) {
t.Errorf("Unexpected error: %v", err)
}
if _, err := registry.Get(testContext, podB.Name); !errors.IsNotFound(err) {
t.Errorf("Unexpected error: %v", err)
}
}
// Test whether objects deleted with DeleteCollection are correctly delivered
// to watchers.
func TestEtcdDeleteCollectionWithWatch(t *testing.T) {
podA := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
testContext := api.WithNamespace(api.NewContext(), "test")
server, registry := NewTestGenericEtcdRegistry(t)
defer server.Terminate(t)
objCreated, err := registry.Create(testContext, podA)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
podCreated := objCreated.(*api.Pod)
watcher, err := registry.WatchPredicate(testContext, setMatcher{sets.NewString("foo")}, podCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := registry.DeleteCollection(testContext, nil, &unversioned.ListOptions{}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
got, open := <-watcher.ResultChan()
if !open {
t.Errorf("Unexpected channel close")
} else {
if got.Type != "DELETED" {
t.Errorf("Unexpected event type: %s", got.Type)
}
gotObject := got.Object.(*api.Pod)
gotObject.ResourceVersion = podCreated.ResourceVersion
if e, a := podCreated, gotObject; !reflect.DeepEqual(e, a) {
t.Errorf("Expected: %#v, got: %#v", e, a)
}
}
}
func TestEtcdWatch(t *testing.T) {
testContext := api.WithNamespace(api.NewContext(), "test")
noNamespaceContext := api.NewContext()