-
Notifications
You must be signed in to change notification settings - Fork 40.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix goroutine leak of wait.poller #70277
Conversation
@liggitt @derekwaynecarr please take a look. |
/ok-to-test |
/assign @caesarxuchao |
@caesarxuchao @liggitt Please take a look. |
cc @kubernetes/sig-api-machinery-pr-reviews |
@@ -312,7 +312,7 @@ func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh | |||
|
|||
// WaitFunc creates a channel that receives an item every time a test | |||
// should be executed and is closed when the last test should be invoked. | |||
type WaitFunc func(done <-chan struct{}) <-chan struct{} | |||
type WaitFunc func(done <-chan struct{}) (ch <-chan struct{}, cancel func()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update documentation.
I'm honestly astonished that the tests continue to pass everywhere considering this breaks the public interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That means nobody uses WaitFor()
directly. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move it to private then.
ch := make(chan struct{}) | ||
|
||
mu := sync.Mutex{} | ||
canceled := false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sync.Once probably would make for more concise code.
@@ -379,6 +393,8 @@ func poller(interval, timeout time.Duration) WaitFunc { | |||
case ch <- struct{}{}: | |||
default: | |||
} | |||
case <-cancelCh: | |||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which goroutine was being leaked, and by what path? The done channel is already getting closed, and should cause this goroutine to exit.
Can you make a test to demonstrate the leak (and verify it's not leaked any more)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I just now read the PR description. Shouldn't WaitForCacheSync just be fixed to close(done) instead? I don't see how this change helps, given that WaitForCacheSync still isn't calling the cancel function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think users should know how to prevent wait
package from leaking goroutines.
Another way to fix this issue: the WaitFor()
function creates a new channel and pass it to WaitFunc
. Then close it after WaitFor()
finishing.
@lavalamp PTAL. |
return nil | ||
} | ||
if !open { | ||
break FOR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return ErrWaitTimeout
here, then we don't need the FOR
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
@@ -312,7 +312,7 @@ func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh | |||
|
|||
// WaitFunc creates a channel that receives an item every time a test | |||
// should be executed and is closed when the last test should be invoked. | |||
type WaitFunc func(done <-chan struct{}) <-chan struct{} | |||
type WaitFunc func(done <-chan struct{}) (ch <-chan struct{}, cancel func()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move it to private then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fix seems reasonable. I can't understand the test easily. I'll take another look next week.
t.Errorf("expected ErrWaitTimeout from WaitFunc") | ||
} | ||
} | ||
|
||
func TestWaitForWithDelay(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, you are trying to verify that if the WaitFor's stopCh is closed, then the done channel pass to the waitFunc is also closed. If so, can you rename the function and variable names to be more clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about TestInternalChannelOfWaitFor
?
@caesarxuchao PTAL |
return false, nil | ||
}, stopCh) | ||
duration := time.Now().Sub(start) | ||
// The WaitFor should returns immediately. So the duration is closed to 0s. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits:
s/returns/return
s/closed/close
s/. So/, so
duration := time.Now().Sub(start) | ||
// The WaitFor should returns immediately. So the duration is closed to 0s. | ||
// This condition ensures that if the WaitFor returns error caused by poller rather | ||
// than stopCh, it will trigger an error. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I don't follow this comment. Are you trying to explain why you did the check in line 472?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've splitted these comments into two paragraphs:
// The WaitFor should return immediately, so the duration is close to 0s.
if duration >= ForeverTestTimeout/2 {
t.Errorf("expected short timeout duration")
}
// The interval of the poller is ForeverTestTimeout, so the WaitFor should always return ErrWaitTimeout.
if err != ErrWaitTimeout {
t.Errorf("expected ErrWaitTimeout from WaitFunc")
}
} | ||
} | ||
|
||
func TestInternalChannelOfWaitFor(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
// TestWaitForClosesStopCh verifies that after the condition func returns true, WaitFor() closes the stop channel it supplies to the WaitFunc.
TestWaitForClosesStopCh
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
Also please squash :) |
@caesarxuchao PTAL |
/retest |
/lgtm Thanks, @kdada. |
/retest |
@caesarxuchao need an |
/approve |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: caesarxuchao, kdada The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
return ErrWaitTimeout | ||
} | ||
case <-done: | ||
closeCh() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is wrong. I do not think this will break you out of the for loop, which I believe is the desired behavior. If we just return here, that will have us exit the for loop and then the deferred close will be called. Otherwise we are relying on waiting for the timeout which seems wrong. We should make sure we are properly testing this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the same doubt but then decided the code in this PR was proper due to backward compatibility concerns.
The current behavior of the function is letting the wait
function to decide how to react to the done
channel, see
c := wait(done) |
return
here would be a behavior change.
Also note that the comment of the WaitFor
function didn't say if the WaitFor
should stop waiting when the done
channel is closed. I think we should explicitly point out what role the done
channels plays here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we decide that WaitFor
should stop waiting once done
channel is closed, regardless of the wait
func, then I suggest that we apply @cheftako's comment in a different PR, because that's a behavior change while this PR is fixing a goroutine leak.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be ok with creating an issue to make sure this does not get dropped. Also we should properly document the parameters when we make that change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @caesarxuchao said, It just goes to another loop and runs the final test.
This behavior is described in the documents.
// WaitFor gets a channel from 'wait()'', and then invokes 'fn' once for every value
// placed on the channel and once more when the channel is closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
once more when the channel is closed
The "channel" here means the channel returned by "wait()", not the "done" channel. The doc doesn't mention the 'done' channel at all, which leaves space for uncertainties..
What cheftako suggested (closing done channel should terminate WaitFor) was what I expected at first. Can you open an issue to track @cheftako's request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nvm, created #72357.
/hold |
/hold cancel |
/retest |
Side question: is this fix going to be backported to older version of apimachinery? We've manually patched our 1.11.5 vendored library - but it would be good to have the official fix in place. Thanks! 👍 🤸♂️ |
What type of PR is this?
What this PR does / why we need it:
This PR fix a bug of
wait.poller()
.wait.poller()
returns a function with type WaitFunc. the function creates a goroutine and the goroutine only quits whenafter
ordone
closed.In
cache.WaitForCacheSync
, after is nil and done is never closed. Then the goroutine never stops.So I add a cancel func for WaitFunc. If
wait.WaitFor()
returns, it will call the cancel function and stop the goroutine which created bywait.poller()
Which issue(s) this PR fixes (optional, in
fixes #<issue number>(, fixes #<issue_number>, ...)
format, will close the issue(s) when PR gets merged):Fixes #
Special notes for your reviewer:
Does this PR introduce a user-facing change?: