audit: add audit event to the context and fill in handlers

pull/6/head
Dr. Stefan Schimanski 2017-05-13 13:57:45 +02:00
parent c1bf6e832e
commit 0b5bcb0219
26 changed files with 1579 additions and 218 deletions

View File

@ -35,7 +35,7 @@ import (
// InsecureServingInfo *ServingInfo
func BuildInsecureHandlerChain(apiHandler http.Handler, c *server.Config) http.Handler {
handler := genericapifilters.WithAudit(apiHandler, c.RequestContextMapper, c.AuditWriter)
handler := genericapifilters.WithAudit(apiHandler, c.RequestContextMapper, c.AuditBackend, c.AuditPolicy, c.LongRunningFunc)
handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, insecureSuperuser{}, nil)
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithPanicRecovery(handler)

View File

@ -150,13 +150,13 @@ func GetHTTPClient(req *http.Request) string {
return "unknown"
}
// Extracts and returns the clients IP from the given request.
// Looks at X-Forwarded-For header, X-Real-Ip header and request.RemoteAddr in that order.
// Returns nil if none of them are set or is set to an invalid value.
func GetClientIP(req *http.Request) net.IP {
// SourceIPs splits the comma separated X-Forwarded-For header or returns the X-Real-Ip header or req.RemoteAddr,
// in that order, ignoring invalid IPs. It returns nil if all of these are empty or invalid.
func SourceIPs(req *http.Request) []net.IP {
hdr := req.Header
// First check the X-Forwarded-For header for requests via proxy.
hdrForwardedFor := hdr.Get("X-Forwarded-For")
forwardedForIPs := []net.IP{}
if hdrForwardedFor != "" {
// X-Forwarded-For can be a csv of IPs in case of multiple proxies.
// Use the first valid one.
@ -164,17 +164,20 @@ func GetClientIP(req *http.Request) net.IP {
for _, part := range parts {
ip := net.ParseIP(strings.TrimSpace(part))
if ip != nil {
return ip
forwardedForIPs = append(forwardedForIPs, ip)
}
}
}
if len(forwardedForIPs) > 0 {
return forwardedForIPs
}
// Try the X-Real-Ip header.
hdrRealIp := hdr.Get("X-Real-Ip")
if hdrRealIp != "" {
ip := net.ParseIP(hdrRealIp)
if ip != nil {
return ip
return []net.IP{ip}
}
}
@ -182,11 +185,28 @@ func GetClientIP(req *http.Request) net.IP {
// Remote Address in Go's HTTP server is in the form host:port so we need to split that first.
host, _, err := net.SplitHostPort(req.RemoteAddr)
if err == nil {
return net.ParseIP(host)
if remoteIP := net.ParseIP(host); remoteIP != nil {
return []net.IP{remoteIP}
}
}
// Fallback if Remote Address was just IP.
return net.ParseIP(req.RemoteAddr)
if remoteIP := net.ParseIP(req.RemoteAddr); remoteIP != nil {
return []net.IP{remoteIP}
}
return nil
}
// Extracts and returns the clients IP from the given request.
// Looks at X-Forwarded-For header, X-Real-Ip header and request.RemoteAddr in that order.
// Returns nil if none of them are set or is set to an invalid value.
func GetClientIP(req *http.Request) net.IP {
ips := SourceIPs(req)
if len(ips) == 0 {
return nil
}
return ips[0]
}
var defaultProxyFuncPointer = fmt.Sprintf("%p", http.ProxyFromEnvironment)

View File

@ -0,0 +1,46 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package audit
func ordLevel(l Level) int {
switch l {
case LevelMetadata:
return 1
case LevelRequest:
return 2
case LevelRequestResponse:
return 3
default:
return 0
}
}
func (a Level) Less(b Level) bool {
return ordLevel(a) < ordLevel(b)
}
func (a Level) GreaterOrEqual(b Level) bool {
return ordLevel(a) >= ordLevel(b)
}
func NewConstantPolicy(level Level) *Policy {
return &Policy{
Rules: []PolicyRule{
{Level: level},
},
}
}

View File

@ -0,0 +1,202 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package audit
import (
"bytes"
"fmt"
"net/http"
"strings"
"time"
"github.com/golang/glog"
"github.com/pborman/uuid"
"reflect"
"k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/apis/audit"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/authorization/authorizer"
authenticationv1 "k8s.io/client-go/pkg/apis/authentication/v1"
)
const (
AuditIDHeader = "X-Request-ID"
)
func NewEventFromRequest(req *http.Request, policy *auditinternal.Policy, attribs authorizer.Attributes) (*auditinternal.Event, error) {
ev := &auditinternal.Event{
Timestamp: metav1.NewTime(time.Now()),
Verb: attribs.GetVerb(),
RequestURI: req.URL.RequestURI(),
}
// set the level
ev.Level = auditinternal.LevelNone
if policy != nil && len(policy.Rules) > 0 {
// This is just a hack to get through the test without setting a high level by default.
// TODO(audit): add the policy evalutation here
ev.Level = policy.Rules[0].Level
}
// prefer the id from the headers. If not available, create a new one.
// TODO(audit): do we want to forbid the header for non-front-proxy users?
ids := req.Header[AuditIDHeader]
if len(ids) > 0 {
ev.AuditID = types.UID(ids[0])
} else {
ev.AuditID = types.UID(uuid.NewRandom().String())
}
ips := utilnet.SourceIPs(req)
ev.SourceIPs = make([]string, len(ips))
for i := range ips {
ev.SourceIPs[i] = ips[i].String()
}
if user := attribs.GetUser(); user != nil {
ev.User.Username = user.GetName()
ev.User.Extra = map[string]auditinternal.ExtraValue{}
for k, v := range user.GetExtra() {
ev.User.Extra[k] = auditinternal.ExtraValue(v)
}
ev.User.Groups = user.GetGroups()
ev.User.UID = user.GetUID()
}
if asuser := req.Header.Get(authenticationv1.ImpersonateUserHeader); len(asuser) > 0 {
ev.ImpersonatedUser = &auditinternal.UserInfo{
Username: asuser,
}
if requestedGroups := req.Header[authenticationv1.ImpersonateGroupHeader]; len(requestedGroups) > 0 {
ev.ImpersonatedUser.Groups = requestedGroups
}
ev.ImpersonatedUser.Extra = map[string]auditinternal.ExtraValue{}
for k, v := range req.Header {
if !strings.HasPrefix(k, authenticationv1.ImpersonateUserExtraHeaderPrefix) {
continue
}
k = k[len(authenticationv1.ImpersonateUserExtraHeaderPrefix):]
ev.ImpersonatedUser.Extra[k] = auditinternal.ExtraValue(v)
}
}
if attribs.IsResourceRequest() {
ev.ObjectRef = &auditinternal.ObjectReference{
Namespace: attribs.GetNamespace(),
Name: attribs.GetName(),
Resource: attribs.GetResource(),
APIVersion: attribs.GetAPIGroup() + "/" + attribs.GetAPIVersion(),
}
}
return ev, nil
}
// LogRequestObject fills in the request object into an audit event. The passed runtime.Object
// will be converted to the given gv.
func LogRequestObject(ae *audit.Event, obj runtime.Object, gv schema.GroupVersion, s runtime.NegotiatedSerializer) {
if ae == nil || ae.Level.Less(audit.LevelRequest) {
return
}
// TODO(audit): hook into the serializer to avoid double conversion
var err error
ae.RequestObject, err = encodeObject(obj, gv, s)
if err != nil {
// TODO(audit): add error slice to audit event struct
glog.Warningf("Auditing failed of %v request: %v", reflect.TypeOf(obj).Name(), err)
return
}
// complete ObjectRef
if ae.ObjectRef == nil {
ae.ObjectRef = &audit.ObjectReference{}
}
if acc, ok := obj.(v1.ObjectMetaAccessor); ok {
meta := acc.GetObjectMeta()
if len(ae.ObjectRef.Namespace) == 0 {
ae.ObjectRef.Namespace = meta.GetNamespace()
}
if len(ae.ObjectRef.Name) == 0 {
ae.ObjectRef.Name = meta.GetName()
}
if len(ae.ObjectRef.UID) == 0 {
ae.ObjectRef.UID = meta.GetUID()
}
if len(ae.ObjectRef.ResourceVersion) == 0 {
ae.ObjectRef.ResourceVersion = meta.GetResourceVersion()
}
}
}
// LogRquestPatch fills in the given patch as the request object into an audit event.
func LogRequestPatch(ae *audit.Event, patch []byte) {
if ae == nil || ae.Level.Less(audit.LevelRequest) {
return
}
ae.RequestObject = runtime.Unknown{
Raw: patch,
ContentType: runtime.ContentTypeJSON,
}
}
// LogResponseObject fills in the response object into an audit event. The passed runtime.Object
// will be converted to the given gv.
func LogResponseObject(ae *audit.Event, obj runtime.Object, gv schema.GroupVersion, s runtime.NegotiatedSerializer) {
if ae == nil || ae.Level.Less(audit.LevelRequestResponse) {
return
}
if status, ok := obj.(*metav1.Status); ok {
ae.ResponseStatus = status
}
// TODO(audit): hook into the serializer to avoid double conversion
var err error
ae.ResponseObject, err = encodeObject(obj, gv, s)
if err != nil {
glog.Warningf("Audit failed for %q response: %v", reflect.TypeOf(obj).Name(), err)
}
}
func encodeObject(obj runtime.Object, gv schema.GroupVersion, serializer runtime.NegotiatedSerializer) (runtime.Unknown, error) {
supported := serializer.SupportedMediaTypes()
for i := range supported {
if supported[i].MediaType == "application/json" {
enc := serializer.EncoderForVersion(supported[i].Serializer, gv)
var buf bytes.Buffer
if err := enc.Encode(obj, &buf); err != nil {
return runtime.Unknown{}, fmt.Errorf("encoding failed: %v", err)
}
return runtime.Unknown{
Raw: buf.Bytes(),
ContentType: runtime.ContentTypeJSON,
}, nil
}
}
return runtime.Unknown{}, fmt.Errorf("no json encoder found")
}

View File

@ -0,0 +1,37 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package audit
import (
auditinternal "k8s.io/apiserver/pkg/apis/audit"
)
type Sink interface {
// ProcessEvents handles events. Per audit ID it might be that ProcessEvents is called up to three times.
// Errors might be logged by the sink itself. If an error should be fatal, leading to an internal
// error, ProcessEvents is supposed to panic. The event must not be mutated and is reused by the caller
// after the call returns, i.e. the sink has to make a deepcopy to keep a copy around if necessary.
ProcessEvents(events ...*auditinternal.Event)
}
type Backend interface {
Sink
// Run will initialize the backend. It must not block, but may run go routines in the background. If
// stopCh is closed, it is supposed to stop them. Run will be called before the first call to ProcessEvents.
Run(stopCh <-chan struct{}) error
}

View File

@ -52,9 +52,11 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/admission"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/apis/example"
examplefuzzer "k8s.io/apiserver/pkg/apis/example/fuzzer"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/audit"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
@ -256,25 +258,25 @@ type defaultAPIServer struct {
// uses the default settings
func handle(storage map[string]rest.Storage) http.Handler {
return handleInternal(storage, admissionControl, selfLinker)
return handleInternal(storage, admissionControl, selfLinker, nil)
}
// tests with a deny admission controller
func handleDeny(storage map[string]rest.Storage) http.Handler {
return handleInternal(storage, alwaysDeny{}, selfLinker)
return handleInternal(storage, alwaysDeny{}, selfLinker, nil)
}
// tests using the new namespace scope mechanism
func handleNamespaced(storage map[string]rest.Storage) http.Handler {
return handleInternal(storage, admissionControl, selfLinker)
return handleInternal(storage, admissionControl, selfLinker, nil)
}
// tests using a custom self linker
func handleLinker(storage map[string]rest.Storage, selfLinker runtime.SelfLinker) http.Handler {
return handleInternal(storage, admissionControl, selfLinker)
return handleInternal(storage, admissionControl, selfLinker, nil)
}
func handleInternal(storage map[string]rest.Storage, admissionControl admission.Interface, selfLinker runtime.SelfLinker) http.Handler {
func handleInternal(storage map[string]rest.Storage, admissionControl admission.Interface, selfLinker runtime.SelfLinker, auditSink audit.Sink) http.Handler {
container := restful.NewContainer()
container.Router(restful.CurlyRouter{})
mux := container.ServeMux
@ -332,7 +334,11 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
}
}
handler := genericapifilters.WithRequestInfo(mux, testRequestInfoResolver(), requestContextMapper)
handler := genericapifilters.WithAudit(mux, requestContextMapper, auditSink, auditinternal.NewConstantPolicy(auditinternal.LevelRequestResponse), func(r *http.Request, requestInfo *request.RequestInfo) bool {
// simplified long-running check
return requestInfo.Verb == "watch" || requestInfo.Verb == "proxy"
})
handler = genericapifilters.WithRequestInfo(handler, testRequestInfoResolver(), requestContextMapper)
handler = request.WithRequestContext(handler, requestContextMapper)
return &defaultAPIServer{handler, container}
@ -1088,7 +1094,7 @@ func TestList(t *testing.T) {
namespace: testCase.namespace,
expectedSet: testCase.selfLink,
}
var handler = handleInternal(storage, admissionControl, selfLinker)
var handler = handleInternal(storage, admissionControl, selfLinker, nil)
server := httptest.NewServer(handler)
defer server.Close()
@ -1768,7 +1774,7 @@ func TestGetNamespaceSelfLink(t *testing.T) {
namespace: "foo",
}
storage["simple"] = &simpleStorage
handler := handleInternal(storage, admissionControl, selfLinker)
handler := handleInternal(storage, admissionControl, selfLinker, nil)
server := httptest.NewServer(handler)
defer server.Close()
@ -3166,7 +3172,7 @@ func TestCreateInvokesAdmissionControl(t *testing.T) {
namespace: "other",
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/other/foo/bar",
}
handler := handleInternal(map[string]rest.Storage{"foo": &storage}, alwaysDeny{}, selfLinker)
handler := handleInternal(map[string]rest.Storage{"foo": &storage}, alwaysDeny{}, selfLinker, nil)
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}
@ -3248,7 +3254,7 @@ func (obj *UnregisteredAPIObject) GetObjectKind() schema.ObjectKind {
func TestWriteJSONDecodeError(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(codecs, newGroupVersion, w, req, http.StatusOK, &UnregisteredAPIObject{"Undecodable"})
responsewriters.WriteObjectNegotiated(nil, codecs, newGroupVersion, w, req, http.StatusOK, &UnregisteredAPIObject{"Undecodable"})
}))
defer server.Close()
// We send a 200 status code before we encode the object, so we expect OK, but there will

View File

@ -0,0 +1,326 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package endpoints
import (
"bytes"
"fmt"
"net/http"
"net/http/httptest"
"regexp"
"sync"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
genericapitesting "k8s.io/apiserver/pkg/endpoints/testing"
"k8s.io/apiserver/pkg/registry/rest"
)
type fakeAuditSink struct {
lock sync.Mutex
events []*auditinternal.Event
}
func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) {
s.lock.Lock()
defer s.lock.Unlock()
s.events = append(s.events, evs...)
}
func (s *fakeAuditSink) Events() []*auditinternal.Event {
s.lock.Lock()
defer s.lock.Unlock()
return append([]*auditinternal.Event{}, s.events...)
}
func TestAudit(t *testing.T) {
type eventCheck func(events []*auditinternal.Event) error
// fixtures
simpleFoo := &genericapitesting.Simple{Other: "foo"}
simpleFooJSON, _ := runtime.Encode(testCodec, simpleFoo)
simpleCPrime := &genericapitesting.Simple{
ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: "other"},
Other: "bla",
}
simpleCPrimeJSON, _ := runtime.Encode(testCodec, simpleCPrime)
// event checks
requestBodyIs := func(i int, text string) eventCheck {
return func(events []*auditinternal.Event) error {
if string(events[i].RequestObject.Raw) != text {
return fmt.Errorf("expected RequestBody %q, got %q", text, string(events[i].RequestObject.Raw))
}
return nil
}
}
requestBodyMatches := func(i int, pattern string) eventCheck {
return func(events []*auditinternal.Event) error {
if matched, _ := regexp.Match(pattern, events[i].RequestObject.Raw); !matched {
return fmt.Errorf("expected RequestBody to match %q, but didn't: %q", pattern, string(events[i].RequestObject.Raw))
}
return nil
}
}
responseBodyIs := func(i int, text string) eventCheck {
return func(events []*auditinternal.Event) error {
if string(events[i].ResponseObject.Raw) != text {
return fmt.Errorf("expected ResponseBody %q, got %q", text, string(events[i].ResponseObject.Raw))
}
return nil
}
}
responseBodyMatches := func(i int, pattern string) eventCheck {
return func(events []*auditinternal.Event) error {
if matched, _ := regexp.Match(pattern, events[i].ResponseObject.Raw); !matched {
return fmt.Errorf("expected ResponseBody to match %q, but didn't: %q", pattern, string(events[i].ResponseObject.Raw))
}
return nil
}
}
for _, test := range []struct {
desc string
req func(server string) (*http.Request, error)
linker runtime.SelfLinker
code int
events int
checks []eventCheck
}{
{
"get",
func(server string) (*http.Request, error) {
return http.NewRequest("GET", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/other/simple/c", bytes.NewBuffer(simpleFooJSON))
},
selfLinker,
200,
1,
[]eventCheck{
requestBodyIs(0, ""),
responseBodyMatches(0, `{.*"name":"c".*}`),
},
},
{
"list",
func(server string) (*http.Request, error) {
return http.NewRequest("GET", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/other/simple?labelSelector=a%3Dfoobar", nil)
},
&setTestSelfLinker{
t: t,
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/other/simple",
namespace: "other",
},
200,
1,
[]eventCheck{
requestBodyMatches(0, ""),
responseBodyMatches(0, `{.*"name":"a".*"name":"b".*}`),
},
},
{
"create",
func(server string) (*http.Request, error) {
return http.NewRequest("POST", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple", bytes.NewBuffer(simpleFooJSON))
},
selfLinker,
201,
1,
[]eventCheck{
requestBodyIs(0, string(simpleFooJSON)),
responseBodyMatches(0, `{.*"foo".*}`),
},
},
{
"not-allowed-named-create",
func(server string) (*http.Request, error) {
return http.NewRequest("POST", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/named", bytes.NewBuffer(simpleFooJSON))
},
selfLinker,
405,
1,
[]eventCheck{
requestBodyIs(0, ""), // the 405 is thrown long before the create handler would be executed
responseBodyIs(0, ""), // the 405 is thrown long before the create handler would be executed
},
},
{
"delete",
func(server string) (*http.Request, error) {
return http.NewRequest("DELETE", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/a", nil)
},
selfLinker,
200,
1,
[]eventCheck{
requestBodyMatches(0, ""),
responseBodyMatches(0, ""),
},
},
{
"delete-with-options-in-body",
func(server string) (*http.Request, error) {
return http.NewRequest("DELETE", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/a", bytes.NewBuffer([]byte(`{"kind":"DeleteOptions"}`)))
},
selfLinker,
200,
1,
[]eventCheck{
requestBodyMatches(0, "DeleteOptions"),
responseBodyMatches(0, ""),
},
},
{
"update",
func(server string) (*http.Request, error) {
return http.NewRequest("PUT", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/other/simple/c", bytes.NewBuffer(simpleCPrimeJSON))
},
selfLinker,
200,
1,
[]eventCheck{
requestBodyIs(0, string(simpleCPrimeJSON)),
responseBodyMatches(0, `{.*"bla".*}`),
},
},
{
"update-wrong-namespace",
func(server string) (*http.Request, error) {
return http.NewRequest("PUT", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/c", bytes.NewBuffer(simpleCPrimeJSON))
},
selfLinker,
400,
1,
[]eventCheck{
requestBodyIs(0, string(simpleCPrimeJSON)),
responseBodyMatches(0, `"Status".*"status":"Failure".*"code":400}`),
},
},
{
"patch",
func(server string) (*http.Request, error) {
req, _ := http.NewRequest("PATCH", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/other/simple/c", bytes.NewReader([]byte(`{"labels":{"foo":"bar"}}`)))
req.Header.Set("Content-Type", "application/merge-patch+json; charset=UTF-8")
return req, nil
},
&setTestSelfLinker{
t: t,
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/other/simple/c",
name: "c",
namespace: "other",
},
200,
1,
[]eventCheck{
requestBodyIs(0, `{"labels":{"foo":"bar"}}`),
responseBodyMatches(0, `"name":"c".*"labels":{"foo":"bar"}`),
},
},
{
"watch",
func(server string) (*http.Request, error) {
return http.NewRequest("GET", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/other/simple?watch=true", nil)
},
&setTestSelfLinker{
t: t,
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/other/simple",
namespace: "other",
},
200,
2,
[]eventCheck{
requestBodyMatches(0, ""),
responseBodyMatches(0, ""),
},
},
} {
sink := &fakeAuditSink{}
handler := handleInternal(map[string]rest.Storage{
"simple": &SimpleRESTStorage{
list: []genericapitesting.Simple{
{
ObjectMeta: metav1.ObjectMeta{Name: "a", Namespace: "other"},
Other: "foo",
},
{
ObjectMeta: metav1.ObjectMeta{Name: "b", Namespace: "other"},
Other: "foo",
},
},
item: genericapitesting.Simple{
ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: "other", UID: "uid"},
Other: "foo",
},
},
}, admissionControl, selfLinker, sink)
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{Timeout: 2 * time.Second}
req, err := test.req(server.URL)
if err != nil {
t.Errorf("[%s] error creating the request: %v", test.desc, err)
}
response, err := client.Do(req)
if err != nil {
t.Errorf("[%s] error: %v", test.desc, err)
}
if response.StatusCode != test.code {
t.Errorf("[%s] expected http code %d, got %#v", test.desc, test.code, response)
}
// close body because the handler might block in Flush, unable to send the remaining event.
response.Body.Close()
// wait for events to arrive, at least the given number in the test
events := []*auditinternal.Event{}
err = wait.Poll(50*time.Millisecond, wait.ForeverTestTimeout, wait.ConditionFunc(func() (done bool, err error) {
events = sink.Events()
return len(events) >= test.events, nil
}))
if err != nil {
t.Errorf("[%s] timeout waiting for events", test.desc)
}
if got := len(events); got != test.events {
t.Errorf("[%s] expected %d audit events, got %d", test.desc, test.events, got)
} else {
for i, check := range test.checks {
err := check(events)
if err != nil {
t.Errorf("[%s,%d] %v", test.desc, i, err)
}
}
}
if len(events) > 0 {
status := events[len(events)-1].ResponseStatus
if status == nil {
t.Errorf("[%s] expected non-nil ResponseStatus in last event", test.desc)
} else if int(status.Code) != test.code {
t.Errorf("[%s] expected ResponseStatus.Code=%d, got %d", test.desc, test.code, status.Code)
}
}
}
}

View File

@ -70,5 +70,5 @@ func (s *APIGroupHandler) handle(req *restful.Request, resp *restful.Response) {
}
func (s *APIGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, w, req, http.StatusOK, &s.group)
responsewriters.WriteObjectNegotiated(nil, s.serializer, schema.GroupVersion{}, w, req, http.StatusOK, &s.group)
}

View File

@ -74,5 +74,5 @@ func (s *legacyRootAPIHandler) handle(req *restful.Request, resp *restful.Respon
Versions: s.apiVersions,
}
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, apiVersions)
responsewriters.WriteObjectNegotiated(nil, s.serializer, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, apiVersions)
}

View File

@ -111,7 +111,7 @@ func (s *rootAPIsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request)
groups[i].ServerAddressByClientCIDRs = serverCIDR
}
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, resp, req, http.StatusOK, &metav1.APIGroupList{Groups: groups})
responsewriters.WriteObjectNegotiated(nil, s.serializer, schema.GroupVersion{}, resp, req, http.StatusOK, &metav1.APIGroupList{Groups: groups})
}
func (s *rootAPIsHandler) restfulHandle(req *restful.Request, resp *restful.Response) {

View File

@ -78,6 +78,6 @@ func (s *APIVersionHandler) handle(req *restful.Request, resp *restful.Response)
}
func (s *APIVersionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, w, req, http.StatusOK,
responsewriters.WriteObjectNegotiated(nil, s.serializer, schema.GroupVersion{}, w, req, http.StatusOK,
&metav1.APIResourceList{GroupVersion: s.groupVersion.String(), APIResources: s.apiResourceLister.ListAPIResources()})
}

View File

@ -19,36 +19,154 @@ package filters
import (
"bufio"
"errors"
"fmt"
"io"
"net"
"net/http"
"strings"
"time"
"sync"
"github.com/golang/glog"
"github.com/pborman/uuid"
"fmt"
utilnet "k8s.io/apimachinery/pkg/util/net"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
authenticationapi "k8s.io/client-go/pkg/apis/authentication/v1"
)
// WithAudit decorates a http.Handler with audit logging information for all the
// requests coming to the server. If out is nil, no decoration takes place.
// Each audit log contains two entries:
// 1. the request line containing:
// - unique id allowing to match the response line (see 2)
// - source ip of the request
// - HTTP method being invoked
// - original user invoking the operation
// - original user's groups info
// - impersonated user for the operation
// - impersonated groups info
// - namespace of the request or <none>
// - uri is the full URI as requested
// 2. the response line containing:
// - the unique id from 1
// - response code
func WithAudit(handler http.Handler, requestContextMapper request.RequestContextMapper, sink audit.Sink, policy *auditinternal.Policy, longRunningCheck request.LongRunningRequestCheck) http.Handler {
if sink == nil {
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, ok := requestContextMapper.Get(req)
if !ok {
responsewriters.InternalError(w, req, errors.New("no context found for request"))
return
}
attribs, err := GetAuthorizerAttributes(ctx)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to GetAuthorizerAttributes: %v", err))
responsewriters.InternalError(w, req, errors.New("failed to parse request"))
return
}
ev, err := audit.NewEventFromRequest(req, policy, attribs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to complete audit event from request: %v", err))
responsewriters.InternalError(w, req, errors.New("failed to update context"))
return
}
ctx = request.WithAuditEvent(ctx, ev)
if err := requestContextMapper.Update(req, ctx); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to attach audit event to the context: %v", err))
responsewriters.InternalError(w, req, errors.New("failed to update context"))
return
}
// intercept the status code
longRunning := false
var longRunningSink audit.Sink
if longRunningCheck != nil {
ri, _ := request.RequestInfoFrom(ctx)
if longRunning = longRunningCheck(req, ri); longRunning {
longRunningSink = sink
}
}
respWriter := decorateResponseWriter(w, ev, longRunningSink)
// send audit event when we leave this func, either via a panic or cleanly. In the case of long
// running requests, this will be the second audit event.
defer func() {
if r := recover(); r != nil {
ev.ResponseStatus = &metav1.Status{
Code: http.StatusInternalServerError,
}
sink.ProcessEvents(ev)
panic(r)
}
if ev.ResponseStatus == nil {
ev.ResponseStatus = &metav1.Status{
Code: 200,
}
}
sink.ProcessEvents(ev)
}()
handler.ServeHTTP(respWriter, req)
})
}
func decorateResponseWriter(responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink) http.ResponseWriter {
delegate := &auditResponseWriter{
ResponseWriter: responseWriter,
event: ev,
sink: sink,
}
// check if the ResponseWriter we're wrapping is the fancy one we need
// or if the basic is sufficient
_, cn := responseWriter.(http.CloseNotifier)
_, fl := responseWriter.(http.Flusher)
_, hj := responseWriter.(http.Hijacker)
if cn && fl && hj {
return &fancyResponseWriterDelegator{delegate}
}
return delegate
}
var _ http.ResponseWriter = &auditResponseWriter{}
// auditResponseWriter intercepts WriteHeader, sets it in the event. If the sink is set, it will
// create immediately an event (for long running requests).
type auditResponseWriter struct {
http.ResponseWriter
out io.Writer
id string
event *auditinternal.Event
once sync.Once
sink audit.Sink
}
func (a *auditResponseWriter) processCode(code int) {
a.once.Do(func() {
if a.sink != nil {
a.sink.ProcessEvents(a.event)
}
// for now we use the ResponseStatus as marker that it's the first or second event
// of a long running request. As soon as we have such a field in the event, we can
// change this.
if a.event.ResponseStatus == nil {
a.event.ResponseStatus = &metav1.Status{}
}
a.event.ResponseStatus.Code = int32(code)
})
}
func (a *auditResponseWriter) Write(bs []byte) (int, error) {
a.processCode(200) // the Go library calls WriteHeader internally if no code was written yet. But this will go unnoticed for us
return a.ResponseWriter.Write(bs)
}
func (a *auditResponseWriter) WriteHeader(code int) {
line := fmt.Sprintf("%s AUDIT: id=%q response=\"%d\"\n", time.Now().Format(time.RFC3339Nano), a.id, code)
if _, err := fmt.Fprint(a.out, line); err != nil {
glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err)
}
a.processCode(code)
a.ResponseWriter.WriteHeader(code)
}
@ -74,89 +192,3 @@ func (f *fancyResponseWriterDelegator) Hijack() (net.Conn, *bufio.ReadWriter, er
var _ http.CloseNotifier = &fancyResponseWriterDelegator{}
var _ http.Flusher = &fancyResponseWriterDelegator{}
var _ http.Hijacker = &fancyResponseWriterDelegator{}
// WithAudit decorates a http.Handler with audit logging information for all the
// requests coming to the server. If out is nil, no decoration takes place.
// Each audit log contains two entries:
// 1. the request line containing:
// - unique id allowing to match the response line (see 2)
// - source ip of the request
// - HTTP method being invoked
// - original user invoking the operation
// - original user's groups info
// - impersonated user for the operation
// - impersonated groups info
// - namespace of the request or <none>
// - uri is the full URI as requested
// 2. the response line containing:
// - the unique id from 1
// - response code
func WithAudit(handler http.Handler, requestContextMapper request.RequestContextMapper, out io.Writer) http.Handler {
if out == nil {
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, ok := requestContextMapper.Get(req)
if !ok {
responsewriters.InternalError(w, req, errors.New("no context found for request"))
return
}
attribs, err := GetAuthorizerAttributes(ctx)
if err != nil {
responsewriters.InternalError(w, req, err)
return
}
username := "<none>"
groups := "<none>"
if attribs.GetUser() != nil {
username = attribs.GetUser().GetName()
if userGroups := attribs.GetUser().GetGroups(); len(userGroups) > 0 {
groups = auditStringSlice(userGroups)
}
}
asuser := req.Header.Get(authenticationapi.ImpersonateUserHeader)
if len(asuser) == 0 {
asuser = "<self>"
}
asgroups := "<lookup>"
requestedGroups := req.Header[authenticationapi.ImpersonateGroupHeader]
if len(requestedGroups) > 0 {
asgroups = auditStringSlice(requestedGroups)
}
namespace := attribs.GetNamespace()
if len(namespace) == 0 {
namespace = "<none>"
}
id := uuid.NewRandom().String()
line := fmt.Sprintf("%s AUDIT: id=%q ip=%q method=%q user=%q groups=%q as=%q asgroups=%q namespace=%q uri=%q\n",
time.Now().Format(time.RFC3339Nano), id, utilnet.GetClientIP(req), req.Method, username, groups, asuser, asgroups, namespace, req.URL)
if _, err := fmt.Fprint(out, line); err != nil {
glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err)
}
respWriter := decorateResponseWriter(w, out, id)
handler.ServeHTTP(respWriter, req)
})
}
func auditStringSlice(inList []string) string {
quotedElements := make([]string, len(inList))
for i, in := range inList {
quotedElements[i] = fmt.Sprintf("%q", in)
}
return strings.Join(quotedElements, ",")
}
func decorateResponseWriter(responseWriter http.ResponseWriter, out io.Writer, id string) http.ResponseWriter {
delegate := &auditResponseWriter{ResponseWriter: responseWriter, out: out, id: id}
// check if the ResponseWriter we're wrapping is the fancy one we need
// or if the basic is sufficient
_, cn := responseWriter.(http.CloseNotifier)
_, fl := responseWriter.(http.Flusher)
_, hj := responseWriter.(http.Hijacker)
if cn && fl && hj {
return &fancyResponseWriterDelegator{delegate}
}
return delegate
}

