Skip to content

Commit

Permalink
Honor starting resourceVersion in watch cache
Browse files Browse the repository at this point in the history
Compare the requested resourceVersion to each event's resourceVersion to ensure events that occurred
in the past are not sent to the client.
  • Loading branch information
Andy Goldstein committed Apr 14, 2016
1 parent 1186f4b commit 049e63d
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 9 deletions.
13 changes: 8 additions & 5 deletions pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,

c.Lock()
defer c.Unlock()
watcher := newCacheWatcher(initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
c.watchers[c.watcherIdx] = watcher
c.watcherIdx++
return watcher, nil
Expand Down Expand Up @@ -465,15 +465,15 @@ type cacheWatcher struct {
forget func(bool)
}

func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan watchCacheEvent, 10),
result: make(chan watch.Event, 10),
filter: filter,
stopped: false,
forget: forget,
}
go watcher.process(initEvents)
go watcher.process(initEvents, resourceVersion)
return watcher
}

Expand Down Expand Up @@ -537,7 +537,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
}
}

func (c *cacheWatcher) process(initEvents []watchCacheEvent) {
func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uint64) {
defer utilruntime.HandleCrash()

for _, event := range initEvents {
Expand All @@ -550,6 +550,9 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent) {
if !ok {
return
}
c.sendWatchCacheEvent(event)
// only send events newer than resourceVersion
if event.ResourceVersion > resourceVersion {
c.sendWatchCacheEvent(event)
}
}
}
48 changes: 48 additions & 0 deletions pkg/storage/cacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,51 @@ func TestFiltering(t *testing.T) {
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime)
}

func TestStartingResourceVersion(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage)
defer cacher.Stop()

// add 1 object
podFoo := makeTestPod("foo")
fooCreated := updatePod(t, etcdStorage, podFoo, nil)

// Set up Watch starting at fooCreated.ResourceVersion + 10
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
rv += 10
startVersion := strconv.Itoa(int(rv))

watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()

lastFoo := fooCreated
for i := 0; i < 11; i++ {
podFooForUpdate := makeTestPod("foo")
podFooForUpdate.Labels = map[string]string{"foo": strconv.Itoa(i)}
lastFoo = updatePod(t, etcdStorage, podFooForUpdate, lastFoo)
}

select {
case e := <-watcher.ResultChan():
pod := e.Object.(*api.Pod)
podRV, err := storage.ParseWatchResourceVersion(pod.ResourceVersion)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// event should have at least rv + 1, since we're starting the watch at rv
if podRV <= rv {
t.Errorf("expected event with resourceVersion of at least %d, got %d", rv+1, podRV)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for event")
}
}
9 changes: 5 additions & 4 deletions pkg/storage/watch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ const (
// the previous value of the object to enable proper filtering in the
// upper layers.
type watchCacheEvent struct {
Type watch.EventType
Object runtime.Object
PrevObject runtime.Object
Type watch.EventType
Object runtime.Object
PrevObject runtime.Object
ResourceVersion uint64
}

// watchCacheElement is a single "watch event" stored in a cache.
Expand Down Expand Up @@ -179,7 +180,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
if exists {
prevObject = previous.(runtime.Object)
}
watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject}
watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject, resourceVersion}
if w.onEvent != nil {
w.onEvent(watchCacheEvent)
}
Expand Down

1 comment on commit 049e63d

@k8s-teamcity-mesosphere

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity OSS :: Kubernetes Mesos :: 4 - Smoke Tests Build 21357 outcome was SUCCESS
Summary: Tests passed: 1, ignored: 276 Build time: 00:12:04

Please sign in to comment.