Fix watch cache filtering

pull/6/head
Jordan Liggitt 2016-07-13 22:21:25 -04:00
parent bea382c124
commit 4fcd999c25
No known key found for this signature in database
GPG Key ID: 24E7ADF9A3B42012
4 changed files with 99 additions and 3 deletions

View File

@ -21,7 +21,6 @@ import (
"net/http" "net/http"
"reflect" "reflect"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
@ -479,7 +478,7 @@ func filterFunction(key string, keyFunc func(runtime.Object) (string, error), fi
glog.Errorf("invalid object for filter: %v", obj) glog.Errorf("invalid object for filter: %v", obj)
return false return false
} }
if !strings.HasPrefix(objKey, key) { if !hasPathPrefix(objKey, key) {
return false return false
} }
return filter.Filter(obj) return filter.Filter(obj)

View File

@ -79,7 +79,7 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod {
} }
return newObj.(*api.Pod), nil, nil return newObj.(*api.Pod), nil, nil
} }
key := etcdtest.AddPrefix("pods/ns/" + obj.Name) key := etcdtest.AddPrefix("pods/" + obj.Namespace + "/" + obj.Name)
if err := s.GuaranteedUpdate(context.TODO(), key, &api.Pod{}, old == nil, nil, updateFn); err != nil { if err := s.GuaranteedUpdate(context.TODO(), key, &api.Pod{}, old == nil, nil, updateFn); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -110,6 +110,12 @@ func TestList(t *testing.T) {
_ = updatePod(t, etcdStorage, podFooPrime, fooCreated) _ = updatePod(t, etcdStorage, podFooPrime, fooCreated)
// Create a pod in a namespace that contains "ns" as a prefix
// Make sure it is not returned in a watch of "ns"
podFooNS2 := makeTestPod("foo")
podFooNS2.Namespace += "2"
updatePod(t, etcdStorage, podFooNS2, nil)
deleted := api.Pod{} deleted := api.Pod{}
if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/bar"), &deleted, nil); err != nil { if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/bar"), &deleted, nil); err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
@ -147,6 +153,10 @@ func TestList(t *testing.T) {
item.ResourceVersion = "" item.ResourceVersion = ""
item.CreationTimestamp = unversioned.Time{} item.CreationTimestamp = unversioned.Time{}
if item.Namespace != "ns" {
t.Errorf("Unexpected namespace: %s", item.Namespace)
}
var expected *api.Pod var expected *api.Pod
switch item.Name { switch item.Name {
case "foo": case "foo":
@ -210,6 +220,9 @@ func TestWatch(t *testing.T) {
podFooBis := makeTestPod("foo") podFooBis := makeTestPod("foo")
podFooBis.Spec.NodeName = "anotherFakeNode" podFooBis.Spec.NodeName = "anotherFakeNode"
podFooNS2 := makeTestPod("foo")
podFooNS2.Namespace += "2"
// initialVersion is used to initate the watcher at the beginning of the world, // initialVersion is used to initate the watcher at the beginning of the world,
// which is not defined precisely in etcd. // which is not defined precisely in etcd.
initialVersion, err := cacher.LastSyncResourceVersion() initialVersion, err := cacher.LastSyncResourceVersion()
@ -225,6 +238,9 @@ func TestWatch(t *testing.T) {
} }
defer watcher.Stop() defer watcher.Stop()
// Create in another namespace first to make sure events from other namespaces don't get delivered
updatePod(t, etcdStorage, podFooNS2, nil)
fooCreated := updatePod(t, etcdStorage, podFoo, nil) fooCreated := updatePod(t, etcdStorage, podFoo, nil)
_ = updatePod(t, etcdStorage, podBar, nil) _ = updatePod(t, etcdStorage, podBar, nil)
fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated) fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated)
@ -320,6 +336,13 @@ func TestFiltering(t *testing.T) {
podFooPrime.Labels = map[string]string{"filter": "foo"} podFooPrime.Labels = map[string]string{"filter": "foo"}
podFooPrime.Spec.NodeName = "fakeNode" podFooPrime.Spec.NodeName = "fakeNode"
podFooNS2 := makeTestPod("foo")
podFooNS2.Namespace += "2"
podFooNS2.Labels = map[string]string{"filter": "foo"}
// Create in another namespace first to make sure events from other namespaces don't get delivered
updatePod(t, etcdStorage, podFooNS2, nil)
fooCreated := updatePod(t, etcdStorage, podFoo, nil) fooCreated := updatePod(t, etcdStorage, podFoo, nil)
fooFiltered := updatePod(t, etcdStorage, podFooFiltered, fooCreated) fooFiltered := updatePod(t, etcdStorage, podFooFiltered, fooCreated)
fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered) fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered)

View File

@ -19,6 +19,7 @@ package storage
import ( import (
"fmt" "fmt"
"strconv" "strconv"
"strings"
"k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/api/validation"
@ -123,3 +124,28 @@ func NoNamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
} }
return prefix + "/" + name, nil return prefix + "/" + name, nil
} }
// hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary
func hasPathPrefix(s, pathPrefix string) bool {
// Short circuit if s doesn't contain the prefix at all
if !strings.HasPrefix(s, pathPrefix) {
return false
}
pathPrefixLength := len(pathPrefix)
if len(s) == pathPrefixLength {
// Exact match
return true
}
if strings.HasSuffix(pathPrefix, "/") {
// pathPrefix already ensured a path segment boundary
return true
}
if s[pathPrefixLength:pathPrefixLength+1] == "/" {
// The next character in s is a path segment boundary
// Check this instead of normalizing pathPrefix to avoid allocating on every call
return true
}
return false
}

View File

@ -51,3 +51,51 @@ func TestEtcdParseWatchResourceVersion(t *testing.T) {
} }
} }
} }
func TestHasPathPrefix(t *testing.T) {
validTestcases := []struct {
s string
prefix string
}{
// Exact matches
{"", ""},
{"a", "a"},
{"a/", "a/"},
{"a/../", "a/../"},
// Path prefix matches
{"a/b", "a"},
{"a/b", "a/"},
{"中文/", "中文"},
}
for i, tc := range validTestcases {
if !hasPathPrefix(tc.s, tc.prefix) {
t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be true`, i, tc.s, tc.prefix)
}
}
invalidTestcases := []struct {
s string
prefix string
}{
// Mismatch
{"a", "b"},
// Dir requirement
{"a", "a/"},
// Prefix mismatch
{"ns2", "ns"},
{"ns2", "ns/"},
{"中文文", "中文"},
// Ensure no normalization is applied
{"a/c/../b/", "a/b/"},
{"a/", "a/b/.."},
}
for i, tc := range invalidTestcases {
if hasPathPrefix(tc.s, tc.prefix) {
t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be false`, i, tc.s, tc.prefix)
}
}
}