Skip to content

Commit

Permalink
send bookmark right now after sending all items in watchCache store
Browse files Browse the repository at this point in the history
  • Loading branch information
Chaunceyctx authored and chentianxiang committed Sep 26, 2024
1 parent 08e6500 commit 59b0e4f
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,10 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime)
}

// send bookmark after sending all events in cacheInterval for watchlist request
if cacheInterval.initialEventsEndBookmark != nil {
c.sendWatchCacheEvent(cacheInterval.initialEventsEndBookmark)
}
c.process(ctx, resourceVersion)
}

Expand Down
22 changes: 22 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return newErrWatcher(err), nil
}

c.setInitialEventsEndBookmarkIfRequested(cacheInterval, opts, c.watchCache.resourceVersion)

addedWatcher := false
func() {
c.Lock()
Expand Down Expand Up @@ -1439,6 +1441,26 @@ func (c *Cacher) Wait(ctx context.Context) error {
return c.ready.wait(ctx)
}

// setInitialEventsEndBookmarkIfRequested sets initialEventsEndBookmark field in watchCacheInterval for watchlist request
func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCacheInterval, opts storage.ListOptions, currentResourceVersion uint64) {
if opts.SendInitialEvents != nil && *opts.SendInitialEvents && opts.Predicate.AllowWatchBookmarks {
// We don't need to set the InitialEventsAnnotation for this bookmark event,
// because this will be automatically set during event conversion in cacheWatcher.convertToWatchEvent method
initialEventsEndBookmark := &watchCacheEvent{
Type: watch.Bookmark,
Object: c.newFunc(),
ResourceVersion: currentResourceVersion,
}

if err := c.versioner.UpdateObject(initialEventsEndBookmark.Object, initialEventsEndBookmark.ResourceVersion); err != nil {
klog.Errorf("failure to set resourceVersion to %d on initialEventsEndBookmark event %+v for watchlist request and wait for bookmark trigger to send", initialEventsEndBookmark.ResourceVersion, initialEventsEndBookmark.Object)
initialEventsEndBookmark = nil
}

cacheInterval.initialEventsEndBookmark = initialEventsEndBookmark
}
}

// errWatcher implements watch.Interface to return a single error
type errWatcher struct {
result chan watch.Event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"k8s.io/apiserver/pkg/storage/cacher/metrics"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
k8smetrics "k8s.io/component-base/metrics"
Expand Down Expand Up @@ -1171,6 +1172,106 @@ func TestCacherSendBookmarkEvents(t *testing.T) {
}
}

func TestInitialEventsEndBookmark(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)
forceRequestWatchProgressSupport(t)

backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()

if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}

makePod := func(index uint64) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", index),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%v", 100+index),
},
}
}

numberOfPods := 3
var expectedPodEvents []watch.Event
for i := 1; i <= numberOfPods; i++ {
pod := makePod(uint64(i))
if err := cacher.watchCache.Add(pod); err != nil {
t.Fatalf("failed to add a pod: %v", err)
}
expectedPodEvents = append(expectedPodEvents, watch.Event{Type: watch.Added, Object: pod})
}
var currentResourceVersion uint64 = 100 + 3

trueVal, falseVal := true, false

scenarios := []struct {
name string
allowWatchBookmarks bool
sendInitialEvents *bool
}{
{
name: "allowWatchBookmarks=false, sendInitialEvents=false",
allowWatchBookmarks: false,
sendInitialEvents: &falseVal,
},
{
name: "allowWatchBookmarks=false, sendInitialEvents=true",
allowWatchBookmarks: false,
sendInitialEvents: &trueVal,
},
{
name: "allowWatchBookmarks=true, sendInitialEvents=true",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
},
{
name: "allowWatchBookmarks=true, sendInitialEvents=false",
allowWatchBookmarks: true,
sendInitialEvents: &falseVal,
},
{
name: "allowWatchBookmarks=false, sendInitialEvents=nil",
allowWatchBookmarks: true,
},
}

for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
expectedWatchEvents := expectedPodEvents
if scenario.allowWatchBookmarks && scenario.sendInitialEvents != nil && *scenario.sendInitialEvents {
expectedWatchEvents = append(expectedWatchEvents, watch.Event{
Type: watch.Bookmark,
Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: strconv.FormatUint(currentResourceVersion, 10),
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
},
},
})
}

pred := storage.Everything
pred.AllowWatchBookmarks = scenario.allowWatchBookmarks
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "100", SendInitialEvents: scenario.sendInitialEvents, Predicate: pred})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
storagetesting.TestCheckResultsInStrictOrder(t, w, expectedWatchEvents)
storagetesting.TestCheckNoMoreResults(t, w)
})
}
}

