From 3f0e49aea430c30f4539d34c0f93486fd451d073 Mon Sep 17 00:00:00 2001 From: Mik Vyatskov Date: Tue, 20 Feb 2018 15:25:46 +0100 Subject: [PATCH] Introduce buffered audit backend Signed-off-by: Mik Vyatskov --- .../k8s.io/apiserver/plugin/pkg/audit/BUILD | 1 + .../apiserver/plugin/pkg/audit/buffered/BUILD | 44 +++ .../plugin/pkg/audit/buffered/buffered.go | 273 ++++++++++++++++++ .../pkg/audit/buffered/buffered_test.go | 181 ++++++++++++ .../plugin/pkg/audit/buffered/doc.go | 19 ++ 5 files changed, 518 insertions(+) create mode 100644 staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/BUILD create mode 100644 staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go create mode 100644 staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered_test.go create mode 100644 staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/doc.go diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/BUILD index 34d23d7a13..fe8f7ce4fd 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/BUILD +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/BUILD @@ -22,6 +22,7 @@ filegroup( name = "all-srcs", srcs = [ ":package-srcs", + "//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:all-srcs", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/log:all-srcs", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook:all-srcs", ], diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/BUILD new file mode 100644 index 0000000000..7c461c4b4e --- /dev/null +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/BUILD @@ -0,0 +1,44 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "buffered.go", + "doc.go", + ], + importpath = "k8s.io/apiserver/plugin/pkg/audit/buffered", + visibility = ["//visibility:public"], + deps = [ + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library", + "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["buffered_test.go"], + embed = [":go_default_library"], + deps = [ + "//vendor/github.com/stretchr/testify/require:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library", + "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go new file mode 100644 index 0000000000..c3991bfef8 --- /dev/null +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go @@ -0,0 +1,273 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package buffered + +import ( + "fmt" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + "k8s.io/apiserver/pkg/audit" + "k8s.io/client-go/util/flowcontrol" +) + +// The plugin name reported in error metrics. +const pluginName = "buffered" + +const ( + // Default configuration values for ModeBatch. + defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding. + defaultBatchMaxSize = 400 // Only send up to 400 events at a time. + defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute. + + defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS. + defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst. +) + +// BatchConfig represents batching delegate audit backend configuration. +type BatchConfig struct { + // BufferSize defines a size of the buffering queue. + BufferSize int + // MaxBatchSize defines maximum size of a batch. + MaxBatchSize int + // MaxBatchWait indicates the maximum interval between two batches. + MaxBatchWait time.Duration + + // ThrottleEnable defines whether throttling will be applied to the batching process. + ThrottleEnable bool + // ThrottleQPS defines the allowed rate of batches per second sent to the delegate backend. + ThrottleQPS float32 + // ThrottleBurst defines the maximum rate of batches per second sent to the delegate backend in case + // the capacity defined by ThrottleQPS was not utilized. + ThrottleBurst int +} + +// NewDefaultBatchConfig returns new Config objects populated by default values. +func NewDefaultBatchConfig() BatchConfig { + return BatchConfig{ + BufferSize: defaultBatchBufferSize, + MaxBatchSize: defaultBatchMaxSize, + MaxBatchWait: defaultBatchMaxWait, + + ThrottleEnable: true, + ThrottleQPS: defaultBatchThrottleQPS, + ThrottleBurst: defaultBatchThrottleBurst, + } +} + +type bufferedBackend struct { + // The delegate backend that actually exports events. + delegateBackend audit.Backend + + // Channel to buffer events before sending to the delegate backend. + buffer chan *auditinternal.Event + // Maximum number of events in a batch sent to the delegate backend. + maxBatchSize int + // Amount of time to wait after sending a batch to the delegate backend before sending another one. + // + // Receiving maxBatchSize events will always trigger sending a batch, regardless of the amount of time passed. + maxBatchWait time.Duration + + // Channel to signal that the batching routine has processed all remaining events and exited. + // Once `shutdownCh` is closed no new events will be sent to the delegate backend. + shutdownCh chan struct{} + + // WaitGroup to control the concurrency of sending batches to the delegate backend. + // Worker routine calls Add before sending a batch and + // then spawns a routine that calls Done after batch was processed by the delegate backend. + // This WaitGroup is used to wait for all sending routines to finish before shutting down audit backend. + wg sync.WaitGroup + + // Limits the number of batches sent to the delegate backend per second. + throttle flowcontrol.RateLimiter +} + +var _ audit.Backend = &bufferedBackend{} + +// NewBackend returns a buffered audit backend that wraps delegate backend. +func NewBackend(delegate audit.Backend, config BatchConfig) audit.Backend { + var throttle flowcontrol.RateLimiter + if config.ThrottleEnable { + throttle = flowcontrol.NewTokenBucketRateLimiter(config.ThrottleQPS, config.ThrottleBurst) + } + return &bufferedBackend{ + delegateBackend: delegate, + buffer: make(chan *auditinternal.Event, config.BufferSize), + maxBatchSize: config.MaxBatchSize, + maxBatchWait: config.MaxBatchWait, + shutdownCh: make(chan struct{}), + wg: sync.WaitGroup{}, + throttle: throttle, + } +} + +func (b *bufferedBackend) Run(stopCh <-chan struct{}) error { + go func() { + // Signal that the working routine has exited. + defer close(b.shutdownCh) + + b.processIncomingEvents(stopCh) + + // Handle the events that were received after the last buffer + // scraping and before this line. Since the buffer is closed, no new + // events will come through. + allEventsProcessed := false + timer := make(chan time.Time) + for !allEventsProcessed { + allEventsProcessed = func() bool { + // Recover from any panic in order to try to process all remaining events. + // Note, that in case of a panic, the return value will be false and + // the loop execution will continue. + defer runtime.HandleCrash() + + events := b.collectEvents(timer, wait.NeverStop) + b.processEvents(events) + return len(events) == 0 + }() + } + }() + return b.delegateBackend.Run(stopCh) +} + +// Shutdown blocks until stopCh passed to the Run method is closed and all +// events added prior to that moment are batched and sent to the delegate backend. +func (b *bufferedBackend) Shutdown() { + // Wait until the routine spawned in Run method exits. + <-b.shutdownCh + + // Wait until all sending routines exit. + b.wg.Wait() + + b.delegateBackend.Shutdown() +} + +// processIncomingEvents runs a loop that collects events from the buffer. When +// b.stopCh is closed, processIncomingEvents stops and closes the buffer. +func (b *bufferedBackend) processIncomingEvents(stopCh <-chan struct{}) { + defer close(b.buffer) + t := time.NewTimer(b.maxBatchWait) + defer t.Stop() + + for { + func() { + // Recover from any panics caused by this function so a panic in the + // goroutine can't bring down the main routine. + defer runtime.HandleCrash() + + t.Reset(b.maxBatchWait) + b.processEvents(b.collectEvents(t.C, stopCh)) + }() + + select { + case <-stopCh: + return + default: + } + } +} + +// collectEvents attempts to collect some number of events in a batch. +// +// The following things can cause collectEvents to stop and return the list +// of events: +// +// * Maximum number of events for a batch. +// * Timer has passed. +// * Buffer channel is closed and empty. +// * stopCh is closed. +func (b *bufferedBackend) collectEvents(timer <-chan time.Time, stopCh <-chan struct{}) []*auditinternal.Event { + var events []*auditinternal.Event + +L: + for i := 0; i < b.maxBatchSize; i++ { + select { + case ev, ok := <-b.buffer: + // Buffer channel was closed and no new events will follow. + if !ok { + break L + } + events = append(events, ev) + case <-timer: + // Timer has expired. Send currently accumulated batch. + break L + case <-stopCh: + // Backend has been stopped. Send currently accumulated batch. + break L + } + } + + return events +} + +// processEvents process the batch events in a goroutine using delegateBackend's ProcessEvents. +func (b *bufferedBackend) processEvents(events []*auditinternal.Event) { + if len(events) == 0 { + return + } + + // TODO(audit): Should control the number of active goroutines + // if one goroutine takes 5 seconds to finish, the number of goroutines can be 5 * defaultBatchThrottleQPS + if b.throttle != nil { + b.throttle.Accept() + } + + b.wg.Add(1) + go func() { + defer b.wg.Done() + defer runtime.HandleCrash() + + // Execute the real processing in a goroutine to keep it from blocking. + // This lets the batching routine continue draining the queue immediately. + b.delegateBackend.ProcessEvents(events...) + }() +} + +func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) { + // The following mechanism is in place to support the situation when audit + // events are still coming after the backend was stopped. + var sendErr error + var evIndex int + + // If the delegateBackend was shutdown and the buffer channel was closed, an + // attempt to add an event to it will result in panic that we should + // recover from. + defer func() { + if err := recover(); err != nil { + sendErr = fmt.Errorf("panic when processing events: %v", err) + } + if sendErr != nil { + audit.HandlePluginError(pluginName, sendErr, ev[evIndex:]...) + } + }() + + for i, e := range ev { + evIndex = i + // Per the audit.Backend interface these events are reused after being + // sent to the Sink. Deep copy and send the copy to the queue. + event := e.DeepCopy() + + select { + case b.buffer <- event: + default: + sendErr = fmt.Errorf("audit buffer queue blocked") + return + } + } +} diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered_test.go new file mode 100644 index 0000000000..b709964688 --- /dev/null +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered_test.go @@ -0,0 +1,181 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package buffered + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "k8s.io/apimachinery/pkg/util/wait" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + "k8s.io/apiserver/pkg/audit" +) + +var ( + closedStopCh = func() <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch + }() + infiniteTimeCh <-chan time.Time = make(chan time.Time) + closedTimeCh = func() <-chan time.Time { + ch := make(chan time.Time) + close(ch) + return ch + }() +) + +func newEvents(number int) []*auditinternal.Event { + events := make([]*auditinternal.Event, number) + for i := range events { + events[i] = &auditinternal.Event{} + } + + return events +} + +func TestBufferedBackendCollectEvents(t *testing.T) { + config := NewDefaultBatchConfig() + + testCases := []struct { + desc string + timer <-chan time.Time + stopCh <-chan struct{} + numEvents int + wantBatchSize int + }{ + { + desc: "max batch size encountered", + timer: infiniteTimeCh, + stopCh: wait.NeverStop, + numEvents: config.MaxBatchSize + 1, + wantBatchSize: config.MaxBatchSize, + }, + { + desc: "timer expired", + timer: closedTimeCh, + stopCh: wait.NeverStop, + }, + { + desc: "chanel closed", + timer: infiniteTimeCh, + stopCh: closedStopCh, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + backend := NewBackend(&fakeBackend{}, config).(*bufferedBackend) + + backend.ProcessEvents(newEvents(tc.numEvents)...) + batch := backend.collectEvents(tc.timer, tc.stopCh) + + require.Equal(t, tc.wantBatchSize, len(batch), "unexpected batch size") + }) + } +} + +func TestBufferedBackendProcessEventsAfterStop(t *testing.T) { + t.Parallel() + + backend := NewBackend(&fakeBackend{}, NewDefaultBatchConfig()).(*bufferedBackend) + + backend.Run(closedStopCh) + backend.Shutdown() + backend.ProcessEvents(newEvents(1)...) + batch := backend.collectEvents(infiniteTimeCh, wait.NeverStop) + + require.Equal(t, 0, len(batch), "processed events after the backed has been stopped") +} + +func TestBufferedBackendProcessEventsBufferFull(t *testing.T) { + t.Parallel() + + config := NewDefaultBatchConfig() + config.BufferSize = 1 + backend := NewBackend(&fakeBackend{}, config).(*bufferedBackend) + + backend.ProcessEvents(newEvents(2)...) + + require.Equal(t, 1, len(backend.buffer), "buffed contains more elements than it should") +} + +func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) { + t.Parallel() + + delegatedCallStartCh := make(chan struct{}) + delegatedCallEndCh := make(chan struct{}) + delegateBackend := &fakeBackend{ + onRequest: func(_ []*auditinternal.Event) { + close(delegatedCallStartCh) + <-delegatedCallEndCh + }, + } + config := NewDefaultBatchConfig() + backend := NewBackend(delegateBackend, config) + + // Run backend, process events, wait for them to be batched and for delegated call to start. + stopCh := make(chan struct{}) + backend.Run(stopCh) + backend.ProcessEvents(newEvents(config.MaxBatchSize)...) + <-delegatedCallStartCh + + // Start shutdown procedure. + shutdownEndCh := make(chan struct{}) + go func() { + close(stopCh) + backend.Shutdown() + close(shutdownEndCh) + }() + + // Wait for some time and then check whether Shutdown has exited. Can give false positive, + // but never false negative. + time.Sleep(100 * time.Millisecond) + select { + case <-shutdownEndCh: + t.Fatalf("Shutdown exited before delegated call ended") + default: + } + + // Wait for Shutdown to exit after delegated call has exited. + close(delegatedCallEndCh) + <-shutdownEndCh +} + +type fakeBackend struct { + onRequest func(events []*auditinternal.Event) +} + +var _ audit.Backend = &fakeBackend{} + +func (b *fakeBackend) Run(stopCh <-chan struct{}) error { + return nil +} + +func (b *fakeBackend) Shutdown() { + return +} + +func (b *fakeBackend) ProcessEvents(ev ...*auditinternal.Event) { + if b.onRequest != nil { + b.onRequest(ev) + } +} diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/doc.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/doc.go new file mode 100644 index 0000000000..a82599e426 --- /dev/null +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package buffered provides an implementation for the audit.Backend interface +// that batches incoming audit events and sends batches to the delegate audit.Backend. +package buffered // import "k8s.io/apiserver/plugin/pkg/audit/buffered"