Skip to content

Commit

Permalink
etcd3/watcher: fix goroutine leak if ctx is canceled
Browse files Browse the repository at this point in the history
In reflector.go, it could probably call Stop() without retrieving all results
from ResultChan().
A potential leak is that when an error has happened, it could block on resultChan,
and then cancelling context in Stop() wouldn't unblock it.
This fixes the problem by making it also select ctx.Done and cancel context
afterwards if error happened.
  • Loading branch information
hongchaodeng committed May 9, 2016
1 parent 5dd0870 commit 97f4647
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
9 changes: 6 additions & 3 deletions pkg/storage/etcd3/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,14 @@ func (wc *watchChan) run() {
select {
case err := <-wc.errChan:
errResult := parseError(err)
wc.cancel()
// error result is guaranteed to be received by user before closing ResultChan.
if errResult != nil {
wc.resultChan <- *errResult
// error result is guaranteed to be received by user before closing ResultChan.
select {
case wc.resultChan <- *errResult:
case <-wc.ctx.Done(): // user has given up all results
}
}
wc.cancel()
case <-wc.ctx.Done():
}
// we need to wait until resultChan wouldn't be sent to anymore
Expand Down
26 changes: 26 additions & 0 deletions pkg/storage/etcd3/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package etcd3

import (
"errors"
"fmt"
"reflect"
"testing"
"time"

"sync"

"github.com/coreos/etcd/integration"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api"
Expand Down Expand Up @@ -187,6 +190,29 @@ func TestWatchContextCancel(t *testing.T) {
}
}

func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
origCtx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, cancel := context.WithCancel(origCtx)
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.Everything)
// make resutlChan and errChan blocking to ensure ordering.
w.resultChan = make(chan watch.Event)
w.errChan = make(chan error)
// The event flow goes like:
// - first we send an error, it should block on resultChan.
// - Then we cancel ctx. The blocking on resultChan should be freed up
// and run() goroutine should return.
var wg sync.WaitGroup
wg.Add(1)
go func() {
w.run()
wg.Done()
}()
w.errChan <- fmt.Errorf("some error")
cancel()
wg.Wait()
}

type testWatchStruct struct {
obj *api.Pod
expectEvent bool
Expand Down

0 comments on commit 97f4647

Please sign in to comment.