View File

@ -19,24 +19,62 @@ package filters
import (
"bufio"
"bytes"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"reflect"
"regexp"
"strings"
"sync"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
pluginlog "k8s.io/apiserver/plugin/pkg/audit/log"
)
type simpleResponseWriter struct {
http.ResponseWriter
type fakeAuditSink struct {
lock sync.Mutex
events []*auditinternal.Event
}
func (*simpleResponseWriter) WriteHeader(code int) {}
func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) {
s.lock.Lock()
defer s.lock.Unlock()
s.events = append(s.events, evs...)
}
func (s *fakeAuditSink) Events() []*auditinternal.Event {
s.lock.Lock()
defer s.lock.Unlock()
return append([]*auditinternal.Event{}, s.events...)
}
func (s *fakeAuditSink) Pop(timeout time.Duration) (*auditinternal.Event, error) {
var result *auditinternal.Event
err := wait.Poll(50*time.Millisecond, wait.ForeverTestTimeout, wait.ConditionFunc(func() (done bool, err error) {
s.lock.Lock()
defer s.lock.Unlock()
if len(s.events) == 0 {
return false, nil
}
result = s.events[0]
s.events = s.events[1:]
return true, nil
}))
return result, err
}
type simpleResponseWriter struct{}
var _ http.ResponseWriter = &simpleResponseWriter{}
func (*simpleResponseWriter) WriteHeader(code int) {}
func (*simpleResponseWriter) Write(bs []byte) (int, error) { return len(bs), nil }
func (*simpleResponseWriter) Header() http.Header { return http.Header{} }
type fancyResponseWriter struct {
simpleResponseWriter
@ -49,14 +87,14 @@ func (*fancyResponseWriter) Flush() {}
func (*fancyResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { return nil, nil, nil }
func TestConstructResponseWriter(t *testing.T) {
actual := decorateResponseWriter(&simpleResponseWriter{}, ioutil.Discard, "")
actual := decorateResponseWriter(&simpleResponseWriter{}, nil, nil)
switch v := actual.(type) {
case *auditResponseWriter:
default:
t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v))
}
actual = decorateResponseWriter(&fancyResponseWriter{}, ioutil.Discard, "")
actual = decorateResponseWriter(&fancyResponseWriter{}, nil, nil)
switch v := actual.(type) {
case *fancyResponseWriterDelegator:
default:
@ -64,6 +102,73 @@ func TestConstructResponseWriter(t *testing.T) {
}
}
func TestDecorateResponseWriterWithoutChannel(t *testing.T) {
ev := &auditinternal.Event{}
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil)
// write status. This will not block because firstEventSentCh is nil
actual.WriteHeader(42)
if ev.ResponseStatus == nil {
t.Fatalf("Expected ResponseStatus to be non-nil")
}
if ev.ResponseStatus.Code != 42 {
t.Errorf("expected status code 42, got %d", ev.ResponseStatus.Code)
}
}
func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) {
ev := &auditinternal.Event{}
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil)
// write status. This will not block because firstEventSentCh is nil
actual.Write([]byte("foo"))
if ev.ResponseStatus == nil {
t.Fatalf("Expected ResponseStatus to be non-nil")
}
if ev.ResponseStatus.Code != 200 {
t.Errorf("expected status code 200, got %d", ev.ResponseStatus.Code)
}
}
func TestDecorateResponseWriterChannel(t *testing.T) {
sink := &fakeAuditSink{}
ev := &auditinternal.Event{}
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, sink)
done := make(chan struct{})
go func() {
t.Log("Writing status code 42")
actual.WriteHeader(42)
t.Log("Finished writing status code 42")
close(done)
actual.Write([]byte("foo"))
}()
// sleep some time to give write the possibility to do wrong stuff
time.Sleep(100 * time.Millisecond)
t.Log("Waiting for event in the channel")
ev1, err := sink.Pop(time.Second)
if err != nil {
t.Fatal("Timeout waiting for events")
}
t.Logf("Seen event with status %v", ev1.ResponseStatus)
if ev != ev1 {
t.Fatalf("ev1 and ev must be equal")
}
<-done
t.Log("Seen the go routine finished")
// write again
_, err = actual.Write([]byte("foo"))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}
type fakeHTTPHandler struct{}
func (*fakeHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@ -71,32 +176,189 @@ func (*fakeHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
func TestAudit(t *testing.T) {
var buf bytes.Buffer
shortRunningPrefix := `[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="list" user="admin" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods"`
longRunningPrefix := `[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="watch" user="admin" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods\?watch=true"`
handler := WithAudit(&fakeHTTPHandler{}, &fakeRequestContextMapper{
user: &user.DefaultInfo{Name: "admin"},
}, &buf)
shortRunningPath := "/api/v1/namespaces/default/pods"
longRunningPath := "/api/v1/namespaces/default/pods?watch=true"
req, _ := http.NewRequest("GET", "/api/v1/namespaces/default/pods", nil)
req.RemoteAddr = "127.0.0.1"
handler.ServeHTTP(httptest.NewRecorder(), req)
line := strings.Split(strings.TrimSpace(buf.String()), "\n")
if len(line) != 2 {
t.Fatalf("Unexpected amount of lines in audit log: %d", len(line))
}
match, err := regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="GET" user="admin" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods"`, line[0])
if err != nil {
t.Errorf("Unexpected error matching first line: %v", err)
}
if !match {
t.Errorf("Unexpected first line of audit: %s", line[0])
}
match, err = regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" response="200"`, line[1])
if err != nil {
t.Errorf("Unexpected error matching second line: %v", err)
}
if !match {
t.Errorf("Unexpected second line of audit: %s", line[1])
delay := 500 * time.Millisecond
for _, test := range []struct {
desc string
path string
handler func(http.ResponseWriter, *http.Request)
expected []string
}{
// short running requests
{
"empty",
shortRunningPath,
func(http.ResponseWriter, *http.Request) {},
[]string{
shortRunningPrefix + ` response="200"`,
},
},
{
"sleep",
shortRunningPath,
func(http.ResponseWriter, *http.Request) {
time.Sleep(delay)
},
[]string{
shortRunningPrefix + ` response="200"`,
},
},
{
"403+write",
shortRunningPath,
func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(403)
w.Write([]byte("foo"))
},
[]string{
shortRunningPrefix + ` response="403"`,
},
},
{
"panic",
shortRunningPath,
func(w http.ResponseWriter, req *http.Request) {
panic("kaboom")
},
[]string{
shortRunningPrefix + ` response="500"`,
},
},
{
"write+panic",
shortRunningPath,
func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("foo"))
panic("kaboom")
},
[]string{
shortRunningPrefix + ` response="500"`,
},
},
// long running requests
{
"empty longrunning",
longRunningPath,
func(http.ResponseWriter, *http.Request) {},
[]string{
longRunningPrefix + ` response="200"`,
},
},
{
"sleep longrunning",
longRunningPath,
func(http.ResponseWriter, *http.Request) {
time.Sleep(delay)
},
[]string{
longRunningPrefix + ` response="200"`,
},
},
{
"sleep+403 longrunning",
longRunningPath,
func(w http.ResponseWriter, req *http.Request) {
time.Sleep(delay)
w.WriteHeader(403)
},
[]string{
longRunningPrefix + ` response="<deferred>"`,
longRunningPrefix + ` response="403"`,
},
},
{
"write longrunning",
longRunningPath,
func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("foo"))
},
[]string{
longRunningPrefix + ` response="<deferred>"`,
longRunningPrefix + ` response="200"`,
},
},
{
"403+write longrunning",
longRunningPath,
func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(403)
w.Write([]byte("foo"))
},
[]string{
longRunningPrefix + ` response="<deferred>"`,
longRunningPrefix + ` response="403"`,
},
},
{
"panic longrunning",
longRunningPath,
func(w http.ResponseWriter, req *http.Request) {
panic("kaboom")
},
[]string{
longRunningPrefix + ` response="500"`,
},
},
{
"write+panic longrunning",
longRunningPath,
func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("foo"))
panic("kaboom")
},
[]string{
longRunningPrefix + ` response="<deferred>"`,
longRunningPrefix + ` response="500"`,
},
},
} {
var buf bytes.Buffer
backend := pluginlog.NewBackend(&buf)
handler := WithAudit(http.HandlerFunc(test.handler), &fakeRequestContextMapper{
user: &user.DefaultInfo{Name: "admin"},
}, backend, auditinternal.NewConstantPolicy(auditinternal.LevelRequestResponse), func(r *http.Request, ri *request.RequestInfo) bool {
// simplified long-running check
return ri.Verb == "watch"
})
req, _ := http.NewRequest("GET", test.path, nil)
req.RemoteAddr = "127.0.0.1"
done := make(chan struct{})
go func() {
defer func() {
recover()
close(done)
}()
handler.ServeHTTP(httptest.NewRecorder(), req)
}()
<-done
t.Logf("[%s] audit log: %v", test.desc, buf.String())
line := strings.Split(strings.TrimSpace(buf.String()), "\n")
if len(line) != len(test.expected) {
t.Errorf("[%s] Unexpected amount of lines in audit log: %d", test.desc, len(line))
continue
}
for i, re := range test.expected {
match, err := regexp.MatchString(re, line[i])
if err != nil {
t.Errorf("[%s] Unexpected error matching line %d: %v", test.desc, i, err)
continue
}
if !match {
t.Errorf("[%s] Unexpected line %d of audit: %s", test.desc, i, line[i])
}
}
}
}
@ -124,29 +386,8 @@ func (*fakeRequestContextMapper) Update(req *http.Request, context request.Conte
}
func TestAuditNoPanicOnNilUser(t *testing.T) {
var buf bytes.Buffer
handler := WithAudit(&fakeHTTPHandler{}, &fakeRequestContextMapper{}, &buf)
handler := WithAudit(&fakeHTTPHandler{}, &fakeRequestContextMapper{}, &fakeAuditSink{}, auditinternal.NewConstantPolicy(auditinternal.LevelRequestResponse), nil)
req, _ := http.NewRequest("GET", "/api/v1/namespaces/default/pods", nil)
req.RemoteAddr = "127.0.0.1"
handler.ServeHTTP(httptest.NewRecorder(), req)
line := strings.Split(strings.TrimSpace(buf.String()), "\n")
if len(line) != 2 {
t.Fatalf("Unexpected amount of lines in audit log: %d", len(line))
}
match, err := regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="GET" user="<none>" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods"`, line[0])
if err != nil {
t.Errorf("Unexpected error matching first line: %v", err)
}
if !match {
t.Errorf("Unexpected first line of audit: %s", line[0])
}
match, err = regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" response="200"`, line[1])
if err != nil {
t.Errorf("Unexpected error matching second line: %v", err)
}
if !match {
t.Errorf("Unexpected second line of audit: %s", line[1])
}
}

