Skip to content

Commit

Permalink
Merge pull request kubernetes#24307 from ncdc/automated-cherry-pick-o…
Browse files Browse the repository at this point in the history
…f-#24208-upstream-release-1.2

Automated cherry pick of kubernetes#24208
  • Loading branch information
zmerlynn committed Apr 20, 2016
2 parents 48df94d + 62ef8f2 commit 8949db3
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 9 deletions.
13 changes: 8 additions & 5 deletions pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,

c.Lock()
defer c.Unlock()
watcher := newCacheWatcher(initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
c.watchers[c.watcherIdx] = watcher
c.watcherIdx++
return watcher, nil
Expand Down Expand Up @@ -470,15 +470,15 @@ type cacheWatcher struct {
forget func(bool)
}

func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan watchCacheEvent, 10),
result: make(chan watch.Event, 10),
filter: filter,
stopped: false,
forget: forget,
}
go watcher.process(initEvents)
go watcher.process(initEvents, resourceVersion)
return watcher
}

Expand Down Expand Up @@ -540,7 +540,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
}
}

func (c *cacheWatcher) process(initEvents []watchCacheEvent) {
func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uint64) {
defer utilruntime.HandleCrash()

for _, event := range initEvents {
Expand All @@ -553,6 +553,9 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent) {
if !ok {
return
}
c.sendWatchCacheEvent(event)
// only send events newer than resourceVersion
if event.ResourceVersion > resourceVersion {
c.sendWatchCacheEvent(event)
}
}
}
48 changes: 48 additions & 0 deletions pkg/storage/cacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,3 +344,51 @@ func TestFiltering(t *testing.T) {
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime)
}

func TestStartingResourceVersion(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage)
defer cacher.Stop()

// add 1 object
podFoo := makeTestPod("foo")
fooCreated := updatePod(t, etcdStorage, podFoo, nil)

// Set up Watch starting at fooCreated.ResourceVersion + 10
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
rv += 10
startVersion := strconv.Itoa(int(rv))

watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()

lastFoo := fooCreated
for i := 0; i < 11; i++ {
podFooForUpdate := makeTestPod("foo")
podFooForUpdate.Labels = map[string]string{"foo": strconv.Itoa(i)}
lastFoo = updatePod(t, etcdStorage, podFooForUpdate, lastFoo)
}

select {
case e := <-watcher.ResultChan():
pod := e.Object.(*api.Pod)
podRV, err := storage.ParseWatchResourceVersion(pod.ResourceVersion)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// event should have at least rv + 1, since we're starting the watch at rv
if podRV <= rv {
t.Errorf("expected event with resourceVersion of at least %d, got %d", rv+1, podRV)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for event")
}
}
9 changes: 5 additions & 4 deletions pkg/storage/watch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ const (
// the previous value of the object to enable proper filtering in the
// upper layers.
type watchCacheEvent struct {
Type watch.EventType
Object runtime.Object
PrevObject runtime.Object
Type watch.EventType
Object runtime.Object
PrevObject runtime.Object
ResourceVersion uint64
}

// watchCacheElement is a single "watch event" stored in a cache.
Expand Down Expand Up @@ -179,7 +180,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
if exists {
prevObject = previous.(runtime.Object)
}
watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject}
watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject, resourceVersion}
if w.onEvent != nil {
w.onEvent(watchCacheEvent)
}
Expand Down

0 comments on commit 8949db3

Please sign in to comment.