Merge pull request #61711 from crassirostris/audit-size-limiting

Automatic merge from submit-queue (batch tested with PRs 62655, 61711, 59122, 62853, 62390). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Implemented truncating audit backend

Fixes https://github.com/kubernetes/kubernetes/issues/60432

Introduces an optional truncating backend, disabled by default, that estimates the size of audit events and truncates events/split batches based on the configuration.

/cc @sttts @tallclair @CaoShuFeng @ericchiang 

```release-note
Introduce truncating audit backend that can be enabled for existing backend to limit the size of individual audit events and batches of events.
```
pull/8/head
Kubernetes Submit Queue 2018-04-24 13:28:17 -07:00 committed by GitHub
commit bf1974c83f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 601 additions and 32 deletions

View File

@ -47,6 +47,7 @@ go_test(
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/truncate:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
],
)

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apiserver/pkg/storage/storagebackend"
utilflag "k8s.io/apiserver/pkg/util/flag"
auditbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
audittruncate "k8s.io/apiserver/plugin/pkg/audit/truncate"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/api/legacyscheme"
kapi "k8s.io/kubernetes/pkg/apis/core"
@ -61,6 +62,9 @@ func TestAddFlags(t *testing.T) {
"--audit-log-batch-throttle-enable=true",
"--audit-log-batch-throttle-qps=49.5",
"--audit-log-batch-throttle-burst=50",
"--audit-log-truncate-enabled=true",
"--audit-log-truncate-max-batch-size=45",
"--audit-log-truncate-max-event-size=44",
"--audit-log-version=audit.k8s.io/v1alpha1",
"--audit-policy-file=/policy",
"--audit-webhook-config-file=/webhook-config",
@ -71,6 +75,9 @@ func TestAddFlags(t *testing.T) {
"--audit-webhook-batch-throttle-enable=false",
"--audit-webhook-batch-throttle-qps=43.5",
"--audit-webhook-batch-throttle-burst=44",
"--audit-webhook-truncate-enabled=true",
"--audit-webhook-truncate-max-batch-size=43",
"--audit-webhook-truncate-max-event-size=42",
"--audit-webhook-initial-backoff=2s",
"--audit-webhook-version=audit.k8s.io/v1alpha1",
"--authentication-token-webhook-cache-ttl=3m",
@ -201,6 +208,13 @@ func TestAddFlags(t *testing.T) {
ThrottleBurst: 50,
},
},
TruncateOptions: apiserveroptions.AuditTruncateOptions{
Enabled: true,
TruncateConfig: audittruncate.Config{
MaxBatchSize: 45,
MaxEventSize: 44,
},
},
GroupVersionString: "audit.k8s.io/v1alpha1",
},
WebhookOptions: apiserveroptions.AuditWebhookOptions{
@ -216,6 +230,13 @@ func TestAddFlags(t *testing.T) {
ThrottleBurst: 44,
},
},
TruncateOptions: apiserveroptions.AuditTruncateOptions{
Enabled: true,
TruncateConfig: audittruncate.Config{
MaxBatchSize: 43,
MaxEventSize: 42,
},
},
InitialBackoff: 2 * time.Second,
GroupVersionString: "audit.k8s.io/v1alpha1",
},

View File

@ -1474,6 +1474,10 @@
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/truncate",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/webhook",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -58,6 +58,7 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/log:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/truncate:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",

View File