View File

@ -0,0 +1,162 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"bufio"
"errors"
"fmt"
"io"
"net"
"net/http"
"strings"
"time"
"github.com/golang/glog"
"github.com/pborman/uuid"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
authenticationapi "k8s.io/client-go/pkg/apis/authentication/v1"
)
var _ http.ResponseWriter = &legacyAuditResponseWriter{}
type legacyAuditResponseWriter struct {
http.ResponseWriter
out io.Writer
id string
}
func (a *legacyAuditResponseWriter) WriteHeader(code int) {
line := fmt.Sprintf("%s AUDIT: id=%q response=\"%d\"\n", time.Now().Format(time.RFC3339Nano), a.id, code)
if _, err := fmt.Fprint(a.out, line); err != nil {
glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err)
}
a.ResponseWriter.WriteHeader(code)
}
// fancyLegacyResponseWriterDelegator implements http.CloseNotifier, http.Flusher and
// http.Hijacker which are needed to make certain http operation (e.g. watch, rsh, etc)
// working.
type fancyLegacyResponseWriterDelegator struct {
*legacyAuditResponseWriter
}
func (f *fancyLegacyResponseWriterDelegator) CloseNotify() <-chan bool {
return f.ResponseWriter.(http.CloseNotifier).CloseNotify()
}
func (f *fancyLegacyResponseWriterDelegator) Flush() {
f.ResponseWriter.(http.Flusher).Flush()
}
func (f *fancyLegacyResponseWriterDelegator) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return f.ResponseWriter.(http.Hijacker).Hijack()
}
var _ http.CloseNotifier = &fancyLegacyResponseWriterDelegator{}
var _ http.Flusher = &fancyLegacyResponseWriterDelegator{}
var _ http.Hijacker = &fancyLegacyResponseWriterDelegator{}
// WithAudit decorates a http.Handler with audit logging information for all the
// requests coming to the server. If out is nil, no decoration takes place.
// Each audit log contains two entries:
// 1. the request line containing:
// - unique id allowing to match the response line (see 2)
// - source ip of the request
// - HTTP method being invoked
// - original user invoking the operation
// - original user's groups info
// - impersonated user for the operation
// - impersonated groups info
// - namespace of the request or <none>
// - uri is the full URI as requested
// 2. the response line containing:
// - the unique id from 1
// - response code
func WithLegacyAudit(handler http.Handler, requestContextMapper request.RequestContextMapper, out io.Writer) http.Handler {
if out == nil {
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, ok := requestContextMapper.Get(req)
if !ok {
responsewriters.InternalError(w, req, errors.New("no context found for request"))
return
}
attribs, err := GetAuthorizerAttributes(ctx)
if err != nil {
responsewriters.InternalError(w, req, err)
return
}
username := "<none>"
groups := "<none>"
if attribs.GetUser() != nil {
username = attribs.GetUser().GetName()
if userGroups := attribs.GetUser().GetGroups(); len(userGroups) > 0 {
groups = auditStringSlice(userGroups)
}
}
asuser := req.Header.Get(authenticationapi.ImpersonateUserHeader)
if len(asuser) == 0 {
asuser = "<self>"
}
asgroups := "<lookup>"
requestedGroups := req.Header[authenticationapi.ImpersonateGroupHeader]
if len(requestedGroups) > 0 {
asgroups = auditStringSlice(requestedGroups)
}
namespace := attribs.GetNamespace()
if len(namespace) == 0 {
namespace = "<none>"
}
id := uuid.NewRandom().String()
line := fmt.Sprintf("%s AUDIT: id=%q ip=%q method=%q user=%q groups=%q as=%q asgroups=%q namespace=%q uri=%q\n",
time.Now().Format(time.RFC3339Nano), id, utilnet.GetClientIP(req), req.Method, username, groups, asuser, asgroups, namespace, req.URL)
if _, err := fmt.Fprint(out, line); err != nil {
glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err)
}
respWriter := legacyDecorateResponseWriter(w, out, id)
handler.ServeHTTP(respWriter, req)
})
}
func auditStringSlice(inList []string) string {
quotedElements := make([]string, len(inList))
for i, in := range inList {
quotedElements[i] = fmt.Sprintf("%q", in)
}
return strings.Join(quotedElements, ",")
}
func legacyDecorateResponseWriter(responseWriter http.ResponseWriter, out io.Writer, id string) http.ResponseWriter {
delegate := &legacyAuditResponseWriter{ResponseWriter: responseWriter, out: out, id: id}
// check if the ResponseWriter we're wrapping is the fancy one we need
// or if the basic is sufficient
_, cn := responseWriter.(http.CloseNotifier)
_, fl := responseWriter.(http.Flusher)
_, hj := responseWriter.(http.Hijacker)
if cn && fl && hj {
return &fancyLegacyResponseWriterDelegator{delegate}
}
return delegate
}

