Add buffering to the log audit backend

Signed-off-by: Mik Vyatskov <vmik@google.com>
pull/6/head
Mik Vyatskov 2018-02-22 19:52:33 +01:00
parent b64230b3ce
commit 881e6d4f6f
14 changed files with 216 additions and 979 deletions

View File

@ -1635,7 +1635,7 @@ function start-kube-apiserver {
params+=" --audit-webhook-batch-throttle-burst=${ADVANCED_AUDIT_WEBHOOK_THROTTLE_BURST}"
fi
if [[ -n "${ADVANCED_AUDIT_WEBHOOK_INITIAL_BACKOFF:-}" ]]; then
params+=" --audit-webhook-batch-initial-backoff=${ADVANCED_AUDIT_WEBHOOK_INITIAL_BACKOFF}"
params+=" --audit-webhook-initial-backoff=${ADVANCED_AUDIT_WEBHOOK_INITIAL_BACKOFF}"
fi
create-master-audit-webhook-config "${audit_webhook_config_file}"
audit_webhook_config_mount="{\"name\": \"auditwebhookconfigmount\",\"mountPath\": \"${audit_webhook_config_file}\", \"readOnly\": true},"

View File

@ -46,7 +46,7 @@ go_test(
"//vendor/k8s.io/apiserver/pkg/server/options:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
],
)

View File