@ -37,6 +37,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
pluginbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
pluginlog "k8s.io/apiserver/plugin/pkg/audit/log"
plugintruncate "k8s.io/apiserver/plugin/pkg/audit/truncate"
pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
)
@ -89,6 +90,14 @@ type AuditBatchOptions struct {
BatchConfig pluginbuffered.BatchConfig
}
type AuditTruncateOptions struct {
// Whether truncating is enabled or not.
Enabled bool
// Truncating configuration.
TruncateConfig plugintruncate.Config
}
// AuditLogOptions determines the output of the structured audit log by default.
// If the AdvancedAuditing feature is set to false, AuditLogOptions holds the legacy
// audit log writer.
@ -99,7 +108,8 @@ type AuditLogOptions struct {
MaxSize int
Format string
BatchOptions AuditBatchOptions
BatchOptions AuditBatchOptions
TruncateOptions AuditTruncateOptions
// API group version used for serializing audit events.
GroupVersionString string
@ -110,7 +120,8 @@ type AuditWebhookOptions struct {
ConfigFile string
InitialBackoff time.Duration
BatchOptions AuditBatchOptions
BatchOptions AuditBatchOptions
TruncateOptions AuditTruncateOptions
// API group version used for serializing audit events.
GroupVersionString string
@ -122,11 +133,12 @@ func NewAuditOptions() *AuditOptions {
return &AuditOptions{
WebhookOptions: AuditWebhookOptions{
InitialBackoff: pluginwebhook.DefaultInitialBackoff,
BatchOptions: AuditBatchOptions{
Mode: ModeBatch,
BatchConfig: pluginbuffered.NewDefaultBatchConfig(),
},
InitialBackoff: pluginwebhook.DefaultInitialBackoff,
TruncateOptions: NewAuditTruncateOptions(),
GroupVersionString: "audit.k8s.io/v1beta1",
},
LogOptions: AuditLogOptions{
@ -135,11 +147,22 @@ func NewAuditOptions() *AuditOptions {
Mode: ModeBlocking,
BatchConfig: defaultLogBatchConfig,
},
TruncateOptions: NewAuditTruncateOptions(),
GroupVersionString: "audit.k8s.io/v1beta1",
},
}
}
func NewAuditTruncateOptions() AuditTruncateOptions {
return AuditTruncateOptions{
Enabled: false,
TruncateConfig: plugintruncate.Config{
MaxBatchSize: 10 * 1024 * 1024, // 10MB
MaxEventSize: 100 * 1024, // 100KB
},
}
}
// Validate checks invalid config combination
func (o *AuditOptions) Validate() []error {
if o == nil {
@ -232,8 +255,10 @@ func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) {
o.LogOptions.AddFlags(fs)
o.LogOptions.BatchOptions.AddFlags(pluginlog.PluginName, fs)
o.LogOptions.TruncateOptions.AddFlags(pluginlog.PluginName, fs)
o.WebhookOptions.AddFlags(fs)
o.WebhookOptions.BatchOptions.AddFlags(pluginwebhook.PluginName, fs)
o.WebhookOptions.TruncateOptions.AddFlags(pluginwebhook.PluginName, fs)
}
func (o *AuditOptions) ApplyTo(c *server.Config) error {
@ -309,6 +334,38 @@ func (o *AuditBatchOptions) wrapBackend(delegate audit.Backend) audit.Backend {
return pluginbuffered.NewBackend(delegate, o.BatchConfig)
}
func (o *AuditTruncateOptions) Validate(pluginName string) error {
config := o.TruncateConfig
if config.MaxEventSize <= 0 {
return fmt.Errorf("invalid audit truncate %s max event size %v, must be a positive number", pluginName, config.MaxEventSize)
}
if config.MaxBatchSize < config.MaxEventSize {
return fmt.Errorf("invalid audit truncate %s max batch size %v, must be greater than "+
"max event size (%v)", pluginName, config.MaxBatchSize, config.MaxEventSize)
}
return nil
}
func (o *AuditTruncateOptions) AddFlags(pluginName string, fs *pflag.FlagSet) {
fs.BoolVar(&o.Enabled, fmt.Sprintf("audit-%s-truncate-enabled", pluginName),
o.Enabled, "Whether event and batch truncating is enabled.")
fs.Int64Var(&o.TruncateConfig.MaxBatchSize, fmt.Sprintf("audit-%s-truncate-max-batch-size", pluginName),
o.TruncateConfig.MaxBatchSize, "Maximum size of the batch sent to the underlying backend. "+
"Actual serialized size can be several hundreds of bytes greater. If a batch exceeds this limit, "+
"it is split into several batches of smaller size.")
fs.Int64Var(&o.TruncateConfig.MaxEventSize, fmt.Sprintf("audit-%s-truncate-max-event-size", pluginName),
o.TruncateConfig.MaxEventSize, "Maximum size of the audit event sent to the underlying backend. "+
"If the size of an event is greater than this number, first request and response are removed, and"+
"if this doesn't reduce the size enough, event is discarded.")
}
func (o *AuditTruncateOptions) wrapBackend(delegate audit.Backend, gv schema.GroupVersion) audit.Backend {
if !o.Enabled {
return delegate
}
return plugintruncate.NewBackend(delegate, o.TruncateConfig, gv)
}
func (o *AuditLogOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.Path, "audit-log-path", o.Path,
"If set, all requests coming to the apiserver will be logged to this file. '-' means standard out.")
@ -337,6 +394,9 @@ func (o *AuditLogOptions) Validate() []error {
if err := validateBackendBatchOptions(pluginlog.PluginName, o.BatchOptions); err != nil {
allErrors = append(allErrors, err)
}
if err := o.TruncateOptions.Validate(pluginlog.PluginName); err != nil {
allErrors = append(allErrors, err)
}
if err := validateGroupVersionString(o.GroupVersionString); err != nil {
allErrors = append(allErrors, err)
@ -395,7 +455,9 @@ func (o *AuditLogOptions) advancedApplyTo(c *server.Config) error {
if w := o.getWriter(); w != nil {
groupVersion, _ := schema.ParseGroupVersion(o.GroupVersionString)
log := pluginlog.NewBackend(w, o.Format, groupVersion)
c.AuditBackend = appendBackend(c.AuditBackend, o.BatchOptions.wrapBackend(log))
log = o.BatchOptions.wrapBackend(log)
log = o.TruncateOptions.wrapBackend(log, groupVersion)
c.AuditBackend = appendBackend(c.AuditBackend, log)
}
return nil
}
@ -429,6 +491,9 @@ func (o *AuditWebhookOptions) Validate() []error {
if err := validateBackendBatchOptions(pluginwebhook.PluginName, o.BatchOptions); err != nil {
allErrors = append(allErrors, err)
}
if err := o.TruncateOptions.Validate(pluginwebhook.PluginName); err != nil {
allErrors = append(allErrors, err)
}
if err := validateGroupVersionString(o.GroupVersionString); err != nil {
allErrors = append(allErrors, err)
@ -451,6 +516,8 @@ func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
if err != nil {
return fmt.Errorf("initializing audit webhook: %v", err)
}
c.AuditBackend = appendBackend(c.AuditBackend, o.BatchOptions.wrapBackend(webhook))
webhook = o.BatchOptions.wrapBackend(webhook)
webhook = o.TruncateOptions.wrapBackend(webhook, groupVersion)
c.AuditBackend = appendBackend(c.AuditBackend, webhook)
return nil
}

View File

@ -78,6 +78,15 @@ func TestAuditValidOptions(t *testing.T) {
return o
},
expected: "union[buffered<log>,webhook]",
}, {
name: "default webhook with truncating",
options: func() *AuditOptions {
o := NewAuditOptions()
o.WebhookOptions.ConfigFile = webhookConfig
o.WebhookOptions.TruncateOptions.Enabled = true
return o
},
expected: "truncate<buffered<webhook>>",
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@ -148,6 +157,25 @@ func TestAuditInvalidOptions(t *testing.T) {
o.WebhookOptions.BatchOptions.BatchConfig.ThrottleQPS = -1
return o
},
}, {
name: "invalid webhook truncate max event size",
options: func() *AuditOptions {
o := NewAuditOptions()
o.WebhookOptions.ConfigFile = "/audit"
o.WebhookOptions.TruncateOptions.Enabled = true
o.WebhookOptions.TruncateOptions.TruncateConfig.MaxEventSize = -1
return o
},
}, {
name: "invalid webhook truncate max batch size",
options: func() *AuditOptions {
o := NewAuditOptions()
o.WebhookOptions.ConfigFile = "/audit"
o.WebhookOptions.TruncateOptions.Enabled = true
o.WebhookOptions.TruncateOptions.TruncateConfig.MaxEventSize = 2
o.WebhookOptions.TruncateOptions.TruncateConfig.MaxBatchSize = 1
return o
},
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

View File

@ -23,7 +23,9 @@ filegroup(
srcs = [
":package-srcs",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:all-srcs",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/fake:all-srcs",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/log:all-srcs",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate:all-srcs",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook:all-srcs",
],
tags = ["automanaged"],

View File

@ -25,7 +25,7 @@ go_test(
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/fake:go_default_library",
],
)

View File

@ -24,7 +24,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/plugin/pkg/audit/fake"
)
var (
@ -83,7 +83,7 @@ func TestBufferedBackendCollectEvents(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
backend := NewBackend(&fakeBackend{}, config).(*bufferedBackend)
backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend)
backend.ProcessEvents(newEvents(tc.numEvents)...)
batch := backend.collectEvents(tc.timer, tc.stopCh)
@ -96,7 +96,7 @@ func TestBufferedBackendCollectEvents(t *testing.T) {
func TestBufferedBackendProcessEventsAfterStop(t *testing.T) {
t.Parallel()
backend := NewBackend(&fakeBackend{}, NewDefaultBatchConfig()).(*bufferedBackend)
backend := NewBackend(&fake.Backend{}, NewDefaultBatchConfig()).(*bufferedBackend)
backend.Run(closedStopCh)
backend.Shutdown()
@ -111,7 +111,7 @@ func TestBufferedBackendProcessEventsBufferFull(t *testing.T) {
config := NewDefaultBatchConfig()
config.BufferSize = 1
backend := NewBackend(&fakeBackend{}, config).(*bufferedBackend)
backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend)
backend.ProcessEvents(newEvents(2)...)
@ -123,8 +123,8 @@ func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) {
delegatedCallStartCh := make(chan struct{})
delegatedCallEndCh := make(chan struct{})
delegateBackend := &fakeBackend{
onRequest: func(_ []*auditinternal.Event) {
delegateBackend := &fake.Backend{
OnRequest: func(_ []*auditinternal.Event) {
close(delegatedCallStartCh)
<-delegatedCallEndCh
},
@ -159,23 +159,3 @@ func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) {
close(delegatedCallEndCh)
<-shutdownEndCh
}
type fakeBackend struct {
onRequest func(events []*auditinternal.Event)
}
var _ audit.Backend = &fakeBackend{}
func (b *fakeBackend) Run(stopCh <-chan struct{}) error {
return nil
}
func (b *fakeBackend) Shutdown() {
return
}
func (b *fakeBackend) ProcessEvents(ev ...*auditinternal.Event) {
if b.onRequest != nil {
b.onRequest(ev)
}
}

View File

@ -0,0 +1,29 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"fake.go",
],
importpath = "k8s.io/apiserver/plugin/pkg/audit/fake",
visibility = ["//visibility:public"],
deps = [
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,18 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package fake provides a fake audit.Backend interface implementation for testing.
package fake // import "k8s.io/apiserver/plugin/pkg/audit/fake"

View File

@ -0,0 +1,46 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fake
import (
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
)
var _ audit.Backend = &Backend{}
// Backend is a fake audit backend for testing purposes.
type Backend struct {
OnRequest func(events []*auditinternal.Event)
}
// Run does nothing.
func (b *Backend) Run(stopCh <-chan struct{}) error {
return nil
}
// Shutdown does nothing.
func (b *Backend) Shutdown() {
return
}
// ProcessEvents calls a callback on a batch, if present.
func (b *Backend) ProcessEvents(ev ...*auditinternal.Event) {
if b.OnRequest != nil {
b.OnRequest(ev)
}
}

View File

@ -0,0 +1,46 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"truncate.go",
],
importpath = "k8s.io/apiserver/plugin/pkg/audit/truncate",
visibility = ["//visibility:public"],
deps = [
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["truncate_test.go"],
embed = [":go_default_library"],
deps = [
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/fake:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,19 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package truncate provides an implementation for the audit.Backend interface
// that truncates audit events and sends them to the delegate audit.Backend.
package truncate // import "k8s.io/apiserver/plugin/pkg/audit/truncate"

View File

@ -0,0 +1,158 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package truncate
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
)
const (
// PluginName is the name reported in error metrics.
PluginName = "truncate"
// annotationKey defines the name of the annotation used to indicate truncation.
annotationKey = "audit.k8s.io/truncated"
// annotationValue defines the value of the annotation used to indicate truncation.
annotationValue = "true"
)
// Config represents truncating backend configuration.
type Config struct {
// MaxEventSize defines max allowed size of the event. If the event is larger,
// truncating will be performed.
MaxEventSize int64
// MaxBatchSize defined max allowed size of the batch of events, passed to the backend.
// If the total size of the batch is larger than this number, batch will be split. Actual
// size of the serialized request might be slightly higher, on the order of hundreds of bytes.
MaxBatchSize int64
}
type backend struct {
// The delegate backend that actually exports events.
delegateBackend audit.Backend
// Configuration used for truncation.
c Config
// Encoder used to calculate audit event sizes.
e runtime.Encoder
}
var _ audit.Backend = &backend{}
// NewBackend returns a new truncating backend, using configuration passed in the parameters.
func NewBackend(delegateBackend audit.Backend, config Config, groupVersion schema.GroupVersion) audit.Backend {
return &backend{
delegateBackend: delegateBackend,
c: config,
e: audit.Codecs.LegacyCodec(groupVersion),
}
}
func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
var errors []error
var impacted []*auditinternal.Event
var batch []*auditinternal.Event
var batchSize int64
for _, event := range events {
size, err := b.calcSize(event)
// If event was correctly serialized, but the size is more than allowed
// and it makes sense to do trimming, i.e. there's a request and/or
// response present, try to strip away request and response.
if err == nil && size > b.c.MaxEventSize && event.Level.GreaterOrEqual(auditinternal.LevelRequest) {
event = truncate(event)
size, err = b.calcSize(event)
}
if err != nil {
errors = append(errors, err)
impacted = append(impacted, event)
continue
}
if size > b.c.MaxEventSize {
errors = append(errors, fmt.Errorf("event is too large even after truncating"))
impacted = append(impacted, event)
continue
}
if len(batch) > 0 && batchSize+size > b.c.MaxBatchSize {
b.delegateBackend.ProcessEvents(batch...)
batch = []*auditinternal.Event{}
batchSize = 0
}
batchSize += size
batch = append(batch, event)
}
if len(batch) > 0 {
b.delegateBackend.ProcessEvents(batch...)
}
if len(impacted) > 0 {
audit.HandlePluginError(PluginName, utilerrors.NewAggregate(errors), impacted...)
}
}
// truncate removed request and response objects from the audit events,
// to try and keep at least metadata.
func truncate(e *auditinternal.Event) *auditinternal.Event {
// Make a shallow copy to avoid copying response/request objects.
newEvent := &auditinternal.Event{}
*newEvent = *e
newEvent.RequestObject = nil
newEvent.ResponseObject = nil
audit.LogAnnotation(newEvent, annotationKey, annotationValue)
return newEvent
}
func (b *backend) Run(stopCh <-chan struct{}) error {
// Nothing to do here
return nil
}
func (b *backend) Shutdown() {
// Nothing to do here
}
func (b *backend) calcSize(e *auditinternal.Event) (int64, error) {
s := &sizer{}
if err := b.e.Encode(e, s); err != nil {
return 0, err
}
return s.Size, nil
}
func (b *backend) String() string {
return fmt.Sprintf("%s<%s>", PluginName, b.delegateBackend)
}
type sizer struct {
Size int64
}
func (s *sizer) Write(p []byte) (n int, err error) {
s.Size += int64(len(p))
return len(p), nil
}

View File

@ -0,0 +1,141 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package truncate
import (
"strings"
"testing"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1"
"k8s.io/apiserver/plugin/pkg/audit/fake"
// Importing just for the schema definitions.
_ "k8s.io/apiserver/plugin/pkg/audit/webhook"
)
var (
defaultConfig = Config{
MaxBatchSize: 4 * 1024 * 1024,
MaxEventSize: 100 * 1024,
}
)
func TestTruncatingEvents(t *testing.T) {
testCases := []struct {
desc string
event *auditinternal.Event
wantDropped bool
wantTruncated bool
}{
{
desc: "Empty event should not be truncated",
event: &auditinternal.Event{},
},
{
desc: "Event with too large body should be truncated",
event: &auditinternal.Event{
Level: auditinternal.LevelRequest,
RequestObject: &runtime.Unknown{
Raw: []byte("\"" + strings.Repeat("A", int(defaultConfig.MaxEventSize)) + "\""),
},
},
wantTruncated: true,
},
{
desc: "Event with too large metadata should be dropped",
event: &auditinternal.Event{
Annotations: map[string]string{
"key": strings.Repeat("A", int(defaultConfig.MaxEventSize)),
},
},
wantDropped: true,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
var event *auditinternal.Event
fb := &fake.Backend{
OnRequest: func(events []*auditinternal.Event) {
require.Equal(t, 1, len(events), "Expected single event in batch")
event = events[0]
},
}
b := NewBackend(fb, defaultConfig, auditv1beta1.SchemeGroupVersion)
b.ProcessEvents(tc.event)
require.Equal(t, !tc.wantDropped, event != nil, "Incorrect event presence")
if tc.wantTruncated {
require.Equal(t, annotationValue, event.Annotations[annotationKey], "Annotation should be present")
require.Nil(t, event.RequestObject, "After truncation request should be nil")
require.Nil(t, event.ResponseObject, "After truncation response should be nil")
}
})
}
}
func TestSplittingBatches(t *testing.T) {
testCases := []struct {
desc string
config Config
events []*auditinternal.Event
wantBatchCount int
}{
{
desc: "Events fitting in one batch should not be split",
config: defaultConfig,
events: []*auditinternal.Event{{}},
wantBatchCount: 1,
},
{
desc: "Events not fitting in one batch should be split",
config: Config{
MaxEventSize: defaultConfig.MaxEventSize,
MaxBatchSize: 1,
},
events: []*auditinternal.Event{
{Annotations: map[string]string{"key": strings.Repeat("A", int(50))}},
{Annotations: map[string]string{"key": strings.Repeat("A", int(50))}},
},
wantBatchCount: 2,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
var gotBatchCount int
fb := &fake.Backend{
OnRequest: func(events []*auditinternal.Event) {
gotBatchCount++
},
}
b := NewBackend(fb, tc.config, auditv1beta1.SchemeGroupVersion)
b.ProcessEvents(tc.events...)
require.Equal(t, tc.wantBatchCount, gotBatchCount)
})
}
}

View File

@ -1146,6 +1146,10 @@
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/truncate",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/webhook",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -1114,6 +1114,10 @@
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/truncate",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/webhook",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"