diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go index df73972b9b575..dc9bda1b14566 100644 --- a/pkg/client/record/event.go +++ b/pkg/client/record/event.go @@ -33,7 +33,7 @@ import ( const maxTriesPerEvent = 12 -var sleepDuration = time.Duration(10 * time.Second) +var sleepDuration = 10 * time.Second // EventRecorder knows how to store events (client.Client implements it.) // EventRecorder must respect the namespace that will be embedded in 'event'. @@ -48,6 +48,9 @@ type EventRecorder interface { // or used to stop recording, if desired. // TODO: make me an object with parameterizable queue length and retry interval func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interface { + // The default math/rand package functions aren't thread safe, so create a + // new Rand object for each StartRecording call. + randGen := rand.New(rand.NewSource(time.Now().UnixNano())) return GetEvents(func(event *api.Event) { // Make a copy before modification, because there could be multiple listeners. // Events are safe to copy like this. @@ -68,7 +71,7 @@ func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interf // Randomize the first sleep so that various clients won't all be // synced up if the master goes down. if tries == 1 { - time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) + time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64())) } else { time.Sleep(sleepDuration) } diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index 3a10f08ca635a..ed5c09e512902 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -27,10 +27,15 @@ import ( type FullChannelBehavior int const ( - WaitIfChannelFull = iota - DropIfChannelFull = iota + WaitIfChannelFull FullChannelBehavior = iota + DropIfChannelFull ) +// Buffer the incoming queue a little bit even though it should rarely ever accumulate +// anything, just in case a few events are received in such a short window that +// Broadcaster can't move them onto the watchers' queues fast enough. +const incomingQueueLength = 25 + // Broadcaster distributes event notifications among any number of watchers. Every event // is delivered to every watcher. type Broadcaster struct { @@ -58,7 +63,7 @@ type Broadcaster struct { func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster { m := &Broadcaster{ watchers: map[int64]*broadcasterWatcher{}, - incoming: make(chan Event), + incoming: make(chan Event, incomingQueueLength), watchQueueLength: queueLength, fullChannelBehavior: fullChannelBehavior, } diff --git a/pkg/watch/mux_test.go b/pkg/watch/mux_test.go index 2cc548c62fcc1..a662605efbe42 100644 --- a/pkg/watch/mux_test.go +++ b/pkg/watch/mux_test.go @@ -116,3 +116,52 @@ func TestBroadcasterWatcherStopDeadlock(t *testing.T) { } m.Shutdown() } + +func TestBroadcasterDropIfChannelFull(t *testing.T) { + m := NewBroadcaster(1, DropIfChannelFull) + + event1 := Event{Added, &myType{"foo", "hello world 1"}} + event2 := Event{Added, &myType{"bar", "hello world 2"}} + + // Add a couple watchers + const testWatchers = 2 + watches := make([]Interface, testWatchers) + for i := 0; i < testWatchers; i++ { + watches[i] = m.Watch() + } + + // Send a couple events before closing the broadcast channel. + t.Log("Sending event 1") + m.Action(event1.Type, event1.Object) + t.Log("Sending event 2") + m.Action(event2.Type, event2.Object) + m.Shutdown() + + // Pull events from the queue. + wg := sync.WaitGroup{} + wg.Add(testWatchers) + for i := 0; i < testWatchers; i++ { + // Verify that each watcher only gets the first event because its watch + // queue of length one was full from the first one. + go func(watcher int, w Interface) { + defer wg.Done() + e1, ok := <-w.ResultChan() + if !ok { + t.Error("Watcher %v failed to retrieve first event.") + return + } + if e, a := event1, e1; !reflect.DeepEqual(e, a) { + t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)", + watcher, e.Type, e.Object, a.Type, a.Object) + } else { + t.Logf("Got (%v, %#v)", e1.Type, e1.Object) + } + e2, ok := <-w.ResultChan() + if ok { + t.Error("Watcher %v received second event (%v, %#v) even though it shouldn't have.", + watcher, e2.Type, e2.Object) + } + }(i, watches[i]) + } + wg.Wait() +}