Skip to content

Commit

Permalink
Introduce buffered audit backend
Browse files Browse the repository at this point in the history
Signed-off-by: Mik Vyatskov <vmik@google.com>
  • Loading branch information
Mik Vyatskov committed Feb 22, 2018
1 parent 852e7f7 commit 3f0e49a
Show file tree
Hide file tree
Showing 5 changed files with 518 additions and 0 deletions.
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/plugin/pkg/audit/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:all-srcs",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/log:all-srcs",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook:all-srcs",
],
Expand Down
44 changes: 44 additions & 0 deletions staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = [
"buffered.go",
"doc.go",
],
importpath = "k8s.io/apiserver/plugin/pkg/audit/buffered",
visibility = ["//visibility:public"],
deps = [
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["buffered_test.go"],
embed = [":go_default_library"],
deps = [
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
273 changes: 273 additions & 0 deletions staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package buffered

import (
"fmt"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
"k8s.io/client-go/util/flowcontrol"
)

// The plugin name reported in error metrics.
const pluginName = "buffered"

const (
// Default configuration values for ModeBatch.
defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding.
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.

defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
)

// BatchConfig represents batching delegate audit backend configuration.
type BatchConfig struct {
// BufferSize defines a size of the buffering queue.
BufferSize int
// MaxBatchSize defines maximum size of a batch.
MaxBatchSize int
// MaxBatchWait indicates the maximum interval between two batches.
MaxBatchWait time.Duration

// ThrottleEnable defines whether throttling will be applied to the batching process.
ThrottleEnable bool
// ThrottleQPS defines the allowed rate of batches per second sent to the delegate backend.
ThrottleQPS float32
// ThrottleBurst defines the maximum rate of batches per second sent to the delegate backend in case
// the capacity defined by ThrottleQPS was not utilized.
ThrottleBurst int
}

// NewDefaultBatchConfig returns new Config objects populated by default values.
func NewDefaultBatchConfig() BatchConfig {
return BatchConfig{
BufferSize: defaultBatchBufferSize,
MaxBatchSize: defaultBatchMaxSize,
MaxBatchWait: defaultBatchMaxWait,

ThrottleEnable: true,
ThrottleQPS: defaultBatchThrottleQPS,
ThrottleBurst: defaultBatchThrottleBurst,
}
}

type bufferedBackend struct {
// The delegate backend that actually exports events.
delegateBackend audit.Backend

// Channel to buffer events before sending to the delegate backend.
buffer chan *auditinternal.Event
// Maximum number of events in a batch sent to the delegate backend.
maxBatchSize int
// Amount of time to wait after sending a batch to the delegate backend before sending another one.
//
// Receiving maxBatchSize events will always trigger sending a batch, regardless of the amount of time passed.
maxBatchWait time.Duration

// Channel to signal that the batching routine has processed all remaining events and exited.
// Once `shutdownCh` is closed no new events will be sent to the delegate backend.
shutdownCh chan struct{}

// WaitGroup to control the concurrency of sending batches to the delegate backend.
// Worker routine calls Add before sending a batch and
// then spawns a routine that calls Done after batch was processed by the delegate backend.
// This WaitGroup is used to wait for all sending routines to finish before shutting down audit backend.
wg sync.WaitGroup

// Limits the number of batches sent to the delegate backend per second.
throttle flowcontrol.RateLimiter
}

var _ audit.Backend = &bufferedBackend{}

// NewBackend returns a buffered audit backend that wraps delegate backend.
func NewBackend(delegate audit.Backend, config BatchConfig) audit.Backend {
var throttle flowcontrol.RateLimiter
if config.ThrottleEnable {
throttle = flowcontrol.NewTokenBucketRateLimiter(config.ThrottleQPS, config.ThrottleBurst)
}
return &bufferedBackend{
delegateBackend: delegate,
buffer: make(chan *auditinternal.Event, config.BufferSize),
maxBatchSize: config.MaxBatchSize,
maxBatchWait: config.MaxBatchWait,
shutdownCh: make(chan struct{}),
wg: sync.WaitGroup{},
throttle: throttle,
}
}

func (b *bufferedBackend) Run(stopCh <-chan struct{}) error {
go func() {
// Signal that the working routine has exited.
defer close(b.shutdownCh)

b.processIncomingEvents(stopCh)

// Handle the events that were received after the last buffer
// scraping and before this line. Since the buffer is closed, no new
// events will come through.
allEventsProcessed := false
timer := make(chan time.Time)
for !allEventsProcessed {
allEventsProcessed = func() bool {
// Recover from any panic in order to try to process all remaining events.
// Note, that in case of a panic, the return value will be false and
// the loop execution will continue.
defer runtime.HandleCrash()

events := b.collectEvents(timer, wait.NeverStop)
b.processEvents(events)
return len(events) == 0
}()
}
}()
return b.delegateBackend.Run(stopCh)
}

// Shutdown blocks until stopCh passed to the Run method is closed and all
// events added prior to that moment are batched and sent to the delegate backend.
func (b *bufferedBackend) Shutdown() {
// Wait until the routine spawned in Run method exits.
<-b.shutdownCh

// Wait until all sending routines exit.
b.wg.Wait()

b.delegateBackend.Shutdown()
}

// processIncomingEvents runs a loop that collects events from the buffer. When
// b.stopCh is closed, processIncomingEvents stops and closes the buffer.
func (b *bufferedBackend) processIncomingEvents(stopCh <-chan struct{}) {
defer close(b.buffer)
t := time.NewTimer(b.maxBatchWait)
defer t.Stop()

for {
func() {
// Recover from any panics caused by this function so a panic in the
// goroutine can't bring down the main routine.
defer runtime.HandleCrash()

t.Reset(b.maxBatchWait)
b.processEvents(b.collectEvents(t.C, stopCh))
}()

select {
case <-stopCh:
return
default:
}
}
}

// collectEvents attempts to collect some number of events in a batch.
//
// The following things can cause collectEvents to stop and return the list
// of events:
//
// * Maximum number of events for a batch.
// * Timer has passed.
// * Buffer channel is closed and empty.
// * stopCh is closed.
func (b *bufferedBackend) collectEvents(timer <-chan time.Time, stopCh <-chan struct{}) []*auditinternal.Event {
var events []*auditinternal.Event

L:
for i := 0; i < b.maxBatchSize; i++ {
select {
case ev, ok := <-b.buffer:
// Buffer channel was closed and no new events will follow.
if !ok {
break L
}
events = append(events, ev)
case <-timer:
// Timer has expired. Send currently accumulated batch.
break L
case <-stopCh:
// Backend has been stopped. Send currently accumulated batch.
break L
}
}

return events
}

// processEvents process the batch events in a goroutine using delegateBackend's ProcessEvents.
func (b *bufferedBackend) processEvents(events []*auditinternal.Event) {
if len(events) == 0 {
return
}

// TODO(audit): Should control the number of active goroutines
// if one goroutine takes 5 seconds to finish, the number of goroutines can be 5 * defaultBatchThrottleQPS
if b.throttle != nil {
b.throttle.Accept()
}

b.wg.Add(1)
go func() {
defer b.wg.Done()
defer runtime.HandleCrash()

// Execute the real processing in a goroutine to keep it from blocking.
// This lets the batching routine continue draining the queue immediately.
b.delegateBackend.ProcessEvents(events...)
}()
}

func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) {
// The following mechanism is in place to support the situation when audit
// events are still coming after the backend was stopped.
var sendErr error
var evIndex int

// If the delegateBackend was shutdown and the buffer channel was closed, an
// attempt to add an event to it will result in panic that we should
// recover from.
defer func() {
if err := recover(); err != nil {
sendErr = fmt.Errorf("panic when processing events: %v", err)
}
if sendErr != nil {
audit.HandlePluginError(pluginName, sendErr, ev[evIndex:]...)
}
}()

for i, e := range ev {
evIndex = i
// Per the audit.Backend interface these events are reused after being
// sent to the Sink. Deep copy and send the copy to the queue.
event := e.DeepCopy()

select {
case b.buffer <- event:
default:
sendErr = fmt.Errorf("audit buffer queue blocked")
return
}
}
}
Loading

0 comments on commit 3f0e49a

Please sign in to comment.