Skip to content

Commit

Permalink
Merge pull request kubernetes#19006 from wojtek-t/wait_on_stop_watch
Browse files Browse the repository at this point in the history
Fix race in watch tests - attempt 3
  • Loading branch information
fgrzadkowski committed Dec 22, 2015
2 parents e164098 + 41c7835 commit d20ab89
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
11 changes: 11 additions & 0 deletions pkg/storage/etcd/etcd_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ type etcdWatcher struct {
userStop chan struct{}
stopped bool
stopLock sync.Mutex
// wg is used to avoid calls to etcd after Stop()
wg sync.WaitGroup

// Injectable for testing. Send the event down the outgoing channel.
emit func(watch.Event)
Expand Down Expand Up @@ -129,6 +131,7 @@ func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, e
outgoing: make(chan watch.Event),
userStop: make(chan struct{}),
stopped: false,
wg: sync.WaitGroup{},
cache: cache,
ctx: nil,
cancel: nil,
Expand All @@ -145,6 +148,11 @@ func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key st
defer close(w.etcdError)
defer close(w.etcdIncoming)

// All calls to etcd are coming from this function - once it is finished
// no other call to etcd should be generated by this watcher.
w.wg.Add(1)
defer w.wg.Done()

// We need to be prepared, that Stop() can be called at any time.
// It can potentially also be called, even before this function is called.
// If that is the case, we simply skip all the code here.
Expand Down Expand Up @@ -456,4 +464,7 @@ func (w *etcdWatcher) Stop() {
w.stopped = true
close(w.userStop)
}
// Wait until all calls to etcd are finished and no other
// will be issued.
w.wg.Wait()
}
10 changes: 1 addition & 9 deletions pkg/storage/etcd/etcd_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func TestWatch(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watching.Stop()
// watching is explicitly closed below.

// Test normal case
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
Expand Down Expand Up @@ -327,8 +327,6 @@ func TestWatchEtcdState(t *testing.T) {
if e, a := endpoint, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected %v, got %v", e, a)
}

watching.Stop()
}

func TestWatchFromZeroIndex(t *testing.T) {
Expand Down Expand Up @@ -379,8 +377,6 @@ func TestWatchFromZeroIndex(t *testing.T) {
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected %v, got %v", e, a)
}

watching.Stop()
}

func TestWatchListFromZeroIndex(t *testing.T) {
Expand Down Expand Up @@ -411,8 +407,6 @@ func TestWatchListFromZeroIndex(t *testing.T) {
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected %v, got %v", e, a)
}

watching.Stop()
}

func TestWatchListIgnoresRootKey(t *testing.T) {
Expand Down Expand Up @@ -444,8 +438,6 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
default:
// fall through, expected behavior
}

watching.Stop()
}

func TestWatchPurposefulShutdown(t *testing.T) {
Expand Down

0 comments on commit d20ab89

Please sign in to comment.