adds dynamic audit plugins

pull/58/head
Patrick Barker 2018-10-18 21:34:02 -05:00
parent 76234a31b0
commit 8eb2150689
18 changed files with 1960 additions and 4 deletions

View File

@ -0,0 +1,147 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package event
import (
"fmt"
"net/url"
"k8s.io/apiserver/pkg/apis/audit"
authuser "k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer"
)
var _ authorizer.Attributes = &attributes{}
// attributes implements the authorizer attributes interface
// with event data. This is used for enforced audit backends
type attributes struct {
event *audit.Event
path string
}
// NewAttributes returns a new attributes struct and parsed request uri
// if needed
func NewAttributes(event *audit.Event) (authorizer.Attributes, error) {
a := attributes{
event: event,
}
if event.ObjectRef == nil {
u, err := url.ParseRequestURI(a.event.RequestURI)
if err != nil {
return nil, fmt.Errorf("could not parse url: %v", err)
}
a.path = u.Path
}
return &a, nil
}
// GetUser returns the user. This is only used for checking audit policy,
// and the audit policy user check is based off the original user,
// not the impersonated user.
func (a *attributes) GetUser() authuser.Info {
return user(a.event.User)
}
// GetVerb returns the verb
func (a *attributes) GetVerb() string {
return a.event.Verb
}
// IsReadOnly determines if the verb is a read only action
func (a *attributes) IsReadOnly() bool {
return a.event.Verb == "get" || a.event.Verb == "list" || a.event.Verb == "watch"
}
// GetNamespace returns the object namespace if present
func (a *attributes) GetNamespace() string {
if a.event.ObjectRef == nil {
return ""
}
return a.event.ObjectRef.Namespace
}
// GetResource returns the object resource if present
func (a *attributes) GetResource() string {
if a.event.ObjectRef == nil {
return ""
}
return a.event.ObjectRef.Resource
}
// GetSubresource returns the object subresource if present
func (a *attributes) GetSubresource() string {
if a.event.ObjectRef == nil {
return ""
}
return a.event.ObjectRef.Subresource
}
// GetName returns the object name if present
func (a *attributes) GetName() string {
if a.event.ObjectRef == nil {
return ""
}
return a.event.ObjectRef.Name
}
// GetAPIGroup returns the object api group if present
func (a *attributes) GetAPIGroup() string {
if a.event.ObjectRef == nil {
return ""
}
return a.event.ObjectRef.APIGroup
}
// GetAPIVersion returns the object api version if present
func (a *attributes) GetAPIVersion() string {
if a.event.ObjectRef == nil {
return ""
}
return a.event.ObjectRef.APIVersion
}
// IsResourceRequest determines if the request was acted on a resource
func (a *attributes) IsResourceRequest() bool {
return a.event.ObjectRef != nil
}
// GetPath returns the path uri accessed
func (a *attributes) GetPath() string {
return a.path
}
// user represents the event user
type user audit.UserInfo
// GetName returns the user name
func (u user) GetName() string { return u.Username }
// GetUID returns the user uid
func (u user) GetUID() string { return u.UID }
// GetGroups returns the user groups
func (u user) GetGroups() []string { return u.Groups }
// GetExtra returns the user extra data
func (u user) GetExtra() map[string][]string {
m := map[string][]string{}
for k, v := range u.Extra {
m[k] = []string(v)
}
return m
}

View File

@ -0,0 +1,96 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package event
import (
"testing"
"github.com/stretchr/testify/require"
"k8s.io/apiserver/pkg/apis/audit"
)
func TestAttributes(t *testing.T) {
for _, tc := range []struct {
desc string
ev *audit.Event
path string
isReadOnly bool
resourceName string
shouldErr bool
}{
{
desc: "has resources",
ev: &audit.Event{
Verb: "get",
ObjectRef: &audit.ObjectReference{
Resource: "pod",
Name: "mypod",
Namespace: "test",
},
RequestURI: "/api/v1/namespaces/test/pods",
},
path: "",
isReadOnly: true,
resourceName: "mypod",
shouldErr: false,
},
{
desc: "no resources",
ev: &audit.Event{
Verb: "create",
RequestURI: "/api/v1/namespaces/test/pods",
},
path: "/api/v1/namespaces/test/pods",
isReadOnly: false,
resourceName: "",
shouldErr: false,
},
{
desc: "no path or resource",
ev: &audit.Event{
Verb: "create",
},
path: "",
isReadOnly: false,
resourceName: "",
shouldErr: true,
},
{
desc: "invalid path",
ev: &audit.Event{
Verb: "create",
},
path: "a/bad/path",
isReadOnly: false,
resourceName: "",
shouldErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
attr, err := NewAttributes(tc.ev)
if tc.shouldErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.path, attr.GetPath())
require.Equal(t, tc.isReadOnly, attr.IsReadOnly())
require.Equal(t, tc.resourceName, attr.GetName())
})
}
}

View File

@ -0,0 +1,39 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package policy
import (
"k8s.io/api/auditregistration/v1alpha1"
"k8s.io/apiserver/pkg/apis/audit"
)
// ConvertDynamicPolicyToInternal constructs an internal policy type from a
// v1alpha1 dynamic type
func ConvertDynamicPolicyToInternal(p *v1alpha1.Policy) *audit.Policy {
stages := make([]audit.Stage, len(p.Stages))
for i, stage := range p.Stages {
stages[i] = audit.Stage(stage)
}
return &audit.Policy{
Rules: []audit.PolicyRule{
{
Level: audit.Level(p.Level),
},
},
OmitStages: InvertStages(stages),
}
}

View File

