Skip to content

Commit

Permalink
Merge pull request kubernetes#28968 from liggitt/cache-filter-release…
Browse files Browse the repository at this point in the history
…-1.3

Automatic merge from submit-queue

release-1.3: Fix watch cache filtering

Cherry-pick of kubernetes#28966
  • Loading branch information
k8s-merge-robot authored Jul 14, 2016
2 parents e45a6a3 + 2c8e4eb commit 5dd680f
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 3 deletions.
3 changes: 1 addition & 2 deletions pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"net/http"
"reflect"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -408,7 +407,7 @@ func filterFunction(key string, keyFunc func(runtime.Object) (string, error), fi
glog.Errorf("invalid object for filter: %v", obj)
return false
}
if !strings.HasPrefix(objKey, key) {
if !hasPathPrefix(objKey, key) {
return false
}
return filter(obj)
Expand Down
25 changes: 24 additions & 1 deletion pkg/storage/cacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod {
}
return newObj.(*api.Pod), nil, nil
}
key := etcdtest.AddPrefix("pods/ns/" + obj.Name)
key := etcdtest.AddPrefix("pods/" + obj.Namespace + "/" + obj.Name)
if err := s.GuaranteedUpdate(context.TODO(), key, &api.Pod{}, old == nil, nil, updateFn); err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -110,6 +110,12 @@ func TestList(t *testing.T) {

_ = updatePod(t, etcdStorage, podFooPrime, fooCreated)

// Create a pod in a namespace that contains "ns" as a prefix
// Make sure it is not returned in a watch of "ns"
podFooNS2 := makeTestPod("foo")
podFooNS2.Namespace += "2"
updatePod(t, etcdStorage, podFooNS2, nil)

deleted := api.Pod{}
if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/bar"), &deleted, nil); err != nil {
t.Errorf("Unexpected error: %v", err)
Expand Down Expand Up @@ -147,6 +153,10 @@ func TestList(t *testing.T) {
item.ResourceVersion = ""
item.CreationTimestamp = unversioned.Time{}

if item.Namespace != "ns" {
t.Errorf("Unexpected namespace: %s", item.Namespace)
}

var expected *api.Pod
switch item.Name {
case "foo":
Expand Down Expand Up @@ -210,6 +220,9 @@ func TestWatch(t *testing.T) {
podFooBis := makeTestPod("foo")
podFooBis.Spec.NodeName = "anotherFakeNode"

podFooNS2 := makeTestPod("foo")
podFooNS2.Namespace += "2"

// initialVersion is used to initate the watcher at the beginning of the world,
// which is not defined precisely in etcd.
initialVersion, err := cacher.LastSyncResourceVersion()
Expand All @@ -225,6 +238,9 @@ func TestWatch(t *testing.T) {
}
defer watcher.Stop()

// Create in another namespace first to make sure events from other namespaces don't get delivered
updatePod(t, etcdStorage, podFooNS2, nil)

fooCreated := updatePod(t, etcdStorage, podFoo, nil)
_ = updatePod(t, etcdStorage, podBar, nil)
fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated)
Expand Down Expand Up @@ -320,6 +336,13 @@ func TestFiltering(t *testing.T) {
podFooPrime.Labels = map[string]string{"filter": "foo"}
podFooPrime.Spec.NodeName = "fakeNode"

podFooNS2 := makeTestPod("foo")
podFooNS2.Namespace += "2"
podFooNS2.Labels = map[string]string{"filter": "foo"}

// Create in another namespace first to make sure events from other namespaces don't get delivered
updatePod(t, etcdStorage, podFooNS2, nil)

fooCreated := updatePod(t, etcdStorage, podFoo, nil)
fooFiltered := updatePod(t, etcdStorage, podFooFiltered, fooCreated)
fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered)
Expand Down
26 changes: 26 additions & 0 deletions pkg/storage/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package storage
import (
"fmt"
"strconv"
"strings"

"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/validation"
Expand Down Expand Up @@ -88,3 +89,28 @@ func NoNamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
}
return prefix + "/" + name, nil
}

// hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary
func hasPathPrefix(s, pathPrefix string) bool {
// Short circuit if s doesn't contain the prefix at all
if !strings.HasPrefix(s, pathPrefix) {
return false
}

pathPrefixLength := len(pathPrefix)

if len(s) == pathPrefixLength {
// Exact match
return true
}
if strings.HasSuffix(pathPrefix, "/") {
// pathPrefix already ensured a path segment boundary
return true
}
if s[pathPrefixLength:pathPrefixLength+1] == "/" {
// The next character in s is a path segment boundary
// Check this instead of normalizing pathPrefix to avoid allocating on every call
return true
}
return false
}
48 changes: 48 additions & 0 deletions pkg/storage/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,51 @@ func TestEtcdParseWatchResourceVersion(t *testing.T) {
}
}
}

func TestHasPathPrefix(t *testing.T) {
validTestcases := []struct {
s string
prefix string
}{
// Exact matches
{"", ""},
{"a", "a"},
{"a/", "a/"},
{"a/../", "a/../"},

// Path prefix matches
{"a/b", "a"},
{"a/b", "a/"},
{"中文/", "中文"},
}
for i, tc := range validTestcases {
if !hasPathPrefix(tc.s, tc.prefix) {
t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be true`, i, tc.s, tc.prefix)
}
}

invalidTestcases := []struct {
s string
prefix string
}{
// Mismatch
{"a", "b"},

// Dir requirement
{"a", "a/"},

// Prefix mismatch
{"ns2", "ns"},
{"ns2", "ns/"},
{"中文文", "中文"},

// Ensure no normalization is applied
{"a/c/../b/", "a/b/"},
{"a/", "a/b/.."},
}
for i, tc := range invalidTestcases {
if hasPathPrefix(tc.s, tc.prefix) {
t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be false`, i, tc.s, tc.prefix)
}
}
}

0 comments on commit 5dd680f

Please sign in to comment.