View File

@ -0,0 +1,104 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"bytes"
"io/ioutil"
"net/http"
"net/http/httptest"
"reflect"
"regexp"
"strings"
"testing"
"k8s.io/apiserver/pkg/authentication/user"
)
func TestLegacyConstructResponseWriter(t *testing.T) {
actual := legacyDecorateResponseWriter(&simpleResponseWriter{}, ioutil.Discard, "")
switch v := actual.(type) {
case *legacyAuditResponseWriter:
default:
t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v))
}
actual = legacyDecorateResponseWriter(&fancyResponseWriter{}, ioutil.Discard, "")
switch v := actual.(type) {
case *fancyLegacyResponseWriterDelegator:
default:
t.Errorf("Expected fancyResponseWriterDelegator, got %v", reflect.TypeOf(v))
}
}
func TestLegacyAudit(t *testing.T) {
var buf bytes.Buffer
handler := WithLegacyAudit(&fakeHTTPHandler{}, &fakeRequestContextMapper{
user: &user.DefaultInfo{Name: "admin"},
}, &buf)
req, _ := http.NewRequest("GET", "/api/v1/namespaces/default/pods", nil)
req.RemoteAddr = "127.0.0.1"
handler.ServeHTTP(httptest.NewRecorder(), req)
line := strings.Split(strings.TrimSpace(buf.String()), "\n")
if len(line) != 2 {
t.Fatalf("Unexpected amount of lines in audit log: %d", len(line))
}
match, err := regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="GET" user="admin" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods"`, line[0])
if err != nil {
t.Errorf("Unexpected error matching first line: %v", err)
}
if !match {
t.Errorf("Unexpected first line of audit: %s", line[0])
}
match, err = regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" response="200"`, line[1])
if err != nil {
t.Errorf("Unexpected error matching second line: %v", err)
}
if !match {
t.Errorf("Unexpected second line of audit: %s", line[1])
}
}
func TestLegacyAuditNoPanicOnNilUser(t *testing.T) {
var buf bytes.Buffer
handler := WithLegacyAudit(&fakeHTTPHandler{}, &fakeRequestContextMapper{}, &buf)
req, _ := http.NewRequest("GET", "/api/v1/namespaces/default/pods", nil)
req.RemoteAddr = "127.0.0.1"
handler.ServeHTTP(httptest.NewRecorder(), req)
line := strings.Split(strings.TrimSpace(buf.String()), "\n")
if len(line) != 2 {
t.Fatalf("Unexpected amount of lines in audit log: %d", len(line))
}
match, err := regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="GET" user="<none>" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods"`, line[0])
if err != nil {
t.Errorf("Unexpected error matching first line: %v", err)
}
if !match {
t.Errorf("Unexpected first line of audit: %s", line[0])
}
match, err = regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" response="200"`, line[1])
if err != nil {
t.Errorf("Unexpected error matching second line: %v", err)
}
if !match {
t.Errorf("Unexpected second line of audit: %s", line[1])
}
}

