From 0297329795efc0f2bd24751e18f66b0aa79c222d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Mon, 17 Apr 2023 14:22:52 +0200 Subject: [PATCH] Refactor watch bookmark tests to allow sharing between etcd3 and watchcache --- .../pkg/storage/etcd3/watcher_test.go | 4 +- .../pkg/storage/testing/watcher_tests.go | 176 +++++++++++++++++- .../pkg/storage/tests/cacher_test.go | 161 +--------------- 3 files changed, 184 insertions(+), 157 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 628c9124d799a..f538add6f6386 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -36,12 +36,12 @@ func TestWatch(t *testing.T) { func TestClusterScopedWatch(t *testing.T) { ctx, store, _ := testSetup(t) - storagetesting.TestClusterScopedWatch(ctx, t, store) + storagetesting.RunTestClusterScopedWatch(ctx, t, store) } func TestNamespaceScopedWatch(t *testing.T) { ctx, store, _ := testSetup(t) - storagetesting.TestNamespaceScopedWatch(ctx, t, store) + storagetesting.RunTestNamespaceScopedWatch(ctx, t, store) } func TestDeleteTriggerWatch(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index 2c62584e52b83..454b01b7270dd 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -19,6 +19,7 @@ package testing import ( "context" "fmt" + "sync" "testing" "time" @@ -376,7 +377,7 @@ func RunOptionalTestProgressNotify(ctx context.Context, t *testing.T, store stor } // It tests watches of cluster-scoped resources. -func TestClusterScopedWatch(ctx context.Context, t *testing.T, store storage.Interface) { +func RunTestClusterScopedWatch(ctx context.Context, t *testing.T, store storage.Interface) { tests := []struct { name string // For watch request, the name of object is specified with field selector @@ -530,7 +531,7 @@ func TestClusterScopedWatch(ctx context.Context, t *testing.T, store storage.Int } // It tests watch of namespace-scoped resources. -func TestNamespaceScopedWatch(ctx context.Context, t *testing.T, store storage.Interface) { +func RunTestNamespaceScopedWatch(ctx context.Context, t *testing.T, store storage.Interface) { tests := []struct { name string // For watch request, the name of object is specified with field selector @@ -844,6 +845,177 @@ func TestNamespaceScopedWatch(ctx context.Context, t *testing.T, store storage.I } } +// RunOptionalTestWatchDispatchBookmarkEvents tests whether bookmark events are sent. +// This feature is currently implemented in watch cache layer, so this is optional. +// +// TODO(#109831): ProgressNotify feature is effectively implementing the same +// +// functionality, so we should refactor this functionality to share the same input. +func RunTestWatchDispatchBookmarkEvents(ctx context.Context, t *testing.T, store storage.Interface) { + key, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}) + startRV := storedObj.ResourceVersion + + tests := []struct { + name string + timeout time.Duration + expected bool + allowWatchBookmarks bool + }{ + { // test old client won't get Bookmark event + name: "allowWatchBookmarks=false", + timeout: 3 * time.Second, + expected: false, + allowWatchBookmarks: false, + }, + { + name: "allowWatchBookmarks=true", + timeout: 3 * time.Second, + expected: true, + allowWatchBookmarks: true, + }, + } + + for i, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pred := storage.Everything + pred.AllowWatchBookmarks = tt.allowWatchBookmarks + ctx, cancel := context.WithTimeout(context.Background(), tt.timeout) + defer cancel() + + watcher, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: startRV, Predicate: pred}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer watcher.Stop() + + // Create events of pods in a different namespace + out := &example.Pod{} + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: fmt.Sprintf("other-ns-%d", i)}} + objKey := computePodKey(obj) + + if err := store.Create(ctx, objKey, obj, out, 0); err != nil { + t.Fatalf("Create failed: %v", err) + } + + // Now wait for Bookmark event + select { + case event, ok := <-watcher.ResultChan(): + if !ok && tt.expected { + t.Errorf("Unexpected object watched (no objects)") + } + if tt.expected && event.Type != watch.Bookmark { + t.Errorf("Unexpected object watched %#v", event) + } + case <-time.After(time.Second * 3): + if tt.expected { + t.Errorf("Unexpected object watched (timeout)") + } + } + }) + } +} + +// RunOptionalTestWatchBookmarksWithCorrectResourceVersion tests whether bookmark events are +// sent with correct resource versions. +// This feature is currently implemented in watch cache layer, so this is optional. +// +// TODO(#109831): ProgressNotify feature is effectively implementing the same +// +// functionality, so we should refactor this functionality to share the same input. +func RunTestOptionalWatchBookmarksWithCorrectResourceVersion(ctx context.Context, t *testing.T, store storage.Interface) { + // Compute the initial resource version. + list := &example.PodList{} + storageOpts := storage.ListOptions{ + Predicate: storage.Everything, + Recursive: true, + } + if err := store.GetList(ctx, "/pods", storageOpts, list); err != nil { + t.Errorf("Unexpected error: %v", err) + } + startRV := list.ResourceVersion + + key := "/pods/test-ns" + pred := storage.Everything + pred.AllowWatchBookmarks = true + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + watcher, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: startRV, Predicate: pred, Recursive: true}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer watcher.Stop() + + done := make(chan struct{}) + errc := make(chan error, 1) + var wg sync.WaitGroup + wg.Add(1) + // We must wait for the waitgroup to exit before we terminate the cache or the server in prior defers. + defer wg.Wait() + // Call close first, so the goroutine knows to exit. + defer close(done) + + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + select { + case <-done: + return + default: + out := &example.Pod{} + pod := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("foo-%d", i), + Namespace: "test-ns", + }, + } + podKey := computePodKey(pod) + if err := store.Create(ctx, podKey, pod, out, 0); err != nil { + errc <- fmt.Errorf("failed to create pod %v: %v", pod, err) + return + } + time.Sleep(10 * time.Millisecond) + } + } + }() + + bookmarkReceived := false + lastObservedResourceVersion := uint64(0) + + for { + select { + case err := <-errc: + t.Fatal(err) + case event, ok := <-watcher.ResultChan(): + if !ok { + // Make sure we have received a bookmark event + if !bookmarkReceived { + t.Fatalf("Unpexected error, we did not received a bookmark event") + } + return + } + rv, err := storage.APIObjectVersioner{}.ObjectResourceVersion(event.Object) + if err != nil { + t.Fatalf("failed to parse resourceVersion from %#v", event) + } + if event.Type == watch.Bookmark { + bookmarkReceived = true + // bookmark event has a RV greater than or equal to the before one + if rv < lastObservedResourceVersion { + t.Fatalf("Unexpected bookmark resourceVersion %v less than observed %v)", rv, lastObservedResourceVersion) + } + } else { + // non-bookmark event has a RV greater than anything before + if rv <= lastObservedResourceVersion { + t.Fatalf("Unexpected event resourceVersion %v less than or equal to bookmark %v)", rv, lastObservedResourceVersion) + } + } + lastObservedResourceVersion = rv + } + } +} + type testWatchStruct struct { obj *example.Pod expectEvent bool diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 7c5014a36f567..977eae9fa9a7d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -21,7 +21,6 @@ import ( "fmt" goruntime "runtime" "strconv" - "sync" "testing" "time" @@ -40,15 +39,12 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" - "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" cacherstorage "k8s.io/apiserver/pkg/storage/cacher" "k8s.io/apiserver/pkg/storage/etcd3" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" storagetesting "k8s.io/apiserver/pkg/storage/testing" "k8s.io/apiserver/pkg/storage/value/encrypt/identity" - utilfeature "k8s.io/apiserver/pkg/util/feature" - featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/utils/clock" testingclock "k8s.io/utils/clock/testing" ) @@ -139,12 +135,6 @@ func makeTestPod(name string) *example.Pod { } } -func createPod(s storage.Interface, obj *example.Pod) error { - key := "pods/" + obj.Namespace + "/" + obj.Name - out := &example.Pod{} - return s.Create(context.TODO(), key, obj, out, 0) -} - func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *example.Pod { updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { return obj.DeepCopyObject(), nil, nil @@ -182,13 +172,13 @@ func TestList(t *testing.T) { func TestClusterScopedWatch(t *testing.T) { ctx, cacher, terminate := testSetup(t, withClusterScopedKeyFunc, withSpecNodeNameIndexerFuncs) t.Cleanup(terminate) - storagetesting.TestClusterScopedWatch(ctx, t, cacher) + storagetesting.RunTestClusterScopedWatch(ctx, t, cacher) } func TestNamespaceScopedWatch(t *testing.T) { ctx, cacher, terminate := testSetup(t, withSpecNodeNameIndexerFuncs) t.Cleanup(terminate) - storagetesting.TestNamespaceScopedWatch(ctx, t, cacher) + storagetesting.RunTestNamespaceScopedWatch(ctx, t, cacher) } func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) { @@ -618,150 +608,15 @@ func TestCacherListerWatcherPagination(t *testing.T) { } func TestWatchDispatchBookmarkEvents(t *testing.T) { - defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)() - - server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) - defer server.Terminate(t) - cacher, v, err := newTestCacher(etcdStorage) - if err != nil { - t.Fatalf("Couldn't create cacher: %v", err) - } - defer cacher.Stop() - - fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil) - rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - startVersion := strconv.Itoa(int(rv)) - - tests := []struct { - timeout time.Duration - expected bool - allowWatchBookmark bool - }{ - { // test old client won't get Bookmark event - timeout: 3 * time.Second, - expected: false, - allowWatchBookmark: false, - }, - { - timeout: 3 * time.Second, - expected: true, - allowWatchBookmark: true, - }, - } - - for i, c := range tests { - pred := storage.Everything - pred.AllowWatchBookmarks = c.allowWatchBookmark - ctx, cancel := context.WithTimeout(context.Background(), c.timeout) - t.Cleanup(cancel) - watcher, err := cacher.Watch(ctx, "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: pred}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // Create events of other pods - updatePod(t, etcdStorage, makeTestPod(fmt.Sprintf("foo-whatever-%d", i)), nil) - - // Now wait for Bookmark event - select { - case event, ok := <-watcher.ResultChan(): - if !ok && c.expected { - t.Errorf("Unexpected object watched (no objects)") - } - if c.expected && event.Type != watch.Bookmark { - t.Errorf("Unexpected object watched %#v", event) - } - case <-time.After(time.Second * 3): - if c.expected { - t.Errorf("Unexpected object watched (timeout)") - } - } - watcher.Stop() - } + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestWatchDispatchBookmarkEvents(ctx, t, cacher) } func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) { - defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)() - - server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) - defer server.Terminate(t) - cacher, v, err := newTestCacher(etcdStorage) - if err != nil { - t.Fatalf("Couldn't create cacher: %v", err) - } - defer cacher.Stop() - - pred := storage.Everything - pred.AllowWatchBookmarks = true - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - t.Cleanup(cancel) - watcher, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred, Recursive: true}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer watcher.Stop() - - done := make(chan struct{}) - errc := make(chan error, 1) - var wg sync.WaitGroup - wg.Add(1) - defer wg.Wait() // We must wait for the waitgroup to exit before we terminate the cache or the server in prior defers - defer close(done) // call close first, so the goroutine knows to exit - go func() { - defer wg.Done() - for i := 0; i < 100; i++ { - select { - case <-done: - return - default: - pod := fmt.Sprintf("foo-%d", i) - err := createPod(etcdStorage, makeTestPod(pod)) - if err != nil { - errc <- fmt.Errorf("failed to create pod %v: %v", pod, err) - return - } - time.Sleep(time.Second / 100) - } - } - }() - - bookmarkReceived := false - lastObservedResourceVersion := uint64(0) - - for { - select { - case err := <-errc: - t.Fatal(err) - case event, ok := <-watcher.ResultChan(): - if !ok { - // Make sure we have received a bookmark event - if !bookmarkReceived { - t.Fatalf("Unpexected error, we did not received a bookmark event") - } - return - } - rv, err := v.ObjectResourceVersion(event.Object) - if err != nil { - t.Fatalf("failed to parse resourceVersion from %#v", event) - } - if event.Type == watch.Bookmark { - bookmarkReceived = true - // bookmark event has a RV greater than or equal to the before one - if rv < lastObservedResourceVersion { - t.Fatalf("Unexpected bookmark resourceVersion %v less than observed %v)", rv, lastObservedResourceVersion) - } - } else { - // non-bookmark event has a RV greater than anything before - if rv <= lastObservedResourceVersion { - t.Fatalf("Unexpected event resourceVersion %v less than or equal to bookmark %v)", rv, lastObservedResourceVersion) - } - } - lastObservedResourceVersion = rv - } - } + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestOptionalWatchBookmarksWithCorrectResourceVersion(ctx, t, cacher) } // ===================================================