Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

advanced audit: shutdown batching audit webhook gracefully #50577

Merged

Conversation

crassirostris
Copy link

Follow-up of #50439

When the stopCh passed to the batching audit webhook is closed, it stops accepting new events and when Shutdown method is called afterwards, it blocks until the last request to the webhook has finished.

/cc @tallclair @soltysh

@crassirostris crassirostris added release-note-none Denotes a PR that doesn't merit a release note. sig/api-machinery Categorizes an issue or PR as relevant to SIG API Machinery. sig/instrumentation Categorizes an issue or PR as relevant to SIG Instrumentation. labels Aug 13, 2017
@crassirostris crassirostris added this to the v1.8 milestone Aug 13, 2017
@crassirostris crassirostris requested a review from sttts August 13, 2017 16:15
@k8s-ci-robot k8s-ci-robot added the cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. label Aug 13, 2017
@k8s-github-robot k8s-github-robot added the size/L Denotes a PR that changes 100-499 lines, ignoring generated files. label Aug 13, 2017
@crassirostris crassirostris force-pushed the audit-graceful-shotdown branch 2 times, most recently from 4cce45d to 2bf415f Compare August 13, 2017 18:26
// in a goroutine and logging any error encountered during the POST.
func (b *batchBackend) Shutdown() {
// Calling Shutdown without closing stopCh that is passed to the Run method
// will hang the goroutine until stopCh is closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add this as an invariant to the backend interface?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, makes sense. I'll add it when your PR is merged, together with the rebase

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to add this now to verify everything

shutdownCh chan struct{}

// Mutex to lock on until all requests to the webhook were successfully sent.
reqMutex sync.RWMutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would add here: read lock for go routines sending, write lock for shutdown

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/go routines sending/main send loop/


// Channel to signal that the main routine stopped and no new requests will
// be sent to the webhook.
shutdownCh chan struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better: "Channel to signal that the shutdown is completed and no go routine is sending anymore."

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's not true, after the shutdownCh was closed, some goroutines may still be sending events, but there's won't be more of them

Mb ...and no new requests will be initiated.?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These sending go routines will have the read lock on the reqMutex? So we wait for them by taking the write lock? Can you add a longer comment here to both fields which explains this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. Will do

}

go func() {
for {
f()
send(collect())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this loop panic? If it does we will lock up on shutdown. We should have a panic handler.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect shouldn't panic as far as I see and send handles panics inside

But I understand the caution, especially if the code grows, ack

@crassirostris crassirostris force-pushed the audit-graceful-shotdown branch from 2bf415f to 7cf388b Compare August 14, 2017 14:17
@crassirostris
Copy link
Author

@sttts Thanks for your comments, PTAL

@crassirostris crassirostris force-pushed the audit-graceful-shotdown branch from 7cf388b to d589120 Compare August 14, 2017 14:43
@@ -34,4 +34,8 @@ type Backend interface {
// Run will initialize the backend. It must not block, but may run go routines in the background. If
// stopCh is closed, it is supposed to stop them. Run will be called before the first call to ProcessEvents.
Run(stopCh <-chan struct{}) error

// Shutdown will synchronously shut down the backend while making sure that all pending
// events are delivered. This method blocks until the stopCh passed to the Run method is closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we say: It can be assumed that the stopCh passed to Run() is already closed when Shutdown() is called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put less requirements on the implementation. I.e. my variant matches an empty Shutdown() {} implementation. Your's doesn't.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, whoops. Ack


// The sending routine locks reqMutex for reading before initiating a new
// goroutine to send a request. This goroutine then unlocks reqMutex for
// reading when completed. Shutdown method locks reqMutex for writing after
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Shutdown method

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@crassirostris crassirostris force-pushed the audit-graceful-shotdown branch 2 times, most recently from 1706738 to e249e02 Compare August 14, 2017 14:51
@crassirostris
Copy link
Author

/retest

Copy link
Contributor

@frobware frobware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that stuck out was trying to Recover from any panics - naively I'm asking whether the logic could be different so this need not be possible.

}
}
}()
return nil
}

// sendBatchEvents attempts to batch some number of events to the backend. It POSTs events
// in a goroutine and logging any error encountered during the POST.
func (b *batchBackend) Shutdown() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who can all this, and how many times can this be called? Can it be private?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who can all this

Apiserver does that during the shutdown process

how many times can this be called

Arbitrary, but only one call is expected

Can it be private

No, it's a part of the interface

// executed after shutdownCh was closed, no new requests will follow this
// lock, because read lock is called in the same goroutine that closes
// shutdownCh before exiting.
b.reqMutex.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are shutting down do we need to give up the lock?

