Skip to content

Commit

Permalink
Refactor watch bookmark tests to allow sharing between etcd3 and watc…
Browse files Browse the repository at this point in the history
…hcache
  • Loading branch information
wojtek-t committed Apr 18, 2023
1 parent 94a1592 commit 0297329
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ func TestWatch(t *testing.T) {

func TestClusterScopedWatch(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.TestClusterScopedWatch(ctx, t, store)
storagetesting.RunTestClusterScopedWatch(ctx, t, store)
}

func TestNamespaceScopedWatch(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.TestNamespaceScopedWatch(ctx, t, store)
storagetesting.RunTestNamespaceScopedWatch(ctx, t, store)
}

func TestDeleteTriggerWatch(t *testing.T) {
Expand Down
176 changes: 174 additions & 2 deletions staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package testing
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -376,7 +377,7 @@ func RunOptionalTestProgressNotify(ctx context.Context, t *testing.T, store stor
}

// It tests watches of cluster-scoped resources.
func TestClusterScopedWatch(ctx context.Context, t *testing.T, store storage.Interface) {
func RunTestClusterScopedWatch(ctx context.Context, t *testing.T, store storage.Interface) {
tests := []struct {
name string
// For watch request, the name of object is specified with field selector
Expand Down Expand Up @@ -530,7 +531,7 @@ func TestClusterScopedWatch(ctx context.Context, t *testing.T, store storage.Int
}

// It tests watch of namespace-scoped resources.
func TestNamespaceScopedWatch(ctx context.Context, t *testing.T, store storage.Interface) {
func RunTestNamespaceScopedWatch(ctx context.Context, t *testing.T, store storage.Interface) {
tests := []struct {
name string
// For watch request, the name of object is specified with field selector
Expand Down Expand Up @@ -844,6 +845,177 @@ func TestNamespaceScopedWatch(ctx context.Context, t *testing.T, store storage.I
}
}

// RunOptionalTestWatchDispatchBookmarkEvents tests whether bookmark events are sent.
// This feature is currently implemented in watch cache layer, so this is optional.
//
// TODO(#109831): ProgressNotify feature is effectively implementing the same
//
// functionality, so we should refactor this functionality to share the same input.
func RunTestWatchDispatchBookmarkEvents(ctx context.Context, t *testing.T, store storage.Interface) {
key, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}})
startRV := storedObj.ResourceVersion

tests := []struct {
name string
timeout time.Duration
expected bool
allowWatchBookmarks bool
}{
{ // test old client won't get Bookmark event
name: "allowWatchBookmarks=false",
timeout: 3 * time.Second,
expected: false,
allowWatchBookmarks: false,
},
{
name: "allowWatchBookmarks=true",
timeout: 3 * time.Second,
expected: true,
allowWatchBookmarks: true,
},
}

for i, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pred := storage.Everything
pred.AllowWatchBookmarks = tt.allowWatchBookmarks
ctx, cancel := context.WithTimeout(context.Background(), tt.timeout)
defer cancel()

watcher, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: startRV, Predicate: pred})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()

// Create events of pods in a different namespace
out := &example.Pod{}
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: fmt.Sprintf("other-ns-%d", i)}}
objKey := computePodKey(obj)

if err := store.Create(ctx, objKey, obj, out, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}

// Now wait for Bookmark event
select {
case event, ok := <-watcher.ResultChan():
if !ok && tt.expected {
t.Errorf("Unexpected object watched (no objects)")
}
if tt.expected && event.Type != watch.Bookmark {
t.Errorf("Unexpected object watched %#v", event)
}
case <-time.After(time.Second * 3):
if tt.expected {
t.Errorf("Unexpected object watched (timeout)")
}
}
})
}
}

// RunOptionalTestWatchBookmarksWithCorrectResourceVersion tests whether bookmark events are
// sent with correct resource versions.
// This feature is currently implemented in watch cache layer, so this is optional.
//
// TODO(#109831): ProgressNotify feature is effectively implementing the same
//
// functionality, so we should refactor this functionality to share the same input.
func RunTestOptionalWatchBookmarksWithCorrectResourceVersion(ctx context.Context, t *testing.T, store storage.Interface) {
// Compute the initial resource version.
list := &example.PodList{}
storageOpts := storage.ListOptions{
Predicate: storage.Everything,
Recursive: true,
}
if err := store.GetList(ctx, "/pods", storageOpts, list); err != nil {
t.Errorf("Unexpected error: %v", err)
}
startRV := list.ResourceVersion

key := "/pods/test-ns"
pred := storage.Everything
pred.AllowWatchBookmarks = true

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

watcher, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: startRV, Predicate: pred, Recursive: true})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()

done := make(chan struct{})
errc := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(1)
// We must wait for the waitgroup to exit before we terminate the cache or the server in prior defers.
defer wg.Wait()
// Call close first, so the goroutine knows to exit.
defer close(done)

go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
select {
case <-done:
return
default:
out := &example.Pod{}
pod := &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("foo-%d", i),
Namespace: "test-ns",
},
}
podKey := computePodKey(pod)
if err := store.Create(ctx, podKey, pod, out, 0); err != nil {
errc <- fmt.Errorf("failed to create pod %v: %v", pod, err)
return
}
time.Sleep(10 * time.Millisecond)
}
}
}()

bookmarkReceived := false
lastObservedResourceVersion := uint64(0)

for {
select {
case err := <-errc:
t.Fatal(err)
case event, ok := <-watcher.ResultChan():
if !ok {
// Make sure we have received a bookmark event
if !bookmarkReceived {
t.Fatalf("Unpexected error, we did not received a bookmark event")
}
return
}
rv, err := storage.APIObjectVersioner{}.ObjectResourceVersion(event.Object)
if err != nil {
t.Fatalf("failed to parse resourceVersion from %#v", event)
}
if event.Type == watch.Bookmark {
bookmarkReceived = true
// bookmark event has a RV greater than or equal to the before one
if rv < lastObservedResourceVersion {
t.Fatalf("Unexpected bookmark resourceVersion %v less than observed %v)", rv, lastObservedResourceVersion)
}
} else {
// non-bookmark event has a RV greater than anything before
if rv <= lastObservedResourceVersion {
t.Fatalf("Unexpected event resourceVersion %v less than or equal to bookmark %v)", rv, lastObservedResourceVersion)
}
}
lastObservedResourceVersion = rv
}
}
}

type testWatchStruct struct {
obj *example.Pod
expectEvent bool
Expand Down
Loading

0 comments on commit 0297329

Please sign in to comment.