func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ type watchCacheInterval struct {
// Given that indexer and indexValidator only read state, if
// possible, Locker obtained through RLocker() is provided.
lock sync.Locker

// initialEventsEndBookmark will be sent after sending all events in cacheInterval
initialEventsEndBookmark *watchCacheEvent
}

type attrFunc func(runtime.Object) (labels.Set, fields.Set, error)
Expand Down
44 changes: 42 additions & 2 deletions staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,34 @@ func testCheckResultFunc(t *testing.T, w watch.Interface, check func(actualEvent
}
}

func testCheckResultWithIgnoreFunc(t *testing.T, w watch.Interface, expectedEvents []watch.Event, ignore func(watch.Event) bool) {
checkIndex := 0
for {
select {
case event := <-w.ResultChan():
obj := event.Object
if co, ok := obj.(runtime.CacheableObject); ok {
event.Object = co.GetObject()
}
if ignore != nil && ignore(event) {
continue
}
if checkIndex < len(expectedEvents) {
expectNoDiff(t, "incorrect event", expectedEvents[checkIndex], event)
checkIndex++
} else {
t.Fatalf("cannot receive correct event, expect no event, but get a event: %+v", event)
}
case <-time.After(100 * time.Millisecond):
// wait 100 ms forcibly in order to receive all watchEvents including multiple bookmark events
if checkIndex < len(expectedEvents) {
t.Fatalf("cannot receive enough events within specific time, rest expected events: %+v", expectedEvents[checkIndex:])
}
return
}
}
}

func testCheckStop(t *testing.T, w watch.Interface) {
select {
case e, ok := <-w.ResultChan():
Expand All @@ -187,13 +215,25 @@ func testCheckStop(t *testing.T, w watch.Interface) {
}
}

func testCheckResultsInStrictOrder(t *testing.T, w watch.Interface, expectedEvents []watch.Event) {
func TestCheckResultsInStrictOrder(t *testing.T, w watch.Interface, expectedEvents []watch.Event) {
for _, expectedEvent := range expectedEvents {
testCheckResult(t, w, expectedEvent)
}
}

func testCheckNoMoreResults(t *testing.T, w watch.Interface) {
func testCheckNoMoreResultsWithIgnoreFunc(t *testing.T, w watch.Interface, ignore func(watch.Event) bool) {
select {
case e := <-w.ResultChan():
if ignore == nil || !ignore(e) {
t.Errorf("Unexpected: %#v event received, expected no events", e)
}
// We consciously make the timeout short here to speed up tests.
case <-time.After(100 * time.Millisecond):
return
}
}

func TestCheckNoMoreResults(t *testing.T, w watch.Interface) {
select {
case e := <-w.ResultChan():
t.Errorf("Unexpected: %#v event received, expected no events", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1478,7 +1478,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
defer w.Stop()

// make sure we only get initial events
testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEvents(createdPods))
TestCheckResultsInStrictOrder(t, w, scenario.expectedInitialEvents(createdPods))

// make sure that the actual bookmark has at least RV >= to the expected one
if scenario.expectedInitialEventsBookmarkWithMinimalRV != nil {
Expand Down Expand Up @@ -1512,8 +1512,9 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
require.NoError(t, err, "failed to add a pod: %v")
createdPods = append(createdPods, out)
}
testCheckResultsInStrictOrder(t, w, scenario.expectedEventsAfterEstablishingWatch(createdPods))
testCheckNoMoreResults(t, w)
ignoreEventsFn := func(event watch.Event) bool { return event.Type == watch.Bookmark }
testCheckResultWithIgnoreFunc(t, w, scenario.expectedEventsAfterEstablishingWatch(createdPods), ignoreEventsFn)
testCheckNoMoreResultsWithIgnoreFunc(t, w, ignoreEventsFn)
})
}
}
Expand Down Expand Up @@ -1567,8 +1568,8 @@ func RunWatchSemanticInitialEventsExtended(ctx context.Context, t *testing.T, st

// make sure we only get initial events from the first ns
// followed by the bookmark with the global RV
testCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(initialPods, otherNsPod.ResourceVersion))
testCheckNoMoreResults(t, w)
TestCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(initialPods, otherNsPod.ResourceVersion))
TestCheckNoMoreResults(t, w)
}

func RunWatchListMatchSingle(ctx context.Context, t *testing.T, store storage.Interface) {
Expand Down Expand Up @@ -1622,8 +1623,8 @@ func RunWatchListMatchSingle(ctx context.Context, t *testing.T, store storage.In

// make sure we only get a single pod matching the field selector
// followed by the bookmark with the global RV
testCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(expectedPod, lastAddedPod.ResourceVersion))
testCheckNoMoreResults(t, w)
TestCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(expectedPod, lastAddedPod.ResourceVersion))
TestCheckNoMoreResults(t, w)
}

func makePod(namePrefix string) *example.Pod {
Expand Down

0 comments on commit 59b0e4f

Please sign in to comment.