Merge pull request #45919 from ericchiang/audit-webhook-backend

Automatic merge from submit-queue

apiserver: add a webhook implementation of the audit backend

This builds off of #45315 and is intended to implement an interfaced defined in #45766.

TODO:

- [x] Rebase on top of API types PR.
- [x] Rebase on top of API types updates (#46065)
- [x] Rebase on top of feature flag (#46009)
- [x] Rebase on top of audit instrumentation.
- [x] Hook up API server flag or register plugin (depending on #45766)

Features issue https://github.com/kubernetes/features/issues/22

Design proposal https://github.com/kubernetes/community/blob/master/contributors/design-proposals/auditing.md

```release-notes
Webhook added to the API server which omits structured audit log events.
```

/cc @soltysh @timstclair @soltysh @deads2k
pull/6/head
Kubernetes Submit Queue 2017-06-01 13:41:59 -07:00 committed by GitHub
commit f7a1f10275
14 changed files with 927 additions and 33 deletions

View File

@ -45,7 +45,7 @@ type ServerRunOptions struct {
Etcd *genericoptions.EtcdOptions
SecureServing *genericoptions.SecureServingOptions
InsecureServing *kubeoptions.InsecureServingOptions
Audit *genericoptions.AuditLogOptions
Audit *genericoptions.AuditOptions
Features *genericoptions.FeatureOptions
Admission *genericoptions.AdmissionOptions
Authentication *kubeoptions.BuiltInAuthenticationOptions
@ -79,7 +79,7 @@ func NewServerRunOptions() *ServerRunOptions {
Etcd: genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, api.Scheme, nil)),
SecureServing: kubeoptions.NewSecureServingOptions(),
InsecureServing: kubeoptions.NewInsecureServingOptions(),
Audit: genericoptions.NewAuditLogOptions(),
Audit: genericoptions.NewAuditOptions(),
Features: genericoptions.NewFeatureOptions(),
Admission: genericoptions.NewAdmissionOptions(),
Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(),

View File

@ -37,7 +37,7 @@ type ServerRunOptions struct {
Etcd *genericoptions.EtcdOptions
SecureServing *genericoptions.SecureServingOptions
InsecureServing *kubeoptions.InsecureServingOptions
Audit *genericoptions.AuditLogOptions
Audit *genericoptions.AuditOptions
Features *genericoptions.FeatureOptions
Admission *genericoptions.AdmissionOptions
Authentication *kubeoptions.BuiltInAuthenticationOptions
@ -56,7 +56,7 @@ func NewServerRunOptions() *ServerRunOptions {
Etcd: genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, api.Scheme, nil)),
SecureServing: kubeoptions.NewSecureServingOptions(),
InsecureServing: kubeoptions.NewInsecureServingOptions(),
Audit: genericoptions.NewAuditLogOptions(),
Audit: genericoptions.NewAuditOptions(),
Features: genericoptions.NewFeatureOptions(),
Admission: genericoptions.NewAdmissionOptions(),
Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(),

View File

@ -359,6 +359,7 @@ staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory
staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/aes
staging/src/k8s.io/apiserver/pkg/util/flushwriter
staging/src/k8s.io/apiserver/pkg/util/logs
staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook
staging/src/k8s.io/apiserver/plugin/pkg/authenticator
staging/src/k8s.io/apiserver/plugin/pkg/authenticator/password
staging/src/k8s.io/apiserver/plugin/pkg/authenticator/password/allow

View File

@ -46,6 +46,8 @@ audit-log-maxage
audit-log-maxbackup
audit-log-maxsize
audit-log-path
audit-webhook-config-file
audit-webhook-mode
authentication-kubeconfig
authentication-token-webhook
authentication-token-webhook-cache-ttl

View File

@ -5,6 +5,7 @@ licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
@ -13,6 +14,7 @@ go_library(
"request.go",
"scheme.go",
"types.go",
"union.go",
],
tags = ["automanaged"],
deps = [
@ -23,6 +25,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library",
@ -30,3 +33,14 @@ go_library(
"//vendor/k8s.io/client-go/pkg/apis/authentication/v1:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["union_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
],
)

View File

@ -0,0 +1,51 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package audit
import (
"k8s.io/apimachinery/pkg/util/errors"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
)
// Union returns an audit Backend which logs events to a set of backends. The returned
// Sink implementation blocks in turn for each call to ProcessEvents.
func Union(backends ...Backend) Backend {
if len(backends) == 1 {
return backends[0]
}
return union{backends}
}
type union struct {
backends []Backend
}
func (u union) ProcessEvents(events ...*auditinternal.Event) {
for _, backend := range u.backends {
backend.ProcessEvents(events...)
}
}
func (u union) Run(stopCh <-chan struct{}) error {
var funcs []func() error
for _, backend := range u.backends {
funcs = append(funcs, func() error {
return backend.Run(stopCh)
})
}
return errors.AggregateGoroutines(funcs...)
}

View File

@ -0,0 +1,73 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package audit
import (
"strconv"
"testing"
"k8s.io/apimachinery/pkg/types"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
)
type fakeBackend struct {
events []*auditinternal.Event
}
func (f *fakeBackend) ProcessEvents(events ...*auditinternal.Event) {
f.events = append(f.events, events...)
}
func (f *fakeBackend) Run(stopCh <-chan struct{}) error {
return nil
}
func TestUnion(t *testing.T) {
backends := []Backend{
new(fakeBackend),
new(fakeBackend),
new(fakeBackend),
}
b := Union(backends...)
n := 5
var events []*auditinternal.Event
for i := 0; i < n; i++ {
events = append(events, &auditinternal.Event{
AuditID: types.UID(strconv.Itoa(i)),
})
}
b.ProcessEvents(events...)
for i, b := range backends {
// so we can inspect the underlying events.
backend := b.(*fakeBackend)
if got := len(backend.events); got != n {
t.Errorf("backend %d wanted %d events, got %d", i, n, got)
continue
}
for j, event := range backend.events {
wantID := types.UID(strconv.Itoa(j))
if event.AuditID != wantID {
t.Errorf("backend %d event %d wanted id %s, got %s", i, j, wantID, event.AuditID)
}
}
}
}

View File

@ -54,6 +54,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission/initializer:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit/policy:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/authenticatorfactory:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library",
@ -66,6 +67,7 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/log:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/authentication/v1beta1:go_default_library",

View File

@ -20,28 +20,106 @@ import (
"fmt"
"io"
"os"
"strings"
"github.com/spf13/pflag"
"gopkg.in/natefinch/lumberjack.v2"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/audit/policy"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server"
utilfeature "k8s.io/apiserver/pkg/util/feature"
pluginlog "k8s.io/apiserver/plugin/pkg/audit/log"
pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
)
func appendBackend(existing, newBackend audit.Backend) audit.Backend {
if existing == nil {
return newBackend
}
return audit.Union(existing, newBackend)
}
func advancedAuditingEnabled() bool {
return utilfeature.DefaultFeatureGate.Enabled(features.AdvancedAuditing)
}
type AuditOptions struct {
// Policy configuration file for filtering audit events that are captured.
// If unspecified, a default is provided.
PolicyFile string
// Plugin options
LogOptions AuditLogOptions
WebhookOptions AuditWebhookOptions
}
// AuditLogOptions holds the legacy audit log writer. If the AdvancedAuditing feature
// is enabled, these options determine the output of the structured audit log.
type AuditLogOptions struct {
Path string
MaxAge int
MaxBackups int
MaxSize int
PolicyFile string
}
func NewAuditLogOptions() *AuditLogOptions {
return &AuditLogOptions{}
// AuditWebhookOptions control the webhook configuration for audit events.
type AuditWebhookOptions struct {
ConfigFile string
// Should the webhook asynchronous batch events to the webhook backend or
// should the webhook block responses?
//
// Defaults to asynchronous batch events.
Mode string
}
func NewAuditOptions() *AuditOptions {
return &AuditOptions{
WebhookOptions: AuditWebhookOptions{Mode: pluginwebhook.ModeBatch},
}
}
func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.PolicyFile, "audit-policy-file", o.PolicyFile,
"Path to the file that defines the audit policy configuration. Requires the 'AdvancedAuditing' feature gate."+
" With AdvancedAuditing, a profile is required to enable auditing.")
o.LogOptions.AddFlags(fs)
o.WebhookOptions.AddFlags(fs)
}
func (o *AuditOptions) ApplyTo(c *server.Config) error {
// Apply generic options.
if err := o.applyTo(c); err != nil {
return err
}
// Apply plugin options.
if err := o.LogOptions.applyTo(c); err != nil {
return err
}
if err := o.WebhookOptions.applyTo(c); err != nil {
return err
}
return nil
}
func (o *AuditOptions) applyTo(c *server.Config) error {
if o.PolicyFile == "" {
return nil
}
if !advancedAuditingEnabled() {
return fmt.Errorf("feature '%s' must be enabled to set an audit policy", features.AdvancedAuditing)
}
p, err := policy.LoadPolicyFromFile(o.PolicyFile)
if err != nil {
return fmt.Errorf("loading audit policy file: %v", err)
}
c.AuditPolicyChecker = policy.NewChecker(p)
return nil
}
func (o *AuditLogOptions) AddFlags(fs *pflag.FlagSet) {
@ -53,29 +131,10 @@ func (o *AuditLogOptions) AddFlags(fs *pflag.FlagSet) {
"The maximum number of old audit log files to retain.")
fs.IntVar(&o.MaxSize, "audit-log-maxsize", o.MaxSize,
"The maximum size in megabytes of the audit log file before it gets rotated.")
fs.StringVar(&o.PolicyFile, "audit-policy-file", o.PolicyFile,
"Path to the file that defines the audit policy configuration. Requires the 'AdvancedAuditing' feature gate."+
" With AdvancedAuditing, a profile is required to enable auditing.")
}
func (o *AuditLogOptions) ApplyTo(c *server.Config) error {
if utilfeature.DefaultFeatureGate.Enabled(features.AdvancedAuditing) {
if o.PolicyFile != "" {
p, err := policy.LoadPolicyFromFile(o.PolicyFile)
if err != nil {
return err
}
c.AuditPolicyChecker = policy.NewChecker(p)
}
} else {
if o.PolicyFile != "" {
return fmt.Errorf("feature '%s' must be enabled to set an audit policy", features.AdvancedAuditing)
}
}
// TODO: Generalize for alternative audit backends.
if len(o.Path) == 0 {
func (o *AuditLogOptions) applyTo(c *server.Config) error {
if o.Path == "" {
return nil
}
@ -89,6 +148,35 @@ func (o *AuditLogOptions) ApplyTo(c *server.Config) error {
}
}
c.LegacyAuditWriter = w
c.AuditBackend = pluginlog.NewBackend(w)
if advancedAuditingEnabled() {
c.AuditBackend = appendBackend(c.AuditBackend, pluginlog.NewBackend(w))
}
return nil
}
func (o *AuditWebhookOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.ConfigFile, "audit-webhook-config-file", o.ConfigFile,
"Path to a kubeconfig formatted file that defines the audit webhook configuration."+
" Requires the 'AdvancedAuditing' feature gate.")
fs.StringVar(&o.Mode, "audit-webhook-mode", o.Mode,
"Strategy for sending audit events. Blocking indicates sending events should block"+
" server responses. Batch causes the webhook to buffer and send events"+
" asynchronously. Known modes are "+strings.Join(pluginwebhook.AllowedModes, ",")+".")
}
func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
if o.ConfigFile == "" {
return nil
}
if !advancedAuditingEnabled() {
return fmt.Errorf("feature '%s' must be enabled to set an audit webhook", features.AdvancedAuditing)
}
webhook, err := pluginwebhook.NewBackend(o.ConfigFile, o.Mode)
if err != nil {
return fmt.Errorf("initializing audit webhook: %v", err)
}
c.AuditBackend = appendBackend(c.AuditBackend, webhook)
return nil
}

View File

@ -31,7 +31,7 @@ type RecommendedOptions struct {
SecureServing *SecureServingOptions
Authentication *DelegatingAuthenticationOptions
Authorization *DelegatingAuthorizationOptions
Audit *AuditLogOptions
Audit *AuditOptions
Features *FeatureOptions
}
@ -41,7 +41,7 @@ func NewRecommendedOptions(prefix string, copier runtime.ObjectCopier, codec run
SecureServing: NewSecureServingOptions(),
Authentication: NewDelegatingAuthenticationOptions(),
Authorization: NewDelegatingAuthorizationOptions(),
Audit: NewAuditLogOptions(),
Audit: NewAuditOptions(),
Features: NewFeatureOptions(),
}
}