@ -0,0 +1,81 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package policy
import (
"testing"
"github.com/stretchr/testify/require"
"k8s.io/api/auditregistration/v1alpha1"
"k8s.io/apiserver/pkg/apis/audit"
)
func TestConvertDynamicPolicyToInternal(t *testing.T) {
for _, test := range []struct {
desc string
dynamic *v1alpha1.Policy
internal *audit.Policy
}{
{
desc: "should convert full",
dynamic: &v1alpha1.Policy{
Level: v1alpha1.LevelMetadata,
Stages: []v1alpha1.Stage{
v1alpha1.StageResponseComplete,
},
},
internal: &audit.Policy{
Rules: []audit.PolicyRule{
{
Level: audit.LevelMetadata,
},
},
OmitStages: []audit.Stage{
audit.StageRequestReceived,
audit.StageResponseStarted,
audit.StagePanic,
},
},
},
{
desc: "should convert missing stages",
dynamic: &v1alpha1.Policy{
Level: v1alpha1.LevelMetadata,
},
internal: &audit.Policy{
Rules: []audit.PolicyRule{
{
Level: audit.LevelMetadata,
},
},
OmitStages: []audit.Stage{
audit.StageRequestReceived,
audit.StageResponseStarted,
audit.StageResponseComplete,
audit.StagePanic,
},
},
},
} {
t.Run(test.desc, func(t *testing.T) {
d := ConvertDynamicPolicyToInternal(test.dynamic)
require.ElementsMatch(t, test.internal.OmitStages, d.OmitStages)
require.Equal(t, test.internal.Rules, d.Rules)
})
}
}

View File

@ -0,0 +1,53 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package policy
import (
"fmt"
"k8s.io/apiserver/pkg/apis/audit"
)
// EnforcePolicy drops any part of the event that doesn't conform to a policy level
// or omitStages and sets the event level accordingly
func EnforcePolicy(event *audit.Event, level audit.Level, omitStages []audit.Stage) (*audit.Event, error) {
for _, stage := range omitStages {
if event.Stage == stage {
return nil, nil
}
}
return enforceLevel(event, level)
}
func enforceLevel(event *audit.Event, level audit.Level) (*audit.Event, error) {
switch level {
case audit.LevelMetadata:
event.Level = audit.LevelMetadata
event.ResponseObject = nil
event.RequestObject = nil
case audit.LevelRequest:
event.Level = audit.LevelRequest
event.ResponseObject = nil
case audit.LevelRequestResponse:
event.Level = audit.LevelRequestResponse
case audit.LevelNone:
return nil, nil
default:
return nil, fmt.Errorf("level unknown: %s", level)
}
return event, nil
}

View File

@ -0,0 +1,146 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package policy
import (
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/apitesting/fuzzer"
"k8s.io/apimachinery/pkg/runtime"
runtimeserializer "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/apis/audit"
auditfuzz "k8s.io/apiserver/pkg/apis/audit/fuzzer"
)
func TestEnforcePolicy(t *testing.T) {
scheme := runtime.NewScheme()
audit.SchemeBuilder.AddToScheme(scheme)
codecs := runtimeserializer.NewCodecFactory(scheme)
rs := rand.NewSource(time.Now().UnixNano())
objectFuzzer := fuzzer.FuzzerFor(auditfuzz.Funcs, rs, codecs)
for _, tc := range []struct {
name string
level audit.Level
omitStages []audit.Stage
}{
{
name: "level metadata",
level: audit.LevelMetadata,
},
{
name: "level request",
level: audit.LevelRequest,
},
{
name: "level requestresponse",
level: audit.LevelRequestResponse,
},
{
name: "level none",
level: audit.LevelNone,
},
{
name: "level unknown",
level: audit.Level("unknown"),
},
{
name: "stage valid",
level: audit.LevelRequest,
omitStages: []audit.Stage{audit.StageRequestReceived},
},
{
name: "stage unknown",
level: audit.LevelRequest,
omitStages: []audit.Stage{"unknown"},
},
} {
t.Run(tc.name, func(t *testing.T) {
events := make([]audit.Event, 20)
omitSet := sets.NewString(ConvertStagesToStrings(tc.omitStages)...)
for i := range events {
e := &events[i]
objectFuzzer.Fuzz(e)
ev, err := EnforcePolicy(e, tc.level, tc.omitStages)
if omitSet.Has(string(e.Stage)) {
require.Nil(t, err)
require.Nil(t, ev)
return
}
switch tc.level {
case audit.LevelNone:
require.Nil(t, ev)
case audit.LevelMetadata:
expected := &audit.Event{
TypeMeta: e.TypeMeta,
Level: tc.level,
AuditID: e.AuditID,
Stage: e.Stage,
RequestURI: e.RequestURI,
Verb: e.Verb,
User: e.User,
ImpersonatedUser: e.ImpersonatedUser,
SourceIPs: e.SourceIPs,
UserAgent: e.UserAgent,
ObjectRef: e.ObjectRef,
ResponseStatus: e.ResponseStatus,
RequestReceivedTimestamp: e.RequestReceivedTimestamp,
StageTimestamp: e.StageTimestamp,
Annotations: e.Annotations,
RequestObject: nil,
ResponseObject: nil,
}
require.Equal(t, expected, ev)
case audit.LevelRequest:
expected := &audit.Event{
TypeMeta: e.TypeMeta,
Level: tc.level,
AuditID: e.AuditID,
Stage: e.Stage,
RequestURI: e.RequestURI,
Verb: e.Verb,
User: e.User,
ImpersonatedUser: e.ImpersonatedUser,
SourceIPs: e.SourceIPs,
UserAgent: e.UserAgent,
ObjectRef: e.ObjectRef,
ResponseStatus: e.ResponseStatus,
RequestReceivedTimestamp: e.RequestReceivedTimestamp,
StageTimestamp: e.StageTimestamp,
Annotations: e.Annotations,
RequestObject: e.RequestObject,
ResponseObject: nil,
}
require.Equal(t, expected, ev)
case audit.LevelRequestResponse:
expected := e.DeepCopy()
expected.Level = tc.level
require.Equal(t, expected, ev)
default:
require.NotNil(t, err)
return
}
require.Nil(t, err)
}
})
}
}

