diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index 0c87111ee16b9..2d40014fb5e75 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -56,9 +56,10 @@ func (m *Mux) Watch() Interface { id := m.nextWatcher m.nextWatcher++ w := &muxWatcher{ - result: make(chan Event), - id: id, - m: m, + result: make(chan Event), + stopped: make(chan struct{}), + id: id, + m: m, } m.watchers[id] = w return w @@ -119,15 +120,20 @@ func (m *Mux) distribute(event Event) { m.lock.Lock() defer m.lock.Unlock() for _, w := range m.watchers { - w.result <- event + select { + case w.result <- event: + case <-w.stopped: + } } } // muxWatcher handles a single watcher of a mux type muxWatcher struct { - result chan Event - id int64 - m *Mux + result chan Event + stopped chan struct{} + stop sync.Once + id int64 + m *Mux } // ResultChan returns a channel to use for waiting on events. @@ -137,5 +143,8 @@ func (mw *muxWatcher) ResultChan() <-chan Event { // Stop stops watching and removes mw from its list. func (mw *muxWatcher) Stop() { - mw.m.stopWatching(mw.id) + mw.stop.Do(func() { + close(mw.stopped) + mw.m.stopWatching(mw.id) + }) }