Merge pull request #46732 from timstclair/audit-metrics

Automatic merge from submit-queue (batch tested with PRs 46620, 46732, 46773, 46772, 46725)

Instrument advanced auditing

Add prometheus metrics for audit logging, including:

- A total count of audit events generated and sent to the output backend
- A count of audit events that failed to be audited due to an error (per backend)
- A count of request audit levels (1 per request)

For https://github.com/kubernetes/features/issues/22

- [x] TODO: Call `HandlePluginError` from the webhook backend, once https://github.com/kubernetes/kubernetes/pull/45919 merges (in this or a separate PR, depending on timing of the merge)

/cc @ihmccreery @sttts @soltysh @ericchiang
pull/6/head
Kubernetes Submit Queue 2017-06-03 11:39:40 -07:00 committed by GitHub
commit 6b76c40a62
7 changed files with 186 additions and 48 deletions

View File

@ -11,6 +11,8 @@ load(
go_library(
name = "go_default_library",
srcs = [
"format.go",
"metrics.go",
"request.go",
"scheme.go",
"types.go",
@ -20,6 +22,7 @@ go_library(
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/pborman/uuid:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",

View File

@ -0,0 +1,73 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package audit
import (
"fmt"
"strconv"
"strings"
"time"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
)
// EventString creates a 1-line text representation of an audit event, using a subset of the
// information in the event struct.
func EventString(ev *auditinternal.Event) string {
username := "<none>"
groups := "<none>"
if len(ev.User.Username) > 0 {
username = ev.User.Username
if len(ev.User.Groups) > 0 {
groups = auditStringSlice(ev.User.Groups)
}
}
asuser := "<self>"
asgroups := "<lookup>"
if ev.ImpersonatedUser != nil {
asuser = ev.ImpersonatedUser.Username
if ev.ImpersonatedUser.Groups != nil {
asgroups = auditStringSlice(ev.ImpersonatedUser.Groups)
}
}
namespace := "<none>"
if ev.ObjectRef != nil && len(ev.ObjectRef.Namespace) != 0 {
namespace = ev.ObjectRef.Namespace
}
response := "<deferred>"
if ev.ResponseStatus != nil {
response = strconv.Itoa(int(ev.ResponseStatus.Code))
}
ip := "<unknown>"
if len(ev.SourceIPs) > 0 {
ip = ev.SourceIPs[0]
}
return fmt.Sprintf("%s AUDIT: id=%q stage=%q ip=%q method=%q user=%q groups=%q as=%q asgroups=%q namespace=%q uri=%q response=\"%s\"\n",
ev.Timestamp.Format(time.RFC3339Nano), ev.AuditID, ev.Stage, ip, ev.Verb, username, groups, asuser, asgroups, namespace, ev.RequestURI, response)
}
func auditStringSlice(inList []string) string {
quotedElements := make([]string, len(inList))
for i, in := range inList {
quotedElements[i] = fmt.Sprintf("%q", in)
}
return strings.Join(quotedElements, ",")
}

View File

@ -0,0 +1,87 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package audit
import (
"fmt"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
)
const (
subsystem = "apiserver_audit"
)
var (
eventCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "event_count",
Help: "Counter of audit events generated and sent to the audit backend.",
})
errorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "error_count",
Help: "Counter of audit events that failed to be audited properly. " +
"Plugin identifies the plugin affected by the error.",
},
[]string{"plugin"},
)
levelCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "level_count",
Help: "Counter of policy levels for audit events (1 per request).",
},
[]string{"level"},
)
)
func init() {
prometheus.MustRegister(eventCounter)
prometheus.MustRegister(errorCounter)
prometheus.MustRegister(levelCounter)
}
// ObserveEvent updates the relevant prometheus metrics for the generated audit event.
func ObserveEvent() {
eventCounter.Inc()
}
// ObservePolicyLevel updates the relevant prometheus metrics with the audit level for a request.
func ObservePolicyLevel(level auditinternal.Level) {
levelCounter.WithLabelValues(string(level)).Inc()
}
// HandlePluginError handles an error that occurred in an audit plugin. This method should only be
// used if the error may have prevented the audit event from being properly recorded. The events are
// modified.
func HandlePluginError(plugin string, err error, impacted ...*auditinternal.Event) {
// Count the error.
errorCounter.WithLabelValues(plugin).Add(float64(len(impacted)))
// Log the audit events to the debug log.
msg := fmt.Sprintf("Error in audit plugin '%s' affecting %d audit events: %v\nImpacted events:\n",
plugin, len(impacted), err)
for _, ev := range impacted {
msg = msg + EventString(ev) + "\n"
}
glog.Error(msg)
}

View File