View File

@ -118,14 +118,14 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
redirector, ok := storage.(rest.Redirector)
if !ok {
httplog.LogOf(req, w).Addf("'%v' is not a redirector", resource)
httpCode = responsewriters.ErrorNegotiated(apierrors.NewMethodNotSupported(schema.GroupResource{Resource: resource}, "proxy"), r.Serializer, gv, w, req)
httpCode = responsewriters.ErrorNegotiated(ctx, apierrors.NewMethodNotSupported(schema.GroupResource{Resource: resource}, "proxy"), r.Serializer, gv, w, req)
return
}
location, roundTripper, err := redirector.ResourceLocation(ctx, id)
if err != nil {
httplog.LogOf(req, w).Addf("Error getting ResourceLocation: %v", err)
httpCode = responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req)
httpCode = responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req)
return
}
if location == nil {
@ -158,7 +158,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newReq, err := http.NewRequest(req.Method, location.String(), req.Body)
if err != nil {
httpCode = responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req)
httpCode = responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req)
return
}
httpCode = http.StatusOK
@ -171,7 +171,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// TODO convert this entire proxy to an UpgradeAwareProxy similar to
// https://github.com/openshift/origin/blob/master/pkg/util/httpproxy/upgradeawareproxy.go.
// That proxy needs to be modified to support multiple backends, not just 1.
if r.tryUpgrade(w, req, newReq, location, roundTripper, gv) {
if r.tryUpgrade(w, req, newReq, location, roundTripper, gv, ctx) {
return
}
@ -220,13 +220,13 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
// tryUpgrade returns true if the request was handled.
func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Request, location *url.URL, transport http.RoundTripper, gv schema.GroupVersion) bool {
func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Request, location *url.URL, transport http.RoundTripper, gv schema.GroupVersion, ctx request.Context) bool {
if !httpstream.IsUpgradeRequest(req) {
return false
}
backendConn, err := proxyutil.DialURL(location, transport)
if err != nil {
responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req)
responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req)
return true
}
defer backendConn.Close()
@ -236,13 +236,13 @@ func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Reque
// hijack, just for reference...
requestHijackedConn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req)
responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req)
return true
}
defer requestHijackedConn.Close()
if err = newReq.Write(backendConn); err != nil {
responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req)
responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req)
return true
}

