Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send bookmark right now after sending all items in watchCache store #127012

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,10 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime)
}

// send bookmark after sending all events in cacheInterval for watchlist request
if cacheInterval.initialEventsEndBookmark != nil {
c.sendWatchCacheEvent(cacheInterval.initialEventsEndBookmark)
}
c.process(ctx, resourceVersion)
}

Expand Down
22 changes: 22 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return newErrWatcher(err), nil
}

c.setInitialEventsEndBookmarkIfRequested(cacheInterval, opts, c.watchCache.resourceVersion)

addedWatcher := false
func() {
c.Lock()
Expand Down Expand Up @@ -1439,6 +1441,26 @@ func (c *Cacher) Wait(ctx context.Context) error {
return c.ready.wait(ctx)
}

// setInitialEventsEndBookmarkIfRequested sets initialEventsEndBookmark field in watchCacheInterval for watchlist request
func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCacheInterval, opts storage.ListOptions, currentResourceVersion uint64) {
if opts.SendInitialEvents != nil && *opts.SendInitialEvents && opts.Predicate.AllowWatchBookmarks {
// We don't need to set the InitialEventsAnnotation for this bookmark event,
// because this will be automatically set during event conversion in cacheWatcher.convertToWatchEvent method
initialEventsEndBookmark := &watchCacheEvent{
Type: watch.Bookmark,
Object: c.newFunc(),
ResourceVersion: currentResourceVersion,
}

wojtek-t marked this conversation as resolved.
Show resolved Hide resolved
if err := c.versioner.UpdateObject(initialEventsEndBookmark.Object, initialEventsEndBookmark.ResourceVersion); err != nil {
klog.Errorf("failure to set resourceVersion to %d on initialEventsEndBookmark event %+v for watchlist request and wait for bookmark trigger to send", initialEventsEndBookmark.ResourceVersion, initialEventsEndBookmark.Object)
initialEventsEndBookmark = nil
}

cacheInterval.initialEventsEndBookmark = initialEventsEndBookmark
}
}

wojtek-t marked this conversation as resolved.
Show resolved Hide resolved
// errWatcher implements watch.Interface to return a single error
type errWatcher struct {
result chan watch.Event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"k8s.io/apiserver/pkg/storage/cacher/metrics"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
k8smetrics "k8s.io/component-base/metrics"
Expand Down Expand Up @@ -1171,6 +1172,106 @@ func TestCacherSendBookmarkEvents(t *testing.T) {
}
}

func TestInitialEventsEndBookmark(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)
forceRequestWatchProgressSupport(t)

backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()

if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}

makePod := func(index uint64) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", index),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%v", 100+index),
},
}
}

numberOfPods := 3
var expectedPodEvents []watch.Event
for i := 1; i <= numberOfPods; i++ {
pod := makePod(uint64(i))
if err := cacher.watchCache.Add(pod); err != nil {
t.Fatalf("failed to add a pod: %v", err)
}
expectedPodEvents = append(expectedPodEvents, watch.Event{Type: watch.Added, Object: pod})
}
var currentResourceVersion uint64 = 100 + 3

trueVal, falseVal := true, false

scenarios := []struct {
name string
allowWatchBookmarks bool
sendInitialEvents *bool
}{
{
name: "allowWatchBookmarks=false, sendInitialEvents=false",
allowWatchBookmarks: false,
sendInitialEvents: &falseVal,
},
{
name: "allowWatchBookmarks=false, sendInitialEvents=true",
allowWatchBookmarks: false,
sendInitialEvents: &trueVal,
},
{
name: "allowWatchBookmarks=true, sendInitialEvents=true",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
},
{
name: "allowWatchBookmarks=true, sendInitialEvents=false",
allowWatchBookmarks: true,
sendInitialEvents: &falseVal,
},
{
name: "allowWatchBookmarks=false, sendInitialEvents=nil",
allowWatchBookmarks: true,
},
}

for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
expectedWatchEvents := expectedPodEvents
if scenario.allowWatchBookmarks && scenario.sendInitialEvents != nil && *scenario.sendInitialEvents {
expectedWatchEvents = append(expectedWatchEvents, watch.Event{
Type: watch.Bookmark,
Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: strconv.FormatUint(currentResourceVersion, 10),
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
},
},
})
}

pred := storage.Everything
pred.AllowWatchBookmarks = scenario.allowWatchBookmarks
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "100", SendInitialEvents: scenario.sendInitialEvents, Predicate: pred})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
storagetesting.TestCheckResultsInStrictOrder(t, w, expectedWatchEvents)
storagetesting.TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil)
})
}
}

