diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index 5f3bd5b7b667c..2d40014fb5e75 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -115,13 +115,10 @@ func (m *Mux) loop() { m.closeAll() } -var testHookMuxDistribute = func() {} - // distribute sends event to all watchers. Blocking. func (m *Mux) distribute(event Event) { m.lock.Lock() defer m.lock.Unlock() - testHookMuxDistribute() for _, w := range m.watchers { select { case w.result <- event: diff --git a/pkg/watch/mux_test.go b/pkg/watch/mux_test.go index 679261fc4b2b9..11c6758b2c291 100644 --- a/pkg/watch/mux_test.go +++ b/pkg/watch/mux_test.go @@ -94,17 +94,20 @@ func TestMuxWatcherClose(t *testing.T) { } func TestMuxWatcherStopDeadlock(t *testing.T) { - defer func(fn func()) { testHookMuxDistribute = fn }(testHookMuxDistribute) - sig, done := make(chan bool), make(chan bool) - testHookMuxDistribute = func() { sig <- true } + done := make(chan bool) m := NewMux(0) - go func(w Interface) { - // Imagine this goroutine was receiving from w.ResultChan() - // until it received some signal and stopped watching. - <-sig - w.Stop() + go func(w0, w1 Interface) { + // We know Mux is in the distribute loop once one watcher receives + // an event. Stop the other watcher while distribute is trying to + // send to it. + select { + case <-w0.ResultChan(): + w1.Stop() + case <-w1.ResultChan(): + w0.Stop() + } close(done) - }(m.Watch()) + }(m.Watch(), m.Watch()) m.Action(Added, &myType{}) select { case <-time.After(5 * time.Second):