View File

@ -0,0 +1,68 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package policy
import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/apis/audit"
)
// AllStages returns all possible stages
func AllStages() sets.String {
return sets.NewString(
audit.StageRequestReceived,
audit.StageResponseStarted,
audit.StageResponseComplete,
audit.StagePanic,
)
}
// AllLevels returns all possible levels
func AllLevels() sets.String {
return sets.NewString(
string(audit.LevelNone),
string(audit.LevelMetadata),
string(audit.LevelRequest),
string(audit.LevelRequestResponse),
)
}
// InvertStages subtracts the given array of stages from all stages
func InvertStages(stages []audit.Stage) []audit.Stage {
s := ConvertStagesToStrings(stages)
a := AllStages()
a.Delete(s...)
return ConvertStringSetToStages(a)
}
// ConvertStagesToStrings converts an array of stages to a string array
func ConvertStagesToStrings(stages []audit.Stage) []string {
s := make([]string, len(stages))
for i, stage := range stages {
s[i] = string(stage)
}
return s
}
// ConvertStringSetToStages converts a string set to an array of stages
func ConvertStringSetToStages(set sets.String) []audit.Stage {
stages := make([]audit.Stage, len(set))
for i, stage := range set.List() {
stages[i] = audit.Stage(stage)
}
return stages
}

View File

@ -0,0 +1,81 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package policy
import (
"testing"
"github.com/stretchr/testify/require"
"k8s.io/apiserver/pkg/apis/audit"
)
func TestInvertStages(t *testing.T) {
for _, test := range []struct {
desc string
stages []audit.Stage
expectedStages []audit.Stage
}{
{
desc: "should remove one",
stages: []audit.Stage{
audit.StageResponseStarted,
},
expectedStages: []audit.Stage{
audit.StageRequestReceived,
audit.StageResponseComplete,
audit.StagePanic,
},
},
{
desc: "should remove both",
stages: []audit.Stage{
audit.StageResponseStarted,
audit.StageRequestReceived,
},
expectedStages: []audit.Stage{
audit.StageResponseComplete,
audit.StagePanic,
},
},
{
desc: "should remove none",
stages: []audit.Stage{},
expectedStages: []audit.Stage{
audit.StageResponseComplete,
audit.StageResponseStarted,
audit.StageRequestReceived,
audit.StagePanic,
},
},
{
desc: "should remove all",
stages: []audit.Stage{
audit.StageResponseComplete,
audit.StageResponseStarted,
audit.StageRequestReceived,
audit.StagePanic,
},
expectedStages: []audit.Stage{},
},
} {
t.Run(test.desc, func(t *testing.T) {
e := InvertStages(test.stages)
require.ElementsMatch(t, e, test.expectedStages)
})
}
}

View File

@ -0,0 +1,43 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"k8s.io/api/auditregistration/v1alpha1"
"k8s.io/apiserver/pkg/util/webhook"
)
// HookClientConfigForSink constructs a webhook.ClientConfig using a v1alpha1.AuditSink API object.
// webhook.ClientConfig is used to create a HookClient and the purpose of the config struct is to
// share that with other packages that need to create a HookClient.
func HookClientConfigForSink(a *v1alpha1.AuditSink) webhook.ClientConfig {
c := a.Spec.Webhook.ClientConfig
ret := webhook.ClientConfig{Name: a.Name, CABundle: c.CABundle}
if c.URL != nil {
ret.URL = *c.URL
}
if c.Service != nil {
ret.Service = &webhook.ClientConfigService{
Name: c.Service.Name,
Namespace: c.Service.Namespace,
}
if c.Service.Path != nil {
ret.Service.Path = *c.Service.Path
}
}
return ret
}

View File

@ -0,0 +1,88 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
"github.com/stretchr/testify/require"
auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/util/webhook"
)
func TestHookClientConfigForSink(t *testing.T) {
testURL := "http://localhost"
path := "/path"
for _, tc := range []struct {
desc string
sink *auditregv1alpha1.AuditSink
clientConfig webhook.ClientConfig
}{
{
desc: "build full",
sink: &auditregv1alpha1.AuditSink{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: auditregv1alpha1.AuditSinkSpec{
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &testURL,
Service: &auditregv1alpha1.ServiceReference{
Name: "test",
Path: &path,
Namespace: "test",
},
},
},
},
},
clientConfig: webhook.ClientConfig{
Name: "test",
URL: testURL,
Service: &webhook.ClientConfigService{
Name: "test",
Namespace: "test",
Path: path,
},
},
},
{
desc: "build empty client config",
sink: &auditregv1alpha1.AuditSink{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: auditregv1alpha1.AuditSinkSpec{
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{},
},
},
},
clientConfig: webhook.ClientConfig{
Name: "test",
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
ret := HookClientConfigForSink(tc.sink)
require.Equal(t, tc.clientConfig, ret)
})
}
}

View File

