From 52fae991305e3252ccc5c9c86a9b7abc04c149af Mon Sep 17 00:00:00 2001 From: Mik Vyatskov Date: Fri, 23 Mar 2018 16:13:34 +0100 Subject: [PATCH] Implemented truncating audit backend Signed-off-by: Mik Vyatskov --- cmd/kube-apiserver/app/options/BUILD | 1 + .../app/options/options_test.go | 21 +++ .../Godeps/Godeps.json | 4 + .../k8s.io/apiserver/pkg/server/options/BUILD | 1 + .../apiserver/pkg/server/options/audit.go | 77 ++++++++- .../pkg/server/options/audit_test.go | 28 ++++ .../k8s.io/apiserver/plugin/pkg/audit/BUILD | 2 + .../apiserver/plugin/pkg/audit/buffered/BUILD | 2 +- .../pkg/audit/buffered/buffered_test.go | 32 +--- .../apiserver/plugin/pkg/audit/fake/BUILD | 29 ++++ .../apiserver/plugin/pkg/audit/fake/doc.go | 18 ++ .../apiserver/plugin/pkg/audit/fake/fake.go | 46 +++++ .../apiserver/plugin/pkg/audit/truncate/BUILD | 46 +++++ .../plugin/pkg/audit/truncate/doc.go | 19 +++ .../plugin/pkg/audit/truncate/truncate.go | 158 ++++++++++++++++++ .../pkg/audit/truncate/truncate_test.go | 141 ++++++++++++++++ .../k8s.io/kube-aggregator/Godeps/Godeps.json | 4 + .../sample-apiserver/Godeps/Godeps.json | 4 + 18 files changed, 601 insertions(+), 32 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/plugin/pkg/audit/fake/BUILD create mode 100644 staging/src/k8s.io/apiserver/plugin/pkg/audit/fake/doc.go create mode 100644 staging/src/k8s.io/apiserver/plugin/pkg/audit/fake/fake.go create mode 100644 staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/BUILD create mode 100644 staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/doc.go create mode 100644 staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/truncate.go create mode 100644 staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/truncate_test.go diff --git a/cmd/kube-apiserver/app/options/BUILD b/cmd/kube-apiserver/app/options/BUILD index bb6f67d6aa..5459e1ce7d 100644 --- a/cmd/kube-apiserver/app/options/BUILD +++ b/cmd/kube-apiserver/app/options/BUILD @@ -47,6 +47,7 @@ go_test( "//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library", "//vendor/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library", + "//vendor/k8s.io/apiserver/plugin/pkg/audit/truncate:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", ], ) diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index 576ec51c81..f3e837c5b6 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apiserver/pkg/storage/storagebackend" utilflag "k8s.io/apiserver/pkg/util/flag" auditbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered" + audittruncate "k8s.io/apiserver/plugin/pkg/audit/truncate" restclient "k8s.io/client-go/rest" "k8s.io/kubernetes/pkg/api/legacyscheme" kapi "k8s.io/kubernetes/pkg/apis/core" @@ -61,6 +62,9 @@ func TestAddFlags(t *testing.T) { "--audit-log-batch-throttle-enable=true", "--audit-log-batch-throttle-qps=49.5", "--audit-log-batch-throttle-burst=50", + "--audit-log-truncate-enabled=true", + "--audit-log-truncate-max-batch-size=45", + "--audit-log-truncate-max-event-size=44", "--audit-log-version=audit.k8s.io/v1alpha1", "--audit-policy-file=/policy", "--audit-webhook-config-file=/webhook-config", @@ -71,6 +75,9 @@ func TestAddFlags(t *testing.T) { "--audit-webhook-batch-throttle-enable=false", "--audit-webhook-batch-throttle-qps=43.5", "--audit-webhook-batch-throttle-burst=44", + "--audit-webhook-truncate-enabled=true", + "--audit-webhook-truncate-max-batch-size=43", + "--audit-webhook-truncate-max-event-size=42", "--audit-webhook-initial-backoff=2s", "--audit-webhook-version=audit.k8s.io/v1alpha1", "--authentication-token-webhook-cache-ttl=3m", @@ -201,6 +208,13 @@ func TestAddFlags(t *testing.T) { ThrottleBurst: 50, }, }, + TruncateOptions: apiserveroptions.AuditTruncateOptions{ + Enabled: true, + TruncateConfig: audittruncate.Config{ + MaxBatchSize: 45, + MaxEventSize: 44, + }, + }, GroupVersionString: "audit.k8s.io/v1alpha1", }, WebhookOptions: apiserveroptions.AuditWebhookOptions{ @@ -216,6 +230,13 @@ func TestAddFlags(t *testing.T) { ThrottleBurst: 44, }, }, + TruncateOptions: apiserveroptions.AuditTruncateOptions{ + Enabled: true, + TruncateConfig: audittruncate.Config{ + MaxBatchSize: 43, + MaxEventSize: 42, + }, + }, InitialBackoff: 2 * time.Second, GroupVersionString: "audit.k8s.io/v1alpha1", }, diff --git a/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json b/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json index 4f80a44254..4bf4a7b3c2 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json @@ -1478,6 +1478,10 @@ "ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, + { + "ImportPath": "k8s.io/apiserver/plugin/pkg/audit/truncate", + "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, { "ImportPath": "k8s.io/apiserver/plugin/pkg/audit/webhook", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/BUILD b/staging/src/k8s.io/apiserver/pkg/server/options/BUILD index 812c847771..4a682be0e1 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/options/BUILD @@ -58,6 +58,7 @@ go_library( "//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library", "//vendor/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library", "//vendor/k8s.io/apiserver/plugin/pkg/audit/log:go_default_library", + "//vendor/k8s.io/apiserver/plugin/pkg/audit/truncate:go_default_library", "//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go index 164b7ccce0..ab77812168 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go @@ -37,6 +37,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" pluginbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered" pluginlog "k8s.io/apiserver/plugin/pkg/audit/log" + plugintruncate "k8s.io/apiserver/plugin/pkg/audit/truncate" pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook" ) @@ -89,6 +90,14 @@ type AuditBatchOptions struct { BatchConfig pluginbuffered.BatchConfig } +type AuditTruncateOptions struct { + // Whether truncating is enabled or not. + Enabled bool + + // Truncating configuration. + TruncateConfig plugintruncate.Config +} + // AuditLogOptions determines the output of the structured audit log by default. // If the AdvancedAuditing feature is set to false, AuditLogOptions holds the legacy // audit log writer. @@ -99,7 +108,8 @@ type AuditLogOptions struct { MaxSize int Format string - BatchOptions AuditBatchOptions + BatchOptions AuditBatchOptions + TruncateOptions AuditTruncateOptions // API group version used for serializing audit events. GroupVersionString string @@ -110,7 +120,8 @@ type AuditWebhookOptions struct { ConfigFile string InitialBackoff time.Duration - BatchOptions AuditBatchOptions + BatchOptions AuditBatchOptions + TruncateOptions AuditTruncateOptions // API group version used for serializing audit events. GroupVersionString string @@ -122,11 +133,12 @@ func NewAuditOptions() *AuditOptions { return &AuditOptions{ WebhookOptions: AuditWebhookOptions{ + InitialBackoff: pluginwebhook.DefaultInitialBackoff, BatchOptions: AuditBatchOptions{ Mode: ModeBatch, BatchConfig: pluginbuffered.NewDefaultBatchConfig(), }, - InitialBackoff: pluginwebhook.DefaultInitialBackoff, + TruncateOptions: NewAuditTruncateOptions(), GroupVersionString: "audit.k8s.io/v1beta1", }, LogOptions: AuditLogOptions{ @@ -135,11 +147,22 @@ func NewAuditOptions() *AuditOptions { Mode: ModeBlocking, BatchConfig: defaultLogBatchConfig, }, + TruncateOptions: NewAuditTruncateOptions(), GroupVersionString: "audit.k8s.io/v1beta1", }, } } +func NewAuditTruncateOptions() AuditTruncateOptions { + return AuditTruncateOptions{ + Enabled: false, + TruncateConfig: plugintruncate.Config{ + MaxBatchSize: 10 * 1024 * 1024, // 10MB + MaxEventSize: 100 * 1024, // 100KB + }, + } +} + // Validate checks invalid config combination func (o *AuditOptions) Validate() []error { if o == nil { @@ -232,8 +255,10 @@ func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) { o.LogOptions.AddFlags(fs) o.LogOptions.BatchOptions.AddFlags(pluginlog.PluginName, fs) + o.LogOptions.TruncateOptions.AddFlags(pluginlog.PluginName, fs) o.WebhookOptions.AddFlags(fs) o.WebhookOptions.BatchOptions.AddFlags(pluginwebhook.PluginName, fs) + o.WebhookOptions.TruncateOptions.AddFlags(pluginwebhook.PluginName, fs) } func (o *AuditOptions) ApplyTo(c *server.Config) error { @@ -309,6 +334,38 @@ func (o *AuditBatchOptions) wrapBackend(delegate audit.Backend) audit.Backend { return pluginbuffered.NewBackend(delegate, o.BatchConfig) } +func (o *AuditTruncateOptions) Validate(pluginName string) error { + config := o.TruncateConfig + if config.MaxEventSize <= 0 { + return fmt.Errorf("invalid audit truncate %s max event size %v, must be a positive number", pluginName, config.MaxEventSize) + } + if config.MaxBatchSize < config.MaxEventSize { + return fmt.Errorf("invalid audit truncate %s max batch size %v, must be greater than "+ + "max event size (%v)", pluginName, config.MaxBatchSize, config.MaxEventSize) + } + return nil +} + +func (o *AuditTruncateOptions) AddFlags(pluginName string, fs *pflag.FlagSet) { + fs.BoolVar(&o.Enabled, fmt.Sprintf("audit-%s-truncate-enabled", pluginName), + o.Enabled, "Whether event and batch truncating is enabled.") + fs.Int64Var(&o.TruncateConfig.MaxBatchSize, fmt.Sprintf("audit-%s-truncate-max-batch-size", pluginName), + o.TruncateConfig.MaxBatchSize, "Maximum size of the batch sent to the underlying backend. "+ + "Actual serialized size can be several hundreds of bytes greater. If a batch exceeds this limit, "+ + "it is split into several batches of smaller size.") + fs.Int64Var(&o.TruncateConfig.MaxEventSize, fmt.Sprintf("audit-%s-truncate-max-event-size", pluginName), + o.TruncateConfig.MaxEventSize, "Maximum size of the audit event sent to the underlying backend. "+ + "If the size of an event is greater than this number, first request and response are removed, and"+ + "if this doesn't reduce the size enough, event is discarded.") +} + +func (o *AuditTruncateOptions) wrapBackend(delegate audit.Backend, gv schema.GroupVersion) audit.Backend { + if !o.Enabled { + return delegate + } + return plugintruncate.NewBackend(delegate, o.TruncateConfig, gv) +} + func (o *AuditLogOptions) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.Path, "audit-log-path", o.Path, "If set, all requests coming to the apiserver will be logged to this file. '-' means standard out.") @@ -337,6 +394,9 @@ func (o *AuditLogOptions) Validate() []error { if err := validateBackendBatchOptions(pluginlog.PluginName, o.BatchOptions); err != nil { allErrors = append(allErrors, err) } + if err := o.TruncateOptions.Validate(pluginlog.PluginName); err != nil { + allErrors = append(allErrors, err) + } if err := validateGroupVersionString(o.GroupVersionString); err != nil { allErrors = append(allErrors, err) @@ -395,7 +455,9 @@ func (o *AuditLogOptions) advancedApplyTo(c *server.Config) error { if w := o.getWriter(); w != nil { groupVersion, _ := schema.ParseGroupVersion(o.GroupVersionString) log := pluginlog.NewBackend(w, o.Format, groupVersion) - c.AuditBackend = appendBackend(c.AuditBackend, o.BatchOptions.wrapBackend(log)) + log = o.BatchOptions.wrapBackend(log) + log = o.TruncateOptions.wrapBackend(log, groupVersion) + c.AuditBackend = appendBackend(c.AuditBackend, log) } return nil } @@ -429,6 +491,9 @@ func (o *AuditWebhookOptions) Validate() []error { if err := validateBackendBatchOptions(pluginwebhook.PluginName, o.BatchOptions); err != nil { allErrors = append(allErrors, err) } + if err := o.TruncateOptions.Validate(pluginwebhook.PluginName); err != nil { + allErrors = append(allErrors, err) + } if err := validateGroupVersionString(o.GroupVersionString); err != nil { allErrors = append(allErrors, err) @@ -451,6 +516,8 @@ func (o *AuditWebhookOptions) applyTo(c *server.Config) error { if err != nil { return fmt.Errorf("initializing audit webhook: %v", err) } - c.AuditBackend = appendBackend(c.AuditBackend, o.BatchOptions.wrapBackend(webhook)) + webhook = o.BatchOptions.wrapBackend(webhook) + webhook = o.TruncateOptions.wrapBackend(webhook, groupVersion) + c.AuditBackend = appendBackend(c.AuditBackend, webhook) return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/audit_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/audit_test.go index e125f6b080..78ed4f6a20 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/audit_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/audit_test.go @@ -78,6 +78,15 @@ func TestAuditValidOptions(t *testing.T) { return o }, expected: "union[buffered,webhook]", + }, { + name: "default webhook with truncating", + options: func() *AuditOptions { + o := NewAuditOptions() + o.WebhookOptions.ConfigFile = webhookConfig + o.WebhookOptions.TruncateOptions.Enabled = true + return o + }, + expected: "truncate>", }} for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -148,6 +157,25 @@ func TestAuditInvalidOptions(t *testing.T) { o.WebhookOptions.BatchOptions.BatchConfig.ThrottleQPS = -1 return o }, + }, { + name: "invalid webhook truncate max event size", + options: func() *AuditOptions { + o := NewAuditOptions() + o.WebhookOptions.ConfigFile = "/audit" + o.WebhookOptions.TruncateOptions.Enabled = true + o.WebhookOptions.TruncateOptions.TruncateConfig.MaxEventSize = -1 + return o + }, + }, { + name: "invalid webhook truncate max batch size", + options: func() *AuditOptions { + o := NewAuditOptions() + o.WebhookOptions.ConfigFile = "/audit" + o.WebhookOptions.TruncateOptions.Enabled = true + o.WebhookOptions.TruncateOptions.TruncateConfig.MaxEventSize = 2 + o.WebhookOptions.TruncateOptions.TruncateConfig.MaxBatchSize = 1 + return o + }, }} for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/BUILD index fe8f7ce4fd..035d2296dd 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/BUILD +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/BUILD @@ -23,7 +23,9 @@ filegroup( srcs = [ ":package-srcs", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:all-srcs", + "//staging/src/k8s.io/apiserver/plugin/pkg/audit/fake:all-srcs", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/log:all-srcs", + "//staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate:all-srcs", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook:all-srcs", ], tags = ["automanaged"], diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/BUILD index 7c461c4b4e..1b333689f1 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/BUILD +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/BUILD @@ -25,7 +25,7 @@ go_test( "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library", - "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", + "//vendor/k8s.io/apiserver/plugin/pkg/audit/fake:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered_test.go index b709964688..c01258e04e 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered_test.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered_test.go @@ -24,7 +24,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" auditinternal "k8s.io/apiserver/pkg/apis/audit" - "k8s.io/apiserver/pkg/audit" + "k8s.io/apiserver/plugin/pkg/audit/fake" ) var ( @@ -83,7 +83,7 @@ func TestBufferedBackendCollectEvents(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { t.Parallel() - backend := NewBackend(&fakeBackend{}, config).(*bufferedBackend) + backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend) backend.ProcessEvents(newEvents(tc.numEvents)...) batch := backend.collectEvents(tc.timer, tc.stopCh) @@ -96,7 +96,7 @@ func TestBufferedBackendCollectEvents(t *testing.T) { func TestBufferedBackendProcessEventsAfterStop(t *testing.T) { t.Parallel() - backend := NewBackend(&fakeBackend{}, NewDefaultBatchConfig()).(*bufferedBackend) + backend := NewBackend(&fake.Backend{}, NewDefaultBatchConfig()).(*bufferedBackend) backend.Run(closedStopCh) backend.Shutdown() @@ -111,7 +111,7 @@ func TestBufferedBackendProcessEventsBufferFull(t *testing.T) { config := NewDefaultBatchConfig() config.BufferSize = 1 - backend := NewBackend(&fakeBackend{}, config).(*bufferedBackend) + backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend) backend.ProcessEvents(newEvents(2)...) @@ -123,8 +123,8 @@ func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) { delegatedCallStartCh := make(chan struct{}) delegatedCallEndCh := make(chan struct{}) - delegateBackend := &fakeBackend{ - onRequest: func(_ []*auditinternal.Event) { + delegateBackend := &fake.Backend{ + OnRequest: func(_ []*auditinternal.Event) { close(delegatedCallStartCh) <-delegatedCallEndCh }, @@ -159,23 +159,3 @@ func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) { close(delegatedCallEndCh) <-shutdownEndCh } - -type fakeBackend struct { - onRequest func(events []*auditinternal.Event) -} - -var _ audit.Backend = &fakeBackend{} - -func (b *fakeBackend) Run(stopCh <-chan struct{}) error { - return nil -} - -func (b *fakeBackend) Shutdown() { - return -} - -func (b *fakeBackend) ProcessEvents(ev ...*auditinternal.Event) { - if b.onRequest != nil { - b.onRequest(ev) - } -} diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/fake/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/fake/BUILD new file mode 100644 index 0000000000..ab4f45b873 --- /dev/null +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/fake/BUILD @@ -0,0 +1,29 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = [ + "doc.go", + "fake.go", + ], + importpath = "k8s.io/apiserver/plugin/pkg/audit/fake", + visibility = ["//visibility:public"], + deps = [ + "//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library", + "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/fake/doc.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/fake/doc.go new file mode 100644 index 0000000000..2739476123 --- /dev/null +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/fake/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package fake provides a fake audit.Backend interface implementation for testing. +package fake // import "k8s.io/apiserver/plugin/pkg/audit/fake" diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/fake/fake.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/fake/fake.go new file mode 100644 index 0000000000..4b8fa3c7c2 --- /dev/null +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/fake/fake.go @@ -0,0 +1,46 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + auditinternal "k8s.io/apiserver/pkg/apis/audit" + "k8s.io/apiserver/pkg/audit" +) + +var _ audit.Backend = &Backend{} + +// Backend is a fake audit backend for testing purposes. +type Backend struct { + OnRequest func(events []*auditinternal.Event) +} + +// Run does nothing. +func (b *Backend) Run(stopCh <-chan struct{}) error { + return nil +} + +// Shutdown does nothing. +func (b *Backend) Shutdown() { + return +} + +// ProcessEvents calls a callback on a batch, if present. +func (b *Backend) ProcessEvents(ev ...*auditinternal.Event) { + if b.OnRequest != nil { + b.OnRequest(ev) + } +} diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/BUILD new file mode 100644 index 0000000000..66db385e8c --- /dev/null +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/BUILD @@ -0,0 +1,46 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "doc.go", + "truncate.go", + ], + importpath = "k8s.io/apiserver/plugin/pkg/audit/truncate", + visibility = ["//visibility:public"], + deps = [ + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", + "//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library", + "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["truncate_test.go"], + embed = [":go_default_library"], + deps = [ + "//vendor/github.com/stretchr/testify/require:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library", + "//vendor/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library", + "//vendor/k8s.io/apiserver/plugin/pkg/audit/fake:go_default_library", + "//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/doc.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/doc.go new file mode 100644 index 0000000000..9392ac3147 --- /dev/null +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package truncate provides an implementation for the audit.Backend interface +// that truncates audit events and sends them to the delegate audit.Backend. +package truncate // import "k8s.io/apiserver/plugin/pkg/audit/truncate" diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/truncate.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/truncate.go new file mode 100644 index 0000000000..79e6876cef --- /dev/null +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/truncate.go @@ -0,0 +1,158 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package truncate + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + "k8s.io/apiserver/pkg/audit" +) + +const ( + // PluginName is the name reported in error metrics. + PluginName = "truncate" + + // annotationKey defines the name of the annotation used to indicate truncation. + annotationKey = "audit.k8s.io/truncated" + // annotationValue defines the value of the annotation used to indicate truncation. + annotationValue = "true" +) + +// Config represents truncating backend configuration. +type Config struct { + // MaxEventSize defines max allowed size of the event. If the event is larger, + // truncating will be performed. + MaxEventSize int64 + + // MaxBatchSize defined max allowed size of the batch of events, passed to the backend. + // If the total size of the batch is larger than this number, batch will be split. Actual + // size of the serialized request might be slightly higher, on the order of hundreds of bytes. + MaxBatchSize int64 +} + +type backend struct { + // The delegate backend that actually exports events. + delegateBackend audit.Backend + + // Configuration used for truncation. + c Config + + // Encoder used to calculate audit event sizes. + e runtime.Encoder +} + +var _ audit.Backend = &backend{} + +// NewBackend returns a new truncating backend, using configuration passed in the parameters. +func NewBackend(delegateBackend audit.Backend, config Config, groupVersion schema.GroupVersion) audit.Backend { + return &backend{ + delegateBackend: delegateBackend, + c: config, + e: audit.Codecs.LegacyCodec(groupVersion), + } +} + +func (b *backend) ProcessEvents(events ...*auditinternal.Event) { + var errors []error + var impacted []*auditinternal.Event + var batch []*auditinternal.Event + var batchSize int64 + for _, event := range events { + size, err := b.calcSize(event) + // If event was correctly serialized, but the size is more than allowed + // and it makes sense to do trimming, i.e. there's a request and/or + // response present, try to strip away request and response. + if err == nil && size > b.c.MaxEventSize && event.Level.GreaterOrEqual(auditinternal.LevelRequest) { + event = truncate(event) + size, err = b.calcSize(event) + } + if err != nil { + errors = append(errors, err) + impacted = append(impacted, event) + continue + } + if size > b.c.MaxEventSize { + errors = append(errors, fmt.Errorf("event is too large even after truncating")) + impacted = append(impacted, event) + continue + } + + if len(batch) > 0 && batchSize+size > b.c.MaxBatchSize { + b.delegateBackend.ProcessEvents(batch...) + batch = []*auditinternal.Event{} + batchSize = 0 + } + + batchSize += size + batch = append(batch, event) + } + + if len(batch) > 0 { + b.delegateBackend.ProcessEvents(batch...) + } + + if len(impacted) > 0 { + audit.HandlePluginError(PluginName, utilerrors.NewAggregate(errors), impacted...) + } +} + +// truncate removed request and response objects from the audit events, +// to try and keep at least metadata. +func truncate(e *auditinternal.Event) *auditinternal.Event { + // Make a shallow copy to avoid copying response/request objects. + newEvent := &auditinternal.Event{} + *newEvent = *e + + newEvent.RequestObject = nil + newEvent.ResponseObject = nil + audit.LogAnnotation(newEvent, annotationKey, annotationValue) + return newEvent +} + +func (b *backend) Run(stopCh <-chan struct{}) error { + // Nothing to do here + return nil +} + +func (b *backend) Shutdown() { + // Nothing to do here +} + +func (b *backend) calcSize(e *auditinternal.Event) (int64, error) { + s := &sizer{} + if err := b.e.Encode(e, s); err != nil { + return 0, err + } + return s.Size, nil +} + +func (b *backend) String() string { + return fmt.Sprintf("%s<%s>", PluginName, b.delegateBackend) +} + +type sizer struct { + Size int64 +} + +func (s *sizer) Write(p []byte) (n int, err error) { + s.Size += int64(len(p)) + return len(p), nil +} diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/truncate_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/truncate_test.go new file mode 100644 index 0000000000..9b0d5c1426 --- /dev/null +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/truncate_test.go @@ -0,0 +1,141 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package truncate + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "k8s.io/apimachinery/pkg/runtime" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1" + "k8s.io/apiserver/plugin/pkg/audit/fake" + // Importing just for the schema definitions. + _ "k8s.io/apiserver/plugin/pkg/audit/webhook" +) + +var ( + defaultConfig = Config{ + MaxBatchSize: 4 * 1024 * 1024, + MaxEventSize: 100 * 1024, + } +) + +func TestTruncatingEvents(t *testing.T) { + testCases := []struct { + desc string + event *auditinternal.Event + wantDropped bool + wantTruncated bool + }{ + { + desc: "Empty event should not be truncated", + event: &auditinternal.Event{}, + }, + { + desc: "Event with too large body should be truncated", + event: &auditinternal.Event{ + Level: auditinternal.LevelRequest, + RequestObject: &runtime.Unknown{ + Raw: []byte("\"" + strings.Repeat("A", int(defaultConfig.MaxEventSize)) + "\""), + }, + }, + wantTruncated: true, + }, + { + desc: "Event with too large metadata should be dropped", + event: &auditinternal.Event{ + Annotations: map[string]string{ + "key": strings.Repeat("A", int(defaultConfig.MaxEventSize)), + }, + }, + wantDropped: true, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + var event *auditinternal.Event + + fb := &fake.Backend{ + OnRequest: func(events []*auditinternal.Event) { + require.Equal(t, 1, len(events), "Expected single event in batch") + event = events[0] + }, + } + b := NewBackend(fb, defaultConfig, auditv1beta1.SchemeGroupVersion) + b.ProcessEvents(tc.event) + + require.Equal(t, !tc.wantDropped, event != nil, "Incorrect event presence") + if tc.wantTruncated { + require.Equal(t, annotationValue, event.Annotations[annotationKey], "Annotation should be present") + require.Nil(t, event.RequestObject, "After truncation request should be nil") + require.Nil(t, event.ResponseObject, "After truncation response should be nil") + } + }) + } +} + +func TestSplittingBatches(t *testing.T) { + testCases := []struct { + desc string + config Config + events []*auditinternal.Event + wantBatchCount int + }{ + { + desc: "Events fitting in one batch should not be split", + config: defaultConfig, + events: []*auditinternal.Event{{}}, + wantBatchCount: 1, + }, + { + desc: "Events not fitting in one batch should be split", + config: Config{ + MaxEventSize: defaultConfig.MaxEventSize, + MaxBatchSize: 1, + }, + events: []*auditinternal.Event{ + {Annotations: map[string]string{"key": strings.Repeat("A", int(50))}}, + {Annotations: map[string]string{"key": strings.Repeat("A", int(50))}}, + }, + wantBatchCount: 2, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + var gotBatchCount int + fb := &fake.Backend{ + OnRequest: func(events []*auditinternal.Event) { + gotBatchCount++ + }, + } + b := NewBackend(fb, tc.config, auditv1beta1.SchemeGroupVersion) + b.ProcessEvents(tc.events...) + + require.Equal(t, tc.wantBatchCount, gotBatchCount) + }) + } +} diff --git a/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json b/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json index 35c85bb078..2fb56ad1fd 100644 --- a/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json +++ b/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json @@ -1150,6 +1150,10 @@ "ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, + { + "ImportPath": "k8s.io/apiserver/plugin/pkg/audit/truncate", + "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, { "ImportPath": "k8s.io/apiserver/plugin/pkg/audit/webhook", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" diff --git a/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json b/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json index 9123cc5c86..5fc8e1f28f 100644 --- a/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json @@ -1118,6 +1118,10 @@ "ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, + { + "ImportPath": "k8s.io/apiserver/plugin/pkg/audit/truncate", + "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, { "ImportPath": "k8s.io/apiserver/plugin/pkg/audit/webhook", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"