View File

@ -0,0 +1,46 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["webhook_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd/api/v1:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = ["webhook.go"],
tags = ["automanaged"],
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apimachinery/announced:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apimachinery/registered:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit/install:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library",
],
)

View File

@ -0,0 +1,285 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package webhook implements the audit.Backend interface using HTTP webhooks.
package webhook
import (
"fmt"
"reflect"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/apimachinery/announced"
"k8s.io/apimachinery/pkg/apimachinery/registered"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/runtime"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/apis/audit/install"
auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/util/webhook"
)
const (
// ModeBatch indicates that the webhook should buffer audit events
// internally, sending batch updates either once a certain number of
// events have been received or a certain amount of time has passed.
ModeBatch = "batch"
// ModeBlocking causes the webhook to block on every attempt to process
// a set of events. This causes requests to the API server to wait for a
// round trip to the external audit service before sending a response.
ModeBlocking = "blocking"
)
// AllowedModes is the modes known by this webhook.
var AllowedModes = []string{
ModeBatch,
ModeBlocking,
}
const (
// Default configuration values for ModeBatch.
//
// TODO(ericchiang): Make these value configurable. Maybe through a
// kubeconfig extension?
defaultBatchBufferSize = 1000 // Buffer up to 1000 events before blocking.
defaultBatchMaxSize = 100 // Only send 100 events at a time.
defaultBatchMaxWait = time.Minute // Send events at least once a minute.
)
// NewBackend returns an audit backend that sends events over HTTP to an external service.
// The mode indicates the caching behavior of the webhook. Either blocking (ModeBlocking)
// or buffered with batch POSTs (ModeBatch).
func NewBackend(kubeConfigFile string, mode string) (audit.Backend, error) {
switch mode {
case ModeBatch:
return newBatchWebhook(kubeConfigFile)
case ModeBlocking:
return newBlockingWebhook(kubeConfigFile)
default:
return nil, fmt.Errorf("webhook mode %q is not in list of known modes (%s)",
mode, strings.Join(AllowedModes, ","))
}
}
var (
// NOTE: Copied from other webhook implementations
//
// Can we make these passable to NewGenericWebhook?
groupFactoryRegistry = make(announced.APIGroupFactoryRegistry)
groupVersions = []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion}
registry = registered.NewOrDie("")
)
func init() {
registry.RegisterVersions(groupVersions)
if err := registry.EnableVersions(groupVersions...); err != nil {
panic(fmt.Sprintf("failed to enable version %v", groupVersions))
}
install.Install(groupFactoryRegistry, registry, audit.Scheme)
}
func loadWebhook(configFile string) (*webhook.GenericWebhook, error) {
return webhook.NewGenericWebhook(registry, audit.Codecs, configFile, groupVersions, 0)
}
func newBlockingWebhook(configFile string) (*blockingBackend, error) {
w, err := loadWebhook(configFile)
if err != nil {
return nil, err
}
return &blockingBackend{w}, nil
}
type blockingBackend struct {
w *webhook.GenericWebhook
}
func (b *blockingBackend) Run(stopCh <-chan struct{}) error {
return nil
}
func (b *blockingBackend) ProcessEvents(ev ...*auditinternal.Event) {
if err := b.processEvents(ev...); err != nil {
glog.Errorf("failed to POST webhook events: %v", err)
}
}
func (b *blockingBackend) processEvents(ev ...*auditinternal.Event) error {
var list auditinternal.EventList
for _, e := range ev {
list.Items = append(list.Items, *e)
}
// NOTE: No exponential backoff because this is the blocking webhook
// mode. Any attempts to retry will block API server requests.
return b.w.RestClient.Post().Body(&list).Do().Error()
}
// Copied from generated code in k8s.io/apiserver/pkg/apis/audit.
//
// TODO(ericchiang): Have the generated code expose these methods like metav1.GetGeneratedDeepCopyFuncs().
var auditDeepCopyFuncs = []conversion.GeneratedDeepCopyFunc{
{Fn: auditinternal.DeepCopy_audit_Event, InType: reflect.TypeOf(&auditinternal.Event{})},
{Fn: auditinternal.DeepCopy_audit_EventList, InType: reflect.TypeOf(&auditinternal.EventList{})},
{Fn: auditinternal.DeepCopy_audit_GroupResources, InType: reflect.TypeOf(&auditinternal.GroupResources{})},
{Fn: auditinternal.DeepCopy_audit_ObjectReference, InType: reflect.TypeOf(&auditinternal.ObjectReference{})},
{Fn: auditinternal.DeepCopy_audit_Policy, InType: reflect.TypeOf(&auditinternal.Policy{})},
{Fn: auditinternal.DeepCopy_audit_PolicyList, InType: reflect.TypeOf(&auditinternal.PolicyList{})},
{Fn: auditinternal.DeepCopy_audit_PolicyRule, InType: reflect.TypeOf(&auditinternal.PolicyRule{})},
{Fn: auditinternal.DeepCopy_audit_UserInfo, InType: reflect.TypeOf(&auditinternal.UserInfo{})},
}
func newBatchWebhook(configFile string) (*batchBackend, error) {
w, err := loadWebhook(configFile)
if err != nil {
return nil, err
}
c := conversion.NewCloner()
for _, f := range metav1.GetGeneratedDeepCopyFuncs() {
if err := c.RegisterGeneratedDeepCopyFunc(f); err != nil {
return nil, fmt.Errorf("registering meta deep copy method: %v", err)
}
}
for _, f := range auditDeepCopyFuncs {
if err := c.RegisterGeneratedDeepCopyFunc(f); err != nil {
return nil, fmt.Errorf("registering audit deep copy method: %v", err)
}
}
return &batchBackend{
w: w,
buffer: make(chan *auditinternal.Event, defaultBatchBufferSize),
maxBatchSize: defaultBatchMaxSize,
maxBatchWait: defaultBatchMaxWait,
cloner: c,
}, nil
}
type batchBackend struct {
w *webhook.GenericWebhook
// Cloner is used to deep copy events as they are buffered.
cloner *conversion.Cloner
// Channel to buffer events in memory before sending them on the webhook.
buffer chan *auditinternal.Event
// Maximum number of events that can be sent at once.
maxBatchSize int
// Amount of time to wait after sending events before force sending another set.
//
// Receiving maxBatchSize events will always trigger a send, regardless of
// if this amount of time has been reached.
maxBatchWait time.Duration
}
func (b *batchBackend) Run(stopCh <-chan struct{}) error {
f := func() {
// Recover from any panics caused by this method so a panic in the
// goroutine can't bring down the main routine.
defer runtime.HandleCrash()
t := time.NewTimer(b.maxBatchWait)
defer t.Stop() // Release ticker resources
b.sendBatchEvents(stopCh, t.C)
}
go func() {
for {
f()
select {
case <-stopCh:
return
default:
}
}
}()
return nil
}
// sendBatchEvents attempts to batch some number of events to the backend. It POSTs events
// in a goroutine and logging any error encountered during the POST.
//
// The following things can cause sendBatchEvents to exit:
//
// * Some maximum number of events are received.
// * Timer has passed, all queued events are sent.
// * StopCh is closed, all queued events are sent.
//
func (b *batchBackend) sendBatchEvents(stopCh <-chan struct{}, timer <-chan time.Time) {
var events []auditinternal.Event
L:
for i := 0; i < b.maxBatchSize; i++ {
select {
case ev := <-b.buffer:
events = append(events, *ev)
case <-timer:
// Timer has expired. Send whatever events are in the queue.
break L
case <-stopCh:
// Webhook has shut down. Send the last events.
break L
}
}
if len(events) == 0 {
return
}
list := auditinternal.EventList{Items: events}
go func() {
// Execute the webhook POST in a goroutine to keep it from blocking.
// This lets the webhook continue to drain the queue immediatly.
defer runtime.HandleCrash()
err := webhook.WithExponentialBackoff(0, func() error {
return b.w.RestClient.Post().Body(&list).Do().Error()
})
if err != nil {
glog.Errorf("failed to POST webhook events: %v", err)
}
}()
return
}
func (b *batchBackend) ProcessEvents(ev ...*auditinternal.Event) {
for i, e := range ev {
// Per the audit.Backend interface these events are reused after being
// sent to the Sink. Deep copy and send the copy to the queue.
event := new(auditinternal.Event)
if err := auditinternal.DeepCopy_audit_Event(e, event, b.cloner); err != nil {
glog.Errorf("failed to clone audit event: %v: %#v", err, e)
return
}
select {
case b.buffer <- event:
default:
glog.Errorf("audit webhook queue blocked, failed to send %d event(s)", len(ev)-i)
return
}
}
}

