Skip to content

Commit

Permalink
WatchList should not convey events for the root key
Browse files Browse the repository at this point in the history
It's possible to watch /foo/bar, get events for /foo/bar/baz,
and then if someone deletes /foo/bar you'll get an event for /foo/bar
without a value.  This results in an error being printed in the logs
because /foo/bar has value "" and we can't decode that.

This commit excludes the parent directory from watch events for now.
  • Loading branch information
smarterclayton committed Jan 15, 2015
1 parent 2dce916 commit ce36431
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 9 deletions.
31 changes: 26 additions & 5 deletions pkg/tools/etcd_tools_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
// watch.Interface. resourceVersion may be used to specify what version to begin
// watching (e.g., for reconnecting without missing any updates).
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
w := newEtcdWatcher(true, filter, h.Codec, h.ResourceVersioner, nil)
w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.ResourceVersioner, nil)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}
Expand Down Expand Up @@ -90,22 +90,33 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) watch.Interface {
//
// Errors will be sent down the channel.
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform)
w := newEtcdWatcher(false, nil, Everything, h.Codec, h.ResourceVersioner, transform)
go w.etcdWatch(h.Client, key, resourceVersion)
return w
}

// TransformFunc attempts to convert an object to another object for use with a watcher.
type TransformFunc func(runtime.Object) (runtime.Object, error)

// includeFunc returns true if the given key should be considered part of a watch
type includeFunc func(key string) bool

// exceptKey is an includeFunc that returns false when the provided key matches the watched key
func exceptKey(except string) includeFunc {
return func(key string) bool {
return key != except
}
}

// etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct {
encoding runtime.Codec
versioner EtcdResourceVersioner
transform TransformFunc

list bool // If we're doing a recursive watch, should be true.
filter FilterFunc
list bool // If we're doing a recursive watch, should be true.
include includeFunc
filter FilterFunc

etcdIncoming chan *etcd.Response
etcdError chan error
Expand All @@ -126,12 +137,13 @@ const watchWaitDuration = 100 * time.Millisecond

// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner EtcdResourceVersioner, transform TransformFunc) *etcdWatcher {
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdResourceVersioner, transform TransformFunc) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
include: include,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
etcdError: make(chan error, 1),
Expand Down Expand Up @@ -258,6 +270,9 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) {
glog.Errorf("unexpected nil node: %#v", res)
return
}
if w.include != nil && !w.include(res.Node.Key) {
return
}
data := []byte(res.Node.Value)
obj, err := w.decodeObject(data, res.Node.ModifiedIndex)
if err != nil {
Expand Down Expand Up @@ -285,6 +300,9 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
glog.Errorf("unexpected nil node: %#v", res)
return
}
if w.include != nil && !w.include(res.Node.Key) {
return
}
curData := []byte(res.Node.Value)
curObj, err := w.decodeObject(curData, res.Node.ModifiedIndex)
if err != nil {
Expand Down Expand Up @@ -331,6 +349,9 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) {
glog.Errorf("unexpected nil prev node: %#v", res)
return
}
if w.include != nil && !w.include(res.PrevNode.Key) {
return
}
data := []byte(res.PrevNode.Value)
index := res.PrevNode.ModifiedIndex
if res.Node != nil {
Expand Down
53 changes: 49 additions & 4 deletions pkg/tools/etcd_tools_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestWatchInterpretations(t *testing.T) {

for name, item := range table {
for _, action := range item.actions {
w := newEtcdWatcher(true, firstLetterIsB, codec, versioner, nil)
w := newEtcdWatcher(true, nil, firstLetterIsB, codec, versioner, nil)
emitCalled := false
w.emit = func(event watch.Event) {
emitCalled = true
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestWatchInterpretations(t *testing.T) {
}

func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
w := newEtcdWatcher(false, Everything, codec, versioner, nil)
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
Expand All @@ -165,7 +165,7 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, Everything, codec, versioner, nil)
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
Expand All @@ -179,7 +179,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, Everything, codec, versioner, nil)
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
Expand Down Expand Up @@ -524,6 +524,51 @@ func TestWatchListFromZeroIndex(t *testing.T) {
watching.Stop()
}

func TestWatchListIgnoresRootKey(t *testing.T) {
codec := latest.Codec
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}

fakeClient := NewFakeEtcdClient(t)
h := EtcdHelper{fakeClient, codec, versioner}

watching, err := h.WatchList("/some/key", 1, Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()

// This is the root directory of the watch, which happens to have a value encoded
fakeClient.WatchResponse <- &etcd.Response{
Action: "delete",
PrevNode: &etcd.Node{
Key: "/some/key",
Value: runtime.EncodeOrDie(codec, pod),
CreatedIndex: 1,
ModifiedIndex: 1,
},
}
// Delete of the parent directory of a key is an event that a list watch would receive,
// but will have no value so the decode will fail.
fakeClient.WatchResponse <- &etcd.Response{
Action: "delete",
PrevNode: &etcd.Node{
Key: "/some/key",
Value: "",
CreatedIndex: 1,
ModifiedIndex: 1,
},
}
close(fakeClient.WatchStop)

// the existing node is detected and the index set
_, open := <-watching.ResultChan()
if open {
t.Fatalf("unexpected channel open")
}

watching.Stop()
}

func TestWatchFromNotFound(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.Data["/some/key"] = EtcdResponseWithError{
Expand Down

0 comments on commit ce36431

Please sign in to comment.