@ -0,0 +1,46 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"time"
bufferedplugin "k8s.io/apiserver/plugin/pkg/audit/buffered"
)
const (
// Default configuration values for ModeBatch when applied to a dynamic plugin
defaultBatchBufferSize = 5000 // Buffer up to 5000 events before starting discarding.
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.
defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
)
// NewDefaultWebhookBatchConfig returns new Batch Config objects populated by default values
// for dynamic webhooks
func NewDefaultWebhookBatchConfig() *bufferedplugin.BatchConfig {
return &bufferedplugin.BatchConfig{
BufferSize: defaultBatchBufferSize,
MaxBatchSize: defaultBatchMaxSize,
MaxBatchWait: defaultBatchMaxWait,
ThrottleEnable: true,
ThrottleQPS: defaultBatchThrottleQPS,
ThrottleBurst: defaultBatchThrottleBurst,
AsyncDelegate: true,
}
}

View File

@ -0,0 +1,335 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"github.com/golang/glog"
auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditinstall "k8s.io/apiserver/pkg/apis/audit/install"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
"k8s.io/apiserver/pkg/audit"
webhook "k8s.io/apiserver/pkg/util/webhook"
bufferedplugin "k8s.io/apiserver/plugin/pkg/audit/buffered"
auditinformer "k8s.io/client-go/informers/auditregistration/v1alpha1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
)
// PluginName is the name reported in error metrics.
const PluginName = "dynamic"
// Config holds the configuration for the dynamic backend
type Config struct {
// Informer for the audit sinks
Informer auditinformer.AuditSinkInformer
// EventConfig holds the configuration for event notifications about the AuditSink API objects
EventConfig EventConfig
// BufferedConfig is the runtime buffered configuration
BufferedConfig *bufferedplugin.BatchConfig
// WebhookConfig holds the configuration for outgoing webhooks
WebhookConfig WebhookConfig
}
// WebhookConfig holds the configurations for outgoing webhooks
type WebhookConfig struct {
// AuthInfoResolverWrapper provides the webhook authentication for in-cluster endpoints
AuthInfoResolverWrapper webhook.AuthenticationInfoResolverWrapper
// ServiceResolver knows how to convert a webhook service reference into an actual location.
ServiceResolver webhook.ServiceResolver
}
// EventConfig holds the configurations for sending event notifiations about AuditSink API objects
type EventConfig struct {
// Sink for emitting events
Sink record.EventSink
// Source holds the source information about the event emitter
Source corev1.EventSource
}
// delegate represents a delegate backend that was created from an audit sink configuration
type delegate struct {
audit.Backend
configuration *auditregv1alpha1.AuditSink
stopChan chan struct{}
}
// gracefulShutdown will gracefully shutdown the delegate
func (d *delegate) gracefulShutdown() {
close(d.stopChan)
d.Shutdown()
}
// NewBackend returns a backend that dynamically updates its configuration
// based on a shared informer.
func NewBackend(c *Config) (audit.Backend, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(c.EventConfig.Sink)
scheme := runtime.NewScheme()
err := auditregv1alpha1.AddToScheme(scheme)
if err != nil {
return nil, err
}
recorder := eventBroadcaster.NewRecorder(scheme, c.EventConfig.Source)
if c.BufferedConfig == nil {
c.BufferedConfig = NewDefaultWebhookBatchConfig()
}
cm, err := webhook.NewClientManager(auditv1.SchemeGroupVersion, func(s *runtime.Scheme) error {
auditinstall.Install(s)
return nil
})
if err != nil {
return nil, err
}
// TODO: need a way of injecting authentication before beta
authInfoResolver, err := webhook.NewDefaultAuthenticationInfoResolver("")
if err != nil {
return nil, err
}
cm.SetAuthenticationInfoResolver(authInfoResolver)
cm.SetServiceResolver(c.WebhookConfig.ServiceResolver)
cm.SetAuthenticationInfoResolverWrapper(c.WebhookConfig.AuthInfoResolverWrapper)
manager := &backend{
config: c,
delegates: atomic.Value{},
delegateUpdateMutex: sync.Mutex{},
webhookClientManager: cm,
recorder: recorder,
}
manager.delegates.Store(syncedDelegates{})
c.Informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
manager.addSink(obj.(*auditregv1alpha1.AuditSink))
},
UpdateFunc: func(oldObj, newObj interface{}) {
manager.updateSink(oldObj.(*auditregv1alpha1.AuditSink), newObj.(*auditregv1alpha1.AuditSink))
},
DeleteFunc: func(obj interface{}) {
sink, ok := obj.(*auditregv1alpha1.AuditSink)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.V(2).Infof("Couldn't get object from tombstone %#v", obj)
return
}
sink, ok = tombstone.Obj.(*auditregv1alpha1.AuditSink)
if !ok {
glog.V(2).Infof("Tombstone contained object that is not an AuditSink: %#v", obj)
return
}
}
manager.deleteSink(sink)
},
})
return manager, nil
}
type backend struct {
// delegateUpdateMutex holds an update lock on the delegates
delegateUpdateMutex sync.Mutex
config *Config
delegates atomic.Value
webhookClientManager webhook.ClientManager
recorder record.EventRecorder
}
type syncedDelegates map[types.UID]*delegate
// Names returns the names of the delegate configurations
func (s syncedDelegates) Names() []string {
names := []string{}
for _, delegate := range s {
names = append(names, delegate.configuration.Name)
}
return names
}
// ProcessEvents proccesses the given events per current delegate map
func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
for _, d := range b.GetDelegates() {
d.ProcessEvents(events...)
}
}
// Run starts a goroutine that propagates the shutdown signal,
// individual delegates are ran as they are created.
func (b *backend) Run(stopCh <-chan struct{}) error {
go func() {
<-stopCh
b.stopAllDelegates()
}()
return nil
}
// stopAllDelegates closes the stopChan for every delegate to enable
// goroutines to terminate gracefully. This is a helper method to propagate
// the primary stopChan to the current delegate map.
func (b *backend) stopAllDelegates() {
b.delegateUpdateMutex.Lock()
for _, d := range b.GetDelegates() {
close(d.stopChan)
}
}
// Shutdown calls the shutdown method on all delegates. The stopChan should
// be closed before this is called.
func (b *backend) Shutdown() {
for _, d := range b.GetDelegates() {
d.Shutdown()
}
}
// GetDelegates retrieves current delegates in a safe manner
func (b *backend) GetDelegates() syncedDelegates {
return b.delegates.Load().(syncedDelegates)
}
// copyDelegates returns a copied delegate map
func (b *backend) copyDelegates() syncedDelegates {
c := make(syncedDelegates)
for u, s := range b.GetDelegates() {
c[u] = s
}
return c
}
// setDelegates sets the current delegates in a safe manner
func (b *backend) setDelegates(delegates syncedDelegates) {
b.delegates.Store(delegates)
}
// addSink is called by the shared informer when a sink is added
func (b *backend) addSink(sink *auditregv1alpha1.AuditSink) {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
delegates := b.copyDelegates()
if _, ok := delegates[sink.UID]; ok {
glog.Errorf("Audit sink %q uid: %s already exists, could not readd", sink.Name, sink.UID)
return
}
d, err := b.createAndStartDelegate(sink)
if err != nil {
msg := fmt.Sprintf("Could not add audit sink %q: %v", sink.Name, err)
glog.Error(msg)
b.recorder.Event(sink, corev1.EventTypeWarning, "CreateFailed", msg)
return
}
delegates[sink.UID] = d
b.setDelegates(delegates)
glog.V(2).Infof("Added audit sink: %s", sink.Name)
glog.V(2).Infof("Current audit sinks: %v", delegates.Names())
}
// updateSink is called by the shared informer when a sink is updated.
// The new sink is only rebuilt on spec changes. The new sink must not have
// the same uid as the previous. The new sink will be started before the old
// one is shutdown so no events will be lost
func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
delegates := b.copyDelegates()
oldDelegate, ok := delegates[oldSink.UID]
if !ok {
glog.Errorf("Could not update audit sink %q uid: %s, old sink does not exist",
oldSink.Name, oldSink.UID)
return
}
// check if spec has changed
eq := reflect.DeepEqual(oldSink.Spec, newSink.Spec)
if eq {
delete(delegates, oldSink.UID)
delegates[newSink.UID] = oldDelegate
b.setDelegates(delegates)
} else {
d, err := b.createAndStartDelegate(newSink)
if err != nil {
msg := fmt.Sprintf("Could not update audit sink %q: %v", oldSink.Name, err)
glog.Error(msg)
b.recorder.Event(newSink, corev1.EventTypeWarning, "UpdateFailed", msg)
return
}
delete(delegates, oldSink.UID)
delegates[newSink.UID] = d
b.setDelegates(delegates)
oldDelegate.gracefulShutdown()
}
glog.V(2).Infof("Updated audit sink: %s", newSink.Name)
glog.V(2).Infof("Current audit sinks: %v", delegates.Names())
}
// deleteSink is called by the shared informer when a sink is deleted
func (b *backend) deleteSink(sink *auditregv1alpha1.AuditSink) {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
delegates := b.copyDelegates()
delegate, ok := delegates[sink.UID]
if !ok {
glog.Errorf("Could not delete audit sink %q uid: %s, does not exist", sink.Name, sink.UID)
return
}
delete(delegates, sink.UID)
b.setDelegates(delegates)
delegate.gracefulShutdown()
glog.V(2).Infof("Deleted audit sink: %s", sink.Name)
glog.V(2).Infof("Current audit sinks: %v", delegates.Names())
}
// createAndStartDelegate will build a delegate from an audit sink configuration and run it
func (b *backend) createAndStartDelegate(sink *auditregv1alpha1.AuditSink) (*delegate, error) {
f := factory{
config: b.config,
webhookClientManager: b.webhookClientManager,
sink: sink,
}
delegate, err := f.BuildDelegate()
if err != nil {
return nil, err
}
err = delegate.Run(delegate.stopChan)
if err != nil {
return nil, err
}
return delegate, nil
}
// String returns a string representation of the backend
func (b *backend) String() string {
var delegateStrings []string
for _, delegate := range b.GetDelegates() {
delegateStrings = append(delegateStrings, fmt.Sprintf("%s", delegate))
}
return fmt.Sprintf("%s[%s]", PluginName, strings.Join(delegateStrings, ","))
}

