Skip to content

Commit

Permalink
Optimize etcd storage by compressing recurring events in to a single …
Browse files Browse the repository at this point in the history
…event
  • Loading branch information
saad-ali committed Feb 12, 2015
1 parent 52bf48c commit 033577e
Show file tree
Hide file tree
Showing 8 changed files with 534 additions and 37 deletions.
21 changes: 21 additions & 0 deletions pkg/client/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type EventNamespacer interface {
// EventInterface has methods to work with Event resources
type EventInterface interface {
Create(event *api.Event) (*api.Event, error)
Update(event *api.Event) (*api.Event, error)
List(label, field labels.Selector) (*api.EventList, error)
Get(name string) (*api.Event, error)
Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error)
Expand Down Expand Up @@ -73,6 +74,26 @@ func (e *events) Create(event *api.Event) (*api.Event, error) {
return result, err
}

// Update modifies an existing event. It returns the copy of the event that the server returns,
// or an error. The namespace and key to update the event within is deduced from the event. The
// namespace must either match this event client's namespace, or this event client must have been
// created with the "" namespace. Update also requires the ResourceVersion to be set in the event
// object.
func (e *events) Update(event *api.Event) (*api.Event, error) {
if len(event.ResourceVersion) == 0 {
return nil, fmt.Errorf("invalid event update object, missing resource version: %#v", event)
}
result := &api.Event{}
err := e.client.Put().
Namespace(event.Namespace).
Resource("events").
Name(event.Name).
Body(event).
Do().
Into(result)
return result, err
}

// List returns a list of events matching the selectors.
func (e *events) List(label, field labels.Selector) (*api.EventList, error) {
result := &api.EventList{}
Expand Down
6 changes: 6 additions & 0 deletions pkg/client/fake_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func (c *FakeEvents) Create(event *api.Event) (*api.Event, error) {
return &api.Event{}, nil
}

// Update replaces an existing event. Returns the copy of the event the server returns, or an error.
func (c *FakeEvents) Update(event *api.Event) (*api.Event, error) {
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "update-event", Value: event.Name})
return &api.Event{}, nil
}

// List returns a list of events matching the selectors.
func (c *FakeEvents) List(label, field labels.Selector) (*api.EventList, error) {
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "list-events"})
Expand Down
26 changes: 23 additions & 3 deletions pkg/client/record/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var sleepDuration = 10 * time.Second
// pkg/client's REST client.
type EventRecorder interface {
Create(event *api.Event) (*api.Event, error)
Update(event *api.Event) (*api.Event, error)
}

// StartRecording starts sending events to recorder. Call once while initializing
Expand All @@ -58,9 +59,18 @@ func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interf
event = &eventCopy
event.Source = source

previousEvent := GetEvent(event)
updateExistingEvent := previousEvent.Count > 0
if updateExistingEvent {
event.Count = previousEvent.Count + 1
event.FirstTimestamp = previousEvent.FirstTimestamp
event.Name = previousEvent.Name
event.ResourceVersion = previousEvent.ResourceVersion
}

tries := 0
for {
if recordEvent(recorder, event) {
if recordEvent(recorder, event, updateExistingEvent) {
break
}
tries++
Expand All @@ -81,11 +91,21 @@ func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interf

// recordEvent attempts to write event to recorder. It returns true if the event
// was successfully recorded or discarded, false if it should be retried.
func recordEvent(recorder EventRecorder, event *api.Event) bool {
_, err := recorder.Create(event)
// If updateExistingEvent is false, it creates a new event, otherwise it updates
// existing event.
func recordEvent(recorder EventRecorder, event *api.Event, updateExistingEvent bool) bool {
var newEvent *api.Event
var err error
if updateExistingEvent {
newEvent, err = recorder.Update(event)
} else {
newEvent, err = recorder.Create(event)
}
if err == nil {
AddOrUpdateEvent(newEvent)
return true
}

// If we can't contact the server, then hold everything while we keep trying.
// Otherwise, something about the event is malformed and we should abandon it.
giveUp := false
Expand Down
Loading

0 comments on commit 033577e

Please sign in to comment.