@ -29,7 +29,7 @@ import (
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend"
utilflag "k8s.io/apiserver/pkg/util/flag"
auditwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
auditbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/api/legacyscheme"
kapi "k8s.io/kubernetes/pkg/apis/core"
@ -54,15 +54,23 @@ func TestAddFlags(t *testing.T) {
"--audit-log-maxbackup=12",
"--audit-log-maxsize=13",
"--audit-log-path=/var/log",
"--audit-log-mode=blocking",
"--audit-log-batch-buffer-size=46",
"--audit-log-batch-max-size=47",
"--audit-log-batch-max-wait=48s",
"--audit-log-batch-throttle-enable=true",
"--audit-log-batch-throttle-qps=49.5",
"--audit-log-batch-throttle-burst=50",
"--audit-policy-file=/policy",
"--audit-webhook-config-file=/webhook-config",
"--audit-webhook-mode=blocking",
"--audit-webhook-batch-buffer-size=42",
"--audit-webhook-batch-max-size=43",
"--audit-webhook-batch-max-wait=1s",
"--audit-webhook-batch-throttle-enable=false",
"--audit-webhook-batch-throttle-qps=43.5",
"--audit-webhook-batch-throttle-burst=44",
"--audit-webhook-batch-initial-backoff=2s",
"--audit-webhook-initial-backoff=2s",
"--authentication-token-webhook-cache-ttl=3m",
"--authentication-token-webhook-config-file=/token-webhook-config",
"--authorization-mode=AlwaysDeny",
@ -180,18 +188,32 @@ func TestAddFlags(t *testing.T) {
MaxBackups: 12,
MaxSize: 13,
Format: "json",
BatchOptions: apiserveroptions.AuditBatchOptions{
Mode: "blocking",
BatchConfig: auditbuffered.BatchConfig{
BufferSize: 46,
MaxBatchSize: 47,
MaxBatchWait: 48 * time.Second,
ThrottleEnable: true,
ThrottleQPS: 49.5,
ThrottleBurst: 50,
},
},
},
WebhookOptions: apiserveroptions.AuditWebhookOptions{
Mode: "blocking",
ConfigFile: "/webhook-config",
BatchConfig: auditwebhook.BatchBackendConfig{
BufferSize: 42,
MaxBatchSize: 43,
MaxBatchWait: 1 * time.Second,
ThrottleQPS: 43.5,
ThrottleBurst: 44,
InitialBackoff: 2 * time.Second,
BatchOptions: apiserveroptions.AuditBatchOptions{
Mode: "blocking",
BatchConfig: auditbuffered.BatchConfig{
BufferSize: 42,
MaxBatchSize: 43,
MaxBatchWait: 1 * time.Second,
ThrottleEnable: false,
ThrottleQPS: 43.5,
ThrottleBurst: 44,
},
},
InitialBackoff: 2 * time.Second,
},
PolicyFile: "/policy",
},

View File

@ -1470,6 +1470,10 @@
"ImportPath": "k8s.io/apiserver/pkg/util/wsstream",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/buffered",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -55,6 +55,7 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/log:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",

View File

@ -21,6 +21,7 @@ import (
"io"
"os"
"strings"
"time"
"github.com/golang/glog"
"github.com/spf13/pflag"
@ -32,6 +33,7 @@ import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server"
utilfeature "k8s.io/apiserver/pkg/util/feature"
pluginbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
pluginlog "k8s.io/apiserver/plugin/pkg/audit/log"
pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
)
@ -58,6 +60,33 @@ type AuditOptions struct {
WebhookOptions AuditWebhookOptions
}
const (
// ModeBatch indicates that the audit backend should buffer audit events
// internally, sending batch updates either once a certain number of
// events have been received or a certain amount of time has passed.
ModeBatch = "batch"
// ModeBlocking causes the audit backend to block on every attempt to process
// a set of events. This causes requests to the API server to wait for the
// flush before sending a response.
ModeBlocking = "blocking"
)
// AllowedModes is the modes known for audit backends.
var AllowedModes = []string{
ModeBatch,
ModeBlocking,
}
type AuditBatchOptions struct {
// Should the backend asynchronous batch events to the webhook backend or
// should the backend block responses?
//
// Defaults to asynchronous batch events.
Mode string
// Configuration for batching backend. Only used in batch mode.
BatchConfig pluginbuffered.BatchConfig
}
// AuditLogOptions determines the output of the structured audit log by default.
// If the AdvancedAuditing feature is set to false, AuditLogOptions holds the legacy
// audit log writer.
@ -67,27 +96,37 @@ type AuditLogOptions struct {
MaxBackups int
MaxSize int
Format string
BatchOptions AuditBatchOptions
}
// AuditWebhookOptions control the webhook configuration for audit events.
type AuditWebhookOptions struct {
ConfigFile string
// Should the webhook asynchronous batch events to the webhook backend or
// should the webhook block responses?
//
// Defaults to asynchronous batch events.
Mode string
// Configuration for batching webhook. Only used in batch mode.
BatchConfig pluginwebhook.BatchBackendConfig
ConfigFile string
InitialBackoff time.Duration
BatchOptions AuditBatchOptions
}
func NewAuditOptions() *AuditOptions {
defaultLogBatchConfig := pluginbuffered.NewDefaultBatchConfig()
defaultLogBatchConfig.ThrottleEnable = false
return &AuditOptions{
WebhookOptions: AuditWebhookOptions{
Mode: pluginwebhook.ModeBatch,
BatchConfig: pluginwebhook.NewDefaultBatchBackendConfig(),
BatchOptions: AuditBatchOptions{
Mode: ModeBatch,
BatchConfig: defaultLogBatchConfig,
},
InitialBackoff: pluginwebhook.DefaultInitialBackoff,
},
LogOptions: AuditLogOptions{
Format: pluginlog.FormatJson,
BatchOptions: AuditBatchOptions{
Mode: ModeBatch,
BatchConfig: pluginbuffered.NewDefaultBatchConfig(),
},
},
LogOptions: AuditLogOptions{Format: pluginlog.FormatJson},
}
}
@ -107,30 +146,20 @@ func (o *AuditOptions) Validate() []error {
allErrors = append(allErrors, fmt.Errorf("feature '%s' must be enabled to set option --audit-webhook-config-file", features.AdvancedAuditing))
}
} else {
// Check webhook mode
validMode := false
for _, m := range pluginwebhook.AllowedModes {
if m == o.WebhookOptions.Mode {
validMode = true
break
}
// check webhook configuration
if err := validateBackendMode(pluginwebhook.PluginName, o.WebhookOptions.BatchOptions.Mode); err != nil {
allErrors = append(allErrors, err)
}
if !validMode {
allErrors = append(allErrors, fmt.Errorf("invalid audit webhook mode %s, allowed modes are %q", o.WebhookOptions.Mode, strings.Join(pluginwebhook.AllowedModes, ",")))
if err := validateBackendBatchConfig(pluginwebhook.PluginName, o.LogOptions.BatchOptions.BatchConfig); err != nil {
allErrors = append(allErrors, err)
}
// Check webhook batch configuration
if o.WebhookOptions.BatchConfig.BufferSize <= 0 {
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook buffer size %v, must be a positive number", o.WebhookOptions.BatchConfig.BufferSize))
// check log configuration
if err := validateBackendMode(pluginlog.PluginName, o.LogOptions.BatchOptions.Mode); err != nil {
allErrors = append(allErrors, err)
}
if o.WebhookOptions.BatchConfig.MaxBatchSize <= 0 {
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook max batch size %v, must be a positive number", o.WebhookOptions.BatchConfig.MaxBatchSize))
}
if o.WebhookOptions.BatchConfig.ThrottleQPS <= 0 {
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook throttle QPS %v, must be a positive number", o.WebhookOptions.BatchConfig.ThrottleQPS))
}
if o.WebhookOptions.BatchConfig.ThrottleBurst <= 0 {
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook throttle burst %v, must be a positive number", o.WebhookOptions.BatchConfig.ThrottleBurst))
if err := validateBackendBatchConfig(pluginlog.PluginName, o.LogOptions.BatchOptions.BatchConfig); err != nil {
allErrors = append(allErrors, err)
}
// Check log format
@ -160,6 +189,31 @@ func (o *AuditOptions) Validate() []error {
return allErrors
}
func validateBackendMode(pluginName string, mode string) error {
for _, m := range AllowedModes {
if m == mode {
return nil
}
}
return fmt.Errorf("invalid audit %s mode %s, allowed modes are %q", pluginName, mode, strings.Join(AllowedModes, ","))
}
func validateBackendBatchConfig(pluginName string, config pluginbuffered.BatchConfig) error {
if config.BufferSize <= 0 {
return fmt.Errorf("invalid audit batch %s buffer size %v, must be a positive number", pluginName, config.BufferSize)
}
if config.MaxBatchSize <= 0 {
return fmt.Errorf("invalid audit batch %s max batch size %v, must be a positive number", pluginName, config.MaxBatchSize)
}
if config.ThrottleQPS <= 0 {
return fmt.Errorf("invalid audit batch %s throttle QPS %v, must be a positive number", pluginName, config.ThrottleQPS)
}
if config.ThrottleBurst <= 0 {
return fmt.Errorf("invalid audit batch %s throttle burst %v, must be a positive number", pluginName, config.ThrottleBurst)
}
return nil
}
func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) {
if o == nil {
return
@ -170,7 +224,9 @@ func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) {
" With AdvancedAuditing, a profile is required to enable auditing.")
o.LogOptions.AddFlags(fs)
o.LogOptions.BatchOptions.AddFlags(pluginlog.PluginName, fs)
o.WebhookOptions.AddFlags(fs)
o.WebhookOptions.BatchOptions.AddFlags(pluginwebhook.PluginName, fs)
}
func (o *AuditOptions) ApplyTo(c *server.Config) error {
@ -216,6 +272,36 @@ func (o *AuditOptions) applyTo(c *server.Config) error {
return nil
}
func (o *AuditBatchOptions) AddFlags(pluginName string, fs *pflag.FlagSet) {
fs.StringVar(&o.Mode, fmt.Sprintf("audit-%s-mode", pluginName), o.Mode,
"Strategy for sending audit events. Blocking indicates sending events should block"+
" server responses. Batch causes the backend to buffer and write events"+
" asynchronously. Known modes are "+strings.Join(AllowedModes, ",")+".")
fs.IntVar(&o.BatchConfig.BufferSize, fmt.Sprintf("audit-%s-batch-buffer-size", pluginName),
o.BatchConfig.BufferSize, "The size of the buffer to store events before "+
"batching and writing. Only used in batch mode.")
fs.IntVar(&o.BatchConfig.MaxBatchSize, fmt.Sprintf("audit-%s-batch-max-size", pluginName),
o.BatchConfig.MaxBatchSize, "The maximum size of a batch. Only used in batch mode.")
fs.DurationVar(&o.BatchConfig.MaxBatchWait, fmt.Sprintf("audit-%s-batch-max-wait", pluginName),
o.BatchConfig.MaxBatchWait, "The amount of time to wait before force writing the "+
"batch that hadn't reached the max size. Only used in batch mode.")
fs.BoolVar(&o.BatchConfig.ThrottleEnable, fmt.Sprintf("audit-%s-batch-throttle-enable", pluginName),
o.BatchConfig.ThrottleEnable, "Whether batching throttling is enabled. Only used in batch mode.")
fs.Float32Var(&o.BatchConfig.ThrottleQPS, fmt.Sprintf("audit-%s-batch-throttle-qps", pluginName),
o.BatchConfig.ThrottleQPS, "Maximum average number of batches per second. "+
"Only used in batch mode.")
fs.IntVar(&o.BatchConfig.ThrottleBurst, fmt.Sprintf("audit-%s-batch-throttle-burst", pluginName),
o.BatchConfig.ThrottleBurst, "Maximum number of requests sent at the same "+
"moment if ThrottleQPS was not utilized before. Only used in batch mode.")
}
func (o *AuditBatchOptions) wrapBackend(delegate audit.Backend) audit.Backend {
if o.Mode == ModeBlocking {
return delegate
}
return pluginbuffered.NewBackend(delegate, o.BatchConfig)
}
func (o *AuditLogOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.Path, "audit-log-path", o.Path,
"If set, all requests coming to the apiserver will be logged to this file. '-' means standard out.")
@ -250,7 +336,8 @@ func (o *AuditLogOptions) getWriter() io.Writer {
func (o *AuditLogOptions) advancedApplyTo(c *server.Config) error {
if w := o.getWriter(); w != nil {
c.AuditBackend = appendBackend(c.AuditBackend, pluginlog.NewBackend(w, o.Format, auditv1beta1.SchemeGroupVersion))
log := pluginlog.NewBackend(w, o.Format, auditv1beta1.SchemeGroupVersion)
c.AuditBackend = appendBackend(c.AuditBackend, o.BatchOptions.wrapBackend(log))
}
return nil
}
@ -264,28 +351,12 @@ func (o *AuditWebhookOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.ConfigFile, "audit-webhook-config-file", o.ConfigFile,
"Path to a kubeconfig formatted file that defines the audit webhook configuration."+
" Requires the 'AdvancedAuditing' feature gate.")
fs.StringVar(&o.Mode, "audit-webhook-mode", o.Mode,
"Strategy for sending audit events. Blocking indicates sending events should block"+
" server responses. Batch causes the webhook to buffer and send events"+
" asynchronously. Known modes are "+strings.Join(pluginwebhook.AllowedModes, ",")+".")
fs.IntVar(&o.BatchConfig.BufferSize, "audit-webhook-batch-buffer-size",
o.BatchConfig.BufferSize, "The size of the buffer to store events before "+
"batching and sending to the webhook. Only used in batch mode.")
fs.IntVar(&o.BatchConfig.MaxBatchSize, "audit-webhook-batch-max-size",
o.BatchConfig.MaxBatchSize, "The maximum size of a batch sent to the webhook. "+
"Only used in batch mode.")
fs.DurationVar(&o.BatchConfig.MaxBatchWait, "audit-webhook-batch-max-wait",
o.BatchConfig.MaxBatchWait, "The amount of time to wait before force sending the "+
"batch that hadn't reached the max size. Only used in batch mode.")
fs.Float32Var(&o.BatchConfig.ThrottleQPS, "audit-webhook-batch-throttle-qps",
o.BatchConfig.ThrottleQPS, "Maximum average number of requests per second. "+
"Only used in batch mode.")
fs.IntVar(&o.BatchConfig.ThrottleBurst, "audit-webhook-batch-throttle-burst",
o.BatchConfig.ThrottleBurst, "Maximum number of requests sent at the same "+
"moment if ThrottleQPS was not utilized before. Only used in batch mode.")
fs.DurationVar(&o.BatchConfig.InitialBackoff, "audit-webhook-batch-initial-backoff",
o.BatchConfig.InitialBackoff, "The amount of time to wait before retrying the "+
"first failed requests. Only used in batch mode.")
fs.DurationVar(&o.InitialBackoff, "audit-webhook-initial-backoff",
o.InitialBackoff, "The amount of time to wait before retrying the first failed request.")
fs.DurationVar(&o.InitialBackoff, "audit-webhook-batch-initial-backoff",
o.InitialBackoff, "The amount of time to wait before retrying the first failed request.")
fs.MarkDeprecated("audit-webhook-batch-initial-backoff",
"Deprecated, use --audit-webhook-initial-backoff instead.")
}
func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
@ -293,10 +364,10 @@ func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
return nil
}
webhook, err := pluginwebhook.NewBackend(o.ConfigFile, o.Mode, auditv1beta1.SchemeGroupVersion, o.BatchConfig)
webhook, err := pluginwebhook.NewBackend(o.ConfigFile, auditv1beta1.SchemeGroupVersion)
if err != nil {
return fmt.Errorf("initializing audit webhook: %v", err)
}
c.AuditBackend = appendBackend(c.AuditBackend, webhook)
c.AuditBackend = appendBackend(c.AuditBackend, o.BatchOptions.wrapBackend(webhook))
return nil
}

View File

@ -153,6 +153,12 @@ func (b *bufferedBackend) Shutdown() {
<-b.shutdownCh
// Wait until all sending routines exit.
//
// - When b.shutdownCh is closed, we know that the goroutine in Run has terminated.
// - This means that processIncomingEvents has terminated.
// - Which means that b.buffer is closed and cannot accept any new events anymore.
// - Because processEvents is called synchronously from the Run goroutine, the waitgroup has its final value.
// Hence wg.Wait will not miss any more outgoing batches.
b.wg.Wait()
b.delegateBackend.Shutdown()
@ -250,7 +256,7 @@ func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) {
// recover from.
defer func() {
if err := recover(); err != nil {
sendErr = fmt.Errorf("panic when processing events: %v", err)
sendErr = fmt.Errorf("audit backend shut down")
}
if sendErr != nil {
audit.HandlePluginError(pluginName, sendErr, ev[evIndex:]...)

View File

@ -32,6 +32,9 @@ const (
FormatLegacy = "legacy"
// FormatJson saves event in structured json format.
FormatJson = "json"
// PluginName is the name of this plugin, to be used in help and logs.
PluginName = "log"
)
// AllowedFormats are the formats known by log backend.
@ -70,17 +73,17 @@ func (b *backend) logEvent(ev *auditinternal.Event) {
case FormatJson:
bs, err := runtime.Encode(audit.Codecs.LegacyCodec(b.groupVersion), ev)
if err != nil {
audit.HandlePluginError("log", err, ev)
audit.HandlePluginError(PluginName, err, ev)
return
}
line = string(bs[:])
default:
audit.HandlePluginError("log", fmt.Errorf("log format %q is not in list of known formats (%s)",
audit.HandlePluginError(PluginName, fmt.Errorf("log format %q is not in list of known formats (%s)",
b.format, strings.Join(AllowedFormats, ",")), ev)
return
}
if _, err := fmt.Fprint(b.out, line); err != nil {
audit.HandlePluginError("log", err, ev)
audit.HandlePluginError(PluginName, err, ev)
}
}

View File

@ -8,10 +8,7 @@ load(
go_test(
name = "go_default_test",
srcs = [
"webhook_test.go",
"webhook_v1alpha1_test.go",
],
srcs = ["webhook_test.go"],
embed = [":go_default_library"],
deps = [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
@ -20,7 +17,6 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd/api/v1:go_default_library",
@ -35,7 +31,6 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/apimachinery/announced:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apimachinery/registered:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit/install:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library",
@ -43,7 +38,6 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)

View File

@ -18,16 +18,12 @@ limitations under the License.
package webhook
import (
"errors"
"fmt"
"strings"
"sync"
"time"
"k8s.io/apimachinery/pkg/apimachinery/announced"
"k8s.io/apimachinery/pkg/apimachinery/registered"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/runtime"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/apis/audit/install"
auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1"
@ -35,91 +31,17 @@ import (
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
)
const (
// ModeBatch indicates that the webhook should buffer audit events
// internally, sending batch updates either once a certain number of
// events have been received or a certain amount of time has passed.
ModeBatch = "batch"
// ModeBlocking causes the webhook to block on every attempt to process
// a set of events. This causes requests to the API server to wait for a
// round trip to the external audit service before sending a response.
ModeBlocking = "blocking"
// PluginName is the name of this plugin, to be used in help and logs.
PluginName = "webhook"
// DefaultInitialBackoff is the default amount of time to wait before
// retrying sending audit events through a webhook.
DefaultInitialBackoff = 10 * time.Second
)
// AllowedModes is the modes known by this webhook.
var AllowedModes = []string{
ModeBatch,
ModeBlocking,
}
const (
// Default configuration values for ModeBatch.
defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding.
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.
defaultInitialBackoff = 10 * time.Second // Wait at least 10 seconds before retrying.
defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
)
// The plugin name reported in error metrics.
const pluginName = "webhook"
// BatchBackendConfig represents batching webhook audit backend configuration.
type BatchBackendConfig struct {
// BufferSize defines a size of the buffering queue.
BufferSize int
// MaxBatchSize defines maximum size of a batch.
MaxBatchSize int
// MaxBatchWait defines maximum amount of time to wait for MaxBatchSize
// events to be accumulated in the buffer before forcibly sending what's
// being accumulated.
MaxBatchWait time.Duration
// ThrottleQPS defines the allowed rate of batches per second sent to the webhook.
ThrottleQPS float32
// ThrottleBurst defines the maximum rate of batches per second sent to the webhook in case
// the capacity defined by ThrottleQPS was not utilized.
ThrottleBurst int
// InitialBackoff defines the amount of time to wait before retrying the requests
// to the webhook for the first time.
InitialBackoff time.Duration
}
// NewDefaultBatchBackendConfig returns new BatchBackendConfig objects populated by default values.
func NewDefaultBatchBackendConfig() BatchBackendConfig {
return BatchBackendConfig{
BufferSize: defaultBatchBufferSize,
MaxBatchSize: defaultBatchMaxSize,
MaxBatchWait: defaultBatchMaxWait,
ThrottleQPS: defaultBatchThrottleQPS,
ThrottleBurst: defaultBatchThrottleBurst,
InitialBackoff: defaultInitialBackoff,
}
}
// NewBackend returns an audit backend that sends events over HTTP to an external service.
// The mode indicates the caching behavior of the webhook. Either blocking (ModeBlocking)
// or buffered with batch POSTs (ModeBatch).
func NewBackend(kubeConfigFile string, mode string, groupVersion schema.GroupVersion, config BatchBackendConfig) (audit.Backend, error) {
switch mode {
case ModeBatch:
return newBatchWebhook(kubeConfigFile, groupVersion, config)
case ModeBlocking:
return newBlockingWebhook(kubeConfigFile, groupVersion)
default:
return nil, fmt.Errorf("webhook mode %q is not in list of known modes (%s)",
mode, strings.Join(AllowedModes, ","))
}
}
var (
// NOTE: Copied from other webhook implementations
//
@ -143,267 +65,39 @@ func loadWebhook(configFile string, groupVersion schema.GroupVersion, initialBac
[]schema.GroupVersion{groupVersion}, initialBackoff)
}
func newBlockingWebhook(configFile string, groupVersion schema.GroupVersion) (*blockingBackend, error) {
w, err := loadWebhook(configFile, groupVersion, defaultInitialBackoff)
if err != nil {
return nil, err
}
return &blockingBackend{w}, nil
}
type blockingBackend struct {
type backend struct {
w *webhook.GenericWebhook
}
func (b *blockingBackend) Run(stopCh <-chan struct{}) error {
// NewBackend returns an audit backend that sends events over HTTP to an external service.
func NewBackend(kubeConfigFile string, groupVersion schema.GroupVersion) (audit.Backend, error) {
w, err := loadWebhook(kubeConfigFile, groupVersion, DefaultInitialBackoff)
if err != nil {
return nil, err
}
return &backend{w}, nil
}
func (b *backend) Run(stopCh <-chan struct{}) error {
return nil
}
func (b *blockingBackend) Shutdown() {
func (b *backend) Shutdown() {
// nothing to do here
}
func (b *blockingBackend) ProcessEvents(ev ...*auditinternal.Event) {
func (b *backend) ProcessEvents(ev ...*auditinternal.Event) {
if err := b.processEvents(ev...); err != nil {
audit.HandlePluginError(pluginName, err, ev...)
audit.HandlePluginError(PluginName, err, ev...)
}
}
func (b *blockingBackend) processEvents(ev ...*auditinternal.Event) error {
func (b *backend) processEvents(ev ...*auditinternal.Event) error {
var list auditinternal.EventList
for _, e := range ev {
list.Items = append(list.Items, *e)
}
// NOTE: No exponential backoff because this is the blocking webhook
// mode. Any attempts to retry will block API server requests.
return b.w.RestClient.Post().Body(&list).Do().Error()
}
func newBatchWebhook(configFile string, groupVersion schema.GroupVersion, config BatchBackendConfig) (*batchBackend, error) {
w, err := loadWebhook(configFile, groupVersion, config.InitialBackoff)
if err != nil {
return nil, err
}
return &batchBackend{
w: w,
buffer: make(chan *auditinternal.Event, config.BufferSize),
maxBatchSize: config.MaxBatchSize,
maxBatchWait: config.MaxBatchWait,
shutdownCh: make(chan struct{}),
throttle: flowcontrol.NewTokenBucketRateLimiter(config.ThrottleQPS, config.ThrottleBurst),
}, nil
}
type batchBackend struct {
w *webhook.GenericWebhook
// Channel to buffer events in memory before sending them on the webhook.
buffer chan *auditinternal.Event
// Maximum number of events that can be sent at once.
maxBatchSize int
// Amount of time to wait after sending events before force sending another set.
//
// Receiving maxBatchSize events will always trigger a send, regardless of
// if this amount of time has been reached.
maxBatchWait time.Duration
// Channel to signal that the sending routine has stopped and therefore
// it's safe to assume that no new requests will be initiated.
shutdownCh chan struct{}
// The sending routine locks reqMutex for reading before initiating a new
// goroutine to send a request. This goroutine then unlocks reqMutex for
// reading when completed. The Shutdown method locks reqMutex for writing
// after the sending routine has exited. When reqMutex is locked for writing,
// all requests have been completed and no new will be spawned, since the
// sending routine is not running anymore.
reqMutex sync.RWMutex
// Limits the number of requests sent to the backend per second.
throttle flowcontrol.RateLimiter
}
func (b *batchBackend) Run(stopCh <-chan struct{}) error {
go func() {
// Signal that the sending routine has exited.
defer close(b.shutdownCh)
b.runSendingRoutine(stopCh)
// Handle the events that were received after the last buffer
// scraping and before this line. Since the buffer is closed, no new
// events will come through.
for {
if last := func() bool {
// Recover from any panic in order to try to send all remaining events.
// Note, that in case of a panic, the return value will be false and
// the loop execution will continue.
defer runtime.HandleCrash()
events := b.collectLastEvents()
b.sendBatchEvents(events)
return len(events) == 0
}(); last {
break
}
}
}()
return nil
}
func (b *batchBackend) Shutdown() {
<-b.shutdownCh
// Write locking reqMutex will guarantee that all requests will be completed
// by the time the goroutine continues the execution. Since this line is
// executed after shutdownCh was closed, no new requests will follow this
// lock, because read lock is called in the same goroutine that closes
// shutdownCh before exiting.
b.reqMutex.Lock()
b.reqMutex.Unlock()
}
// runSendingRoutine runs a loop that collects events from the buffer. When
// stopCh is closed, runSendingRoutine stops and closes the buffer.
func (b *batchBackend) runSendingRoutine(stopCh <-chan struct{}) {
defer close(b.buffer)
for {
func() {
// Recover from any panics caused by this function so a panic in the
// goroutine can't bring down the main routine.
defer runtime.HandleCrash()
t := time.NewTimer(b.maxBatchWait)
defer t.Stop() // Release ticker resources
b.sendBatchEvents(b.collectEvents(stopCh, t.C))
}()
select {
case <-stopCh:
return
default:
}
}
}
// collectEvents attempts to collect some number of events in a batch.
//
// The following things can cause collectEvents to stop and return the list
// of events:
//
// * Some maximum number of events are received.
// * Timer has passed, all queued events are sent.
// * StopCh is closed, all queued events are sent.
//
func (b *batchBackend) collectEvents(stopCh <-chan struct{}, timer <-chan time.Time) []auditinternal.Event {
var events []auditinternal.Event
L:
for i := 0; i < b.maxBatchSize; i++ {
select {
case ev, ok := <-b.buffer:
// Buffer channel was closed and no new events will follow.
if !ok {
break L
}
events = append(events, *ev)
case <-timer:
// Timer has expired. Send whatever events are in the queue.
break L
case <-stopCh:
// Webhook has shut down. Send the last events.
break L
}
}
return events
}
// collectLastEvents assumes that the buffer was closed. It collects the first
// maxBatchSize events from the closed buffer into a batch and returns them.
func (b *batchBackend) collectLastEvents() []auditinternal.Event {
var events []auditinternal.Event
for i := 0; i < b.maxBatchSize; i++ {
ev, ok := <-b.buffer
if !ok {
break
}
events = append(events, *ev)
}
return events
}
// sendBatchEvents sends a POST requests with the event list in a goroutine
// and logs any error encountered.
func (b *batchBackend) sendBatchEvents(events []auditinternal.Event) {
if len(events) == 0 {
return
}
list := auditinternal.EventList{Items: events}
if b.throttle != nil {
b.throttle.Accept()
}
// Locking reqMutex for read will guarantee that the shutdown process will
// block until the goroutine started below is finished. At the same time, it
// will not prevent other batches from being proceed further this point.
b.reqMutex.RLock()
go func() {
// Execute the webhook POST in a goroutine to keep it from blocking.
// This lets the webhook continue to drain the queue immediately.
defer b.reqMutex.RUnlock()
defer runtime.HandleCrash()
err := b.w.WithExponentialBackoff(func() rest.Result {
return b.w.RestClient.Post().Body(&list).Do()
}).Error()
if err != nil {
impacted := make([]*auditinternal.Event, len(events))
for i := range events {
impacted[i] = &events[i]
}
audit.HandlePluginError(pluginName, err, impacted...)
}
}()
return
}
func (b *batchBackend) ProcessEvents(ev ...*auditinternal.Event) {
for i, e := range ev {
// Per the audit.Backend interface these events are reused after being
// sent to the Sink. Deep copy and send the copy to the queue.
event := e.DeepCopy()
// The following mechanism is in place to support the situation when audit
// events are still coming after the backend was shut down.
var sendErr error
func() {
// If the backend was shut down and the buffer channel was closed, an
// attempt to add an event to it will result in panic that we should
// recover from.
defer func() {
if err := recover(); err != nil {
sendErr = errors.New("audit webhook shut down")
}
}()
select {
case b.buffer <- event:
default:
sendErr = errors.New("audit webhook queue blocked")
}
}()
if sendErr != nil {
audit.HandlePluginError(pluginName, sendErr, ev[i:]...)
return
}
}
return b.w.WithExponentialBackoff(func() rest.Result {
return b.w.RestClient.Post().Body(&list).Do()
}).Error()
}

View File

@ -25,10 +25,7 @@ import (
"net/http/httptest"
"os"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -91,15 +88,7 @@ func (t *testWebhookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
func newTestBlockingWebhook(t *testing.T, endpoint string, groupVersion schema.GroupVersion) *blockingBackend {
return newWebhook(t, endpoint, ModeBlocking, groupVersion).(*blockingBackend)
}
func newTestBatchWebhook(t *testing.T, endpoint string, groupVersion schema.GroupVersion) *batchBackend {
return newWebhook(t, endpoint, ModeBatch, groupVersion).(*batchBackend)
}
func newWebhook(t *testing.T, endpoint string, mode string, groupVersion schema.GroupVersion) audit.Backend {
func newWebhook(t *testing.T, endpoint string, groupVersion schema.GroupVersion) *backend {
config := v1.Config{
Clusters: []v1.NamedCluster{
{Cluster: v1.Cluster{Server: endpoint, InsecureSkipTLSVerify: true}},
@ -116,10 +105,10 @@ func newWebhook(t *testing.T, endpoint string, mode string, groupVersion schema.
// NOTE(ericchiang): Do we need to use a proper serializer?
require.NoError(t, stdjson.NewEncoder(f).Encode(config), "writing kubeconfig")
backend, err := NewBackend(f.Name(), mode, groupVersion, NewDefaultBatchBackendConfig())
b, err := NewBackend(f.Name(), groupVersion)
require.NoError(t, err, "initializing backend")
return backend
return b.(*backend)
}
func TestWebhook(t *testing.T) {
@ -131,275 +120,9 @@ func TestWebhook(t *testing.T) {
}))
defer s.Close()
backend := newTestBlockingWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
backend := newWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
// Ensure this doesn't return a serialization error.
event := &auditinternal.Event{}
require.NoError(t, backend.processEvents(event), "failed to send events")
}
// waitForEmptyBuffer indicates when the sendBatchEvents method has read from the
// existing buffer. This lets test coordinate closing a timer and stop channel
// until the for loop has read from the buffer.
func waitForEmptyBuffer(b *batchBackend) {
for len(b.buffer) != 0 {
time.Sleep(time.Millisecond)
}
}
func TestBatchWebhookMaxEvents(t *testing.T) {
nRest := 10
events := make([]*auditinternal.Event, defaultBatchMaxSize+nRest) // greater than max size.
for i := range events {
events[i] = &auditinternal.Event{}
}
got := make(chan int, 2)
s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
got <- len(events.(*auditv1beta1.EventList).Items)
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
backend.ProcessEvents(events...)
stopCh := make(chan struct{})
timer := make(chan time.Time, 1)
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
require.Equal(t, defaultBatchMaxSize, <-got, "did not get batch max size")
go func() {
waitForEmptyBuffer(backend) // wait for the buffer to empty
timer <- time.Now() // Trigger the wait timeout
}()
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
require.Equal(t, nRest, <-got, "failed to get the rest of the events")
}
func TestBatchWebhookStopCh(t *testing.T) {
events := make([]*auditinternal.Event, 1) // less than max size.
for i := range events {
events[i] = &auditinternal.Event{}
}
expected := len(events)
got := make(chan int, 2)
s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
got <- len(events.(*auditv1beta1.EventList).Items)
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
backend.ProcessEvents(events...)
stopCh := make(chan struct{})
timer := make(chan time.Time)
go func() {
waitForEmptyBuffer(backend)
close(stopCh) // stop channel has stopped
}()
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
require.Equal(t, expected, <-got, "get queued events after timer expires")
}
func TestBatchWebhookProcessEventsAfterStop(t *testing.T) {
events := make([]*auditinternal.Event, 1) // less than max size.
for i := range events {
events[i] = &auditinternal.Event{}
}
got := make(chan struct{})
s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
close(got)
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
stopCh := make(chan struct{})
backend.Run(stopCh)
close(stopCh)
<-backend.shutdownCh
backend.ProcessEvents(events...)
assert.Equal(t, 0, len(backend.buffer), "processed events after the backed has been stopped")
}
func TestBatchWebhookShutdown(t *testing.T) {
events := make([]*auditinternal.Event, 1)
for i := range events {
events[i] = &auditinternal.Event{}
}
got := make(chan struct{})
contReqCh := make(chan struct{})
shutdownCh := make(chan struct{})
s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
close(got)
<-contReqCh
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
backend.ProcessEvents(events...)
go func() {
// Assume stopCh was closed.
close(backend.buffer)
backend.sendBatchEvents(backend.collectLastEvents())
}()
<-got
go func() {
close(backend.shutdownCh)
backend.Shutdown()
close(shutdownCh)
}()
// Wait for some time in case there's a bug that allows for the Shutdown
// method to exit before all requests has been completed.
time.Sleep(1 * time.Second)
select {
case <-shutdownCh:
t.Fatal("Backend shut down before all requests finished")
default:
// Continue.
}
close(contReqCh)
<-shutdownCh
}
func TestBatchWebhookEmptyBuffer(t *testing.T) {
events := make([]*auditinternal.Event, 1) // less than max size.
for i := range events {
events[i] = &auditinternal.Event{}
}
expected := len(events)
got := make(chan int, 2)
s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
got <- len(events.(*auditv1beta1.EventList).Items)
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
stopCh := make(chan struct{})
timer := make(chan time.Time, 1)
timer <- time.Now() // Timer is done.
// Buffer is empty, no events have been queued. This should exit but send no events.
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
// Send additional events after the sendBatchEvents has been called.
backend.ProcessEvents(events...)
go func() {
waitForEmptyBuffer(backend)
timer <- time.Now()
}()
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
// Make sure we didn't get a POST with zero events.
require.Equal(t, expected, <-got, "expected one event")
}
func TestBatchBufferFull(t *testing.T) {
events := make([]*auditinternal.Event, defaultBatchBufferSize+1) // More than buffered size
for i := range events {
events[i] = &auditinternal.Event{}
}
s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
// Do nothing.
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
// Make sure this doesn't block.
backend.ProcessEvents(events...)
}
func TestBatchRun(t *testing.T) {
// Divisable by max batch size so we don't have to wait for a minute for
// the test to finish.
events := make([]*auditinternal.Event, defaultBatchMaxSize*3)
for i := range events {
events[i] = &auditinternal.Event{}
}
got := new(int64)
want := len(events)
wg := new(sync.WaitGroup)
wg.Add(want)
done := make(chan struct{})
go func() {
wg.Wait()
// When the expected number of events have been received, close the channel.
close(done)
}()
s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(obj runtime.Object) {
events := obj.(*auditv1beta1.EventList)
atomic.AddInt64(got, int64(len(events.Items)))
wg.Add(-len(events.Items))
}))
defer s.Close()
stopCh := make(chan struct{})
defer close(stopCh)
backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
// Test the Run codepath. E.g. that the spawned goroutines behave correctly.
backend.Run(stopCh)
backend.ProcessEvents(events...)
select {
case <-done:
// Received all the events.
case <-time.After(2 * time.Minute):
t.Errorf("expected %d events got %d", want, atomic.LoadInt64(got))
}
}
func TestBatchConcurrentRequests(t *testing.T) {
events := make([]*auditinternal.Event, defaultBatchBufferSize) // Don't drop events
for i := range events {
events[i] = &auditinternal.Event{}
}
wg := new(sync.WaitGroup)
wg.Add(len(events))
s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
wg.Add(-len(events.(*auditv1beta1.EventList).Items))
// Since the webhook makes concurrent requests, blocking on the webhook response
// shouldn't block the webhook from sending more events.
//
// Wait for all responses to be received before sending the response.
wg.Wait()
}))
defer s.Close()
stopCh := make(chan struct{})
defer close(stopCh)
backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
backend.Run(stopCh)
backend.ProcessEvents(events...)
// Wait for the webhook to receive all events.
wg.Wait()
}

View File

@ -1,289 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package webhook
import (
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1"
)
func TestBatchWebhookMaxEventsV1Alpha1(t *testing.T) {
nRest := 10
events := make([]*auditinternal.Event, defaultBatchMaxSize+nRest) // greater than max size.
for i := range events {
events[i] = &auditinternal.Event{}
}
got := make(chan int, 2)
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
got <- len(events.(*auditv1alpha1.EventList).Items)
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion)
backend.ProcessEvents(events...)
stopCh := make(chan struct{})
timer := make(chan time.Time, 1)
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
require.Equal(t, defaultBatchMaxSize, <-got, "did not get batch max size")
go func() {
waitForEmptyBuffer(backend) // wait for the buffer to empty
timer <- time.Now() // Trigger the wait timeout
}()
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
require.Equal(t, nRest, <-got, "failed to get the rest of the events")
}
func TestBatchWebhookStopChV1Alpha1(t *testing.T) {
events := make([]*auditinternal.Event, 1) // less than max size.
for i := range events {
events[i] = &auditinternal.Event{}
}
expected := len(events)
got := make(chan int, 2)
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
got <- len(events.(*auditv1alpha1.EventList).Items)
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion)
backend.ProcessEvents(events...)
stopCh := make(chan struct{})
timer := make(chan time.Time)
go func() {
waitForEmptyBuffer(backend)
close(stopCh) // stop channel has stopped
}()
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
require.Equal(t, expected, <-got, "get queued events after timer expires")
}
func TestBatchWebhookProcessEventsAfterStopV1Alpha1(t *testing.T) {
events := make([]*auditinternal.Event, 1) // less than max size.
for i := range events {
events[i] = &auditinternal.Event{}
}
got := make(chan struct{})
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
close(got)
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion)
stopCh := make(chan struct{})
backend.Run(stopCh)
close(stopCh)
<-backend.shutdownCh
backend.ProcessEvents(events...)
assert.Equal(t, 0, len(backend.buffer), "processed events after the backed has been stopped")
}
func TestBatchWebhookShutdownV1Alpha1(t *testing.T) {
events := make([]*auditinternal.Event, 1)
for i := range events {
events[i] = &auditinternal.Event{}
}
got := make(chan struct{})
contReqCh := make(chan struct{})
shutdownCh := make(chan struct{})
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
close(got)
<-contReqCh
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion)
backend.ProcessEvents(events...)
go func() {
// Assume stopCh was closed.
close(backend.buffer)
backend.sendBatchEvents(backend.collectLastEvents())
}()
<-got
go func() {
close(backend.shutdownCh)
backend.Shutdown()
close(shutdownCh)
}()
// Wait for some time in case there's a bug that allows for the Shutdown
// method to exit before all requests has been completed.
time.Sleep(1 * time.Second)
select {
case <-shutdownCh:
t.Fatal("Backend shut down before all requests finished")
default:
// Continue.
}
close(contReqCh)
<-shutdownCh
}
func TestBatchWebhookEmptyBufferV1Alpha1(t *testing.T) {
events := make([]*auditinternal.Event, 1) // less than max size.
for i := range events {
events[i] = &auditinternal.Event{}
}
expected := len(events)
got := make(chan int, 2)
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
got <- len(events.(*auditv1alpha1.EventList).Items)
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion)
stopCh := make(chan struct{})
timer := make(chan time.Time, 1)
timer <- time.Now() // Timer is done.
// Buffer is empty, no events have been queued. This should exit but send no events.
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
// Send additional events after the sendBatchEvents has been called.
backend.ProcessEvents(events...)
go func() {
waitForEmptyBuffer(backend)
timer <- time.Now()
}()
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
// Make sure we didn't get a POST with zero events.
require.Equal(t, expected, <-got, "expected one event")
}
func TestBatchBufferFullV1Alpha1(t *testing.T) {
events := make([]*auditinternal.Event, defaultBatchBufferSize+1) // More than buffered size
for i := range events {
events[i] = &auditinternal.Event{}
}
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
// Do nothing.
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion)
// Make sure this doesn't block.
backend.ProcessEvents(events...)
}
func TestBatchRunV1Alpha1(t *testing.T) {
// Divisable by max batch size so we don't have to wait for a minute for
// the test to finish.
events := make([]*auditinternal.Event, defaultBatchMaxSize*3)
for i := range events {
events[i] = &auditinternal.Event{}
}
got := new(int64)
want := len(events)
wg := new(sync.WaitGroup)
wg.Add(want)
done := make(chan struct{})
go func() {
wg.Wait()
// When the expected number of events have been received, close the channel.
close(done)
}()
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(obj runtime.Object) {
events := obj.(*auditv1alpha1.EventList)
atomic.AddInt64(got, int64(len(events.Items)))
wg.Add(-len(events.Items))
}))
defer s.Close()
stopCh := make(chan struct{})
defer close(stopCh)
backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion)
// Test the Run codepath. E.g. that the spawned goroutines behave correctly.
backend.Run(stopCh)
backend.ProcessEvents(events...)
select {
case <-done:
// Received all the events.
case <-time.After(2 * time.Minute):
t.Errorf("expected %d events got %d", want, atomic.LoadInt64(got))
}
}
func TestBatchConcurrentRequestsV1Alpha1(t *testing.T) {
events := make([]*auditinternal.Event, defaultBatchBufferSize) // Don't drop events
for i := range events {
events[i] = &auditinternal.Event{}
}
wg := new(sync.WaitGroup)
wg.Add(len(events))
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
wg.Add(-len(events.(*auditv1alpha1.EventList).Items))
// Since the webhook makes concurrent requests, blocking on the webhook response
// shouldn't block the webhook from sending more events.
//
// Wait for all responses to be received before sending the response.
wg.Wait()
}))
defer s.Close()
stopCh := make(chan struct{})
defer close(stopCh)
backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion)
backend.Run(stopCh)
backend.ProcessEvents(events...)
// Wait for the webhook to receive all events.
wg.Wait()
}

View File

@ -1142,6 +1142,10 @@
"ImportPath": "k8s.io/apiserver/pkg/util/wsstream",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/buffered",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -1110,6 +1110,10 @@
"ImportPath": "k8s.io/apiserver/pkg/util/wsstream",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/buffered",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"