View File

@ -0,0 +1,275 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"reflect"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
"k8s.io/apiserver/pkg/audit"
webhook "k8s.io/apiserver/pkg/util/webhook"
informers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
)
func TestDynamic(t *testing.T) {
eventList1 := &atomic.Value{}
eventList1.Store(auditinternal.EventList{})
eventList2 := &atomic.Value{}
eventList2.Store(auditinternal.EventList{})
// start test servers
server1 := httptest.NewServer(buildTestHandler(t, eventList1))
defer server1.Close()
server2 := httptest.NewServer(buildTestHandler(t, eventList2))
defer server2.Close()
testPolicy := auditregv1alpha1.Policy{
Level: auditregv1alpha1.LevelMetadata,
Stages: []auditregv1alpha1.Stage{
auditregv1alpha1.StageResponseStarted,
},
}
testEvent := auditinternal.Event{
Level: auditinternal.LevelMetadata,
Stage: auditinternal.StageResponseStarted,
Verb: "get",
RequestURI: "/test/path",
}
testConfig1 := &auditregv1alpha1.AuditSink{
ObjectMeta: metav1.ObjectMeta{
Name: "test1",
UID: types.UID("test1"),
},
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: testPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &server1.URL,
},
},
},
}
testConfig2 := &auditregv1alpha1.AuditSink{
ObjectMeta: metav1.ObjectMeta{
Name: "test2",
UID: types.UID("test2"),
},
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: testPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &server2.URL,
},
},
},
}
config, stopChan := defaultTestConfig()
config.BufferedConfig.MaxBatchWait = 10 * time.Millisecond
b, err := NewBackend(config)
require.NoError(t, err)
d := b.(*backend)
err = b.Run(stopChan)
require.NoError(t, err)
t.Run("find none", func(t *testing.T) {
require.Len(t, d.GetDelegates(), 0)
})
t.Run("find one", func(t *testing.T) {
d.addSink(testConfig1)
delegates := d.GetDelegates()
require.Len(t, delegates, 1)
require.Contains(t, delegates, types.UID("test1"))
require.Equal(t, testConfig1, delegates["test1"].configuration)
// send event and check that it arrives
b.ProcessEvents(&testEvent)
err := checkForEvent(eventList1, testEvent)
require.NoError(t, err, "unable to find events sent to sink")
})
t.Run("find two", func(t *testing.T) {
eventList1.Store(auditinternal.EventList{})
d.addSink(testConfig2)
delegates := d.GetDelegates()
require.Len(t, delegates, 2)
require.Contains(t, delegates, types.UID("test1"))
require.Contains(t, delegates, types.UID("test2"))
require.Equal(t, testConfig1, delegates["test1"].configuration)
require.Equal(t, testConfig2, delegates["test2"].configuration)
// send event to both delegates and check that it arrives in both places
b.ProcessEvents(&testEvent)
err := checkForEvent(eventList1, testEvent)
require.NoError(t, err, "unable to find events sent to sink 1")
err = checkForEvent(eventList2, testEvent)
require.NoError(t, err, "unable to find events sent to sink 2")
})
t.Run("delete one", func(t *testing.T) {
eventList2.Store(auditinternal.EventList{})
d.deleteSink(testConfig1)
delegates := d.GetDelegates()
require.Len(t, delegates, 1)
require.Contains(t, delegates, types.UID("test2"))
require.Equal(t, testConfig2, delegates["test2"].configuration)
// send event and check that it arrives to remaining sink
b.ProcessEvents(&testEvent)
err := checkForEvent(eventList2, testEvent)
require.NoError(t, err, "unable to find events sent to sink")
})
t.Run("update one", func(t *testing.T) {
eventList1.Store(auditinternal.EventList{})
oldConfig := *testConfig2
testConfig2.Spec.Webhook.ClientConfig.URL = &server1.URL
testConfig2.UID = types.UID("test2.1")
d.updateSink(&oldConfig, testConfig2)
delegates := d.GetDelegates()
require.Len(t, delegates, 1)
require.Contains(t, delegates, types.UID("test2.1"))
require.Equal(t, testConfig2, delegates["test2.1"].configuration)
// send event and check that it arrives to updated sink
b.ProcessEvents(&testEvent)
err := checkForEvent(eventList1, testEvent)
require.NoError(t, err, "unable to find events sent to sink")
})
t.Run("update meta only", func(t *testing.T) {
eventList1.Store(auditinternal.EventList{})
oldConfig := *testConfig2
testConfig2.UID = types.UID("test2.2")
testConfig2.Labels = map[string]string{"my": "label"}
d.updateSink(&oldConfig, testConfig2)
delegates := d.GetDelegates()
require.Len(t, delegates, 1)
require.Contains(t, delegates, types.UID("test2.2"))
// send event and check that it arrives to same sink
b.ProcessEvents(&testEvent)
err := checkForEvent(eventList1, testEvent)
require.NoError(t, err, "unable to find events sent to sink")
})
t.Run("shutdown", func(t *testing.T) {
// if the stop signal is not propagated correctly the buffers will not
// close down gracefully, and the shutdown method will hang causing
// the test will timeout.
timeoutChan := make(chan struct{})
successChan := make(chan struct{})
go func() {
time.Sleep(1 * time.Second)
timeoutChan <- struct{}{}
}()
go func() {
close(stopChan)
d.Shutdown()
successChan <- struct{}{}
}()
for {
select {
case <-timeoutChan:
t.Error("shutdown timed out")
return
case <-successChan:
return
}
}
})
}
// checkForEvent will poll to check for an audit event in an atomic event list
func checkForEvent(a *atomic.Value, evSent auditinternal.Event) error {
return wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) {
el := a.Load().(auditinternal.EventList)
if len(el.Items) != 1 {
return false, nil
}
evFound := el.Items[0]
eq := reflect.DeepEqual(evSent, evFound)
if !eq {
return false, fmt.Errorf("event mismatch -- sent: %+v found: %+v", evSent, evFound)
}
return true, nil
})
}
// buildTestHandler returns a handler that will update the atomic value passed in
// with the event list it receives
func buildTestHandler(t *testing.T, a *atomic.Value) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Fatalf("could not read request body: %v", err)
}
el := auditinternal.EventList{}
decoder := audit.Codecs.UniversalDecoder(auditv1.SchemeGroupVersion)
if err := runtime.DecodeInto(decoder, body, &el); err != nil {
t.Fatalf("failed decoding buf: %b, apiVersion: %s", body, auditv1.SchemeGroupVersion)
}
defer r.Body.Close()
a.Store(el)
w.WriteHeader(200)
})
}
// defaultTestConfig returns a Config object suitable for testing along with its
// associated stopChan
func defaultTestConfig() (*Config, chan struct{}) {
authWrapper := webhook.AuthenticationInfoResolverWrapper(
func(a webhook.AuthenticationInfoResolver) webhook.AuthenticationInfoResolver { return a },
)
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
stop := make(chan struct{})
eventSink := &v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
informer := informerFactory.Auditregistration().V1alpha1().AuditSinks()
return &Config{
Informer: informer,
EventConfig: EventConfig{Sink: eventSink},
BufferedConfig: NewDefaultWebhookBatchConfig(),
WebhookConfig: WebhookConfig{
AuthInfoResolverWrapper: authWrapper,
ServiceResolver: webhook.NewDefaultServiceResolver(),
},
}, stop
}