View File

@ -0,0 +1,332 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package webhook
import (
stdjson "encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1"
"k8s.io/apiserver/pkg/audit"
"k8s.io/client-go/tools/clientcmd/api/v1"
)
// newWebhookHandler returns a handler which recieves webhook events and decodes the
// request body. The caller passes a callback which is called on each webhook POST.
func newWebhookHandler(t *testing.T, cb func(events *auditv1alpha1.EventList)) http.Handler {
s := json.NewSerializer(json.DefaultMetaFactory, audit.Scheme, audit.Scheme, false)
return &testWebhookHandler{
t: t,
onEvents: cb,
serializer: s,
}
}
type testWebhookHandler struct {
t *testing.T
onEvents func(events *auditv1alpha1.EventList)
serializer runtime.Serializer
}
func (t *testWebhookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
err := func() error {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return fmt.Errorf("read webhook request body: %v", err)
}
obj, _, err := t.serializer.Decode(body, nil, &auditv1alpha1.EventList{})
if err != nil {
return fmt.Errorf("decode request body: %v", err)
}
list, ok := obj.(*auditv1alpha1.EventList)
if !ok {
return fmt.Errorf("expected *v1alpha1.EventList got %T", obj)
}
t.onEvents(list)
return nil
}()
if err == nil {
io.WriteString(w, "{}")
return
}
// In a goroutine, can't call Fatal.
assert.NoError(t.t, err, "failed to read request body")
http.Error(w, err.Error(), http.StatusInternalServerError)
}
func newTestBlockingWebhook(t *testing.T, endpoint string) *blockingBackend {
return newWebhook(t, endpoint, ModeBlocking).(*blockingBackend)
}
func newTestBatchWebhook(t *testing.T, endpoint string) *batchBackend {
return newWebhook(t, endpoint, ModeBatch).(*batchBackend)
}
func newWebhook(t *testing.T, endpoint string, mode string) audit.Backend {
config := v1.Config{
Clusters: []v1.NamedCluster{
{Cluster: v1.Cluster{Server: endpoint, InsecureSkipTLSVerify: true}},
},
}
f, err := ioutil.TempFile("", "k8s_audit_webhook_test_")
require.NoError(t, err, "creating temp file")
defer func() {
f.Close()
os.Remove(f.Name())
}()
// NOTE(ericchiang): Do we need to use a proper serializer?
require.NoError(t, stdjson.NewEncoder(f).Encode(config), "writing kubeconfig")
backend, err := NewBackend(f.Name(), mode)
require.NoError(t, err, "initializing backend")
return backend
}
func TestWebhook(t *testing.T) {
gotEvents := false
defer func() { require.True(t, gotEvents, "no events received") }()
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) {
gotEvents = true
}))
defer s.Close()
backend := newTestBlockingWebhook(t, s.URL)
// Ensure this doesn't return a serialization error.
event := &auditinternal.Event{}
require.NoError(t, backend.processEvents(event), "failed to send events")
}
// waitForEmptyBuffer indicates when the sendBatchEvents method has read from the
// existing buffer. This lets test coordinate closing a timer and stop channel
// until the for loop has read from the buffer.
func waitForEmptyBuffer(b *batchBackend) {
for len(b.buffer) != 0 {
time.Sleep(time.Millisecond)
}
}
func TestBatchWebhookMaxEvents(t *testing.T) {
nRest := 10
events := make([]*auditinternal.Event, defaultBatchMaxSize+nRest) // greater than max size.
for i := range events {
events[i] = &auditinternal.Event{}
}
got := make(chan int, 2)
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) {
got <- len(events.Items)
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL)
backend.ProcessEvents(events...)
stopCh := make(chan struct{})
timer := make(chan time.Time, 1)
backend.sendBatchEvents(stopCh, timer)
require.Equal(t, defaultBatchMaxSize, <-got, "did not get batch max size")
go func() {
waitForEmptyBuffer(backend) // wait for the buffer to empty
timer <- time.Now() // Trigger the wait timeout
}()
backend.sendBatchEvents(stopCh, timer)
require.Equal(t, nRest, <-got, "failed to get the rest of the events")
}
func TestBatchWebhookStopCh(t *testing.T) {
events := make([]*auditinternal.Event, 1) // less than max size.
for i := range events {
events[i] = &auditinternal.Event{}
}
expected := len(events)
got := make(chan int, 2)
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) {
got <- len(events.Items)
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL)
backend.ProcessEvents(events...)
stopCh := make(chan struct{})
timer := make(chan time.Time)
go func() {
waitForEmptyBuffer(backend)
close(stopCh) // stop channel has stopped
}()
backend.sendBatchEvents(stopCh, timer)
require.Equal(t, expected, <-got, "get queued events after timer expires")
}
func TestBatchWebhookEmptyBuffer(t *testing.T) {
events := make([]*auditinternal.Event, 1) // less than max size.
for i := range events {
events[i] = &auditinternal.Event{}
}
expected := len(events)
got := make(chan int, 2)
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) {
got <- len(events.Items)
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL)
stopCh := make(chan struct{})
timer := make(chan time.Time, 1)
timer <- time.Now() // Timer is done.
// Buffer is empty, no events have been queued. This should exit but send no events.
backend.sendBatchEvents(stopCh, timer)
// Send additional events after the sendBatchEvents has been called.
backend.ProcessEvents(events...)
go func() {
waitForEmptyBuffer(backend)
timer <- time.Now()
}()
backend.sendBatchEvents(stopCh, timer)
// Make sure we didn't get a POST with zero events.
require.Equal(t, expected, <-got, "expected one event")
}
func TestBatchBufferFull(t *testing.T) {
events := make([]*auditinternal.Event, defaultBatchBufferSize+1) // More than buffered size
for i := range events {
events[i] = &auditinternal.Event{}
}
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) {
// Do nothing.
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL)
// Make sure this doesn't block.
backend.ProcessEvents(events...)
}
func TestBatchRun(t *testing.T) {
// Divisable by max batch size so we don't have to wait for a minute for
// the test to finish.
events := make([]*auditinternal.Event, defaultBatchMaxSize*3)
for i := range events {
events[i] = &auditinternal.Event{}
}
got := new(int64)
want := len(events)
wg := new(sync.WaitGroup)
wg.Add(want)
done := make(chan struct{})
go func() {
wg.Wait()
// When the expected number of events have been received, close the channel.
close(done)
}()
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) {
atomic.AddInt64(got, int64(len(events.Items)))
wg.Add(-len(events.Items))
}))
defer s.Close()
stopCh := make(chan struct{})
defer close(stopCh)
backend := newTestBatchWebhook(t, s.URL)
// Test the Run codepath. E.g. that the spawned goroutines behave correctly.
backend.Run(stopCh)
backend.ProcessEvents(events...)
select {
case <-done:
// Received all the events.
case <-time.After(2 * time.Minute):
t.Errorf("expected %d events got %d", want, atomic.LoadInt64(got))
}
}
func TestBatchConcurrentRequests(t *testing.T) {
events := make([]*auditinternal.Event, defaultBatchBufferSize) // Don't drop events
for i := range events {
events[i] = &auditinternal.Event{}
}
wg := new(sync.WaitGroup)
wg.Add(len(events))
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) {
wg.Add(-len(events.Items))
// Since the webhook makes concurrent requests, blocking on the webhook response
// shouldn't block the webhook from sending more events.
//
// Wait for all responses to be received before sending the response.
wg.Wait()
}))
defer s.Close()
stopCh := make(chan struct{})
defer close(stopCh)
backend := newTestBatchWebhook(t, s.URL)
backend.Run(stopCh)
backend.ProcessEvents(events...)
// Wait for the webhook to receive all events.
wg.Wait()
}

View File

@ -42,7 +42,7 @@ func DefaultServerConfig() (*extensionsapiserver.Config, error) {
}
options := server.NewCustomResourceDefinitionsServerOptions(os.Stdout, os.Stderr)
options.RecommendedOptions.Audit.Path = "-"
options.RecommendedOptions.Audit.LogOptions.Path = "-"
options.RecommendedOptions.SecureServing.BindPort = port
options.RecommendedOptions.Authentication.SkipInClusterLookup = true
options.RecommendedOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1")