-
Notifications
You must be signed in to change notification settings - Fork 40k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lock-free broadcaster #91602
Lock-free broadcaster #91602
Conversation
Thanks for your pull request. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). 📝 Please follow instructions at https://git.k8s.io/community/CLA.md#the-contributor-license-agreement to sign the CLA. It may take a couple minutes for the CLA signature to be fully registered; after that, please reply here with a new comment and we'll verify. Thanks.
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
Welcome @sxllwx! |
Hi @sxllwx. Thanks for your PR. I'm waiting for a kubernetes member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
/check-cla |
/assign @deads2k |
/retest |
/test pull-kubernetes-e2e-kind |
@@ -196,7 +193,10 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) { | |||
// have received the data yet as it can remain sitting in the buffered | |||
// channel. | |||
func (m *Broadcaster) Shutdown() { | |||
close(m.incoming) | |||
m.blockQueue(func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change takes the serial close and then wait and makes the the close async, not guaranteed before the wait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change takes the serial close and then wait and makes the the close async, not guaranteed before the wait.
Reading through, I think the change still works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😄
func (b *Broadcaster) blockQueue(f func()) { | ||
func (m *Broadcaster) blockQueue(f func()) { | ||
select { | ||
case <-m.stopped: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previously, sending after shutdown would result in a write to a closed channel, right? Can you make a unit test to prove that it used to panic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blockQueue is called by Watch, WatchWithPrefix, stopWatching, Shutdown. The Action method sends data directly to incoming chan instead of calling blockQueue. There is a subtle bug here. If the Broadcaster has been closed, calling Watch will get a watch.Interface with a value of nil instead of Panic as before. If I call panic here, calling Watcher's Stop method after closing the Broadcaster will cause Panic. In order to maintain the consistency of the interface, in the Watch && WatchWithPrefix method, determine whether broadcasterWatcher is nil to decide whether to call panic.
@@ -96,10 +94,15 @@ func (obj functionFakeRuntimeObject) DeepCopyObject() runtime.Object { | |||
// The purpose of this terrible hack is so that watchers added after an event | |||
// won't ever see that event, and will always see any event after they are | |||
// added. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update the comment on the Watch
and WatchforPrefix
, to indicate that it blocks until the watch is accepted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
minor comments, there is a clear benchmark benefit here. |
/approve |
@@ -127,18 +129,20 @@ func (m *Broadcaster) Watch() Interface { | |||
} | |||
m.watchers[id] = w | |||
}) | |||
if w == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment saying this matches previous behavior, but that we're open to reassessing in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -156,26 +160,27 @@ func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface { | |||
w.result <- e | |||
} | |||
}) | |||
if w == nil { | |||
panic("broadcaster already stopped") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment saying this matches previous behavior, but that we're open to reassessing in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/lgtm /hold Holding for a squash and a comment. Ping me after and I'll retag. |
78990f0
to
e6e1769
Compare
/assign @deads2k |
@deads2k PTAL |
/lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: deads2k, sxllwx The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/retest Review the full test history for this PR. Silence the bot with an |
/test pull-kubernetes-e2e-kind-ipv6 |
What type of PR is this?
/kind cleanup
What this PR does / why we need it:
lock-free broadcaster, use chan to ensure thread safety.
Which issue(s) this PR fixes:
Fixes #
Special notes for your reviewer:
Hello the maintainer of kubernetes. When I read the implementation of the broadcaster, I found this comment
// TODO: see if this lock is needed now that new watchers go through the incoming channel.
. I think that the current implementation of the Broadcaster requires this mutex.One of the reasons is that although the chan named "incoming" was passed when the watcher was added, the channel was not passed when the watcher was closed. This may cause a critical condition.
Pull Request implementation is to close the watcher and the broadcaster through the "incoming" chan, so the use of the mutex can be safely deleted. When I benchmarked, I found that the performance of sending messages to the broadcaster has improved to some extent.
The following are my test results:
Thank you for reading ~
Does this PR introduce a user-facing change?:
Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.: