Merge pull request #48605 from CaoShuFeng/json_log

Automatic merge from submit-queue (batch tested with PRs 48583, 48605, 48601)

support json output for log backend of advanced audit

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #

**Special notes for your reviewer**:

**Release note**:

```
Add json format support for advanced audit in apiserver. Use --audit-log-format=json to emit json to log backend.
```
pull/6/head
Kubernetes Submit Queue 2017-07-08 08:54:43 -07:00 committed by GitHub
commit eab5e060a4
5 changed files with 476 additions and 8 deletions

View File

@ -23,11 +23,14 @@ go_test(
deps = [
"//vendor/k8s.io/api/authentication/v1:go_default_library",
"//vendor/k8s.io/api/batch/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit/policy:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
@ -35,6 +38,7 @@ go_test(
"//vendor/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/log:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library",
],
)

View File

@ -30,12 +30,18 @@ import (
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/audit/policy"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
pluginlog "k8s.io/apiserver/plugin/pkg/audit/log"
// import to call webhook's init() function to register audit.Event to schema
_ "k8s.io/apiserver/plugin/pkg/audit/webhook"
)
type fakeAuditSink struct {
@ -177,7 +183,7 @@ func (*fakeHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(200)
}
func TestAudit(t *testing.T) {
func TestAuditLegacy(t *testing.T) {
writingShortRunningPrefix := func(stage string) string {
return fmt.Sprintf(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" stage="%s" ip="127.0.0.1" method="update" user="admin" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods/foo"`, stage)
}
@ -380,7 +386,7 @@ func TestAudit(t *testing.T) {
},
} {
var buf bytes.Buffer
backend := pluginlog.NewBackend(&buf)
backend := pluginlog.NewBackend(&buf, pluginlog.FormatLegacy)
policyChecker := policy.FakeChecker(auditinternal.LevelRequestResponse)
handler := WithAudit(http.HandlerFunc(test.handler), &fakeRequestContextMapper{
user: &user.DefaultInfo{Name: "admin"},
@ -420,6 +426,411 @@ func TestAudit(t *testing.T) {
}
}
func TestAuditJson(t *testing.T) {
shortRunningPath := "/api/v1/namespaces/default/pods/foo"
longRunningPath := "/api/v1/namespaces/default/pods?watch=true"
delay := 500 * time.Millisecond
for _, test := range []struct {
desc string
path string
verb string
handler func(http.ResponseWriter, *http.Request)
expected []auditv1alpha1.Event
}{
// short running requests with read-only verb
{
"read-only empty",
shortRunningPath,
"GET",
func(http.ResponseWriter, *http.Request) {},
[]auditv1alpha1.Event{
{
Stage: auditinternal.StageRequestReceived,
Verb: "get",
RequestURI: shortRunningPath,
},
{
Stage: auditinternal.StageResponseComplete,
Verb: "get",
RequestURI: shortRunningPath,
ResponseStatus: &metav1.Status{Code: 200},
},
},
},
{
"read-only panic",
shortRunningPath,
"GET",
func(w http.ResponseWriter, req *http.Request) {
panic("kaboom")
},
[]auditv1alpha1.Event{
{
Stage: auditinternal.StageRequestReceived,
Verb: "get",
RequestURI: shortRunningPath,
},
{
Stage: auditinternal.StagePanic,
Verb: "get",
RequestURI: shortRunningPath,
ResponseStatus: &metav1.Status{Code: 500},
},
},
},
// short running request with non-read-only verb
{
"writing empty",
shortRunningPath,
"PUT",
func(http.ResponseWriter, *http.Request) {},
[]auditv1alpha1.Event{
{
Stage: auditinternal.StageRequestReceived,
Verb: "update",
RequestURI: shortRunningPath,
},
{
Stage: auditinternal.StageResponseComplete,
Verb: "update",
RequestURI: shortRunningPath,
ResponseStatus: &metav1.Status{Code: 200},
},
},
},
{
"writing sleep",
shortRunningPath,
"PUT",
func(http.ResponseWriter, *http.Request) {
time.Sleep(delay)
},
[]auditv1alpha1.Event{
{
Stage: auditinternal.StageRequestReceived,
Verb: "update",
RequestURI: shortRunningPath,
},
{
Stage: auditinternal.StageResponseComplete,
Verb: "update",
RequestURI: shortRunningPath,
ResponseStatus: &metav1.Status{Code: 200},
},
},
},
{
"writing 403+write",
shortRunningPath,
"PUT",
func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(403)
w.Write([]byte("foo"))
},
[]auditv1alpha1.Event{
{
Stage: auditinternal.StageRequestReceived,
Verb: "update",
RequestURI: shortRunningPath,
},
{
Stage: auditinternal.StageResponseComplete,
Verb: "update",
RequestURI: shortRunningPath,
ResponseStatus: &metav1.Status{Code: 403},
},
},
},
{
"writing panic",
shortRunningPath,
"PUT",
func(w http.ResponseWriter, req *http.Request) {
panic("kaboom")
},
[]auditv1alpha1.Event{
{
Stage: auditinternal.StageRequestReceived,
Verb: "update",
RequestURI: shortRunningPath,
},
{
Stage: auditinternal.StagePanic,
Verb: "update",
RequestURI: shortRunningPath,
ResponseStatus: &metav1.Status{Code: 500},
},
},
},
{
"writing write+panic",
shortRunningPath,
"PUT",
func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("foo"))
panic("kaboom")
},
[]auditv1alpha1.Event{
{
Stage: auditinternal.StageRequestReceived,
Verb: "update",
RequestURI: shortRunningPath,
},
{
Stage: auditinternal.StagePanic,
Verb: "update",
RequestURI: shortRunningPath,
ResponseStatus: &metav1.Status{Code: 500},
},
},
},
// long running requests
{
"empty longrunning",
longRunningPath,
"GET",
func(http.ResponseWriter, *http.Request) {},
[]auditv1alpha1.Event{
{
Stage: auditinternal.StageRequestReceived,
Verb: "watch",
RequestURI: longRunningPath,
},
{
Stage: auditinternal.StageResponseStarted,
Verb: "watch",
RequestURI: longRunningPath,
ResponseStatus: &metav1.Status{Code: 200},
},
{
Stage: auditinternal.StageResponseComplete,
Verb: "watch",
RequestURI: longRunningPath,
ResponseStatus: &metav1.Status{Code: 200},
},
},
},
{
"sleep longrunning",
longRunningPath,
"GET",
func(http.ResponseWriter, *http.Request) {
time.Sleep(delay)
},
[]auditv1alpha1.Event{
{
Stage: auditinternal.StageRequestReceived,
Verb: "watch",
RequestURI: longRunningPath,
},
{
Stage: auditinternal.StageResponseStarted,
Verb: "watch",
RequestURI: longRunningPath,
ResponseStatus: &metav1.Status{Code: 200},
},
{
Stage: auditinternal.StageResponseComplete,
Verb: "watch",
RequestURI: longRunningPath,
ResponseStatus: &metav1.Status{Code: 200},
},
},
},
{
"sleep+403 longrunning",
longRunningPath,
"GET",
func(w http.ResponseWriter, req *http.Request) {
time.Sleep(delay)
w.WriteHeader(403)
},
[]auditv1alpha1.Event{
{
Stage: auditinternal.StageRequestReceived,
Verb: "watch",
RequestURI: longRunningPath,
},
{
Stage: auditinternal.StageResponseStarted,
Verb: "watch",
RequestURI: longRunningPath,
ResponseStatus: &metav1.Status{Code: 403},
},
{
Stage: auditinternal.StageResponseComplete,
Verb: "watch",
RequestURI: longRunningPath,
ResponseStatus: &metav1.Status{Code: 403},
},
},
},
{
"write longrunning",
longRunningPath,
"GET",
func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("foo"))
},
[]auditv1alpha1.Event{
{
Stage: auditinternal.StageRequestReceived,
Verb: "watch",
RequestURI: longRunningPath,
},
{
Stage: auditinternal.StageResponseStarted,
Verb: "watch",
RequestURI: longRunningPath,
ResponseStatus: &metav1.Status{Code: 200},
},
{
Stage: auditinternal.StageResponseComplete,
Verb: "watch",
RequestURI: longRunningPath,
ResponseStatus: &metav1.Status{Code: 200},
},
},
},
{
"403+write longrunning",
longRunningPath,
"GET",
func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(403)
w.Write([]byte("foo"))
},
[]auditv1alpha1.Event{
{
Stage: auditinternal.StageRequestReceived,
Verb: "watch",
RequestURI: longRunningPath,
},
{
Stage: auditinternal.StageResponseStarted,
Verb: "watch",
RequestURI: longRunningPath,
ResponseStatus: &metav1.Status{Code: 403},
},
{
Stage: auditinternal.StageResponseComplete,
Verb: "watch",
RequestURI: longRunningPath,
ResponseStatus: &metav1.Status{Code: 403},
},
},
},
{
"panic longrunning",
longRunningPath,
"GET",
func(w http.ResponseWriter, req *http.Request) {
panic("kaboom")
},
[]auditv1alpha1.Event{
{
Stage: auditinternal.StageRequestReceived,
Verb: "watch",
RequestURI: longRunningPath,
},
{
Stage: auditinternal.StagePanic,
Verb: "watch",
RequestURI: longRunningPath,
ResponseStatus: &metav1.Status{Code: 500},
},
},
},
{
"write+panic longrunning",
longRunningPath,
"GET",
func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("foo"))
panic("kaboom")
},
[]auditv1alpha1.Event{
{
Stage: auditinternal.StageRequestReceived,
Verb: "watch",
RequestURI: longRunningPath,
},
{
Stage: auditinternal.StageResponseStarted,
Verb: "watch",
RequestURI: longRunningPath,
ResponseStatus: &metav1.Status{Code: 200},
},
{
Stage: auditinternal.StagePanic,
Verb: "watch",
RequestURI: longRunningPath,
ResponseStatus: &metav1.Status{Code: 500},
},
},
},
} {
var buf bytes.Buffer
backend := pluginlog.NewBackend(&buf, pluginlog.FormatJson)
policyChecker := policy.FakeChecker(auditinternal.LevelRequestResponse)
handler := WithAudit(http.HandlerFunc(test.handler), &fakeRequestContextMapper{
user: &user.DefaultInfo{Name: "admin"},
}, backend, policyChecker, func(r *http.Request, ri *request.RequestInfo) bool {
// simplified long-running check
return ri.Verb == "watch"
})
req, _ := http.NewRequest(test.verb, test.path, nil)
req.RemoteAddr = "127.0.0.1"
func() {
defer func() {
recover()
}()
handler.ServeHTTP(httptest.NewRecorder(), req)
}()
t.Logf("[%s] audit log: %v", test.desc, buf.String())
line := strings.Split(strings.TrimSpace(buf.String()), "\n")
if len(line) != len(test.expected) {
t.Errorf("[%s] Unexpected amount of lines in audit log: %d", test.desc, len(line))
continue
}
for i, expect := range test.expected {
// decode events back to check json elements.
event := &auditv1alpha1.Event{}
decoder := audit.Codecs.UniversalDecoder(auditv1alpha1.SchemeGroupVersion)
if err := runtime.DecodeInto(decoder, []byte(line[i]), event); err != nil {
t.Errorf("failed decoding line %s: %v", line[i], err)
continue
}
if "admin" != event.User.Username {
t.Errorf("[%s] Unexpected username: %s", test.desc, event.User.Username)
}
if event.Stage != expect.Stage {
t.Errorf("[%s] Unexpected Stage: %s", test.desc, event.Stage)
}
if event.Verb != expect.Verb {
t.Errorf("[%s] Unexpected Verb: %s", test.desc, event.Verb)
}
if event.RequestURI != expect.RequestURI {
t.Errorf("[%s] Unexpected RequestURI: %s", test.desc, event.RequestURI)
}
if (event.ResponseStatus == nil) != (expect.ResponseStatus == nil) {
t.Errorf("[%s] Unexpected ResponseStatus: %v", test.desc, event.ResponseStatus)
continue
}
if (event.ResponseStatus != nil) && (event.ResponseStatus.Code != expect.ResponseStatus.Code) {
t.Errorf("[%s] Unexpected status code : %d", test.desc, event.ResponseStatus.Code)
}
}
}
}
type fakeRequestContextMapper struct {
user *user.DefaultInfo
}

View File

@ -63,6 +63,7 @@ type AuditLogOptions struct {
MaxAge int
MaxBackups int
MaxSize int
Format string
}
// AuditWebhookOptions control the webhook configuration for audit events.
@ -78,6 +79,7 @@ type AuditWebhookOptions struct {
func NewAuditOptions() *AuditOptions {
return &AuditOptions{
WebhookOptions: AuditWebhookOptions{Mode: pluginwebhook.ModeBatch},
LogOptions: AuditLogOptions{Format: pluginlog.FormatLegacy},
}
}
@ -104,6 +106,18 @@ func (o *AuditOptions) Validate() []error {
if !validMode {
allErrors = append(allErrors, fmt.Errorf("invalid audit webhook mode %s, allowed modes are %q", o.WebhookOptions.Mode, strings.Join(pluginwebhook.AllowedModes, ",")))
}
// check log format
validFormat := false
for _, f := range pluginlog.AllowedFormats {
if f == o.LogOptions.Format {
validFormat = true
break
}
}
if !validFormat {
allErrors = append(allErrors, fmt.Errorf("invalid audit log format %s, allowed formats are %q", o.LogOptions.Format, strings.Join(pluginlog.AllowedFormats, ",")))
}
}
return allErrors
}
@ -161,6 +175,10 @@ func (o *AuditLogOptions) AddFlags(fs *pflag.FlagSet) {
"The maximum number of old audit log files to retain.")
fs.IntVar(&o.MaxSize, "audit-log-maxsize", o.MaxSize,
"The maximum size in megabytes of the audit log file before it gets rotated.")
fs.StringVar(&o.Format, "audit-log-format", o.Format,
"Format of saved audits. \"legacy\" indicates 1-line text format for each event."+
" \"json\" indicates structured json format. Requires the 'AdvancedAuditing' feature"+
" gate. Known formats are "+strings.Join(pluginlog.AllowedFormats, ",")+".")
}
func (o *AuditLogOptions) getWriter() io.Writer {
@ -182,7 +200,7 @@ func (o *AuditLogOptions) getWriter() io.Writer {
func (o *AuditLogOptions) advancedApplyTo(c *server.Config) error {
if w := o.getWriter(); w != nil {
c.AuditBackend = appendBackend(c.AuditBackend, pluginlog.NewBackend(w))
c.AuditBackend = appendBackend(c.AuditBackend, pluginlog.NewBackend(w, o.Format))
}
return nil
}

View File

@ -12,7 +12,9 @@ go_library(
srcs = ["backend.go"],
tags = ["automanaged"],
deps = [
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
],
)

View File

@ -19,20 +19,38 @@ package log
import (
"fmt"
"io"
"strings"
"k8s.io/apimachinery/pkg/runtime"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1"
"k8s.io/apiserver/pkg/audit"
)
const (
// FormatLegacy saves event in 1-line text format.
FormatLegacy = "legacy"
// FormatJson saves event in structured json format.
FormatJson = "json"
)
// AllowedFormats are the formats known by log backend.
var AllowedFormats = []string{
FormatLegacy,
FormatJson,
}
type backend struct {
out io.Writer
out io.Writer
format string
}
var _ audit.Backend = &backend{}
func NewBackend(out io.Writer) *backend {
func NewBackend(out io.Writer, format string) *backend {
return &backend{
out: out,
out: out,
format: format,
}
}
@ -43,8 +61,23 @@ func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
}
func (b *backend) logEvent(ev *auditinternal.Event) {
line := audit.EventString(ev)
if _, err := fmt.Fprintln(b.out, line); err != nil {
line := ""
switch b.format {
case FormatLegacy:
line = audit.EventString(ev) + "\n"
case FormatJson:
bs, err := runtime.Encode(audit.Codecs.LegacyCodec(auditv1alpha1.SchemeGroupVersion), ev)
if err != nil {
audit.HandlePluginError("log", err, ev)
return
}
line = string(bs[:])
default:
audit.HandlePluginError("log", fmt.Errorf("log format %q is not in list of known formats (%s)",
b.format, strings.Join(AllowedFormats, ",")), ev)
return
}
if _, err := fmt.Fprint(b.out, line); err != nil {
audit.HandlePluginError("log", err, ev)
}
}