Skip to content

Commit

Permalink
Add event correlation to client
Browse files Browse the repository at this point in the history
  • Loading branch information
derekwaynecarr committed Nov 6, 2015
1 parent 4566e03 commit 6ad7f1a
Show file tree
Hide file tree
Showing 6 changed files with 571 additions and 258 deletions.
35 changes: 31 additions & 4 deletions docs/design/event_compression.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,23 @@ Documentation for other releases can be found at

This document captures the design of event compression.


## Background

Kubernetes components can get into a state where they generate tons of events which are identical except for the timestamp. For example, when pulling a non-existing image, Kubelet will repeatedly generate `image_not_existing` and `container_is_waiting` events until upstream components correct the image. When this happens, the spam from the repeated events makes the entire event mechanism useless. It also appears to cause memory pressure in etcd (see [#3853](http://issue.k8s.io/3853)).
Kubernetes components can get into a state where they generate tons of events.

The events can be categorized in one of two ways:

1. same - the event is identical to previous events except it varies only on timestamp
2. similar - the event is identical to previous events except it varies on timestamp and message

For example, when pulling a non-existing image, Kubelet will repeatedly generate `image_not_existing` and `container_is_waiting` events until upstream components correct the image. When this happens, the spam from the repeated events makes the entire event mechanism useless. It also appears to cause memory pressure in etcd (see [#3853](http://issue.k8s.io/3853)).

The goal is introduce event counting to increment same events, and event aggregation to collapse similar events.

## Proposal

Each binary that generates events (for example, `kubelet`) should keep track of previously generated events so that it can collapse recurring events into a single event instead of creating a new instance for each new event.
Each binary that generates events (for example, `kubelet`) should keep track of previously generated events so that it can collapse recurring events into a single event instead of creating a new instance for each new event. In addition, if many similar events are
created, events should be aggregated into a single event to reduce spam.

Event compression should be best effort (not guaranteed). Meaning, in the worst case, `n` identical (minus timestamp) events may still result in `n` event entries.

Expand All @@ -61,6 +70,24 @@ Instead of a single Timestamp, each event object [contains](http://releases.k8s.
Each binary that generates events:
* Maintains a historical record of previously generated events:
* Implemented with ["Least Recently Used Cache"](https://github.com/golang/groupcache/blob/master/lru/lru.go) in [`pkg/client/record/events_cache.go`](../../pkg/client/record/events_cache.go).
* Implemented behind an `EventCorrelator` that manages two subcomponents: `EventAggregator` and `EventLogger`
* The `EventCorrelator` observes all incoming events and lets each subcomponent visit and modify the event in turn.
* The `EventAggregator` runs an aggregation function over each event. This function buckets each event based on an `aggregateKey`,
and identifies the event uniquely with a `localKey` in that bucket.
* The default aggregation function groups similar events that differ only by `event.Message`. It's `localKey` is `event.Message` and its aggregate key is produced by joining:
* `event.Source.Component`
* `event.Source.Host`
* `event.InvolvedObject.Kind`
* `event.InvolvedObject.Namespace`
* `event.InvolvedObject.Name`
* `event.InvolvedObject.UID`
* `event.InvolvedObject.APIVersion`
* `event.Reason`
* If the `EventAggregator` observes a similar event produced 10 times in a 10 minute window, it drops the event that was provided as
input and creates a new event that differs only on the message. The message denotes that this event is used to group similar events
that matched on reason. This aggregated `Event` is then used in the event processing sequence.
* The `EventLogger` observes the event out of `EventAggregation` and tracks the number of times it has observed that event previously
by incrementing a key in a cache associated with that matching event.
* The key in the cache is generated from the event object minus timestamps/count/transient fields, specifically the following events fields are used to construct a unique key for an event:
* `event.Source.Component`
* `event.Source.Host`
Expand All @@ -71,7 +98,7 @@ Each binary that generates events:
* `event.InvolvedObject.APIVersion`
* `event.Reason`
* `event.Message`
* The LRU cache is capped at 4096 events. That means if a component (e.g. kubelet) runs for a long period of time and generates tons of unique events, the previously generated events cache will not grow unchecked in memory. Instead, after 4096 unique events are generated, the oldest events are evicted from the cache.
* The LRU cache is capped at 4096 events for both `EventAggregator` and `EventLogger`. That means if a component (e.g. kubelet) runs for a long period of time and generates tons of unique events, the previously generated events cache will not grow unchecked in memory. Instead, after 4096 unique events are generated, the oldest events are evicted from the cache.
* When an event is generated, the previously generated events cache is checked (see [`pkg/client/unversioned/record/event.go`](http://releases.k8s.io/HEAD/pkg/client/unversioned/record/event.go)).
* If the key for the new event matches the key for a previously generated event (meaning all of the above fields match between the new event and some previously generated event), then the event is considered to be a duplicate and the existing event entry is updated in etcd:
* The new PUT (update) event API is called to update the existing event entry in etcd with the new last seen timestamp and count.
Expand Down
33 changes: 11 additions & 22 deletions pkg/client/record/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package record

import (
"encoding/json"
"fmt"
"math/rand"
"time"
Expand All @@ -28,7 +27,6 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/strategicpatch"
"k8s.io/kubernetes/pkg/watch"

"github.com/golang/glog"
Expand Down Expand Up @@ -107,33 +105,23 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin
// The default math/rand package functions aren't thread safe, so create a
// new Rand object for each StartRecording call.
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
var eventCache *historyCache = NewEventCache()
eventCorrelator := NewEventCorrelator(util.RealClock{})
return eventBroadcaster.StartEventWatcher(
func(event *api.Event) {
// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
eventCopy := *event
event = &eventCopy
var patch []byte
previousEvent := eventCache.getEvent(event)
updateExistingEvent := previousEvent.Count > 0
if updateExistingEvent {
// we still need to copy Name because the Patch relies on the Name to find the target event
event.Name = previousEvent.Name
event.Count = previousEvent.Count + 1

// we need to make sure the Count and LastTimestamp are the only differences between event and the eventCopy2
eventCopy2 := *event
eventCopy2.Count = 0
eventCopy2.LastTimestamp = unversioned.NewTime(time.Unix(0, 0))
newData, _ := json.Marshal(event)
oldData, _ := json.Marshal(eventCopy2)
patch, _ = strategicpatch.CreateStrategicMergePatch(oldData, newData, event)
result, err := eventCorrelator.EventCorrelate(event)
if err != nil {
util.HandleError(err)
}
if result.Skip {
return
}

tries := 0
for {
if recordEvent(sink, event, patch, updateExistingEvent, eventCache) {
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
break
}
tries++
Expand Down Expand Up @@ -167,7 +155,7 @@ func isKeyNotFoundError(err error) bool {
// was successfully recorded or discarded, false if it should be retried.
// If updateExistingEvent is false, it creates a new event, otherwise it updates
// existing event.
func recordEvent(sink EventSink, event *api.Event, patch []byte, updateExistingEvent bool, eventCache *historyCache) bool {
func recordEvent(sink EventSink, event *api.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
var newEvent *api.Event
var err error
if updateExistingEvent {
Expand All @@ -180,7 +168,8 @@ func recordEvent(sink EventSink, event *api.Event, patch []byte, updateExistingE
newEvent, err = sink.Create(event)
}
if err == nil {
eventCache.addOrUpdateEvent(newEvent)
// we need to update our event correlator with the server returned state to handle name/resourceversion
eventCorrelator.UpdateState(newEvent)
return true
}

Expand Down
66 changes: 15 additions & 51 deletions pkg/client/record/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package record
import (
"encoding/json"
"fmt"
"reflect"
"runtime"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -85,7 +83,7 @@ func OnPatchFactory(testCache map[string]*api.Event, patchEvent chan<- *api.Even
return func(event *api.Event, patch []byte) (*api.Event, error) {
cachedEvent, found := testCache[getEventKey(event)]
if !found {
return nil, fmt.Errorf("unexpected error: couldn't find Event in testCache. Try to find Event: %v", event)
return nil, fmt.Errorf("unexpected error: couldn't find Event in testCache.")
}
originalData, err := json.Marshal(cachedEvent)
if err != nil {
Expand Down Expand Up @@ -337,7 +335,7 @@ func TestEventf(t *testing.T) {

clock := &util.FakeClock{time.Now()}
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
for _, item := range table {
for index, item := range table {
clock.Step(1 * time.Second)
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
Expand All @@ -353,52 +351,17 @@ func TestEventf(t *testing.T) {
// validate event
if item.expectUpdate {
actualEvent := <-patchEvent
validateEvent(actualEvent, item.expect, t)
validateEvent(string(index), actualEvent, item.expect, t)
} else {
actualEvent := <-createEvent
validateEvent(actualEvent, item.expect, t)
validateEvent(string(index), actualEvent, item.expect, t)
}
logWatcher1.Stop()
logWatcher2.Stop()
}
sinkWatcher.Stop()
}

func validateEvent(actualEvent *api.Event, expectedEvent *api.Event, t *testing.T) (*api.Event, error) {
recvEvent := *actualEvent
expectCompression := expectedEvent.Count > 1
t.Logf("expectedEvent.Count is %d\n", expectedEvent.Count)
// Just check that the timestamp was set.
if recvEvent.FirstTimestamp.IsZero() || recvEvent.LastTimestamp.IsZero() {
t.Errorf("timestamp wasn't set: %#v", recvEvent)
}
actualFirstTimestamp := recvEvent.FirstTimestamp
actualLastTimestamp := recvEvent.LastTimestamp
if actualFirstTimestamp.Equal(actualLastTimestamp) {
if expectCompression {
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, recvEvent)
}
} else {
if expectedEvent.Count == 1 {
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, recvEvent)
}
}
// Temp clear time stamps for comparison because actual values don't matter for comparison
recvEvent.FirstTimestamp = expectedEvent.FirstTimestamp
recvEvent.LastTimestamp = expectedEvent.LastTimestamp
// Check that name has the right prefix.
if n, en := recvEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) {
t.Errorf("Name '%v' does not contain prefix '%v'", n, en)
}
recvEvent.Name = expectedEvent.Name
if e, a := expectedEvent, &recvEvent; !reflect.DeepEqual(e, a) {
t.Errorf("diff: %s", util.ObjectGoPrintDiff(e, a))
}
recvEvent.FirstTimestamp = actualFirstTimestamp
recvEvent.LastTimestamp = actualLastTimestamp
return actualEvent, nil
}

func recorderWithFakeClock(eventSource api.EventSource, eventBroadcaster EventBroadcaster, clock util.Clock) EventRecorder {
return &recorderImpl{eventSource, eventBroadcaster.(*eventBroadcasterImpl).Broadcaster, clock}
}
Expand Down Expand Up @@ -520,7 +483,8 @@ func TestLotsOfEvents(t *testing.T) {
APIVersion: "version",
}
for i := 0; i < maxQueuedEvents; i++ {
go recorder.Eventf(ref, "Reason", strconv.Itoa(i))
// we need to vary the reason to prevent aggregation
go recorder.Eventf(ref, "Reason-"+string(i), strconv.Itoa(i))
}
// Make sure no events were dropped by either of the listeners.
for i := 0; i < maxQueuedEvents; i++ {
Expand Down Expand Up @@ -605,7 +569,7 @@ func TestEventfNoNamespace(t *testing.T) {
clock := &util.FakeClock{time.Now()}
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)

for _, item := range table {
for index, item := range table {
clock.Step(1 * time.Second)
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
Expand All @@ -621,10 +585,10 @@ func TestEventfNoNamespace(t *testing.T) {
// validate event
if item.expectUpdate {
actualEvent := <-patchEvent
validateEvent(actualEvent, item.expect, t)
validateEvent(string(index), actualEvent, item.expect, t)
} else {
actualEvent := <-createEvent
validateEvent(actualEvent, item.expect, t)
validateEvent(string(index), actualEvent, item.expect, t)
}

logWatcher1.Stop()
Expand Down Expand Up @@ -878,33 +842,33 @@ func TestMultiSinkCache(t *testing.T) {
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)

sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
for _, item := range table {
for index, item := range table {
clock.Step(1 * time.Second)
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)

// validate event
if item.expectUpdate {
actualEvent := <-patchEvent
validateEvent(actualEvent, item.expect, t)
validateEvent(string(index), actualEvent, item.expect, t)
} else {
actualEvent := <-createEvent
validateEvent(actualEvent, item.expect, t)
validateEvent(string(index), actualEvent, item.expect, t)
}
}

// Another StartRecordingToSink call should start to record events with new clean cache.
sinkWatcher2 := eventBroadcaster.StartRecordingToSink(&testEvents2)
for _, item := range table {
for index, item := range table {
clock.Step(1 * time.Second)
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)

// validate event
if item.expectUpdate {
actualEvent := <-patchEvent2
validateEvent(actualEvent, item.expect, t)
validateEvent(string(index), actualEvent, item.expect, t)
} else {
actualEvent := <-createEvent2
validateEvent(actualEvent, item.expect, t)
validateEvent(string(index), actualEvent, item.expect, t)
}
}

Expand Down
Loading

0 comments on commit 6ad7f1a

Please sign in to comment.