diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 365e224e8757d..652a03b7281d2 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/runtime" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" @@ -540,6 +541,8 @@ func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) { } func (c *cacheWatcher) process(initEvents []watchCacheEvent) { + defer utilruntime.HandleCrash() + for _, event := range initEvents { c.sendWatchCacheEvent(event) } diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index d15a17f06e49c..6b6d6e0ad6c0a 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -19,6 +19,7 @@ package storage_test import ( "fmt" "reflect" + goruntime "runtime" "strconv" "testing" "time" @@ -159,15 +160,19 @@ func TestList(t *testing.T) { } func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) { + _, _, line, _ := goruntime.Caller(1) select { case event := <-w.ResultChan(): if e, a := eventType, event.Type; e != a { + t.Logf("(called from line %d)", line) t.Errorf("Expected: %s, got: %s", eventType, event.Type) } if e, a := eventObject, event.Object; !api.Semantic.DeepDerivative(e, a) { + t.Logf("(called from line %d)", line) t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a) } case <-time.After(wait.ForeverTestTimeout): + t.Logf("(called from line %d)", line) t.Errorf("Timed out waiting for an event") } } @@ -236,7 +241,6 @@ func TestWatch(t *testing.T) { } defer initialWatcher.Stop() - verifyWatchEvent(t, initialWatcher, watch.Added, podFoo) verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime) // Now test watch from "now". @@ -335,7 +339,6 @@ func TestFiltering(t *testing.T) { } defer watcher.Stop() - verifyWatchEvent(t, watcher, watch.Added, podFoo) verifyWatchEvent(t, watcher, watch.Deleted, podFooFiltered) verifyWatchEvent(t, watcher, watch.Added, podFoo) verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) diff --git a/pkg/storage/watch_cache.go b/pkg/storage/watch_cache.go index 2e842a28e1eb1..87bce0c0e6a3e 100644 --- a/pkg/storage/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -302,14 +302,13 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa } return result, nil } - if resourceVersion < oldest { - return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest)) + if resourceVersion < oldest-1 { + return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1)) } - // Binary search the smallest index at which resourceVersion is not smaller than - // the given one. + // Binary search the smallest index at which resourceVersion is greater than the given one. f := func(i int) bool { - return w.cache[(w.startIndex+i)%w.capacity].resourceVersion >= resourceVersion + return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion } first := sort.Search(size, f) result := make([]watchCacheEvent, size-first) diff --git a/pkg/storage/watch_cache_test.go b/pkg/storage/watch_cache_test.go index e2268fe956d2b..8d9327e3cb762 100644 --- a/pkg/storage/watch_cache_test.go +++ b/pkg/storage/watch_cache_test.go @@ -122,7 +122,7 @@ func TestWatchCacheBasic(t *testing.T) { func TestEvents(t *testing.T) { store := newTestWatchCache(5) - store.Add(makeTestPod("pod", 2)) + store.Add(makeTestPod("pod", 3)) // Test for Added event. { @@ -145,7 +145,7 @@ func TestEvents(t *testing.T) { if result[0].Type != watch.Added { t.Errorf("unexpected event type: %v", result[0].Type) } - pod := makeTestPod("pod", uint64(2)) + pod := makeTestPod("pod", uint64(3)) if !api.Semantic.DeepEqual(pod, result[0].Object) { t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod) } @@ -154,8 +154,8 @@ func TestEvents(t *testing.T) { } } - store.Update(makeTestPod("pod", 3)) store.Update(makeTestPod("pod", 4)) + store.Update(makeTestPod("pod", 5)) // Test with not full cache. { @@ -176,22 +176,22 @@ func TestEvents(t *testing.T) { if result[i].Type != watch.Modified { t.Errorf("unexpected event type: %v", result[i].Type) } - pod := makeTestPod("pod", uint64(i+3)) + pod := makeTestPod("pod", uint64(i+4)) if !api.Semantic.DeepEqual(pod, result[i].Object) { t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod) } - prevPod := makeTestPod("pod", uint64(i+2)) + prevPod := makeTestPod("pod", uint64(i+3)) if !api.Semantic.DeepEqual(prevPod, result[i].PrevObject) { t.Errorf("unexpected item: %v, expected: %v", result[i].PrevObject, prevPod) } } } - for i := 5; i < 9; i++ { + for i := 6; i < 10; i++ { store.Update(makeTestPod("pod", uint64(i))) } - // Test with full cache - there should be elements from 4 to 8. + // Test with full cache - there should be elements from 5 to 9. { _, err := store.GetAllEventsSince(3) if err == nil { @@ -207,7 +207,7 @@ func TestEvents(t *testing.T) { t.Fatalf("unexpected events: %v", result) } for i := 0; i < 5; i++ { - pod := makeTestPod("pod", uint64(i+4)) + pod := makeTestPod("pod", uint64(i+5)) if !api.Semantic.DeepEqual(pod, result[i].Object) { t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod) } @@ -215,7 +215,7 @@ func TestEvents(t *testing.T) { } // Test for delete event. - store.Delete(makeTestPod("pod", uint64(9))) + store.Delete(makeTestPod("pod", uint64(10))) { result, err := store.GetAllEventsSince(9) @@ -228,11 +228,11 @@ func TestEvents(t *testing.T) { if result[0].Type != watch.Deleted { t.Errorf("unexpected event type: %v", result[0].Type) } - pod := makeTestPod("pod", uint64(9)) + pod := makeTestPod("pod", uint64(10)) if !api.Semantic.DeepEqual(pod, result[0].Object) { t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod) } - prevPod := makeTestPod("pod", uint64(8)) + prevPod := makeTestPod("pod", uint64(9)) if !api.Semantic.DeepEqual(prevPod, result[0].PrevObject) { t.Errorf("unexpected item: %v, expected: %v", result[0].PrevObject, prevPod) }