View File

@ -26,7 +26,9 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/flushwriter"
"k8s.io/apiserver/pkg/util/wsstream"
@ -37,16 +39,16 @@ import (
// response. The Accept header and current API version will be passed in, and the output will be copied
// directly to the response body. If content type is returned it is used, otherwise the content type will
// be "application/octet-stream". All other objects are sent to standard JSON serialization.
func WriteObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, object runtime.Object, w http.ResponseWriter, req *http.Request) {
func WriteObject(ctx request.Context, statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, object runtime.Object, w http.ResponseWriter, req *http.Request) {
stream, ok := object.(rest.ResourceStreamer)
if !ok {
WriteObjectNegotiated(s, gv, w, req, statusCode, object)
WriteObjectNegotiated(ctx, s, gv, w, req, statusCode, object)
return
}
out, flush, contentType, err := stream.InputStream(gv.String(), req.Header.Get("Accept"))
if err != nil {
ErrorNegotiated(err, s, gv, w, req)
ErrorNegotiated(ctx, err, s, gv, w, req)
return
}
if out == nil {
@ -76,8 +78,9 @@ func WriteObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSer
io.Copy(writer, out)
}
// WriteObjectNegotiated renders an object in the content type negotiated by the client
func WriteObjectNegotiated(s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
// WriteObjectNegotiated renders an object in the content type negotiated by the client.
// The context is optional and can be nil.
func WriteObjectNegotiated(ctx request.Context, s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
serializer, err := negotiation.NegotiateOutputSerializer(req, s)
if err != nil {
status := apiStatus(err)
@ -85,6 +88,10 @@ func WriteObjectNegotiated(s runtime.NegotiatedSerializer, gv schema.GroupVersio
return
}
if ae := request.AuditEventFrom(ctx); ae != nil {
audit.LogResponseObject(ae, object, gv, s)
}
w.Header().Set("Content-Type", serializer.MediaType)
w.WriteHeader(statusCode)
@ -95,7 +102,8 @@ func WriteObjectNegotiated(s runtime.NegotiatedSerializer, gv schema.GroupVersio
}
// ErrorNegotiated renders an error to the response. Returns the HTTP status code of the error.
func ErrorNegotiated(err error, s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request) int {
// The context is options and may be nil.
func ErrorNegotiated(ctx request.Context, err error, s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request) int {
status := apiStatus(err)
code := int(status.Code)
// when writing an error, check to see if the status indicates a retry after period
@ -109,7 +117,7 @@ func ErrorNegotiated(err error, s runtime.NegotiatedSerializer, gv schema.GroupV
return code
}
WriteObjectNegotiated(s, gv, w, req, code, status)
WriteObjectNegotiated(ctx, s, gv, w, req, code, status)
return code
}

View File

@ -42,6 +42,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
@ -72,7 +73,8 @@ type RequestScope struct {
}
func (scope *RequestScope) err(err error, w http.ResponseWriter, req *http.Request) {
responsewriters.ErrorNegotiated(err, scope.Serializer, scope.Kind.GroupVersion(), w, req)
ctx := scope.ContextFunc(req)
responsewriters.ErrorNegotiated(ctx, err, scope.Serializer, scope.Kind.GroupVersion(), w, req)
}
// getterFunc performs a get request with the given context and object name. The request
@ -106,8 +108,9 @@ func getResourceHandler(scope RequestScope, getter getterFunc) http.HandlerFunc
scope.err(err, w, req)
return
}
trace.Step("About to write a response")
responsewriters.WriteObject(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
responsewriters.WriteObject(ctx, http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
}
}
@ -226,7 +229,8 @@ type responder struct {
}
func (r *responder) Object(statusCode int, obj runtime.Object) {
responsewriters.WriteObject(statusCode, r.scope.Kind.GroupVersion(), r.scope.Serializer, obj, r.w, r.req)
ctx := r.scope.ContextFunc(r.req)
responsewriters.WriteObject(ctx, statusCode, r.scope.Kind.GroupVersion(), r.scope.Serializer, obj, r.w, r.req)
}
func (r *responder) Error(err error) {
@ -334,7 +338,8 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch
return
}
}
responsewriters.WriteObject(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
responsewriters.WriteObject(ctx, http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
trace.Step(fmt.Sprintf("Writing http response done (%d items)", numberOfItems))
}
}
@ -395,6 +400,9 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object
}
trace.Step("Conversion done")
ae := request.AuditEventFrom(ctx)
audit.LogRequestObject(ae, obj, scope.Resource.GroupVersion(), scope.Serializer)
if admit != nil && admit.Handles(admission.Create) {
userInfo, _ := request.UserFrom(ctx)
@ -425,7 +433,7 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object
}
trace.Step("Self-link added")
responsewriters.WriteObject(http.StatusCreated, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
responsewriters.WriteObject(ctx, http.StatusCreated, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
}
}
@ -485,6 +493,9 @@ func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface
return
}
ae := request.AuditEventFrom(ctx)
audit.LogRequestPatch(ae, patchJS)
s, ok := runtime.SerializerInfoForMediaType(scope.Serializer.SupportedMediaTypes(), runtime.ContentTypeJSON)
if !ok {
scope.err(fmt.Errorf("no serializer defined for JSON"), w, req)
@ -517,7 +528,7 @@ func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface
return
}
responsewriters.WriteObject(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
responsewriters.WriteObject(ctx, http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
}
}
@ -803,6 +814,9 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType
}
trace.Step("Conversion done")
ae := request.AuditEventFrom(ctx)
audit.LogRequestObject(ae, obj, scope.Resource.GroupVersion(), scope.Serializer)
if err := checkName(obj, name, namespace, scope.Namer); err != nil {
scope.err(err, w, req)
return
@ -839,7 +853,7 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType
if wasCreated {
status = http.StatusCreated
}
responsewriters.WriteObject(status, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
responsewriters.WriteObject(ctx, status, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
}
}
@ -886,6 +900,9 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope RequestSco
scope.err(fmt.Errorf("decoded object cannot be converted to DeleteOptions"), w, req)
return
}
ae := request.AuditEventFrom(ctx)
audit.LogRequestObject(ae, obj, scope.Resource.GroupVersion(), scope.Serializer)
} else {
if values := req.URL.Query(); len(values) > 0 {
if err := metainternalversion.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, options); err != nil {
@ -949,7 +966,8 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope RequestSco
}
}
}
responsewriters.WriteObject(status, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
responsewriters.WriteObject(ctx, status, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
}
}
@ -1021,6 +1039,9 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco
scope.err(fmt.Errorf("decoded object cannot be converted to DeleteOptions"), w, req)
return
}
ae := request.AuditEventFrom(ctx)
audit.LogRequestObject(ae, obj, scope.Resource.GroupVersion(), scope.Serializer)
}
}
@ -1051,7 +1072,8 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco
}
}
}
responsewriters.WriteObjectNegotiated(scope.Serializer, scope.Kind.GroupVersion(), w, req, http.StatusOK, result)
responsewriters.WriteObjectNegotiated(ctx, scope.Serializer, scope.Kind.GroupVersion(), w, req, http.StatusOK, result)
}
}