View File

@ -0,0 +1,90 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package enforced
import (
"fmt"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
ev "k8s.io/apiserver/pkg/audit/event"
"k8s.io/apiserver/pkg/audit/policy"
)
// PluginName is the name reported in error metrics.
const PluginName = "enforced"
// Backend filters audit events according to the policy
// trimming them as necessary to match the level
type Backend struct {
policyChecker policy.Checker
delegateBackend audit.Backend
}
// NewBackend returns an enforced audit backend that wraps delegate backend.
// Enforced backend automatically runs and shuts down the delegate backend.
func NewBackend(delegate audit.Backend, p policy.Checker) audit.Backend {
return &Backend{
policyChecker: p,
delegateBackend: delegate,
}
}
// Run the delegate backend
func (b Backend) Run(stopCh <-chan struct{}) error {
return b.delegateBackend.Run(stopCh)
}
// Shutdown the delegate backend
func (b Backend) Shutdown() {
b.delegateBackend.Shutdown()
}
// ProcessEvents enforces policy on a shallow copy of the given event
// dropping any sections that don't conform
func (b Backend) ProcessEvents(events ...*auditinternal.Event) {
for _, event := range events {
if event == nil {
continue
}
attr, err := ev.NewAttributes(event)
if err != nil {
audit.HandlePluginError(PluginName, err, event)
continue
}
level, stages := b.policyChecker.LevelAndStages(attr)
if level == auditinternal.LevelNone {
continue
}
// make shallow copy before modifying to satisfy interface definition
ev := *event
e, err := policy.EnforcePolicy(&ev, level, stages)
if err != nil {
audit.HandlePluginError(PluginName, err, event)
continue
}
if e == nil {
continue
}
b.delegateBackend.ProcessEvents(e)
}
}
// String returns a string representation of the backend
func (b Backend) String() string {
return fmt.Sprintf("%s<%s>", PluginName, b.delegateBackend)
}

