Merge pull request #65763 from x13n/audit-logging

Add option to k8s apiserver to reject incoming requests upon audit failure
pull/58/head
k8s-ci-robot 2018-11-17 04:39:56 -08:00 committed by GitHub
commit 46ebebcc4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 101 additions and 31 deletions

View File

@ -52,12 +52,22 @@ var (
},
[]string{"level"},
)
ApiserverAuditDroppedCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "requests_rejected_total",
Help: "Counter of apiserver requests rejected due to an error " +
"in audit logging backend.",
},
)
)
func init() {
prometheus.MustRegister(eventCounter)
prometheus.MustRegister(errorCounter)
prometheus.MustRegister(levelCounter)
prometheus.MustRegister(ApiserverAuditDroppedCounter)
}
// ObserveEvent updates the relevant prometheus metrics for the generated audit event.

View File

@ -25,7 +25,8 @@ type Sink interface {
// Errors might be logged by the sink itself. If an error should be fatal, leading to an internal
// error, ProcessEvents is supposed to panic. The event must not be mutated and is reused by the caller
// after the call returns, i.e. the sink has to make a deepcopy to keep a copy around if necessary.
ProcessEvents(events ...*auditinternal.Event)
// Returns true on success, may return false on error.
ProcessEvents(events ...*auditinternal.Event) bool
}
type Backend interface {

View File

@ -37,10 +37,12 @@ type union struct {
backends []Backend
}
func (u union) ProcessEvents(events ...*auditinternal.Event) {
func (u union) ProcessEvents(events ...*auditinternal.Event) bool {
success := true
for _, backend := range u.backends {
backend.ProcessEvents(events...)
success = backend.ProcessEvents(events...) && success
}
return success
}
func (u union) Run(stopCh <-chan struct{}) error {

View File

@ -28,8 +28,9 @@ type fakeBackend struct {
events []*auditinternal.Event
}
func (f *fakeBackend) ProcessEvents(events ...*auditinternal.Event) {
func (f *fakeBackend) ProcessEvents(events ...*auditinternal.Event) bool {
f.events = append(f.events, events...)
return true
}
func (f *fakeBackend) Run(stopCh <-chan struct{}) error {

View File

@ -39,13 +39,14 @@ type fakeAuditSink struct {
events []*auditinternal.Event
}
func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) {
func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) bool {
s.lock.Lock()
defer s.lock.Unlock()
for _, ev := range evs {
e := ev.DeepCopy()
s.events = append(s.events, e)
}
return true
}
func (s *fakeAuditSink) Events() []*auditinternal.Event {

View File

@ -56,7 +56,11 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
}
ev.Stage = auditinternal.StageRequestReceived
processAuditEvent(sink, ev, omitStages)
if processed := processAuditEvent(sink, ev, omitStages); !processed {
audit.ApiserverAuditDroppedCounter.Inc()
responsewriters.InternalError(w, req, errors.New("failed to store audit event"))
return
}
// intercept the status code
var longRunningSink audit.Sink
@ -137,10 +141,10 @@ func createAuditEventAndAttachToContext(req *http.Request, policy policy.Checker
return req, ev, omitStages, nil
}
func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) {
func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool {
for _, stage := range omitStages {
if ev.Stage == stage {
return
return true
}
}
@ -150,7 +154,7 @@ func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []au
ev.StageTimestamp = metav1.NewMicroTime(time.Now())
}
audit.ObserveEvent()
sink.ProcessEvents(ev)
return sink.ProcessEvents(ev)
}
func decorateResponseWriter(responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink, omitStages []auditinternal.Stage) http.ResponseWriter {

View File

@ -42,13 +42,14 @@ type fakeAuditSink struct {
events []*auditinternal.Event
}
func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) {
func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) bool {
s.lock.Lock()
defer s.lock.Unlock()
for _, e := range evs {
event := e.DeepCopy()
s.events = append(s.events, event)
}
return true
}
func (s *fakeAuditSink) Events() []*auditinternal.Event {

View File

@ -42,6 +42,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver/v1alpha1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library",

View File

@ -29,6 +29,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1"
auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1"
@ -89,12 +90,17 @@ const (
// a set of events. This causes requests to the API server to wait for the
// flush before sending a response.
ModeBlocking = "blocking"
// ModeBlockingStrict is the same as ModeBlocking, except when there is
// a failure during audit logging at RequestReceived stage, the whole
// request to apiserver will fail.
ModeBlockingStrict = "blocking-strict"
)
// AllowedModes is the modes known for audit backends.
var AllowedModes = []string{
ModeBatch,
ModeBlocking,
ModeBlockingStrict,
}
type AuditBatchOptions struct {
@ -393,10 +399,26 @@ func (o *AuditBatchOptions) AddFlags(pluginName string, fs *pflag.FlagSet) {
"moment if ThrottleQPS was not utilized before. Only used in batch mode.")
}
type ignoreErrorsBackend struct {
audit.Backend
}
func (i *ignoreErrorsBackend) ProcessEvents(ev ...*auditinternal.Event) bool {
i.Backend.ProcessEvents(ev...)
return true
}
func (i *ignoreErrorsBackend) String() string {
return fmt.Sprintf("ignoreErrors<%s>", i.Backend)
}
func (o *AuditBatchOptions) wrapBackend(delegate audit.Backend) audit.Backend {
if o.Mode == ModeBlocking {
if o.Mode == ModeBlockingStrict {
return delegate
}
if o.Mode == ModeBlocking {
return &ignoreErrorsBackend{Backend: delegate}
}
return pluginbuffered.NewBackend(delegate, o.BatchConfig)
}

View File

@ -67,7 +67,7 @@ func TestAuditValidOptions(t *testing.T) {
o.PolicyFile = policy
return o
},
expected: "log",
expected: "ignoreErrors<log>",
}, {
name: "default log no policy",
options: func() *AuditOptions {
@ -93,6 +93,16 @@ func TestAuditValidOptions(t *testing.T) {
return o
},
expected: "",
}, {
name: "strict webhook",
options: func() *AuditOptions {
o := NewAuditOptions()
o.WebhookOptions.ConfigFile = webhookConfig
o.WebhookOptions.BatchOptions.Mode = ModeBlockingStrict
o.PolicyFile = policy
return o
},
expected: "webhook",
}, {
name: "default union",
options: func() *AuditOptions {
@ -102,7 +112,7 @@ func TestAuditValidOptions(t *testing.T) {
o.PolicyFile = policy
return o
},
expected: "union[log,buffered<webhook>]",
expected: "union[ignoreErrors<log>,buffered<webhook>]",
}, {
name: "custom",
options: func() *AuditOptions {
@ -114,7 +124,7 @@ func TestAuditValidOptions(t *testing.T) {
o.PolicyFile = policy
return o
},
expected: "union[buffered<log>,webhook]",
expected: "union[buffered<log>,ignoreErrors<webhook>]",
}, {
name: "default webhook with truncating",
options: func() *AuditOptions {
@ -151,7 +161,7 @@ func TestAuditValidOptions(t *testing.T) {
o.PolicyFile = policy
return o
},
expected: "union[enforced<log>,dynamic[]]",
expected: "union[enforced<ignoreErrors<log>>,dynamic[]]",
}, {
name: "dynamic with truncating and webhook",
options: func() *AuditOptions {
@ -174,7 +184,7 @@ func TestAuditValidOptions(t *testing.T) {
o.LogOptions.Path = "/audit"
return o
},
expected: "union[enforced<log>,truncate<union[enforced<buffered<webhook>>,dynamic[]]>]",
expected: "union[enforced<ignoreErrors<log>>,truncate<union[enforced<buffered<webhook>>,dynamic[]]>]",
},
}
for _, tc := range testCases {

View File

@ -251,7 +251,7 @@ func (b *bufferedBackend) processEvents(events []*auditinternal.Event) {
}
}
func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) {
func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) bool {
// The following mechanism is in place to support the situation when audit
// events are still coming after the backend was stopped.
var sendErr error
@ -279,9 +279,10 @@ func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) {
case b.buffer <- event:
default:
sendErr = fmt.Errorf("audit buffer queue blocked")
return
return true
}
}
return true
}
func (b *bufferedBackend) String() string {

View File

@ -176,10 +176,13 @@ func (s syncedDelegates) Names() []string {
}
// ProcessEvents proccesses the given events per current delegate map
func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
func (b *backend) ProcessEvents(events ...*auditinternal.Event) bool {
for _, d := range b.GetDelegates() {
d.ProcessEvents(events...)
}
// Returning true regardless of results, since dynamic audit backends
// can never cause apiserver request to fail.
return true
}
// Run starts a goroutine that propagates the shutdown signal,

View File

@ -56,7 +56,7 @@ func (b Backend) Shutdown() {
// ProcessEvents enforces policy on a shallow copy of the given event
// dropping any sections that don't conform
func (b Backend) ProcessEvents(events ...*auditinternal.Event) {
func (b Backend) ProcessEvents(events ...*auditinternal.Event) bool {
for _, event := range events {
if event == nil {
continue
@ -82,6 +82,9 @@ func (b Backend) ProcessEvents(events ...*auditinternal.Event) {
}
b.delegateBackend.ProcessEvents(e)
}
// Returning true regardless of results, since dynamic audit backends
// can never cause apiserver request to fail.
return true
}
// String returns a string representation of the backend

View File

@ -39,10 +39,11 @@ func (b *Backend) Shutdown() {
}
// ProcessEvents calls a callback on a batch, if present.
func (b *Backend) ProcessEvents(ev ...*auditinternal.Event) {
func (b *Backend) ProcessEvents(ev ...*auditinternal.Event) bool {
if b.OnRequest != nil {
b.OnRequest(ev)
}
return true
}
func (b *Backend) String() string {

View File

@ -59,13 +59,15 @@ func NewBackend(out io.Writer, format string, groupVersion schema.GroupVersion)
}
}
func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
func (b *backend) ProcessEvents(events ...*auditinternal.Event) bool {
success := true
for _, ev := range events {
b.logEvent(ev)
success = b.logEvent(ev) && success
}
return success
}
func (b *backend) logEvent(ev *auditinternal.Event) {
func (b *backend) logEvent(ev *auditinternal.Event) bool {
line := ""
switch b.format {
case FormatLegacy:
@ -74,17 +76,19 @@ func (b *backend) logEvent(ev *auditinternal.Event) {
bs, err := runtime.Encode(audit.Codecs.LegacyCodec(b.groupVersion), ev)
if err != nil {
audit.HandlePluginError(PluginName, err, ev)
return
return false
}
line = string(bs[:])
default:
audit.HandlePluginError(PluginName, fmt.Errorf("log format %q is not in list of known formats (%s)",
b.format, strings.Join(AllowedFormats, ",")), ev)
return
return false
}
if _, err := fmt.Fprint(b.out, line); err != nil {
audit.HandlePluginError(PluginName, err, ev)
return false
}
return true
}
func (b *backend) Run(stopCh <-chan struct{}) error {

View File

@ -71,11 +71,12 @@ func NewBackend(delegateBackend audit.Backend, config Config, groupVersion schem
}
}
func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
func (b *backend) ProcessEvents(events ...*auditinternal.Event) bool {
var errors []error
var impacted []*auditinternal.Event
var batch []*auditinternal.Event
var batchSize int64
success := true
for _, event := range events {
size, err := b.calcSize(event)
// If event was correctly serialized, but the size is more than allowed
@ -97,7 +98,7 @@ func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
}
if len(batch) > 0 && batchSize+size > b.c.MaxBatchSize {
b.delegateBackend.ProcessEvents(batch...)
success = b.delegateBackend.ProcessEvents(batch...) && success
batch = []*auditinternal.Event{}
batchSize = 0
}
@ -107,12 +108,13 @@ func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
}
if len(batch) > 0 {
b.delegateBackend.ProcessEvents(batch...)
success = b.delegateBackend.ProcessEvents(batch...) && success
}
if len(impacted) > 0 {
audit.HandlePluginError(PluginName, utilerrors.NewAggregate(errors), impacted...)
}
return success
}
// truncate removed request and response objects from the audit events,

View File

@ -81,10 +81,12 @@ func (b *backend) Shutdown() {
// nothing to do here
}
func (b *backend) ProcessEvents(ev ...*auditinternal.Event) {
func (b *backend) ProcessEvents(ev ...*auditinternal.Event) bool {
if err := b.processEvents(ev...); err != nil {
audit.HandlePluginError(b.String(), err, ev...)
return false
}
return true
}
func (b *backend) processEvents(ev ...*auditinternal.Event) error {

View File

@ -108,8 +108,9 @@ func (f auditChecker) LevelAndStages(attrs authorizer.Attributes) (auditinternal
type auditSinkFunc func(events ...*auditinternal.Event)
func (f auditSinkFunc) ProcessEvents(events ...*auditinternal.Event) {
func (f auditSinkFunc) ProcessEvents(events ...*auditinternal.Event) bool {
f(events...)
return true
}
func (auditSinkFunc) Run(stopCh <-chan struct{}) error {