mirror of https://github.com/k3s-io/k3s
Introduce buffered audit backend
Signed-off-by: Mik Vyatskov <vmik@google.com>pull/6/head
parent
852e7f7bfa
commit
3f0e49aea4
|
@ -22,6 +22,7 @@ filegroup(
|
|||
name = "all-srcs",
|
||||
srcs = [
|
||||
":package-srcs",
|
||||
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:all-srcs",
|
||||
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/log:all-srcs",
|
||||
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook:all-srcs",
|
||||
],
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"buffered.go",
|
||||
"doc.go",
|
||||
],
|
||||
importpath = "k8s.io/apiserver/plugin/pkg/audit/buffered",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
|
||||
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["buffered_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//vendor/github.com/stretchr/testify/require:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
|
@ -0,0 +1,273 @@
|
|||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package buffered
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
auditinternal "k8s.io/apiserver/pkg/apis/audit"
|
||||
"k8s.io/apiserver/pkg/audit"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
)
|
||||
|
||||
// The plugin name reported in error metrics.
|
||||
const pluginName = "buffered"
|
||||
|
||||
const (
|
||||
// Default configuration values for ModeBatch.
|
||||
defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding.
|
||||
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
|
||||
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.
|
||||
|
||||
defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
|
||||
defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
|
||||
)
|
||||
|
||||
// BatchConfig represents batching delegate audit backend configuration.
|
||||
type BatchConfig struct {
|
||||
// BufferSize defines a size of the buffering queue.
|
||||
BufferSize int
|
||||
// MaxBatchSize defines maximum size of a batch.
|
||||
MaxBatchSize int
|
||||
// MaxBatchWait indicates the maximum interval between two batches.
|
||||
MaxBatchWait time.Duration
|
||||
|
||||
// ThrottleEnable defines whether throttling will be applied to the batching process.
|
||||
ThrottleEnable bool
|
||||
// ThrottleQPS defines the allowed rate of batches per second sent to the delegate backend.
|
||||
ThrottleQPS float32
|
||||
// ThrottleBurst defines the maximum rate of batches per second sent to the delegate backend in case
|
||||
// the capacity defined by ThrottleQPS was not utilized.
|
||||
ThrottleBurst int
|
||||
}
|
||||
|
||||
// NewDefaultBatchConfig returns new Config objects populated by default values.
|
||||
func NewDefaultBatchConfig() BatchConfig {
|
||||
return BatchConfig{
|
||||
BufferSize: defaultBatchBufferSize,
|
||||
MaxBatchSize: defaultBatchMaxSize,
|
||||
MaxBatchWait: defaultBatchMaxWait,
|
||||
|
||||
ThrottleEnable: true,
|
||||
ThrottleQPS: defaultBatchThrottleQPS,
|
||||
ThrottleBurst: defaultBatchThrottleBurst,
|
||||
}
|
||||
}
|
||||
|
||||
type bufferedBackend struct {
|
||||
// The delegate backend that actually exports events.
|
||||
delegateBackend audit.Backend
|
||||
|
||||
// Channel to buffer events before sending to the delegate backend.
|
||||
buffer chan *auditinternal.Event
|
||||
// Maximum number of events in a batch sent to the delegate backend.
|
||||
maxBatchSize int
|
||||
// Amount of time to wait after sending a batch to the delegate backend before sending another one.
|
||||
//
|
||||
// Receiving maxBatchSize events will always trigger sending a batch, regardless of the amount of time passed.
|
||||
maxBatchWait time.Duration
|
||||
|
||||
// Channel to signal that the batching routine has processed all remaining events and exited.
|
||||
// Once `shutdownCh` is closed no new events will be sent to the delegate backend.
|
||||
shutdownCh chan struct{}
|
||||
|
||||
// WaitGroup to control the concurrency of sending batches to the delegate backend.
|
||||
// Worker routine calls Add before sending a batch and
|
||||
// then spawns a routine that calls Done after batch was processed by the delegate backend.
|
||||
// This WaitGroup is used to wait for all sending routines to finish before shutting down audit backend.
|
||||
wg sync.WaitGroup
|
||||
|
||||
// Limits the number of batches sent to the delegate backend per second.
|
||||
throttle flowcontrol.RateLimiter
|
||||
}
|
||||
|
||||
var _ audit.Backend = &bufferedBackend{}
|
||||
|
||||
// NewBackend returns a buffered audit backend that wraps delegate backend.
|
||||
func NewBackend(delegate audit.Backend, config BatchConfig) audit.Backend {
|
||||
var throttle flowcontrol.RateLimiter
|
||||
if config.ThrottleEnable {
|
||||
throttle = flowcontrol.NewTokenBucketRateLimiter(config.ThrottleQPS, config.ThrottleBurst)
|
||||
}
|
||||
return &bufferedBackend{
|
||||
delegateBackend: delegate,
|
||||
buffer: make(chan *auditinternal.Event, config.BufferSize),
|
||||
maxBatchSize: config.MaxBatchSize,
|
||||
maxBatchWait: config.MaxBatchWait,
|
||||
shutdownCh: make(chan struct{}),
|
||||
wg: sync.WaitGroup{},
|
||||
throttle: throttle,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *bufferedBackend) Run(stopCh <-chan struct{}) error {
|
||||
go func() {
|
||||
// Signal that the working routine has exited.
|
||||
defer close(b.shutdownCh)
|
||||
|
||||
b.processIncomingEvents(stopCh)
|
||||
|
||||
// Handle the events that were received after the last buffer
|
||||
// scraping and before this line. Since the buffer is closed, no new
|
||||
// events will come through.
|
||||
allEventsProcessed := false
|
||||
timer := make(chan time.Time)
|
||||
for !allEventsProcessed {
|
||||
allEventsProcessed = func() bool {
|
||||
// Recover from any panic in order to try to process all remaining events.
|
||||
// Note, that in case of a panic, the return value will be false and
|
||||
// the loop execution will continue.
|
||||
defer runtime.HandleCrash()
|
||||
|
||||
events := b.collectEvents(timer, wait.NeverStop)
|
||||
b.processEvents(events)
|
||||
return len(events) == 0
|
||||
}()
|
||||
}
|
||||
}()
|
||||
return b.delegateBackend.Run(stopCh)
|
||||
}
|
||||
|
||||
// Shutdown blocks until stopCh passed to the Run method is closed and all
|
||||
// events added prior to that moment are batched and sent to the delegate backend.
|
||||
func (b *bufferedBackend) Shutdown() {
|
||||
// Wait until the routine spawned in Run method exits.
|
||||
<-b.shutdownCh
|
||||
|
||||
// Wait until all sending routines exit.
|
||||
b.wg.Wait()
|
||||
|
||||
b.delegateBackend.Shutdown()
|
||||
}
|
||||
|
||||
// processIncomingEvents runs a loop that collects events from the buffer. When
|
||||
// b.stopCh is closed, processIncomingEvents stops and closes the buffer.
|
||||
func (b *bufferedBackend) processIncomingEvents(stopCh <-chan struct{}) {
|
||||
defer close(b.buffer)
|
||||
t := time.NewTimer(b.maxBatchWait)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
func() {
|
||||
// Recover from any panics caused by this function so a panic in the
|
||||
// goroutine can't bring down the main routine.
|
||||
defer runtime.HandleCrash()
|
||||
|
||||
t.Reset(b.maxBatchWait)
|
||||
b.processEvents(b.collectEvents(t.C, stopCh))
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// collectEvents attempts to collect some number of events in a batch.
|
||||
//
|
||||
// The following things can cause collectEvents to stop and return the list
|
||||
// of events:
|
||||
//
|
||||
// * Maximum number of events for a batch.
|
||||
// * Timer has passed.
|
||||
// * Buffer channel is closed and empty.
|
||||
// * stopCh is closed.
|
||||
func (b *bufferedBackend) collectEvents(timer <-chan time.Time, stopCh <-chan struct{}) []*auditinternal.Event {
|
||||
var events []*auditinternal.Event
|
||||
|
||||
L:
|
||||
for i := 0; i < b.maxBatchSize; i++ {
|
||||
select {
|
||||
case ev, ok := <-b.buffer:
|
||||
// Buffer channel was closed and no new events will follow.
|
||||
if !ok {
|
||||
break L
|
||||
}
|
||||
events = append(events, ev)
|
||||
case <-timer:
|
||||
// Timer has expired. Send currently accumulated batch.
|
||||
break L
|
||||
case <-stopCh:
|
||||
// Backend has been stopped. Send currently accumulated batch.
|
||||
break L
|
||||
}
|
||||
}
|
||||
|
||||
return events
|
||||
}
|
||||
|
||||
// processEvents process the batch events in a goroutine using delegateBackend's ProcessEvents.
|
||||
func (b *bufferedBackend) processEvents(events []*auditinternal.Event) {
|
||||
if len(events) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO(audit): Should control the number of active goroutines
|
||||
// if one goroutine takes 5 seconds to finish, the number of goroutines can be 5 * defaultBatchThrottleQPS
|
||||
if b.throttle != nil {
|
||||
b.throttle.Accept()
|
||||
}
|
||||
|
||||
b.wg.Add(1)
|
||||
go func() {
|
||||
defer b.wg.Done()
|
||||
defer runtime.HandleCrash()
|
||||
|
||||
// Execute the real processing in a goroutine to keep it from blocking.
|
||||
// This lets the batching routine continue draining the queue immediately.
|
||||
b.delegateBackend.ProcessEvents(events...)
|
||||
}()
|
||||
}
|
||||
|
||||
func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) {
|
||||
// The following mechanism is in place to support the situation when audit
|
||||
// events are still coming after the backend was stopped.
|
||||
var sendErr error
|
||||
var evIndex int
|
||||
|
||||
// If the delegateBackend was shutdown and the buffer channel was closed, an
|
||||
// attempt to add an event to it will result in panic that we should
|
||||
// recover from.
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
sendErr = fmt.Errorf("panic when processing events: %v", err)
|
||||
}
|
||||
if sendErr != nil {
|
||||
audit.HandlePluginError(pluginName, sendErr, ev[evIndex:]...)
|
||||
}
|
||||
}()
|
||||
|
||||
for i, e := range ev {
|
||||
evIndex = i
|
||||
// Per the audit.Backend interface these events are reused after being
|
||||
// sent to the Sink. Deep copy and send the copy to the queue.
|
||||
event := e.DeepCopy()
|
||||
|
||||
select {
|
||||
case b.buffer <- event:
|
||||
default:
|
||||
sendErr = fmt.Errorf("audit buffer queue blocked")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,181 @@
|
|||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package buffered
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
auditinternal "k8s.io/apiserver/pkg/apis/audit"
|
||||
"k8s.io/apiserver/pkg/audit"
|
||||
)
|
||||
|
||||
var (
|
||||
closedStopCh = func() <-chan struct{} {
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
return ch
|
||||
}()
|
||||
infiniteTimeCh <-chan time.Time = make(chan time.Time)
|
||||
closedTimeCh = func() <-chan time.Time {
|
||||
ch := make(chan time.Time)
|
||||
close(ch)
|
||||
return ch
|
||||
}()
|
||||
)
|
||||
|
||||
func newEvents(number int) []*auditinternal.Event {
|
||||
events := make([]*auditinternal.Event, number)
|
||||
for i := range events {
|
||||
events[i] = &auditinternal.Event{}
|
||||
}
|
||||
|
||||
return events
|
||||
}
|
||||
|
||||
func TestBufferedBackendCollectEvents(t *testing.T) {
|
||||
config := NewDefaultBatchConfig()
|
||||
|
||||
testCases := []struct {
|
||||
desc string
|
||||
timer <-chan time.Time
|
||||
stopCh <-chan struct{}
|
||||
numEvents int
|
||||
wantBatchSize int
|
||||
}{
|
||||
{
|
||||
desc: "max batch size encountered",
|
||||
timer: infiniteTimeCh,
|
||||
stopCh: wait.NeverStop,
|
||||
numEvents: config.MaxBatchSize + 1,
|
||||
wantBatchSize: config.MaxBatchSize,
|
||||
},
|
||||
{
|
||||
desc: "timer expired",
|
||||
timer: closedTimeCh,
|
||||
stopCh: wait.NeverStop,
|
||||
},
|
||||
{
|
||||
desc: "chanel closed",
|
||||
timer: infiniteTimeCh,
|
||||
stopCh: closedStopCh,
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
backend := NewBackend(&fakeBackend{}, config).(*bufferedBackend)
|
||||
|
||||
backend.ProcessEvents(newEvents(tc.numEvents)...)
|
||||
batch := backend.collectEvents(tc.timer, tc.stopCh)
|
||||
|
||||
require.Equal(t, tc.wantBatchSize, len(batch), "unexpected batch size")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBufferedBackendProcessEventsAfterStop(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
backend := NewBackend(&fakeBackend{}, NewDefaultBatchConfig()).(*bufferedBackend)
|
||||
|
||||
backend.Run(closedStopCh)
|
||||
backend.Shutdown()
|
||||
backend.ProcessEvents(newEvents(1)...)
|
||||
batch := backend.collectEvents(infiniteTimeCh, wait.NeverStop)
|
||||
|
||||
require.Equal(t, 0, len(batch), "processed events after the backed has been stopped")
|
||||
}
|
||||
|
||||
func TestBufferedBackendProcessEventsBufferFull(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
config := NewDefaultBatchConfig()
|
||||
config.BufferSize = 1
|
||||
backend := NewBackend(&fakeBackend{}, config).(*bufferedBackend)
|
||||
|
||||
backend.ProcessEvents(newEvents(2)...)
|
||||
|
||||
require.Equal(t, 1, len(backend.buffer), "buffed contains more elements than it should")
|
||||
}
|
||||
|
||||
func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegatedCallStartCh := make(chan struct{})
|
||||
delegatedCallEndCh := make(chan struct{})
|
||||
delegateBackend := &fakeBackend{
|
||||
onRequest: func(_ []*auditinternal.Event) {
|
||||
close(delegatedCallStartCh)
|
||||
<-delegatedCallEndCh
|
||||
},
|
||||
}
|
||||
config := NewDefaultBatchConfig()
|
||||
backend := NewBackend(delegateBackend, config)
|
||||
|
||||
// Run backend, process events, wait for them to be batched and for delegated call to start.
|
||||
stopCh := make(chan struct{})
|
||||
backend.Run(stopCh)
|
||||
backend.ProcessEvents(newEvents(config.MaxBatchSize)...)
|
||||
<-delegatedCallStartCh
|
||||
|
||||
// Start shutdown procedure.
|
||||
shutdownEndCh := make(chan struct{})
|
||||
go func() {
|
||||
close(stopCh)
|
||||
backend.Shutdown()
|
||||
close(shutdownEndCh)
|
||||
}()
|
||||
|
||||
// Wait for some time and then check whether Shutdown has exited. Can give false positive,
|
||||
// but never false negative.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
select {
|
||||
case <-shutdownEndCh:
|
||||
t.Fatalf("Shutdown exited before delegated call ended")
|
||||
default:
|
||||
}
|
||||
|
||||
// Wait for Shutdown to exit after delegated call has exited.
|
||||
close(delegatedCallEndCh)
|
||||
<-shutdownEndCh
|
||||
}
|
||||
|
||||
type fakeBackend struct {
|
||||
onRequest func(events []*auditinternal.Event)
|
||||
}
|
||||
|
||||
var _ audit.Backend = &fakeBackend{}
|
||||
|
||||
func (b *fakeBackend) Run(stopCh <-chan struct{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *fakeBackend) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
||||
func (b *fakeBackend) ProcessEvents(ev ...*auditinternal.Event) {
|
||||
if b.onRequest != nil {
|
||||
b.onRequest(ev)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package buffered provides an implementation for the audit.Backend interface
|
||||
// that batches incoming audit events and sends batches to the delegate audit.Backend.
|
||||
package buffered // import "k8s.io/apiserver/plugin/pkg/audit/buffered"
|
Loading…
Reference in New Issue