Skip to content

Commit

Permalink
Add a unit test for watch.Broadcaster DropIfChannelFull and a couple …
Browse files Browse the repository at this point in the history
…small fixes
  • Loading branch information
a-robinson committed Jan 14, 2015
1 parent 3eaf362 commit 90e1d58
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 5 deletions.
7 changes: 5 additions & 2 deletions pkg/client/record/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

const maxTriesPerEvent = 12

var sleepDuration = time.Duration(10 * time.Second)
var sleepDuration = 10 * time.Second

// EventRecorder knows how to store events (client.Client implements it.)
// EventRecorder must respect the namespace that will be embedded in 'event'.
Expand All @@ -48,6 +48,9 @@ type EventRecorder interface {
// or used to stop recording, if desired.
// TODO: make me an object with parameterizable queue length and retry interval
func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interface {
// The default math/rand package functions aren't thread safe, so create a
// new Rand object for each StartRecording call.
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
return GetEvents(func(event *api.Event) {
// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
Expand All @@ -68,7 +71,7 @@ func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interf
// Randomize the first sleep so that various clients won't all be
// synced up if the master goes down.
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
} else {
time.Sleep(sleepDuration)
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/watch/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,15 @@ import (
type FullChannelBehavior int

const (
WaitIfChannelFull = iota
DropIfChannelFull = iota
WaitIfChannelFull FullChannelBehavior = iota
DropIfChannelFull
)

// Buffer the incoming queue a little bit even though it should rarely ever accumulate
// anything, just in case a few events are received in such a short window that
// Broadcaster can't move them onto the watchers' queues fast enough.
const incomingQueueLength = 25

// Broadcaster distributes event notifications among any number of watchers. Every event
// is delivered to every watcher.
type Broadcaster struct {
Expand Down Expand Up @@ -58,7 +63,7 @@ type Broadcaster struct {
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event),
incoming: make(chan Event, incomingQueueLength),
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
}
Expand Down
49 changes: 49 additions & 0 deletions pkg/watch/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,52 @@ func TestBroadcasterWatcherStopDeadlock(t *testing.T) {
}
m.Shutdown()
}

func TestBroadcasterDropIfChannelFull(t *testing.T) {
m := NewBroadcaster(1, DropIfChannelFull)

event1 := Event{Added, &myType{"foo", "hello world 1"}}
event2 := Event{Added, &myType{"bar", "hello world 2"}}

// Add a couple watchers
const testWatchers = 2
watches := make([]Interface, testWatchers)
for i := 0; i < testWatchers; i++ {
watches[i] = m.Watch()
}

// Send a couple events before closing the broadcast channel.
t.Log("Sending event 1")
m.Action(event1.Type, event1.Object)
t.Log("Sending event 2")
m.Action(event2.Type, event2.Object)
m.Shutdown()

// Pull events from the queue.
wg := sync.WaitGroup{}
wg.Add(testWatchers)
for i := 0; i < testWatchers; i++ {
// Verify that each watcher only gets the first event because its watch
// queue of length one was full from the first one.
go func(watcher int, w Interface) {
defer wg.Done()
e1, ok := <-w.ResultChan()
if !ok {
t.Error("Watcher %v failed to retrieve first event.")
return
}
if e, a := event1, e1; !reflect.DeepEqual(e, a) {
t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)",
watcher, e.Type, e.Object, a.Type, a.Object)
} else {
t.Logf("Got (%v, %#v)", e1.Type, e1.Object)
}
e2, ok := <-w.ResultChan()
if ok {
t.Error("Watcher %v received second event (%v, %#v) even though it shouldn't have.",
watcher, e2.Type, e2.Object)
}
}(i, watches[i])
}
wg.Wait()
}

0 comments on commit 90e1d58

Please sign in to comment.