-
Notifications
You must be signed in to change notification settings - Fork 40k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
advanced audit: shutdown batching audit webhook gracefully #50577
advanced audit: shutdown batching audit webhook gracefully #50577
Conversation
4cce45d
to
2bf415f
Compare
// in a goroutine and logging any error encountered during the POST. | ||
func (b *batchBackend) Shutdown() { | ||
// Calling Shutdown without closing stopCh that is passed to the Run method | ||
// will hang the goroutine until stopCh is closed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add this as an invariant to the backend interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, makes sense. I'll add it when your PR is merged, together with the rebase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decided to add this now to verify everything
shutdownCh chan struct{} | ||
|
||
// Mutex to lock on until all requests to the webhook were successfully sent. | ||
reqMutex sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would add here: read lock for go routines sending, write lock for shutdown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/go routines sending/main send loop/
|
||
// Channel to signal that the main routine stopped and no new requests will | ||
// be sent to the webhook. | ||
shutdownCh chan struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe better: "Channel to signal that the shutdown is completed and no go routine is sending anymore."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it's not true, after the shutdownCh was closed, some goroutines may still be sending events, but there's won't be more of them
Mb ...and no new requests will be initiated.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These sending go routines will have the read lock on the reqMutex
? So we wait for them by taking the write lock? Can you add a longer comment here to both fields which explains this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. Will do
} | ||
|
||
go func() { | ||
for { | ||
f() | ||
send(collect()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this loop panic? If it does we will lock up on shutdown. We should have a panic handler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collect
shouldn't panic as far as I see and send
handles panics inside
But I understand the caution, especially if the code grows, ack
2bf415f
to
7cf388b
Compare
@sttts Thanks for your comments, PTAL |
7cf388b
to
d589120
Compare
@@ -34,4 +34,8 @@ type Backend interface { | |||
// Run will initialize the backend. It must not block, but may run go routines in the background. If | |||
// stopCh is closed, it is supposed to stop them. Run will be called before the first call to ProcessEvents. | |||
Run(stopCh <-chan struct{}) error | |||
|
|||
// Shutdown will synchronously shut down the backend while making sure that all pending | |||
// events are delivered. This method blocks until the stopCh passed to the Run method is closed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we say: It can be assumed that the stopCh passed to Run() is already closed when Shutdown() is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put less requirements on the implementation. I.e. my variant matches an empty Shutdown() {}
implementation. Your's doesn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, whoops. Ack
|
||
// The sending routine locks reqMutex for reading before initiating a new | ||
// goroutine to send a request. This goroutine then unlocks reqMutex for | ||
// reading when completed. Shutdown method locks reqMutex for writing after |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Shutdown method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
1706738
to
e249e02
Compare
/retest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing that stuck out was trying to Recover from any panics
- naively I'm asking whether the logic could be different so this need not be possible.
} | ||
} | ||
}() | ||
return nil | ||
} | ||
|
||
// sendBatchEvents attempts to batch some number of events to the backend. It POSTs events | ||
// in a goroutine and logging any error encountered during the POST. | ||
func (b *batchBackend) Shutdown() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who can all this, and how many times can this be called? Can it be private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who can all this
Apiserver does that during the shutdown process
how many times can this be called
Arbitrary, but only one call is expected
Can it be private
No, it's a part of the interface
// executed after shutdownCh was closed, no new requests will follow this | ||
// lock, because read lock is called in the same goroutine that closes | ||
// shutdownCh before exiting. | ||
b.reqMutex.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are shutting down do we need to give up the lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case it's called the second time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, shouldn't matter. shutdownCh
will be closed and taking the lock just works without sending go routines.
if len(events) == 0 { | ||
return | ||
} | ||
|
||
list := auditinternal.EventList{Items: events} | ||
|
||
// Locking reqMutex for read will guarantee that the shutdown process will | ||
// lock until the goroutine started below is finished. At the same time, it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"will block" or "will lock"? Just a nit regarding wording.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
block, thanks!
require.Equal(t, expected, <-got, "get queued events after timer expires") | ||
} | ||
|
||
func TestBatchWebhookProcessEventsAfterStop(t *testing.T) { | ||
events := make([]*auditinternal.Event, 1) // less than max size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just events := []*auditinternal.Event{&auditinternal.Event{}}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it's consistent with the rest of the code, that way it's clear how many events there are and you can easily change
backend.ProcessEvents(events...) | ||
|
||
go func() { | ||
// Assume stopCh was closed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you read from stopCh
? If so, and it is closed, it will return the zero value, so it would validate the assumption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no stopCh
in this test, b/c backend.Run
is not called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My question/comment wasn't terribly clear: the comment carried the word "assume". As an engineer I try to stop doing this. Why? Because assumptions often fail. I was trying to ask if the test could be enhanced so that we don't have to "assume".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I didn't understand your comment, sorry
By "assume" I mean the reference to the Shutdown method description in the Backend interface, where it's assumed that the method is called after the Run has been stopped by closing stopCh
passed to it. Otherwise collectLastEvents
method will hang forever. If we use collectEvents
method instead, there's a race, b/c the order is not guaranteed in select
clause and we can exit before collecting all events
The whole idea of collectLastEvents
method is to overcome this race and collect all events, even if the stopCh
was closed
close(shutdownCh) | ||
}() | ||
|
||
// Wait for some time in case a bug that will allow for Shutdown method to exit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps a comment explaining what bug you're protecting against.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
time.Sleep(1 * time.Second) | ||
select { | ||
case <-shutdownCh: | ||
t.Fatal("Backed shut down before all requests finished") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: "BackeNd".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, thanks!
return | ||
default: | ||
if last := func() bool { | ||
// Recover to try to send all remaining events. Note, that in case of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wording: "Attempt to send all remaining events..." That's how I interpreted the intent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was "Goroutine recovers in order to try and send all remaining events"
t := time.NewTimer(b.maxBatchWait) | ||
defer t.Stop() // Release ticker resources | ||
|
||
events := b.collectEvents(stopCh, t.C) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit; why not just pass b.collectEvents()
to b.sendBatchEvents()
? Just less variable state to consider. It is done this way in the test code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not
No reason, ack :)
|
||
// collectLastEvents assumes that the buffer was closed. It collects the first | ||
// maxBatchSize events from the closed buffer into a batch and returns them. | ||
func (b *batchBackend) collectLastEvents() []auditinternal.Event { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be stateless on the receiver
func readBatch(buffer chan, int batchSize) []Event
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but why? It has pretty specific purpose, so I don't want to generalize it that way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It simplifies what can and does happen to the object (i.e., the call is a functional-style). As a reviewer I can scan the function to see that it doesn't mutate the object in any way. All the things it needs can be satisfied from the functions parameters. Equally, if I come to modify this file I have a reduced set of functions to consider where state is held and mutated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry! I surely understand you argument and totally agree with that. What I wanted to say is that it doesn't make much sense to extract this variable, since this method is going to be used in one place and the method extracting was done more to emphasize the purpose of this piece of code, compared to regular collectEvents
. Do you agree or do you think it will help to the code readability in this particular place to make this method a separate function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think any opportunity we have to remove state the better!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think any opportunity we have to remove state the better!
I don't agree with that. Any private method can be made stateless. Stateless itself is not a goal, it should serve some purpose. In this particular case I think the private method with a meaningful name is better for readability than a stateless function
Let's let @sttts decide on that, as a tiebreaker :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am usually a fan of making it explicit if state is not used, and of functional style in general. Here though the complexity is very local, i.e. this is a private method. We don't win much by detaching it wrong b
. I would leave it the way it is, while the alternative with a detached func would be fine as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Addressed your comments
return | ||
default: | ||
if last := func() bool { | ||
// Recover to try to send all remaining events. Note, that in case of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was "Goroutine recovers in order to try and send all remaining events"
} | ||
} | ||
}() | ||
return nil | ||
} | ||
|
||
// sendBatchEvents attempts to batch some number of events to the backend. It POSTs events | ||
// in a goroutine and logging any error encountered during the POST. | ||
func (b *batchBackend) Shutdown() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who can all this
Apiserver does that during the shutdown process
how many times can this be called
Arbitrary, but only one call is expected
Can it be private
No, it's a part of the interface
// executed after shutdownCh was closed, no new requests will follow this | ||
// lock, because read lock is called in the same goroutine that closes | ||
// shutdownCh before exiting. | ||
b.reqMutex.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case it's called the second time
t := time.NewTimer(b.maxBatchWait) | ||
defer t.Stop() // Release ticker resources | ||
|
||
events := b.collectEvents(stopCh, t.C) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not
No reason, ack :)
|
||
// collectLastEvents assumes that the buffer was closed. It collects the first | ||
// maxBatchSize events from the closed buffer into a batch and returns them. | ||
func (b *batchBackend) collectLastEvents() []auditinternal.Event { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but why? It has pretty specific purpose, so I don't want to generalize it that way
if len(events) == 0 { | ||
return | ||
} | ||
|
||
list := auditinternal.EventList{Items: events} | ||
|
||
// Locking reqMutex for read will guarantee that the shutdown process will | ||
// lock until the goroutine started below is finished. At the same time, it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
block, thanks!
require.Equal(t, expected, <-got, "get queued events after timer expires") | ||
} | ||
|
||
func TestBatchWebhookProcessEventsAfterStop(t *testing.T) { | ||
events := make([]*auditinternal.Event, 1) // less than max size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it's consistent with the rest of the code, that way it's clear how many events there are and you can easily change
backend.ProcessEvents(events...) | ||
|
||
go func() { | ||
// Assume stopCh was closed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no stopCh
in this test, b/c backend.Run
is not called
time.Sleep(1 * time.Second) | ||
select { | ||
case <-shutdownCh: | ||
t.Fatal("Backed shut down before all requests finished") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, thanks!
close(shutdownCh) | ||
}() | ||
|
||
// Wait for some time in case a bug that will allow for Shutdown method to exit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
As far as I can see, currently the only place that can possibly have a panic is a goroutine that sends audit logs. But you can never be too paranoid, right? What if somebody changes the code and there will be a possibility to have a panic somewhere else? |
697ba1e
to
f18d488
Compare
// goroutine to send a request. This goroutine then unlocks reqMutex for | ||
// reading when completed. The Shutdown method locks reqMutex for writing | ||
// after the sending routine has exited. When reqMutex is locked for writing, | ||
// all requests has been completed and no new will be spawned, since the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have been completed
case <-stopCh: | ||
return | ||
default: | ||
if last := func() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a clojure in an if-clause is fun, never seen that.
/lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: crassirostris, sttts Associated issue requirement bypassed by: sttts The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these OWNERS Files:
You can indicate your approval by writing |
/test all [submit-queue is verifying that this PR is safe to merge] |
f18d488
to
7798d32
Compare
Fixed typo, re-applying the lgtm label |
/test all [submit-queue is verifying that this PR is safe to merge] |
Automatic merge from submit-queue |
Follow-up of #50439
When the
stopCh
passed to the batching audit webhook is closed, it stops accepting new events and whenShutdown
method is called afterwards, it blocks until the last request to the webhook has finished./cc @tallclair @soltysh