View File

@ -0,0 +1,117 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package enforced
import (
"testing"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit/policy"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer"
fakeplugin "k8s.io/apiserver/plugin/pkg/audit/fake"
)
func TestEnforced(t *testing.T) {
testCases := []struct {
name string
event *auditinternal.Event
policy auditinternal.Policy
attribs authorizer.Attributes
expected []*auditinternal.Event
}{
{
name: "enforce level",
event: &auditinternal.Event{
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseComplete,
RequestURI: "/apis/extensions/v1beta1",
RequestObject: &runtime.Unknown{Raw: []byte(`test`)},
ResponseObject: &runtime.Unknown{Raw: []byte(`test`)},
},
policy: auditinternal.Policy{
Rules: []auditinternal.PolicyRule{
{
Level: auditinternal.LevelMetadata,
},
},
},
expected: []*auditinternal.Event{
{
Level: auditinternal.LevelMetadata,
Stage: auditinternal.StageResponseComplete,
RequestURI: "/apis/extensions/v1beta1",
},
},
},
{
name: "enforce policy rule",
event: &auditinternal.Event{
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseComplete,
RequestURI: "/apis/extensions/v1beta1",
User: auditinternal.UserInfo{
Username: user.Anonymous,
},
RequestObject: &runtime.Unknown{Raw: []byte(`test`)},
ResponseObject: &runtime.Unknown{Raw: []byte(`test`)},
},
policy: auditinternal.Policy{
Rules: []auditinternal.PolicyRule{
{
Level: auditinternal.LevelNone,
Users: []string{user.Anonymous},
},
{
Level: auditinternal.LevelMetadata,
},
},
},
expected: []*auditinternal.Event{},
},
{
name: "nil event",
event: nil,
policy: auditinternal.Policy{
Rules: []auditinternal.PolicyRule{
{
Level: auditinternal.LevelMetadata,
},
},
},
expected: []*auditinternal.Event{},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ev := []*auditinternal.Event{}
fakeBackend := fakeplugin.Backend{
OnRequest: func(events []*auditinternal.Event) {
ev = events
},
}
b := NewBackend(&fakeBackend, policy.NewChecker(&tc.policy))
defer b.Shutdown()
b.ProcessEvents(tc.event)
require.Equal(t, tc.expected, ev)
})
}
}

View File