@ -68,6 +68,7 @@ func WithAudit(handler http.Handler, requestContextMapper request.RequestContext
}
level := policy.Level(attribs)
audit.ObservePolicyLevel(level)
if level == auditinternal.LevelNone {
// Don't audit.
handler.ServeHTTP(w, req)
@ -89,7 +90,7 @@ func WithAudit(handler http.Handler, requestContextMapper request.RequestContext
}
ev.Stage = auditinternal.StageRequestReceived
sink.ProcessEvents(ev)
processEvent(sink, ev)
// intercept the status code
var longRunningSink audit.Sink
@ -113,7 +114,7 @@ func WithAudit(handler http.Handler, requestContextMapper request.RequestContext
Reason: metav1.StatusReasonInternalError,
Message: fmt.Sprintf("APIServer panic'd: %v", r),
}
sink.ProcessEvents(ev)
processEvent(sink, ev)
return
}
@ -126,19 +127,24 @@ func WithAudit(handler http.Handler, requestContextMapper request.RequestContext
if ev.ResponseStatus == nil && longRunningSink != nil {
ev.ResponseStatus = fakedSuccessStatus
ev.Stage = auditinternal.StageResponseStarted
longRunningSink.ProcessEvents(ev)
processEvent(longRunningSink, ev)
}
ev.Stage = auditinternal.StageResponseComplete
if ev.ResponseStatus == nil {
ev.ResponseStatus = fakedSuccessStatus
}
sink.ProcessEvents(ev)
processEvent(sink, ev)
}()
handler.ServeHTTP(respWriter, req)
})
}
func processEvent(sink audit.Sink, ev *auditinternal.Event) {
audit.ObserveEvent()
sink.ProcessEvents(ev)
}
func decorateResponseWriter(responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink) http.ResponseWriter {
delegate := &auditResponseWriter{
ResponseWriter: responseWriter,
@ -177,7 +183,7 @@ func (a *auditResponseWriter) processCode(code int) {
a.event.Stage = auditinternal.StageResponseStarted
if a.sink != nil {
a.sink.ProcessEvents(a.event)
processEvent(a.sink, a.event)
}
})
}

View File

@ -12,7 +12,6 @@ go_library(
srcs = ["backend.go"],
tags = ["automanaged"],
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
],

View File

@ -19,11 +19,7 @@ package log
import (
"fmt"
"io"
"strconv"
"strings"
"time"
"github.com/golang/glog"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
@ -50,42 +46,9 @@ func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
}
func (b *backend) logEvent(ev *auditinternal.Event) {
username := "<none>"
groups := "<none>"
if len(ev.User.Username) > 0 {
username = ev.User.Username
if len(ev.User.Groups) > 0 {
groups = auditStringSlice(ev.User.Groups)
}
}
asuser := "<self>"
asgroups := "<lookup>"
if ev.ImpersonatedUser != nil {
asuser = ev.ImpersonatedUser.Username
if ev.ImpersonatedUser.Groups != nil {
asgroups = auditStringSlice(ev.ImpersonatedUser.Groups)
}
}
namespace := "<none>"
if ev.ObjectRef != nil && len(ev.ObjectRef.Namespace) != 0 {
namespace = ev.ObjectRef.Namespace
}
response := "<deferred>"
if ev.ResponseStatus != nil {
response = strconv.Itoa(int(ev.ResponseStatus.Code))
}
ip := "<unknown>"
if len(ev.SourceIPs) > 0 {
ip = ev.SourceIPs[0]
}
line := fmt.Sprintf("%s AUDIT: id=%q stage=%q ip=%q method=%q user=%q groups=%q as=%q asgroups=%q namespace=%q uri=%q response=\"%s\"\n",
ev.Timestamp.Format(time.RFC3339Nano), ev.AuditID, ev.Stage, ip, ev.Verb, username, groups, asuser, asgroups, namespace, ev.RequestURI, response)
line := audit.EventString(ev)
if _, err := fmt.Fprint(b.out, line); err != nil {
glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err)
audit.HandlePluginError("log", err, ev)
}
}

View File

@ -65,6 +65,9 @@ const (
defaultBatchMaxWait = time.Minute // Send events at least once a minute.
)
// The plugin name reported in error metrics.
const pluginName = "webhook"
// NewBackend returns an audit backend that sends events over HTTP to an external service.
// The mode indicates the caching behavior of the webhook. Either blocking (ModeBlocking)
// or buffered with batch POSTs (ModeBatch).
@ -119,7 +122,7 @@ func (b *blockingBackend) Run(stopCh <-chan struct{}) error {
func (b *blockingBackend) ProcessEvents(ev ...*auditinternal.Event) {
if err := b.processEvents(ev...); err != nil {
glog.Errorf("failed to POST webhook events: %v", err)
audit.HandlePluginError(pluginName, err, ev...)
}
}
@ -259,7 +262,11 @@ L:
return b.w.RestClient.Post().Body(&list).Do().Error()
})
if err != nil {
glog.Errorf("failed to POST webhook events: %v", err)
impacted := make([]*auditinternal.Event, len(events))
for i := range events {
impacted[i] = &events[i]
}
audit.HandlePluginError(pluginName, err, impacted...)
}
}()
return
@ -278,7 +285,7 @@ func (b *batchBackend) ProcessEvents(ev ...*auditinternal.Event) {
select {
case b.buffer <- event:
default:
glog.Errorf("audit webhook queue blocked, failed to send %d event(s)", len(ev)-i)
audit.HandlePluginError(pluginName, fmt.Errorf("audit webhook queue blocked"), ev[i:]...)
return
}
}