View File

@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/authentication/user"
)
@ -63,6 +64,9 @@ const (
// userAgentKey is the context key for the request user agent.
userAgentKey
// auditKey is the context key for the audit event.
auditKey
namespaceDefault = "default" // TODO(sttts): solve import cycle when using metav1.NamespaceDefault
)
@ -143,3 +147,14 @@ func UserAgentFrom(ctx Context) (string, bool) {
userAgent, ok := ctx.Value(userAgentKey).(string)
return userAgent, ok
}
// WithAuditEvent returns set audit event struct.
func WithAuditEvent(parent Context, ev *audit.Event) Context {
return WithValue(parent, auditKey, ev)
}
// AuditEventFrom returns the audit event struct on the ctx
func AuditEventFrom(ctx Context) *audit.Event {
ev, _ := ctx.Value(auditKey).(*audit.Event)
return ev
}

View File

@ -20,7 +20,6 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net"
"net/http"
goruntime "runtime"
@ -39,6 +38,8 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/admission"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/authenticatorfactory"
authenticatorunion "k8s.io/apiserver/pkg/authentication/request/union"
@ -100,8 +101,11 @@ type Config struct {
// Version will enable the /version endpoint if non-nil
Version *version.Info
// AuditWriter is the destination for audit logs. If nil, they will not be written.
AuditWriter io.Writer
// AuditBackend is where audit events are sent to.
AuditBackend audit.Backend
// AuditPolicy defines rules which determine the audit level for different requests.
AuditPolicy *auditinternal.Policy
// SupportsBasicAuth indicates that's at least one Authenticator supports basic auth
// If this is true, a basic auth challenge is returned on authentication failure
// TODO(roberthbailey): Remove once the server no longer supports http basic auth.
@ -374,7 +378,7 @@ func (c completedConfig) New(delegationTarget DelegationTarget) (*GenericAPIServ
handlerChainBuilder := func(handler http.Handler) http.Handler {
return c.BuildHandlerChainFunc(handler, c.Config)
}
apiServerHandler := NewAPIServerHandler(c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
apiServerHandler := NewAPIServerHandler(c.RequestContextMapper, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
s := &GenericAPIServer{
discoveryAddresses: c.DiscoveryAddresses,
@ -383,6 +387,7 @@ func (c completedConfig) New(delegationTarget DelegationTarget) (*GenericAPIServ
admissionControl: c.AdmissionControl,
requestContextMapper: c.RequestContextMapper,
Serializer: c.Serializer,
AuditBackend: c.AuditBackend,
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
@ -452,7 +457,8 @@ func (c completedConfig) New(delegationTarget DelegationTarget) (*GenericAPIServ
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := genericapifilters.WithAuthorization(apiHandler, c.RequestContextMapper, c.Authorizer)
handler = genericapifilters.WithImpersonation(handler, c.RequestContextMapper, c.Authorizer)
handler = genericapifilters.WithAudit(handler, c.RequestContextMapper, c.AuditWriter)
// TODO(audit): use WithLegacyAudit if feature flag is false
handler = genericapifilters.WithAudit(handler, c.RequestContextMapper, c.AuditBackend, c.AuditPolicy, c.LongRunningFunc)
handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, genericapifilters.Unauthorized(c.SupportsBasicAuth))
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithPanicRecovery(handler)

View File

@ -37,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/audit"
genericapi "k8s.io/apiserver/pkg/endpoints"
"k8s.io/apiserver/pkg/endpoints/discovery"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
@ -141,6 +142,9 @@ type GenericAPIServer struct {
healthzLock sync.Mutex
healthzChecks []healthz.HealthzChecker
healthzCreated bool
// auditing. The backend is started after the server starts listening.
AuditBackend audit.Backend
}
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
@ -272,6 +276,14 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
close(internalStopCh)
}()
// Start the audit backend before any request comes in. This means we cannot turn it into a
// post start hook because without calling Backend.Run the Backend.ProcessEvents call might block.
if s.AuditBackend != nil {
if err := s.AuditBackend.Run(stopCh); err != nil {
return fmt.Errorf("failed to run the audit backend: %v", err)
}
}
s.RunPostStartHooks(stopCh)
if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {

View File

@ -109,11 +109,12 @@ func logStackOnRecover(s runtime.NegotiatedSerializer, panicReason interface{},
if ct := w.Header().Get("Content-Type"); len(ct) > 0 {
headers.Set("Accept", ct)
}
responsewriters.ErrorNegotiated(apierrors.NewGenericServerResponse(http.StatusInternalServerError, "", schema.GroupResource{}, "", "", 0, false), s, schema.GroupVersion{}, w, &http.Request{Header: headers})
responsewriters.ErrorNegotiated(nil, apierrors.NewGenericServerResponse(http.StatusInternalServerError, "", schema.GroupResource{}, "", "", 0, false), s, schema.GroupVersion{}, w, &http.Request{Header: headers})
}
func serviceErrorHandler(s runtime.NegotiatedSerializer, serviceErr restful.ServiceError, request *restful.Request, resp *restful.Response) {
responsewriters.ErrorNegotiated(
nil,
apierrors.NewGenericServerResponse(serviceErr.Code, "", schema.GroupResource{}, "", serviceErr.Message, 0, false),
s,
schema.GroupVersion{},

View File

@ -17,12 +17,14 @@ limitations under the License.
package options
import (
"io"
"os"
"github.com/spf13/pflag"
"gopkg.in/natefinch/lumberjack.v2"
"k8s.io/apiserver/pkg/server"
pluginlog "k8s.io/apiserver/plugin/pkg/audit/log"
)
type AuditLogOptions struct {
@ -52,16 +54,15 @@ func (o *AuditLogOptions) ApplyTo(c *server.Config) error {
return nil
}
if o.Path == "-" {
c.AuditWriter = os.Stdout
return nil
}
c.AuditWriter = &lumberjack.Logger{
Filename: o.Path,
MaxAge: o.MaxAge,
MaxBackups: o.MaxBackups,
MaxSize: o.MaxSize,
var w io.Writer = os.Stdout
if o.Path != "-" {
w = &lumberjack.Logger{
Filename: o.Path,
MaxAge: o.MaxAge,
MaxBackups: o.MaxBackups,
MaxSize: o.MaxSize,
}
}
c.AuditBackend = pluginlog.NewBackend(w)
return nil
}

View File

@ -0,0 +1,18 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package auduút contains implementations for pkg/audit/AuditBackend interface
package audit // import "k8s.io/apiserver/plugin/pkg/audit"

View File

@ -0,0 +1,102 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package log
import (
"fmt"
"io"
"strconv"
"strings"
"time"
"github.com/golang/glog"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
)
type backend struct {
out io.Writer
sink chan *auditinternal.Event
}
var _ audit.Backend = &backend{}
func NewBackend(out io.Writer) *backend {
return &backend{
out: out,
sink: make(chan *auditinternal.Event, 100),
}
}
func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
for _, ev := range events {
b.logEvent(ev)
}
}
func (b *backend) logEvent(ev *auditinternal.Event) {
username := "<none>"
groups := "<none>"
if len(ev.User.Username) > 0 {
username = ev.User.Username
if len(ev.User.Groups) > 0 {
groups = auditStringSlice(ev.User.Groups)
}
}
asuser := "<self>"
asgroups := "<lookup>"
if ev.ImpersonatedUser != nil {
asuser = ev.ImpersonatedUser.Username
if ev.ImpersonatedUser.Groups != nil {
asgroups = auditStringSlice(ev.ImpersonatedUser.Groups)
}
}
namespace := "<none>"
if ev.ObjectRef != nil && len(ev.ObjectRef.Namespace) != 0 {
namespace = ev.ObjectRef.Namespace
}
response := "<deferred>"
if ev.ResponseStatus != nil {
response = strconv.Itoa(int(ev.ResponseStatus.Code))
}
ip := "<unknown>"
if len(ev.SourceIPs) > 0 {
ip = ev.SourceIPs[0]
}
line := fmt.Sprintf("%s AUDIT: id=%q ip=%q method=%q user=%q groups=%q as=%q asgroups=%q namespace=%q uri=%q response=\"%s\"\n",
ev.Timestamp.Format(time.RFC3339Nano), ev.AuditID, ip, ev.Verb, username, groups, asuser, asgroups, namespace, ev.RequestURI, response)
if _, err := fmt.Fprint(b.out, line); err != nil {
glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err)
}
}
func (b *backend) Run(stopCh <-chan struct{}) error {
return nil
}
func auditStringSlice(inList []string) string {
quotedElements := make([]string, len(inList))
for i, in := range inList {
quotedElements[i] = fmt.Sprintf("%q", in)
}
return strings.Join(quotedElements, ",")
}

View File

@ -76,7 +76,7 @@ func (r *apisHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}
responsewriters.WriteObjectNegotiated(r.codecs, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroupList)
responsewriters.WriteObjectNegotiated(nil, r.codecs, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroupList)
}
// convertToDiscoveryAPIGroup takes apiservices in a single group and returns a discovery compatible object.
@ -151,5 +151,5 @@ func (r *apiGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
http.Error(w, "", http.StatusNotFound)
return
}
responsewriters.WriteObjectNegotiated(r.codecs, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroup)
responsewriters.WriteObjectNegotiated(nil, r.codecs, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroup)
}