@ -0,0 +1,91 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"fmt"
"time"
auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/audit/policy"
auditutil "k8s.io/apiserver/pkg/audit/util"
"k8s.io/apiserver/pkg/util/webhook"
bufferedplugin "k8s.io/apiserver/plugin/pkg/audit/buffered"
enforcedplugin "k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced"
webhookplugin "k8s.io/apiserver/plugin/pkg/audit/webhook"
)
// TODO: find a common place for all the default retry backoffs
const retryBackoff = 500 * time.Millisecond
// factory builds a delegate from an AuditSink
type factory struct {
config *Config
webhookClientManager webhook.ClientManager
sink *auditregv1alpha1.AuditSink
}
// BuildDelegate creates a delegate from the AuditSink object
func (f *factory) BuildDelegate() (*delegate, error) {
backend, err := f.buildWebhookBackend()
if err != nil {
return nil, err
}
backend = f.applyEnforcedOpts(backend)
backend = f.applyBufferedOpts(backend)
ch := make(chan struct{})
return &delegate{
Backend: backend,
configuration: f.sink,
stopChan: ch,
}, nil
}
func (f *factory) buildWebhookBackend() (audit.Backend, error) {
hookClient := auditutil.HookClientConfigForSink(f.sink)
client, err := f.webhookClientManager.HookClient(hookClient)
if err != nil {
return nil, fmt.Errorf("could not create webhook client: %v", err)
}
backend := webhookplugin.NewDynamicBackend(client, retryBackoff)
return backend, nil
}
func (f *factory) applyEnforcedOpts(delegate audit.Backend) audit.Backend {
pol := policy.ConvertDynamicPolicyToInternal(&f.sink.Spec.Policy)
checker := policy.NewChecker(pol)
eb := enforcedplugin.NewBackend(delegate, checker)
return eb
}
func (f *factory) applyBufferedOpts(delegate audit.Backend) audit.Backend {
bc := f.config.BufferedConfig
tc := f.sink.Spec.Webhook.Throttle
if tc != nil {
bc.ThrottleEnable = true
if tc.Burst != nil {
bc.ThrottleBurst = int(*tc.Burst)
}
if tc.QPS != nil {
bc.ThrottleQPS = float32(*tc.QPS)
}
} else {
bc.ThrottleEnable = false
}
return bufferedplugin.NewBackend(delegate, *bc)
}

View File

@ -0,0 +1,146 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"testing"
"github.com/stretchr/testify/require"
auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
utilpointer "k8s.io/utils/pointer"
)
func TestToDelegate(t *testing.T) {
config, _ := defaultTestConfig()
defaultPolicy := auditregv1alpha1.Policy{
Level: auditregv1alpha1.LevelMetadata,
}
u := "http://localhost:4444"
for _, tc := range []struct {
name string
auditConfig *auditregv1alpha1.AuditSink
throttleConfig *auditregv1alpha1.WebhookThrottleConfig
expectedBackend string
}{
{
name: "build full",
auditConfig: &auditregv1alpha1.AuditSink{
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: defaultPolicy,
Webhook: auditregv1alpha1.Webhook{
Throttle: &auditregv1alpha1.WebhookThrottleConfig{
QPS: utilpointer.Int64Ptr(10),
Burst: utilpointer.Int64Ptr(5),
},
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &u,
},
},
},
},
expectedBackend: "buffered<enforced<dynamic_webhook>>",
},
{
name: "build no throttle",
auditConfig: &auditregv1alpha1.AuditSink{
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: defaultPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &u,
},
},
},
},
expectedBackend: "buffered<enforced<dynamic_webhook>>",
},
} {
t.Run(tc.name, func(t *testing.T) {
b, err := NewBackend(config)
require.NoError(t, err)
c := factory{
config: b.(*backend).config,
webhookClientManager: b.(*backend).webhookClientManager,
sink: tc.auditConfig,
}
d, err := c.BuildDelegate()
require.NoError(t, err)
require.Equal(t, tc.expectedBackend, d.String())
})
}
}
func TestBuildWebhookBackend(t *testing.T) {
defaultPolicy := auditregv1alpha1.Policy{
Level: auditregv1alpha1.LevelMetadata,
}
config, _ := defaultTestConfig()
b, err := NewBackend(config)
require.NoError(t, err)
d := b.(*backend)
u := "http://localhost:4444"
for _, tc := range []struct {
name string
auditConfig *auditregv1alpha1.AuditSink
shouldErr bool
expectedBackend string
}{
{
name: "build full",
auditConfig: &auditregv1alpha1.AuditSink{
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: defaultPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &u,
},
},
},
},
expectedBackend: "dynamic_webhook",
shouldErr: false,
},
{
name: "fail missing url",
auditConfig: &auditregv1alpha1.AuditSink{
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: defaultPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{},
},
},
},
shouldErr: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
c := &factory{
config: config,
webhookClientManager: d.webhookClientManager,
sink: tc.auditConfig,
}
ab, err := c.buildWebhookBackend()
if tc.shouldErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.expectedBackend, ab.String())
})
}
}

View File

@ -18,6 +18,7 @@ limitations under the License.
package webhook
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -47,7 +48,20 @@ func loadWebhook(configFile string, groupVersion schema.GroupVersion, initialBac
}
type backend struct {
w *webhook.GenericWebhook
w *webhook.GenericWebhook
name string
}
// NewDynamicBackend returns an audit backend configured from a REST client that
// sends events over HTTP to an external service.
func NewDynamicBackend(rc *rest.RESTClient, initialBackoff time.Duration) audit.Backend {
return &backend{
w: &webhook.GenericWebhook{
RestClient: rc,
InitialBackoff: initialBackoff,
},
name: fmt.Sprintf("dynamic_%s", PluginName),
}
}
// NewBackend returns an audit backend that sends events over HTTP to an external service.
@ -56,7 +70,7 @@ func NewBackend(kubeConfigFile string, groupVersion schema.GroupVersion, initial
if err != nil {
return nil, err
}
return &backend{w}, nil
return &backend{w: w, name: PluginName}, nil
}
func (b *backend) Run(stopCh <-chan struct{}) error {
@ -69,7 +83,7 @@ func (b *backend) Shutdown() {
func (b *backend) ProcessEvents(ev ...*auditinternal.Event) {
if err := b.processEvents(ev...); err != nil {
audit.HandlePluginError(PluginName, err, ev...)
audit.HandlePluginError(b.String(), err, ev...)
}
}
@ -84,5 +98,5 @@ func (b *backend) processEvents(ev ...*auditinternal.Event) error {
}
func (b *backend) String() string {
return PluginName
return b.name
}