Copy link
Author

@crassirostris crassirostris Aug 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case it's called the second time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, shouldn't matter. shutdownCh will be closed and taking the lock just works without sending go routines.

if len(events) == 0 {
return
}

list := auditinternal.EventList{Items: events}

// Locking reqMutex for read will guarantee that the shutdown process will
// lock until the goroutine started below is finished. At the same time, it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"will block" or "will lock"? Just a nit regarding wording.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

block, thanks!

require.Equal(t, expected, <-got, "get queued events after timer expires")
}

func TestBatchWebhookProcessEventsAfterStop(t *testing.T) {
events := make([]*auditinternal.Event, 1) // less than max size.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just events := []*auditinternal.Event{&auditinternal.Event{}}?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it's consistent with the rest of the code, that way it's clear how many events there are and you can easily change

backend.ProcessEvents(events...)

go func() {
// Assume stopCh was closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you read from stopCh? If so, and it is closed, it will return the zero value, so it would validate the assumption.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no stopCh in this test, b/c backend.Run is not called

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question/comment wasn't terribly clear: the comment carried the word "assume". As an engineer I try to stop doing this. Why? Because assumptions often fail. I was trying to ask if the test could be enhanced so that we don't have to "assume".

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't understand your comment, sorry

By "assume" I mean the reference to the Shutdown method description in the Backend interface, where it's assumed that the method is called after the Run has been stopped by closing stopCh passed to it. Otherwise collectLastEvents method will hang forever. If we use collectEvents method instead, there's a race, b/c the order is not guaranteed in select clause and we can exit before collecting all events

The whole idea of collectLastEvents method is to overcome this race and collect all events, even if the stopCh was closed

close(shutdownCh)
}()

// Wait for some time in case a bug that will allow for Shutdown method to exit.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a comment explaining what bug you're protecting against.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

time.Sleep(1 * time.Second)
select {
case <-shutdownCh:
t.Fatal("Backed shut down before all requests finished")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: "BackeNd".

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, thanks!

return
default:
if last := func() bool {
// Recover to try to send all remaining events. Note, that in case of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wording: "Attempt to send all remaining events..." That's how I interpreted the intent.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was "Goroutine recovers in order to try and send all remaining events"

t := time.NewTimer(b.maxBatchWait)
defer t.Stop() // Release ticker resources

events := b.collectEvents(stopCh, t.C)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit; why not just pass b.collectEvents() to b.sendBatchEvents()? Just less variable state to consider. It is done this way in the test code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not

No reason, ack :)


// collectLastEvents assumes that the buffer was closed. It collects the first
// maxBatchSize events from the closed buffer into a batch and returns them.
func (b *batchBackend) collectLastEvents() []auditinternal.Event {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be stateless on the receiver

func readBatch(buffer chan, int batchSize) []Event

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but why? It has pretty specific purpose, so I don't want to generalize it that way

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It simplifies what can and does happen to the object (i.e., the call is a functional-style). As a reviewer I can scan the function to see that it doesn't mutate the object in any way. All the things it needs can be satisfied from the functions parameters. Equally, if I come to modify this file I have a reduced set of functions to consider where state is held and mutated.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry! I surely understand you argument and totally agree with that. What I wanted to say is that it doesn't make much sense to extract this variable, since this method is going to be used in one place and the method extracting was done more to emphasize the purpose of this piece of code, compared to regular collectEvents. Do you agree or do you think it will help to the code readability in this particular place to make this method a separate function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think any opportunity we have to remove state the better!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think any opportunity we have to remove state the better!

I don't agree with that. Any private method can be made stateless. Stateless itself is not a goal, it should serve some purpose. In this particular case I think the private method with a meaningful name is better for readability than a stateless function

Let's let @sttts decide on that, as a tiebreaker :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am usually a fan of making it explicit if state is not used, and of functional style in general. Here though the complexity is very local, i.e. this is a private method. We don't win much by detaching it wrong b. I would leave it the way it is, while the alternative with a detached func would be fine as well.

Copy link
Author

@crassirostris crassirostris left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Addressed your comments

return
default:
if last := func() bool {
// Recover to try to send all remaining events. Note, that in case of
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was "Goroutine recovers in order to try and send all remaining events"

}
}
}()
return nil
}

