diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index d9c08e6719b07..311622289a19a 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -21,7 +21,6 @@ import ( "net/http" "reflect" "strconv" - "strings" "sync" "time" @@ -479,7 +478,7 @@ func filterFunction(key string, keyFunc func(runtime.Object) (string, error), fi glog.Errorf("invalid object for filter: %v", obj) return false } - if !strings.HasPrefix(objKey, key) { + if !hasPathPrefix(objKey, key) { return false } return filter.Filter(obj) diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 51751491d52d8..9d0bf718bef50 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -79,7 +79,7 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod { } return newObj.(*api.Pod), nil, nil } - key := etcdtest.AddPrefix("pods/ns/" + obj.Name) + key := etcdtest.AddPrefix("pods/" + obj.Namespace + "/" + obj.Name) if err := s.GuaranteedUpdate(context.TODO(), key, &api.Pod{}, old == nil, nil, updateFn); err != nil { t.Errorf("unexpected error: %v", err) } @@ -110,6 +110,12 @@ func TestList(t *testing.T) { _ = updatePod(t, etcdStorage, podFooPrime, fooCreated) + // Create a pod in a namespace that contains "ns" as a prefix + // Make sure it is not returned in a watch of "ns" + podFooNS2 := makeTestPod("foo") + podFooNS2.Namespace += "2" + updatePod(t, etcdStorage, podFooNS2, nil) + deleted := api.Pod{} if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/bar"), &deleted, nil); err != nil { t.Errorf("Unexpected error: %v", err) @@ -147,6 +153,10 @@ func TestList(t *testing.T) { item.ResourceVersion = "" item.CreationTimestamp = unversioned.Time{} + if item.Namespace != "ns" { + t.Errorf("Unexpected namespace: %s", item.Namespace) + } + var expected *api.Pod switch item.Name { case "foo": @@ -210,6 +220,9 @@ func TestWatch(t *testing.T) { podFooBis := makeTestPod("foo") podFooBis.Spec.NodeName = "anotherFakeNode" + podFooNS2 := makeTestPod("foo") + podFooNS2.Namespace += "2" + // initialVersion is used to initate the watcher at the beginning of the world, // which is not defined precisely in etcd. initialVersion, err := cacher.LastSyncResourceVersion() @@ -225,6 +238,9 @@ func TestWatch(t *testing.T) { } defer watcher.Stop() + // Create in another namespace first to make sure events from other namespaces don't get delivered + updatePod(t, etcdStorage, podFooNS2, nil) + fooCreated := updatePod(t, etcdStorage, podFoo, nil) _ = updatePod(t, etcdStorage, podBar, nil) fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated) @@ -320,6 +336,13 @@ func TestFiltering(t *testing.T) { podFooPrime.Labels = map[string]string{"filter": "foo"} podFooPrime.Spec.NodeName = "fakeNode" + podFooNS2 := makeTestPod("foo") + podFooNS2.Namespace += "2" + podFooNS2.Labels = map[string]string{"filter": "foo"} + + // Create in another namespace first to make sure events from other namespaces don't get delivered + updatePod(t, etcdStorage, podFooNS2, nil) + fooCreated := updatePod(t, etcdStorage, podFoo, nil) fooFiltered := updatePod(t, etcdStorage, podFooFiltered, fooCreated) fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered) diff --git a/pkg/storage/util.go b/pkg/storage/util.go index 437028ae5a7b7..571e104d9363d 100644 --- a/pkg/storage/util.go +++ b/pkg/storage/util.go @@ -19,6 +19,7 @@ package storage import ( "fmt" "strconv" + "strings" "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/validation" @@ -123,3 +124,28 @@ func NoNamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) { } return prefix + "/" + name, nil } + +// hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary +func hasPathPrefix(s, pathPrefix string) bool { + // Short circuit if s doesn't contain the prefix at all + if !strings.HasPrefix(s, pathPrefix) { + return false + } + + pathPrefixLength := len(pathPrefix) + + if len(s) == pathPrefixLength { + // Exact match + return true + } + if strings.HasSuffix(pathPrefix, "/") { + // pathPrefix already ensured a path segment boundary + return true + } + if s[pathPrefixLength:pathPrefixLength+1] == "/" { + // The next character in s is a path segment boundary + // Check this instead of normalizing pathPrefix to avoid allocating on every call + return true + } + return false +} diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index bcc31b08c2fc5..17dcac014377f 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -51,3 +51,51 @@ func TestEtcdParseWatchResourceVersion(t *testing.T) { } } } + +func TestHasPathPrefix(t *testing.T) { + validTestcases := []struct { + s string + prefix string + }{ + // Exact matches + {"", ""}, + {"a", "a"}, + {"a/", "a/"}, + {"a/../", "a/../"}, + + // Path prefix matches + {"a/b", "a"}, + {"a/b", "a/"}, + {"中文/", "中文"}, + } + for i, tc := range validTestcases { + if !hasPathPrefix(tc.s, tc.prefix) { + t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be true`, i, tc.s, tc.prefix) + } + } + + invalidTestcases := []struct { + s string + prefix string + }{ + // Mismatch + {"a", "b"}, + + // Dir requirement + {"a", "a/"}, + + // Prefix mismatch + {"ns2", "ns"}, + {"ns2", "ns/"}, + {"中文文", "中文"}, + + // Ensure no normalization is applied + {"a/c/../b/", "a/b/"}, + {"a/", "a/b/.."}, + } + for i, tc := range invalidTestcases { + if hasPathPrefix(tc.s, tc.prefix) { + t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be false`, i, tc.s, tc.prefix) + } + } +}