diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go index 191a991cc7d4e..77291b036f00e 100644 --- a/pkg/client/record/event.go +++ b/pkg/client/record/event.go @@ -32,10 +32,7 @@ import ( "github.com/golang/glog" ) -const ( - maxQueuedEvents = 1000 - maxTriesPerEvent = 10 -) +const maxTriesPerEvent = 10 var ( minSleep = float64(1 * time.Second) @@ -56,47 +53,26 @@ type EventRecorder interface { // or used to stop recording, if desired. // TODO: make me an object with parameterizable queue length and retry interval func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interface { - // Set up our own personal buffer of events so that we can clear out GetEvents' - // broadcast channel as quickly as possible to avoid causing the relatively more - // important event-producing goroutines from blocking while trying to insert events. - eventQueue := make(chan *api.Event, maxQueuedEvents) - - // Run a function in the background that grabs events off the queue and tries - // to record them, retrying as appropriate to try to avoid dropping any. - go func() { - defer util.HandleCrash() - for event := range eventQueue { - tries := 0 - for { - if recordEvent(recorder, event) { - break - } - tries++ - if tries >= maxTriesPerEvent { - glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) - break - } - sleepDuration := time.Duration( - math.Min(maxSleep, minSleep*math.Pow(backoffExp, float64(tries-1)))) - time.Sleep(wait.Jitter(sleepDuration, 0.5)) - } - } - }() - - // Finally, kick off the watcher that takes events from the channel and puts them - // onto the queue. return GetEvents(func(event *api.Event) { // Make a copy before modification, because there could be multiple listeners. // Events are safe to copy like this. eventCopy := *event event = &eventCopy event.Source = source - // Drop new events rather than old ones because the old ones may contain - // some information explaining why everything is so backed up. - if len(eventQueue) == maxQueuedEvents { - glog.Errorf("Unable to write event '%#v' (event buffer full!)", event) - } else { - eventQueue <- event + + tries := 0 + for { + if recordEvent(recorder, event) { + break + } + tries++ + if tries >= maxTriesPerEvent { + glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) + break + } + sleepDuration := time.Duration( + math.Min(maxSleep, minSleep*math.Pow(backoffExp, float64(tries-1)))) + time.Sleep(wait.Jitter(sleepDuration, 0.5)) } }) } @@ -163,9 +139,9 @@ func GetEvents(f func(*api.Event)) watch.Interface { return w } -const queueLen = 1000 +const maxQueuedEvents = 1000 -var events = watch.NewBroadcaster(queueLen) +var events = watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull) // Event constructs an event from the given information and puts it in the queue for sending. // 'object' is the object this event is about. Event will make a reference-- or you may also diff --git a/pkg/registry/registrytest/generic.go b/pkg/registry/registrytest/generic.go index 28a0d66b5ebf0..46776a786ecc6 100644 --- a/pkg/registry/registrytest/generic.go +++ b/pkg/registry/registrytest/generic.go @@ -39,7 +39,7 @@ type GenericRegistry struct { func NewGeneric(list runtime.Object) *GenericRegistry { return &GenericRegistry{ ObjectList: list, - Broadcaster: watch.NewBroadcaster(0), + Broadcaster: watch.NewBroadcaster(0, watch.WaitIfChannelFull), } } diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index 6d60d30f62bd0..571b61074b270 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -36,7 +36,7 @@ type PodRegistry struct { func NewPodRegistry(pods *api.PodList) *PodRegistry { return &PodRegistry{ Pods: pods, - broadcaster: watch.NewBroadcaster(0), + broadcaster: watch.NewBroadcaster(0, watch.WaitIfChannelFull), } } diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index e4d8bef16051e..3a10f08ca635a 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -22,6 +22,15 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) +// FullChannelBehavior controls how the Broadcaster reacts if a watcher's watch +// channel is full. +type FullChannelBehavior int + +const ( + WaitIfChannelFull = iota + DropIfChannelFull = iota +) + // Broadcaster distributes event notifications among any number of watchers. Every event // is delivered to every watcher. type Broadcaster struct { @@ -31,17 +40,27 @@ type Broadcaster struct { nextWatcher int64 incoming chan Event + + // How large to make watcher's channel. + watchQueueLength int + // If one of the watch channels is full, don't wait for it to become empty. + // Instead just deliver it to the watchers that do have space in their + // channels and move on to the next event. + // It's more fair to do this on a per-watcher basis than to do it on the + // "incoming" channel, which would allow one slow watcher to prevent all + // other watchers from getting new events. + fullChannelBehavior FullChannelBehavior } -// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue. -// When queueLength is 0, Action will block until any prior event has been -// completely distributed. It is guaranteed that events will be distibuted in the -// order in which they ocurr, but the order in which a single event is distributed -// among all of the watchers is unspecified. -func NewBroadcaster(queueLength int) *Broadcaster { +// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher. +// It is guaranteed that events will be distibuted in the order in which they ocur, +// but the order in which a single event is distributed among all of the watchers is unspecified. +func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster { m := &Broadcaster{ - watchers: map[int64]*broadcasterWatcher{}, - incoming: make(chan Event, queueLength), + watchers: map[int64]*broadcasterWatcher{}, + incoming: make(chan Event), + watchQueueLength: queueLength, + fullChannelBehavior: fullChannelBehavior, } go m.loop() return m @@ -56,7 +75,7 @@ func (m *Broadcaster) Watch() Interface { id := m.nextWatcher m.nextWatcher++ w := &broadcasterWatcher{ - result: make(chan Event), + result: make(chan Event, m.watchQueueLength), stopped: make(chan struct{}), id: id, m: m, @@ -119,10 +138,20 @@ func (m *Broadcaster) loop() { func (m *Broadcaster) distribute(event Event) { m.lock.Lock() defer m.lock.Unlock() - for _, w := range m.watchers { - select { - case w.result <- event: - case <-w.stopped: + if m.fullChannelBehavior == DropIfChannelFull { + for _, w := range m.watchers { + select { + case w.result <- event: + case <-w.stopped: + default: // Don't block if the event can't be queued. + } + } + } else { + for _, w := range m.watchers { + select { + case w.result <- event: + case <-w.stopped: + } } } } diff --git a/pkg/watch/mux_test.go b/pkg/watch/mux_test.go index 895846187178c..2cc548c62fcc1 100644 --- a/pkg/watch/mux_test.go +++ b/pkg/watch/mux_test.go @@ -39,7 +39,7 @@ func TestBroadcaster(t *testing.T) { } // The broadcaster we're testing - m := NewBroadcaster(0) + m := NewBroadcaster(0, WaitIfChannelFull) // Add a bunch of watchers const testWatchers = 2 @@ -77,7 +77,7 @@ func TestBroadcaster(t *testing.T) { } func TestBroadcasterWatcherClose(t *testing.T) { - m := NewBroadcaster(0) + m := NewBroadcaster(0, WaitIfChannelFull) w := m.Watch() w2 := m.Watch() w.Stop() @@ -95,7 +95,7 @@ func TestBroadcasterWatcherClose(t *testing.T) { func TestBroadcasterWatcherStopDeadlock(t *testing.T) { done := make(chan bool) - m := NewBroadcaster(0) + m := NewBroadcaster(0, WaitIfChannelFull) go func(w0, w1 Interface) { // We know Broadcaster is in the distribute loop once one watcher receives // an event. Stop the other watcher while distribute is trying to