// sendBatchEvents attempts to batch some number of events to the backend. It POSTs events
// in a goroutine and logging any error encountered during the POST.
func (b *batchBackend) Shutdown() {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who can all this

Apiserver does that during the shutdown process

how many times can this be called

Arbitrary, but only one call is expected

Can it be private

No, it's a part of the interface

// executed after shutdownCh was closed, no new requests will follow this
// lock, because read lock is called in the same goroutine that closes
// shutdownCh before exiting.
b.reqMutex.Lock()
Copy link
Author

@crassirostris crassirostris Aug 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case it's called the second time

t := time.NewTimer(b.maxBatchWait)
defer t.Stop() // Release ticker resources

events := b.collectEvents(stopCh, t.C)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not

No reason, ack :)


// collectLastEvents assumes that the buffer was closed. It collects the first
// maxBatchSize events from the closed buffer into a batch and returns them.
func (b *batchBackend) collectLastEvents() []auditinternal.Event {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but why? It has pretty specific purpose, so I don't want to generalize it that way

if len(events) == 0 {
return
}

list := auditinternal.EventList{Items: events}

// Locking reqMutex for read will guarantee that the shutdown process will
// lock until the goroutine started below is finished. At the same time, it
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

block, thanks!

require.Equal(t, expected, <-got, "get queued events after timer expires")
}

func TestBatchWebhookProcessEventsAfterStop(t *testing.T) {
events := make([]*auditinternal.Event, 1) // less than max size.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it's consistent with the rest of the code, that way it's clear how many events there are and you can easily change

backend.ProcessEvents(events...)

go func() {
// Assume stopCh was closed.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no stopCh in this test, b/c backend.Run is not called

time.Sleep(1 * time.Second)
select {
case <-shutdownCh:
t.Fatal("Backed shut down before all requests finished")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, thanks!

close(shutdownCh)
}()

// Wait for some time in case a bug that will allow for Shutdown method to exit.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@crassirostris
Copy link
Author

@frobware

One thing that stuck out was trying to Recover from any panics - naively I'm asking whether the logic could be different so this need not be possible.

As far as I can see, currently the only place that can possibly have a panic is a goroutine that sends audit logs. But you can never be too paranoid, right? What if somebody changes the code and there will be a possibility to have a panic somewhere else?

@crassirostris crassirostris force-pushed the audit-graceful-shotdown branch from 697ba1e to f18d488 Compare August 15, 2017 10:30
// goroutine to send a request. This goroutine then unlocks reqMutex for
// reading when completed. The Shutdown method locks reqMutex for writing
// after the sending routine has exited. When reqMutex is locked for writing,
// all requests has been completed and no new will be spawned, since the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have been completed

case <-stopCh:
return
default:
if last := func() bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a clojure in an if-clause is fun, never seen that.

@sttts
Copy link
Contributor

sttts commented Aug 15, 2017

/lgtm
/approve no-issue

@k8s-ci-robot k8s-ci-robot added the lgtm "Looks good to me", indicates that a PR is ready to be merged. label Aug 15, 2017
@k8s-github-robot
Copy link

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: crassirostris, sttts

Associated issue requirement bypassed by: sttts

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these OWNERS Files:

You can indicate your approval by writing /approve in a comment
You can cancel your approval by writing /approve cancel in a comment

@k8s-github-robot k8s-github-robot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Aug 15, 2017
@k8s-github-robot
Copy link

/test all [submit-queue is verifying that this PR is safe to merge]

@crassirostris crassirostris force-pushed the audit-graceful-shotdown branch from f18d488 to 7798d32 Compare August 15, 2017 12:21
@k8s-github-robot k8s-github-robot removed the lgtm "Looks good to me", indicates that a PR is ready to be merged. label Aug 15, 2017
@crassirostris
Copy link
Author

Fixed typo, re-applying the lgtm label

@crassirostris crassirostris added the lgtm "Looks good to me", indicates that a PR is ready to be merged. label Aug 15, 2017
@k8s-github-robot
Copy link

/test all [submit-queue is verifying that this PR is safe to merge]

@crassirostris crassirostris added release-note Denotes a PR that will be considered when it comes time to generate release notes. and removed release-note-none Denotes a PR that doesn't merit a release note. labels Aug 15, 2017
@crassirostris crassirostris changed the title audit: shutdown batching audit webhook gracefully advanced audit: shutdown batching audit webhook gracefully Aug 15, 2017
@k8s-github-robot
Copy link

Automatic merge from submit-queue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. lgtm "Looks good to me", indicates that a PR is ready to be merged. release-note Denotes a PR that will be considered when it comes time to generate release notes. sig/api-machinery Categorizes an issue or PR as relevant to SIG API Machinery. sig/instrumentation Categorizes an issue or PR as relevant to SIG Instrumentation. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants