mirror of https://github.com/k3s-io/k3s
fix watch of single object
parent
67b5b080b8
commit
2fa3ae9f15
|
@ -27,6 +27,7 @@ import (
|
|||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/strategicpatch"
|
||||
|
||||
|
@ -180,6 +181,15 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch
|
|||
errorJSON(err, scope.Codec, w)
|
||||
return
|
||||
}
|
||||
|
||||
// Watches for single objects are routed to this function.
|
||||
// Treat a /name parameter the same as a field selector entry.
|
||||
hasName := true
|
||||
_, name, err := scope.Namer.Name(req)
|
||||
if err != nil {
|
||||
hasName = false
|
||||
}
|
||||
|
||||
ctx := scope.ContextFunc(req)
|
||||
ctx = api.WithNamespace(ctx, namespace)
|
||||
|
||||
|
@ -191,6 +201,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch
|
|||
opts := *out.(*api.ListOptions)
|
||||
|
||||
// transform fields
|
||||
// TODO: queryToObject should do this.
|
||||
fn := func(label, value string) (newLabel, newValue string, err error) {
|
||||
return scope.Convertor.ConvertFieldLabel(scope.APIVersion, scope.Kind, label, value)
|
||||
}
|
||||
|
@ -201,6 +212,27 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch
|
|||
return
|
||||
}
|
||||
|
||||
if hasName {
|
||||
// metadata.name is the canonical internal name.
|
||||
// generic.SelectionPredicate will notice that this is
|
||||
// a request for a single object and optimize the
|
||||
// storage query accordingly.
|
||||
nameSelector := fields.OneTermEqualSelector("metadata.name", name)
|
||||
if opts.FieldSelector != nil && !opts.FieldSelector.Empty() {
|
||||
// It doesn't make sense to ask for both a name
|
||||
// and a field selector, since just the name is
|
||||
// sufficient to narrow down the request to a
|
||||
// single object.
|
||||
errorJSON(
|
||||
errors.NewBadRequest("both a name and a field selector provided; please provide one or the other."),
|
||||
scope.Codec,
|
||||
w,
|
||||
)
|
||||
return
|
||||
}
|
||||
opts.FieldSelector = nameSelector
|
||||
}
|
||||
|
||||
if (opts.Watch || forceWatch) && rw != nil {
|
||||
watcher, err := rw.Watch(ctx, opts.LabelSelector, opts.FieldSelector, opts.ResourceVersion)
|
||||
if err != nil {
|
||||
|
|
|
@ -75,20 +75,17 @@ func (endpointsStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object
|
|||
|
||||
// MatchEndpoints returns a generic matcher for a given label and field selector.
|
||||
func MatchEndpoints(label labels.Selector, field fields.Selector) generic.Matcher {
|
||||
return generic.MatcherFunc(func(obj runtime.Object) (bool, error) {
|
||||
endpoints, ok := obj.(*api.Endpoints)
|
||||
if !ok {
|
||||
return false, fmt.Errorf("not a endpoints")
|
||||
}
|
||||
fields := EndpointsToSelectableFields(endpoints)
|
||||
return label.Matches(labels.Set(endpoints.Labels)) && field.Matches(fields), nil
|
||||
})
|
||||
return &generic.SelectionPredicate{label, field, EndpointsAttributes}
|
||||
}
|
||||
|
||||
// EndpointsToSelectableFields returns a label set that represents the object
|
||||
// TODO: fields are not labels, and the validation rules for them do not apply.
|
||||
func EndpointsToSelectableFields(endpoints *api.Endpoints) labels.Set {
|
||||
return labels.Set{
|
||||
"name": endpoints.Name,
|
||||
// EndpointsAttributes returns the attributes of an endpoint such that a
|
||||
// generic.SelectionPredicate can match appropriately.
|
||||
func EndpointsAttributes(obj runtime.Object) (objLabels labels.Set, objFields fields.Set, err error) {
|
||||
endpoints, ok := obj.(*api.Endpoints)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("invalid object type %#v", obj)
|
||||
}
|
||||
return endpoints.Labels, fields.Set{
|
||||
"metadata.name": endpoints.Name,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -117,7 +117,12 @@ func (rs *REST) getAttrs(obj runtime.Object) (objLabels labels.Set, objFields fi
|
|||
if !ok {
|
||||
return nil, nil, fmt.Errorf("invalid object type")
|
||||
}
|
||||
return labels.Set{}, fields.Set{
|
||||
l := event.Labels
|
||||
if l == nil {
|
||||
l = labels.Set{}
|
||||
}
|
||||
return l, fields.Set{
|
||||
"metadata.name": event.Name,
|
||||
"involvedObject.kind": event.InvolvedObject.Kind,
|
||||
"involvedObject.namespace": event.InvolvedObject.Namespace,
|
||||
"involvedObject.name": event.InvolvedObject.Name,
|
||||
|
|
|
@ -162,6 +162,7 @@ func TestRESTGet(t *testing.T) {
|
|||
func TestRESTgetAttrs(t *testing.T) {
|
||||
_, rest := NewTestREST()
|
||||
eventA := &api.Event{
|
||||
ObjectMeta: api.ObjectMeta{Name: "f0118"},
|
||||
InvolvedObject: api.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Name: "foo",
|
||||
|
@ -182,6 +183,7 @@ func TestRESTgetAttrs(t *testing.T) {
|
|||
t.Errorf("diff: %s", util.ObjectDiff(e, a))
|
||||
}
|
||||
expect := fields.Set{
|
||||
"metadata.name": "f0118",
|
||||
"involvedObject.kind": "Pod",
|
||||
"involvedObject.name": "foo",
|
||||
"involvedObject.namespace": "baz",
|
||||
|
|
|
@ -417,7 +417,8 @@ func (e *Etcd) finalizeDelete(obj runtime.Object, runHooks bool) (runtime.Object
|
|||
|
||||
// Watch makes a matcher for the given label and field, and calls
|
||||
// WatchPredicate. If possible, you should customize PredicateFunc to produre a
|
||||
// matcher that matches by key.
|
||||
// matcher that matches by key. generic.SelectionPredicate does this for you
|
||||
// automatically.
|
||||
func (e *Etcd) Watch(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
|
||||
return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion)
|
||||
}
|
||||
|
|
|
@ -110,6 +110,83 @@ func TestClient(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSingleWatch(t *testing.T) {
|
||||
_, s := runAMaster(t)
|
||||
defer s.Close()
|
||||
|
||||
ns := "blargh"
|
||||
deleteAllEtcdKeys()
|
||||
client := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Version()})
|
||||
|
||||
mkEvent := func(i int) *api.Event {
|
||||
name := fmt.Sprintf("event-%v", i)
|
||||
return &api.Event{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Namespace: ns,
|
||||
Name: name,
|
||||
},
|
||||
InvolvedObject: api.ObjectReference{
|
||||
Namespace: ns,
|
||||
Name: name,
|
||||
},
|
||||
Reason: fmt.Sprintf("event %v", i),
|
||||
}
|
||||
}
|
||||
|
||||
rv1 := ""
|
||||
for i := 0; i < 10; i++ {
|
||||
event := mkEvent(i)
|
||||
got, err := client.Events(ns).Create(event)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed creating event %#q: %v", event, err)
|
||||
}
|
||||
if rv1 == "" {
|
||||
rv1 = got.ResourceVersion
|
||||
if rv1 == "" {
|
||||
t.Fatal("did not get a resource version.")
|
||||
}
|
||||
}
|
||||
t.Logf("Created event %#v", got.ObjectMeta)
|
||||
}
|
||||
|
||||
w, err := client.Get().
|
||||
Prefix("watch").
|
||||
NamespaceIfScoped(ns, len(ns) > 0).
|
||||
Resource("events").
|
||||
Name("event-9").
|
||||
Param("resourceVersion", rv1).
|
||||
Watch()
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Failed watch: %v", err)
|
||||
}
|
||||
defer w.Stop()
|
||||
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("watch took longer than 15 seconds")
|
||||
case got, ok := <-w.ResultChan():
|
||||
if !ok {
|
||||
t.Fatal("Watch channel closed unexpectedly.")
|
||||
}
|
||||
|
||||
// We expect to see an ADD of event-9 and only event-9. (This
|
||||
// catches a bug where all the events would have been sent down
|
||||
// the channel.)
|
||||
if e, a := watch.Added, got.Type; e != a {
|
||||
t.Errorf("Wanted %v, got %v", e, a)
|
||||
}
|
||||
switch o := got.Object.(type) {
|
||||
case *api.Event:
|
||||
if e, a := "event-9", o.Name; e != a {
|
||||
t.Errorf("Wanted %v, got %v", e, a)
|
||||
}
|
||||
default:
|
||||
t.Fatalf("Unexpected watch event containing object %#q", got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMultiWatch(t *testing.T) {
|
||||
// Disable this test as long as it demonstrates a problem.
|
||||
// TODO: Reenable this test when we get #6059 resolved.
|
||||
|
|
Loading…
Reference in New Issue