func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ type watchCacheInterval struct {
// Given that indexer and indexValidator only read state, if
// possible, Locker obtained through RLocker() is provided.
lock sync.Locker

// initialEventsEndBookmark will be sent after sending all events in cacheInterval
initialEventsEndBookmark *watchCacheEvent
}

type attrFunc func(runtime.Object) (labels.Set, fields.Set, error)
Expand Down
38 changes: 35 additions & 3 deletions staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,36 @@ func testCheckResultFunc(t *testing.T, w watch.Interface, check func(actualEvent
}
}

func testCheckResultWithIgnoreFunc(t *testing.T, w watch.Interface, expectedEvents []watch.Event, ignore func(watch.Event) bool) {
checkIndex := 0
for {
select {
case event := <-w.ResultChan():
obj := event.Object
if co, ok := obj.(runtime.CacheableObject); ok {
event.Object = co.GetObject()
}
if ignore != nil && ignore(event) {
continue
}
if checkIndex < len(expectedEvents) {
expectNoDiff(t, "incorrect event", expectedEvents[checkIndex], event)
checkIndex++
} else {
t.Fatalf("cannot receive correct event, expect no event, but get a event: %+v", event)
}
case <-time.After(100 * time.Millisecond):
// wait 100ms forcibly in order to receive watchEvents including bookmark event.
// we cannot guarantee that we will receive all bookmark events within 100ms,
// but too large timeout value will lead to exceed the timeout of package test.
if checkIndex < len(expectedEvents) {
t.Fatalf("cannot receive enough events within specific time, rest expected events: %+v", expectedEvents[checkIndex:])
}
return
}
}
}

func testCheckStop(t *testing.T, w watch.Interface) {
select {
case e, ok := <-w.ResultChan():
Expand All @@ -187,16 +217,18 @@ func testCheckStop(t *testing.T, w watch.Interface) {
}
}

func testCheckResultsInStrictOrder(t *testing.T, w watch.Interface, expectedEvents []watch.Event) {
func TestCheckResultsInStrictOrder(t *testing.T, w watch.Interface, expectedEvents []watch.Event) {
for _, expectedEvent := range expectedEvents {
testCheckResult(t, w, expectedEvent)
}
}

func testCheckNoMoreResults(t *testing.T, w watch.Interface) {
func TestCheckNoMoreResultsWithIgnoreFunc(t *testing.T, w watch.Interface, ignore func(watch.Event) bool) {
select {
case e := <-w.ResultChan():
t.Errorf("Unexpected: %#v event received, expected no events", e)
if ignore == nil || !ignore(e) {
t.Errorf("Unexpected: %#v event received, expected no events", e)
}
// We consciously make the timeout short here to speed up tests.
case <-time.After(100 * time.Millisecond):
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1478,7 +1478,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
defer w.Stop()

// make sure we only get initial events
testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEvents(createdPods))
TestCheckResultsInStrictOrder(t, w, scenario.expectedInitialEvents(createdPods))

// make sure that the actual bookmark has at least RV >= to the expected one
if scenario.expectedInitialEventsBookmarkWithMinimalRV != nil {
Expand Down Expand Up @@ -1512,8 +1512,9 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
require.NoError(t, err, "failed to add a pod: %v")
createdPods = append(createdPods, out)
}
testCheckResultsInStrictOrder(t, w, scenario.expectedEventsAfterEstablishingWatch(createdPods))
testCheckNoMoreResults(t, w)
ignoreEventsFn := func(event watch.Event) bool { return event.Type == watch.Bookmark }
testCheckResultWithIgnoreFunc(t, w, scenario.expectedEventsAfterEstablishingWatch(createdPods), ignoreEventsFn)
TestCheckNoMoreResultsWithIgnoreFunc(t, w, ignoreEventsFn)
})
}
}
Expand Down Expand Up @@ -1567,8 +1568,8 @@ func RunWatchSemanticInitialEventsExtended(ctx context.Context, t *testing.T, st

// make sure we only get initial events from the first ns
// followed by the bookmark with the global RV
testCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(initialPods, otherNsPod.ResourceVersion))
testCheckNoMoreResults(t, w)
TestCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(initialPods, otherNsPod.ResourceVersion))
TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil)
}

func RunWatchListMatchSingle(ctx context.Context, t *testing.T, store storage.Interface) {
Expand Down Expand Up @@ -1622,8 +1623,8 @@ func RunWatchListMatchSingle(ctx context.Context, t *testing.T, store storage.In

// make sure we only get a single pod matching the field selector
// followed by the bookmark with the global RV
testCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(expectedPod, lastAddedPod.ResourceVersion))
testCheckNoMoreResults(t, w)
TestCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(expectedPod, lastAddedPod.ResourceVersion))
TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil)
}

func makePod(namePrefix string) *example.Pod {
Expand Down