Skip to content

Commit

Permalink
Merge pull request #24208 from ncdc/watch-cache-check-starting-rv
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

Honor starting resourceVersion in watch cache

Compare the requested resourceVersion to each event's resourceVersion to ensure events that occurred
in the past are not sent to the client.

Fixes #24194 

cc @liggitt @wojtek-t
  • Loading branch information
k8s-merge-robot committed Apr 14, 2016
2 parents 1186f4b + 049e63d commit d800dca
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 9 deletions.
13 changes: 8 additions & 5 deletions pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,

c.Lock()
defer c.Unlock()
watcher := newCacheWatcher(initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
c.watchers[c.watcherIdx] = watcher
c.watcherIdx++
return watcher, nil
Expand Down Expand Up @@ -465,15 +465,15 @@ type cacheWatcher struct {
forget func(bool)
}

func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan watchCacheEvent, 10),
result: make(chan watch.Event, 10),
filter: filter,
stopped: false,
forget: forget,
}
go watcher.process(initEvents)
go watcher.process(initEvents, resourceVersion)
return watcher
}

Expand Down Expand Up @@ -537,7 +537,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
}
}

func (c *cacheWatcher) process(initEvents []watchCacheEvent) {
func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uint64) {
defer utilruntime.HandleCrash()

for _, event := range initEvents {
Expand All @@ -550,6 +550,9 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent) {
if !ok {
return
}
c.sendWatchCacheEvent(event)
// only send events newer than resourceVersion
if event.ResourceVersion > resourceVersion {
c.sendWatchCacheEvent(event)
}
}
}
48 changes: 48 additions & 0 deletions pkg/storage/cacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,51 @@ func TestFiltering(t *testing.T) {
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime)
}

func TestStartingResourceVersion(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage)
defer cacher.Stop()

// add 1 object
podFoo := makeTestPod("foo")
fooCreated := updatePod(t, etcdStorage, podFoo, nil)

// Set up Watch starting at fooCreated.ResourceVersion + 10
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
rv += 10
startVersion := strconv.Itoa(int(rv))

watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()

lastFoo := fooCreated
for i := 0; i < 11; i++ {
podFooForUpdate := makeTestPod("foo")
podFooForUpdate.Labels = map[string]string{"foo": strconv.Itoa(i)}
lastFoo = updatePod(t, etcdStorage, podFooForUpdate, lastFoo)
}

select {
case e := <-watcher.ResultChan():
pod := e.Object.(*api.Pod)
podRV, err := storage.ParseWatchResourceVersion(pod.ResourceVersion)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// event should have at least rv + 1, since we're starting the watch at rv
if podRV <= rv {
t.Errorf("expected event with resourceVersion of at least %d, got %d", rv+1, podRV)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for event")
}
}
9 changes: 5 additions & 4 deletions pkg/storage/watch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ const (
// the previous value of the object to enable proper filtering in the
// upper layers.
type watchCacheEvent struct {
Type watch.EventType
Object runtime.Object
PrevObject runtime.Object
Type watch.EventType
Object runtime.Object
PrevObject runtime.Object
ResourceVersion uint64
}

// watchCacheElement is a single "watch event" stored in a cache.
Expand Down Expand Up @@ -179,7 +180,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
if exists {
prevObject = previous.(runtime.Object)
}
watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject}
watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject, resourceVersion}
if w.onEvent != nil {
w.onEvent(watchCacheEvent)
}
Expand Down

1 comment on commit d800dca

@k8s-teamcity-mesosphere

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity OSS :: Kubernetes Mesos :: 4 - Smoke Tests Build 21377 outcome was SUCCESS
Summary: Tests passed: 1, ignored: 276 Build time: 00:12:48

Please sign in to comment.