Skip to content

Commit

Permalink
Merge pull request #120729 from pohly/events-context
Browse files Browse the repository at this point in the history
k8s.io/client-go/tools/[events|record]: support context
  • Loading branch information
k8s-ci-robot authored Oct 4, 2023
2 parents 56f3304 + 5dc540f commit f936f69
Show file tree
Hide file tree
Showing 12 changed files with 353 additions and 134 deletions.
2 changes: 2 additions & 0 deletions hack/logcheck.conf
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ structured k8s.io/apiserver/pkg/server/options/encryptionconfig/.*
# TODO next: contextual k8s.io/kubernetes/pkg/scheduler/.*
# A few files involved in startup migrated already to contextual
# We can't enable contextual logcheck until all are migrated
contextual k8s.io/client-go/tools/events/.*
contextual k8s.io/client-go/tools/record/.*
contextual k8s.io/dynamic-resource-allocation/.*
contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
contextual k8s.io/kubernetes/pkg/controller/.*
Expand Down
117 changes: 68 additions & 49 deletions staging/src/k8s.io/client-go/tools/events/event_broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,27 +81,27 @@ type EventSinkImpl struct {
}

// Create takes the representation of a event and creates it. Returns the server's representation of the event, and an error, if there is any.
func (e *EventSinkImpl) Create(event *eventsv1.Event) (*eventsv1.Event, error) {
func (e *EventSinkImpl) Create(ctx context.Context, event *eventsv1.Event) (*eventsv1.Event, error) {
if event.Namespace == "" {
return nil, fmt.Errorf("can't create an event with empty namespace")
}
return e.Interface.Events(event.Namespace).Create(context.TODO(), event, metav1.CreateOptions{})
return e.Interface.Events(event.Namespace).Create(ctx, event, metav1.CreateOptions{})
}

// Update takes the representation of a event and updates it. Returns the server's representation of the event, and an error, if there is any.
func (e *EventSinkImpl) Update(event *eventsv1.Event) (*eventsv1.Event, error) {
func (e *EventSinkImpl) Update(ctx context.Context, event *eventsv1.Event) (*eventsv1.Event, error) {
if event.Namespace == "" {
return nil, fmt.Errorf("can't update an event with empty namespace")
}
return e.Interface.Events(event.Namespace).Update(context.TODO(), event, metav1.UpdateOptions{})
return e.Interface.Events(event.Namespace).Update(ctx, event, metav1.UpdateOptions{})
}

// Patch applies the patch and returns the patched event, and an error, if there is any.
func (e *EventSinkImpl) Patch(event *eventsv1.Event, data []byte) (*eventsv1.Event, error) {
func (e *EventSinkImpl) Patch(ctx context.Context, event *eventsv1.Event, data []byte) (*eventsv1.Event, error) {
if event.Namespace == "" {
return nil, fmt.Errorf("can't patch an event with empty namespace")
}
return e.Interface.Events(event.Namespace).Patch(context.TODO(), event.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
return e.Interface.Events(event.Namespace).Patch(ctx, event.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
}

// NewBroadcaster Creates a new event broadcaster.
Expand All @@ -124,13 +124,13 @@ func (e *eventBroadcasterImpl) Shutdown() {
}

// refreshExistingEventSeries refresh events TTL
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
func (e *eventBroadcasterImpl) refreshExistingEventSeries(ctx context.Context) {
// TODO: Investigate whether lock contention won't be a problem
e.mu.Lock()
defer e.mu.Unlock()
for isomorphicKey, event := range e.eventCache {
if event.Series != nil {
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
if recordedEvent, retry := recordEvent(ctx, e.sink, event); !retry {
if recordedEvent != nil {
e.eventCache[isomorphicKey] = recordedEvent
}
Expand All @@ -142,15 +142,15 @@ func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
// finishSeries checks if a series has ended and either:
// - write final count to the apiserver
// - delete a singleton event (i.e. series field is nil) from the cache
func (e *eventBroadcasterImpl) finishSeries() {
func (e *eventBroadcasterImpl) finishSeries(ctx context.Context) {
// TODO: Investigate whether lock contention won't be a problem
e.mu.Lock()
defer e.mu.Unlock()
for isomorphicKey, event := range e.eventCache {
eventSerie := event.Series
if eventSerie != nil {
if eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) {
if _, retry := recordEvent(e.sink, event); !retry {
if _, retry := recordEvent(ctx, e.sink, event); !retry {
delete(e.eventCache, isomorphicKey)
}
}
Expand All @@ -161,13 +161,13 @@ func (e *eventBroadcasterImpl) finishSeries() {
}

// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder {
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorderLogger {
hostname, _ := os.Hostname()
reportingInstance := reportingController + "-" + hostname
return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}
return &recorderImplLogger{recorderImpl: &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}, logger: klog.Background()}
}

func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.Clock) {
func (e *eventBroadcasterImpl) recordToSink(ctx context.Context, event *eventsv1.Event, clock clock.Clock) {
// Make a copy before modification, because there could be multiple listeners.
eventCopy := event.DeepCopy()
go func() {
Expand Down Expand Up @@ -197,48 +197,53 @@ func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.C
}()
if evToRecord != nil {
// TODO: Add a metric counting the number of recording attempts
e.attemptRecording(evToRecord)
e.attemptRecording(ctx, evToRecord)
// We don't want the new recorded Event to be reflected in the
// client's cache because server-side mutations could mess with the
// aggregation mechanism used by the client.
}
}()
}

func (e *eventBroadcasterImpl) attemptRecording(event *eventsv1.Event) *eventsv1.Event {
func (e *eventBroadcasterImpl) attemptRecording(ctx context.Context, event *eventsv1.Event) {
tries := 0
for {
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
return recordedEvent
if _, retry := recordEvent(ctx, e.sink, event); !retry {
return
}
tries++
if tries >= maxTriesPerEvent {
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
return nil
klog.FromContext(ctx).Error(nil, "Unable to write event (retry limit exceeded!)", "event", event)
return
}
// Randomize sleep so that various clients won't all be
// synced up if the master goes down.
time.Sleep(wait.Jitter(e.sleepDuration, 0.25))
// synced up if the master goes down. Give up when
// the context is canceled.
select {
case <-ctx.Done():
return
case <-time.After(wait.Jitter(e.sleepDuration, 0.25)):
}
}
}

func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) {
func recordEvent(ctx context.Context, sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) {
var newEvent *eventsv1.Event
var err error
isEventSeries := event.Series != nil
if isEventSeries {
patch, patchBytesErr := createPatchBytesForSeries(event)
if patchBytesErr != nil {
klog.Errorf("Unable to calculate diff, no merge is possible: %v", patchBytesErr)
klog.FromContext(ctx).Error(patchBytesErr, "Unable to calculate diff, no merge is possible")
return nil, false
}
newEvent, err = sink.Patch(event, patch)
newEvent, err = sink.Patch(ctx, event, patch)
}
// Update can fail because the event may have been removed and it no longer exists.
if !isEventSeries || (isEventSeries && util.IsKeyNotFoundError(err)) {
// Making sure that ResourceVersion is empty on creation
event.ResourceVersion = ""
newEvent, err = sink.Create(event)
newEvent, err = sink.Create(ctx, event)
}
if err == nil {
return newEvent, false
Expand All @@ -248,7 +253,7 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool)
switch err.(type) {
case *restclient.RequestConstructionError:
// We will construct the request the same next time, so don't keep trying.
klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).Error(err, "Unable to construct event (will not retry!)", "event", event)
return nil, false
case *errors.StatusError:
if errors.IsAlreadyExists(err) {
Expand All @@ -260,9 +265,9 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool)
if isEventSeries {
return nil, true
}
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).V(5).Info("Server rejected event (will not retry!)", "event", event, "err", err)
} else {
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).Error(err, "Server rejected event (will not retry!)", "event", event)
}
return nil, false
case *errors.UnexpectedObjectError:
Expand All @@ -271,7 +276,7 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool)
default:
// This case includes actual http transport errors. Go ahead and retry.
}
klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
klog.FromContext(ctx).Error(err, "Unable to write event (may retry after sleeping)")
return nil, true
}

Expand Down Expand Up @@ -307,29 +312,38 @@ func getKey(event *eventsv1.Event) eventKey {
// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function.
// The return value can be ignored or used to stop recording, if desired.
// TODO: this function should also return an error.
//
// Deprecated: use StartLogging instead.
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func() {
stopWatcher, err := e.StartEventWatcher(
logger := klog.Background().V(int(verbosity))
stopWatcher, err := e.StartLogging(logger)
if err != nil {
logger.Error(err, "Failed to start event watcher")
return func() {}
}
return stopWatcher
}

// StartLogging starts sending events received from this EventBroadcaster to the structured logger.
// To adjust verbosity, use the logger's V method (i.e. pass `logger.V(3)` instead of `logger`).
// The returned function can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartLogging(logger klog.Logger) (func(), error) {
return e.StartEventWatcher(
func(obj runtime.Object) {
event, ok := obj.(*eventsv1.Event)
if !ok {
klog.Errorf("unexpected type, expected eventsv1.Event")
logger.Error(nil, "unexpected type, expected eventsv1.Event")
return
}
klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note)
logger.Info("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note)
})
if err != nil {
klog.Errorf("failed to start event watcher: '%v'", err)
return func() {}
}
return stopWatcher
}

// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value is used to stop recording
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) (func(), error) {
watcher, err := e.Watch()
if err != nil {
klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
return nil, err
}
go func() {
Expand All @@ -345,37 +359,42 @@ func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime
return watcher.Stop, nil
}

func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) error {
func (e *eventBroadcasterImpl) startRecordingEvents(ctx context.Context) error {
eventHandler := func(obj runtime.Object) {
event, ok := obj.(*eventsv1.Event)
if !ok {
klog.Errorf("unexpected type, expected eventsv1.Event")
klog.FromContext(ctx).Error(nil, "unexpected type, expected eventsv1.Event")
return
}
e.recordToSink(event, clock.RealClock{})
e.recordToSink(ctx, event, clock.RealClock{})
}
stopWatcher, err := e.StartEventWatcher(eventHandler)
if err != nil {
return err
}
go func() {
<-stopCh
<-ctx.Done()
stopWatcher()
}()
return nil
}

// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
// Deprecated: use StartRecordingToSinkWithContext instead.
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
go wait.Until(e.finishSeries, finishTime, stopCh)
err := e.startRecordingEvents(stopCh)
err := e.StartRecordingToSinkWithContext(wait.ContextForChannel(stopCh))
if err != nil {
klog.Errorf("unexpected type, expected eventsv1.Event")
return
klog.Background().Error(err, "Failed to start recording to sink")
}
}

// StartRecordingToSinkWithContext starts sending events received from the specified eventBroadcaster to the given sink.
func (e *eventBroadcasterImpl) StartRecordingToSinkWithContext(ctx context.Context) error {
go wait.UntilWithContext(ctx, e.refreshExistingEventSeries, refreshTime)
go wait.UntilWithContext(ctx, e.finishSeries, finishTime)
return e.startRecordingEvents(ctx)
}

type eventBroadcasterAdapterImpl struct {
coreClient typedv1core.EventsGetter
coreBroadcaster record.EventBroadcaster
Expand Down Expand Up @@ -409,14 +428,14 @@ func (e *eventBroadcasterAdapterImpl) StartRecordingToSink(stopCh <-chan struct{
}
}

func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorder {
func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorderLogger {
if e.eventsv1Broadcaster != nil && e.eventsv1Client != nil {
return e.eventsv1Broadcaster.NewRecorder(scheme.Scheme, name)
}
return record.NewEventRecorderAdapter(e.DeprecatedNewLegacyRecorder(name))
}

func (e *eventBroadcasterAdapterImpl) DeprecatedNewLegacyRecorder(name string) record.EventRecorder {
func (e *eventBroadcasterAdapterImpl) DeprecatedNewLegacyRecorder(name string) record.EventRecorderLogger {
return e.coreBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: name})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
eventsv1 "k8s.io/api/events/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/ktesting"
)

func TestRecordEventToSink(t *testing.T) {
Expand Down Expand Up @@ -78,11 +79,12 @@ func TestRecordEventToSink(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
kubeClient := fake.NewSimpleClientset()
eventSink := &EventSinkImpl{Interface: kubeClient.EventsV1()}

for _, ev := range tc.eventsToRecord {
recordEvent(eventSink, &ev)
recordEvent(ctx, eventSink, &ev)
}

recordedEvents, err := kubeClient.EventsV1().Events(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{})
Expand Down
27 changes: 24 additions & 3 deletions staging/src/k8s.io/client-go/tools/events/event_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,45 @@ type recorderImpl struct {
clock clock.Clock
}

var _ EventRecorder = &recorderImpl{}

func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
recorder.eventf(klog.Background(), regarding, related, eventtype, reason, action, note, args...)
}

type recorderImplLogger struct {
*recorderImpl
logger klog.Logger
}

var _ EventRecorderLogger = &recorderImplLogger{}

func (recorder *recorderImplLogger) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
recorder.eventf(recorder.logger, regarding, related, eventtype, reason, action, note, args...)
}

func (recorder *recorderImplLogger) WithLogger(logger klog.Logger) EventRecorderLogger {
return &recorderImplLogger{recorderImpl: recorder.recorderImpl, logger: logger}
}

func (recorder *recorderImpl) eventf(logger klog.Logger, regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
timestamp := metav1.MicroTime{Time: time.Now()}
message := fmt.Sprintf(note, args...)
refRegarding, err := reference.GetReference(recorder.scheme, regarding)
if err != nil {
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", regarding, err, eventtype, reason, message)
logger.Error(err, "Could not construct reference, will not report event", "object", regarding, "eventType", eventtype, "reason", reason, "message", message)
return
}

var refRelated *v1.ObjectReference
if related != nil {
refRelated, err = reference.GetReference(recorder.scheme, related)
if err != nil {
klog.V(9).Infof("Could not construct reference to: '%#v' due to: '%v'.", related, err)
logger.V(9).Info("Could not construct reference", "object", related, "err", err)
}
}
if !util.ValidateEventType(eventtype) {
klog.Errorf("Unsupported event type: '%v'", eventtype)
logger.Error(nil, "Unsupported event type", "eventType", eventtype)
return
}
event := recorder.makeEvent(refRegarding, refRelated, timestamp, eventtype, reason, message, recorder.reportingController, recorder.reportingInstance, action)
Expand Down
Loading

0 comments on commit f936f69

Please sign in to comment.