Merge pull request #45766 from sttts/sttts-audit-event-in-context

Automatic merge from submit-queue (batch tested with PRs 45766, 46223)

Audit: fill audit.Event in handler chain

Related:
- external API types https://github.com/kubernetes/kubernetes/pull/45315
- policy checker https://github.com/kubernetes/kubernetes/pull/46009

Decisions:
- ~~[ ] decide whether we want to send an event before `WriteHeader` https://github.com/kubernetes/kubernetes/pull/45766#pullrequestreview-38664161~~ Follow-up described in https://github.com/kubernetes/kubernetes/pull/46065/files#r117438531
- [ ] decide how to handle `AuditID`s and the IP chain https://github.com/kubernetes/kubernetes/pull/45766#pullrequestreview-38659371. Is the variant in the proposal (https://github.com/kubernetes/community/pull/625) final? Then we need the API type update.
- ~~[ ] decide how to mark intermediate/incomplete events? set a special reason in `ResponseStatus.Reason` vs. having extra fields for that `Event.NonFinal`
 https://github.com/kubernetes/kubernetes/pull/45766#discussion_r116795888~~ Follow-up of #46065
- [ ] decide whether and how to protect the `Audit-Level` header https://github.com/kubernetes/kubernetes/pull/45766#pullrequestreview-38937691

TODOs:
- ~~[ ] move `AuditIDHeader`, `AuditLevelHeader` to types https://github.com/kubernetes/kubernetes/pull/45766#discussion_r117064094, @timstclair for the type PR~~ Follow-up of https://github.com/kubernetes/kubernetes/pull/46065
- [x] add SourceIP/ForwardedFor support https://github.com/kubernetes/kubernetes/pull/45766#discussion_r116778101
- [x] adapt ObjectReference.Resource to API PR https://github.com/kubernetes/kubernetes/pull/45766#pullrequestreview-38656828
pull/6/head
Kubernetes Submit Queue 2017-05-23 07:41:56 -07:00 committed by GitHub
commit 1f45c4846b
51 changed files with 1802 additions and 280 deletions

View File

@ -35,7 +35,7 @@ import (
// InsecureServingInfo *ServingInfo
func BuildInsecureHandlerChain(apiHandler http.Handler, c *server.Config) http.Handler {
handler := genericapifilters.WithAudit(apiHandler, c.RequestContextMapper, c.AuditWriter)
handler := genericapifilters.WithAudit(apiHandler, c.RequestContextMapper, c.AuditBackend, c.AuditPolicy, c.LongRunningFunc)
handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, insecureSuperuser{}, nil)
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithPanicRecovery(handler)

View File

@ -27,6 +27,7 @@ import (
"net/http/httptest"
"testing"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/kubernetes/pkg/api"
openapigen "k8s.io/kubernetes/pkg/generated/openapi"
@ -59,7 +60,7 @@ func TestValidOpenAPISpec(t *testing.T) {
}
// make sure swagger.json is not registered before calling PrepareRun.
server := httptest.NewServer(master.GenericAPIServer.Handler.GoRestfulContainer.ServeMux)
server := httptest.NewServer(apirequest.WithRequestContext(master.GenericAPIServer.Handler.GoRestfulContainer.ServeMux, master.GenericAPIServer.RequestContextMapper()))
defer server.Close()
resp, err := http.Get(server.URL + "/swagger.json")
if !assert.NoError(err) {

View File

@ -234,7 +234,7 @@ func TestAPIVersionOfDiscoveryEndpoints(t *testing.T) {
master, etcdserver, _, assert := newMaster(t)
defer etcdserver.Terminate(t)
server := httptest.NewServer(master.GenericAPIServer.Handler.GoRestfulContainer.ServeMux)
server := httptest.NewServer(genericapirequest.WithRequestContext(master.GenericAPIServer.Handler.GoRestfulContainer.ServeMux, master.GenericAPIServer.RequestContextMapper()))
// /api exists in release-1.1
resp, err := http.Get(server.URL + "/api")

View File

@ -287,7 +287,7 @@ func (m *ThirdPartyResourceServer) InstallThirdPartyResource(rsrc *extensions.Th
if err := thirdparty.InstallREST(m.genericAPIServer.Handler.GoRestfulContainer); err != nil {
glog.Errorf("Unable to setup thirdparty api: %v", err)
}
m.genericAPIServer.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(api.Codecs, apiGroup).WebService())
m.genericAPIServer.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(api.Codecs, apiGroup, m.genericAPIServer.RequestContextMapper()).WebService())
m.addThirdPartyResourceStorage(path, plural.Resource, thirdparty.Storage[plural.Resource].(*thirdpartyresourcedatastore.REST), apiGroup)
api.Registry.AddThirdPartyAPIGroupVersions(schema.GroupVersion{Group: group, Version: rsrc.Versions[0].Name})

View File

@ -150,13 +150,13 @@ func GetHTTPClient(req *http.Request) string {
return "unknown"
}
// Extracts and returns the clients IP from the given request.
// Looks at X-Forwarded-For header, X-Real-Ip header and request.RemoteAddr in that order.
// Returns nil if none of them are set or is set to an invalid value.
func GetClientIP(req *http.Request) net.IP {
// SourceIPs splits the comma separated X-Forwarded-For header or returns the X-Real-Ip header or req.RemoteAddr,
// in that order, ignoring invalid IPs. It returns nil if all of these are empty or invalid.
func SourceIPs(req *http.Request) []net.IP {
hdr := req.Header
// First check the X-Forwarded-For header for requests via proxy.
hdrForwardedFor := hdr.Get("X-Forwarded-For")
forwardedForIPs := []net.IP{}
if hdrForwardedFor != "" {
// X-Forwarded-For can be a csv of IPs in case of multiple proxies.
// Use the first valid one.
@ -164,17 +164,20 @@ func GetClientIP(req *http.Request) net.IP {
for _, part := range parts {
ip := net.ParseIP(strings.TrimSpace(part))
if ip != nil {
return ip
forwardedForIPs = append(forwardedForIPs, ip)
}
}
}
if len(forwardedForIPs) > 0 {
return forwardedForIPs
}
// Try the X-Real-Ip header.
hdrRealIp := hdr.Get("X-Real-Ip")
if hdrRealIp != "" {
ip := net.ParseIP(hdrRealIp)
if ip != nil {
return ip
return []net.IP{ip}
}
}
@ -182,11 +185,28 @@ func GetClientIP(req *http.Request) net.IP {
// Remote Address in Go's HTTP server is in the form host:port so we need to split that first.
host, _, err := net.SplitHostPort(req.RemoteAddr)
if err == nil {
return net.ParseIP(host)
if remoteIP := net.ParseIP(host); remoteIP != nil {
return []net.IP{remoteIP}
}
}
// Fallback if Remote Address was just IP.
return net.ParseIP(req.RemoteAddr)
if remoteIP := net.ParseIP(req.RemoteAddr); remoteIP != nil {
return []net.IP{remoteIP}
}
return nil
}
// Extracts and returns the clients IP from the given request.
// Looks at X-Forwarded-For header, X-Real-Ip header and request.RemoteAddr in that order.
// Returns nil if none of them are set or is set to an invalid value.
func GetClientIP(req *http.Request) net.IP {
ips := SourceIPs(req)
if len(ips) == 0 {
return nil
}
return ips[0]
}
var defaultProxyFuncPointer = fmt.Sprintf("%p", http.ProxyFromEnvironment)

View File

@ -11,6 +11,7 @@ go_library(
name = "go_default_library",
srcs = [
"doc.go",
"helpers.go",
"register.go",
"types.go",
"zz_generated.deepcopy.go",

View File

@ -0,0 +1,46 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package audit
func ordLevel(l Level) int {
switch l {
case LevelMetadata:
return 1
case LevelRequest:
return 2
case LevelRequestResponse:
return 3
default:
return 0
}
}
func (a Level) Less(b Level) bool {
return ordLevel(a) < ordLevel(b)
}
func (a Level) GreaterOrEqual(b Level) bool {
return ordLevel(a) >= ordLevel(b)
}
func NewConstantPolicy(level Level) *Policy {
return &Policy{
Rules: []PolicyRule{
{Level: level},
},
}
}

View File

@ -0,0 +1,29 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = [
"request.go",
"types.go",
],
tags = ["automanaged"],
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/pborman/uuid:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//vendor/k8s.io/client-go/pkg/apis/authentication/v1:go_default_library",
],
)

View File

@ -0,0 +1,202 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package audit
import (
"bytes"
"fmt"
"net/http"
"strings"
"time"
"github.com/golang/glog"
"github.com/pborman/uuid"
"reflect"
"k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/apis/audit"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/authorization/authorizer"
authenticationv1 "k8s.io/client-go/pkg/apis/authentication/v1"
)
const (
AuditIDHeader = "X-Request-ID"
)
func NewEventFromRequest(req *http.Request, policy *auditinternal.Policy, attribs authorizer.Attributes) (*auditinternal.Event, error) {
ev := &auditinternal.Event{
Timestamp: metav1.NewTime(time.Now()),
Verb: attribs.GetVerb(),
RequestURI: req.URL.RequestURI(),
}
// set the level
ev.Level = auditinternal.LevelNone
if policy != nil && len(policy.Rules) > 0 {
// This is just a hack to get through the test without setting a high level by default.
// TODO(audit): add the policy evalutation here
ev.Level = policy.Rules[0].Level
}
// prefer the id from the headers. If not available, create a new one.
// TODO(audit): do we want to forbid the header for non-front-proxy users?
ids := req.Header[AuditIDHeader]
if len(ids) > 0 {
ev.AuditID = types.UID(ids[0])
} else {
ev.AuditID = types.UID(uuid.NewRandom().String())
}
ips := utilnet.SourceIPs(req)
ev.SourceIPs = make([]string, len(ips))
for i := range ips {
ev.SourceIPs[i] = ips[i].String()
}
if user := attribs.GetUser(); user != nil {
ev.User.Username = user.GetName()
ev.User.Extra = map[string]auditinternal.ExtraValue{}
for k, v := range user.GetExtra() {
ev.User.Extra[k] = auditinternal.ExtraValue(v)
}
ev.User.Groups = user.GetGroups()
ev.User.UID = user.GetUID()
}
if asuser := req.Header.Get(authenticationv1.ImpersonateUserHeader); len(asuser) > 0 {
ev.ImpersonatedUser = &auditinternal.UserInfo{
Username: asuser,
}
if requestedGroups := req.Header[authenticationv1.ImpersonateGroupHeader]; len(requestedGroups) > 0 {
ev.ImpersonatedUser.Groups = requestedGroups
}
ev.ImpersonatedUser.Extra = map[string]auditinternal.ExtraValue{}
for k, v := range req.Header {
if !strings.HasPrefix(k, authenticationv1.ImpersonateUserExtraHeaderPrefix) {
continue
}
k = k[len(authenticationv1.ImpersonateUserExtraHeaderPrefix):]
ev.ImpersonatedUser.Extra[k] = auditinternal.ExtraValue(v)
}
}
if attribs.IsResourceRequest() {
ev.ObjectRef = &auditinternal.ObjectReference{
Namespace: attribs.GetNamespace(),
Name: attribs.GetName(),
Resource: attribs.GetResource(),
APIVersion: attribs.GetAPIGroup() + "/" + attribs.GetAPIVersion(),
}
}
return ev, nil
}
// LogRequestObject fills in the request object into an audit event. The passed runtime.Object
// will be converted to the given gv.
func LogRequestObject(ae *audit.Event, obj runtime.Object, gv schema.GroupVersion, s runtime.NegotiatedSerializer) {
if ae == nil || ae.Level.Less(audit.LevelRequest) {
return
}
// TODO(audit): hook into the serializer to avoid double conversion
var err error
ae.RequestObject, err = encodeObject(obj, gv, s)
if err != nil {
// TODO(audit): add error slice to audit event struct
glog.Warningf("Auditing failed of %v request: %v", reflect.TypeOf(obj).Name(), err)
return
}
// complete ObjectRef
if ae.ObjectRef == nil {
ae.ObjectRef = &audit.ObjectReference{}
}
if acc, ok := obj.(v1.ObjectMetaAccessor); ok {
meta := acc.GetObjectMeta()
if len(ae.ObjectRef.Namespace) == 0 {
ae.ObjectRef.Namespace = meta.GetNamespace()
}
if len(ae.ObjectRef.Name) == 0 {
ae.ObjectRef.Name = meta.GetName()
}
if len(ae.ObjectRef.UID) == 0 {
ae.ObjectRef.UID = meta.GetUID()
}
if len(ae.ObjectRef.ResourceVersion) == 0 {
ae.ObjectRef.ResourceVersion = meta.GetResourceVersion()
}
}
}
// LogRquestPatch fills in the given patch as the request object into an audit event.
func LogRequestPatch(ae *audit.Event, patch []byte) {
if ae == nil || ae.Level.Less(audit.LevelRequest) {
return
}
ae.RequestObject = runtime.Unknown{
Raw: patch,
ContentType: runtime.ContentTypeJSON,
}
}
// LogResponseObject fills in the response object into an audit event. The passed runtime.Object
// will be converted to the given gv.
func LogResponseObject(ae *audit.Event, obj runtime.Object, gv schema.GroupVersion, s runtime.NegotiatedSerializer) {
if ae == nil || ae.Level.Less(audit.LevelRequestResponse) {
return
}
if status, ok := obj.(*metav1.Status); ok {
ae.ResponseStatus = status
}
// TODO(audit): hook into the serializer to avoid double conversion
var err error
ae.ResponseObject, err = encodeObject(obj, gv, s)
if err != nil {
glog.Warningf("Audit failed for %q response: %v", reflect.TypeOf(obj).Name(), err)
}
}
func encodeObject(obj runtime.Object, gv schema.GroupVersion, serializer runtime.NegotiatedSerializer) (runtime.Unknown, error) {
supported := serializer.SupportedMediaTypes()
for i := range supported {
if supported[i].MediaType == "application/json" {
enc := serializer.EncoderForVersion(supported[i].Serializer, gv)
var buf bytes.Buffer
if err := enc.Encode(obj, &buf); err != nil {
return runtime.Unknown{}, fmt.Errorf("encoding failed: %v", err)
}
return runtime.Unknown{
Raw: buf.Bytes(),
ContentType: runtime.ContentTypeJSON,
}, nil
}
}
return runtime.Unknown{}, fmt.Errorf("no json encoder found")
}

View File

@ -0,0 +1,37 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package audit
import (
auditinternal "k8s.io/apiserver/pkg/apis/audit"
)
type Sink interface {
// ProcessEvents handles events. Per audit ID it might be that ProcessEvents is called up to three times.
// Errors might be logged by the sink itself. If an error should be fatal, leading to an internal
// error, ProcessEvents is supposed to panic. The event must not be mutated and is reused by the caller
// after the call returns, i.e. the sink has to make a deepcopy to keep a copy around if necessary.
ProcessEvents(events ...*auditinternal.Event)
}
type Backend interface {
Sink
// Run will initialize the backend. It must not block, but may run go routines in the background. If
// stopCh is closed, it is supposed to stop them. Run will be called before the first call to ProcessEvents.
Run(stopCh <-chan struct{}) error
}

View File

@ -12,6 +12,7 @@ go_test(
name = "go_default_test",
srcs = [
"apiserver_test.go",
"audit_test.go",
"installer_test.go",
"proxy_test.go",
"watch_test.go",
@ -39,9 +40,11 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/example:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/example/fuzzer:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/filters:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/handlers:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library",

View File

@ -52,9 +52,11 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/admission"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/apis/example"
examplefuzzer "k8s.io/apiserver/pkg/apis/example/fuzzer"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/audit"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
@ -256,25 +258,25 @@ type defaultAPIServer struct {
// uses the default settings
func handle(storage map[string]rest.Storage) http.Handler {
return handleInternal(storage, admissionControl, selfLinker)
return handleInternal(storage, admissionControl, selfLinker, nil)
}
// tests with a deny admission controller
func handleDeny(storage map[string]rest.Storage) http.Handler {
return handleInternal(storage, alwaysDeny{}, selfLinker)
return handleInternal(storage, alwaysDeny{}, selfLinker, nil)
}
// tests using the new namespace scope mechanism
func handleNamespaced(storage map[string]rest.Storage) http.Handler {
return handleInternal(storage, admissionControl, selfLinker)
return handleInternal(storage, admissionControl, selfLinker, nil)
}
// tests using a custom self linker
func handleLinker(storage map[string]rest.Storage, selfLinker runtime.SelfLinker) http.Handler {
return handleInternal(storage, admissionControl, selfLinker)
return handleInternal(storage, admissionControl, selfLinker, nil)
}
func handleInternal(storage map[string]rest.Storage, admissionControl admission.Interface, selfLinker runtime.SelfLinker) http.Handler {
func handleInternal(storage map[string]rest.Storage, admissionControl admission.Interface, selfLinker runtime.SelfLinker, auditSink audit.Sink) http.Handler {
container := restful.NewContainer()
container.Router(restful.CurlyRouter{})
mux := container.ServeMux
@ -332,7 +334,11 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
}
}
handler := genericapifilters.WithRequestInfo(mux, testRequestInfoResolver(), requestContextMapper)
handler := genericapifilters.WithAudit(mux, requestContextMapper, auditSink, auditinternal.NewConstantPolicy(auditinternal.LevelRequestResponse), func(r *http.Request, requestInfo *request.RequestInfo) bool {
// simplified long-running check
return requestInfo.Verb == "watch" || requestInfo.Verb == "proxy"
})
handler = genericapifilters.WithRequestInfo(handler, testRequestInfoResolver(), requestContextMapper)
handler = request.WithRequestContext(handler, requestContextMapper)
return &defaultAPIServer{handler, container}
@ -1088,7 +1094,7 @@ func TestList(t *testing.T) {
namespace: testCase.namespace,
expectedSet: testCase.selfLink,
}
var handler = handleInternal(storage, admissionControl, selfLinker)
var handler = handleInternal(storage, admissionControl, selfLinker, nil)
server := httptest.NewServer(handler)
defer server.Close()
@ -1768,7 +1774,7 @@ func TestGetNamespaceSelfLink(t *testing.T) {
namespace: "foo",
}
storage["simple"] = &simpleStorage
handler := handleInternal(storage, admissionControl, selfLinker)
handler := handleInternal(storage, admissionControl, selfLinker, nil)
server := httptest.NewServer(handler)
defer server.Close()
@ -3166,7 +3172,7 @@ func TestCreateInvokesAdmissionControl(t *testing.T) {
namespace: "other",
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/other/foo/bar",
}
handler := handleInternal(map[string]rest.Storage{"foo": &storage}, alwaysDeny{}, selfLinker)
handler := handleInternal(map[string]rest.Storage{"foo": &storage}, alwaysDeny{}, selfLinker, nil)
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}
@ -3248,7 +3254,7 @@ func (obj *UnregisteredAPIObject) GetObjectKind() schema.ObjectKind {
func TestWriteJSONDecodeError(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(codecs, newGroupVersion, w, req, http.StatusOK, &UnregisteredAPIObject{"Undecodable"})
responsewriters.WriteObjectNegotiated(request.NewContext(), codecs, newGroupVersion, w, req, http.StatusOK, &UnregisteredAPIObject{"Undecodable"})
}))
defer server.Close()
// We send a 200 status code before we encode the object, so we expect OK, but there will

View File

@ -0,0 +1,326 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package endpoints
import (
"bytes"
"fmt"
"net/http"
"net/http/httptest"
"regexp"
"sync"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
genericapitesting "k8s.io/apiserver/pkg/endpoints/testing"
"k8s.io/apiserver/pkg/registry/rest"
)
type fakeAuditSink struct {
lock sync.Mutex
events []*auditinternal.Event
}
func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) {
s.lock.Lock()
defer s.lock.Unlock()
s.events = append(s.events, evs...)
}
func (s *fakeAuditSink) Events() []*auditinternal.Event {
s.lock.Lock()
defer s.lock.Unlock()
return append([]*auditinternal.Event{}, s.events...)
}
func TestAudit(t *testing.T) {
type eventCheck func(events []*auditinternal.Event) error
// fixtures
simpleFoo := &genericapitesting.Simple{Other: "foo"}
simpleFooJSON, _ := runtime.Encode(testCodec, simpleFoo)
simpleCPrime := &genericapitesting.Simple{
ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: "other"},
Other: "bla",
}
simpleCPrimeJSON, _ := runtime.Encode(testCodec, simpleCPrime)
// event checks
requestBodyIs := func(i int, text string) eventCheck {
return func(events []*auditinternal.Event) error {
if string(events[i].RequestObject.Raw) != text {
return fmt.Errorf("expected RequestBody %q, got %q", text, string(events[i].RequestObject.Raw))
}
return nil
}
}
requestBodyMatches := func(i int, pattern string) eventCheck {
return func(events []*auditinternal.Event) error {
if matched, _ := regexp.Match(pattern, events[i].RequestObject.Raw); !matched {
return fmt.Errorf("expected RequestBody to match %q, but didn't: %q", pattern, string(events[i].RequestObject.Raw))
}
return nil
}
}
responseBodyIs := func(i int, text string) eventCheck {
return func(events []*auditinternal.Event) error {
if string(events[i].ResponseObject.Raw) != text {
return fmt.Errorf("expected ResponseBody %q, got %q", text, string(events[i].ResponseObject.Raw))
}
return nil
}
}
responseBodyMatches := func(i int, pattern string) eventCheck {
return func(events []*auditinternal.Event) error {
if matched, _ := regexp.Match(pattern, events[i].ResponseObject.Raw); !matched {
return fmt.Errorf("expected ResponseBody to match %q, but didn't: %q", pattern, string(events[i].ResponseObject.Raw))
}
return nil
}
}
for _, test := range []struct {
desc string
req func(server string) (*http.Request, error)
linker runtime.SelfLinker
code int
events int
checks []eventCheck
}{
{
"get",
func(server string) (*http.Request, error) {
return http.NewRequest("GET", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/other/simple/c", bytes.NewBuffer(simpleFooJSON))
},
selfLinker,
200,
1,
[]eventCheck{
requestBodyIs(0, ""),
responseBodyMatches(0, `{.*"name":"c".*}`),
},
},
{
"list",
func(server string) (*http.Request, error) {
return http.NewRequest("GET", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/other/simple?labelSelector=a%3Dfoobar", nil)
},
&setTestSelfLinker{
t: t,
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/other/simple",
namespace: "other",
},
200,
1,
[]eventCheck{
requestBodyMatches(0, ""),
responseBodyMatches(0, `{.*"name":"a".*"name":"b".*}`),
},
},
{
"create",
func(server string) (*http.Request, error) {
return http.NewRequest("POST", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple", bytes.NewBuffer(simpleFooJSON))
},
selfLinker,
201,
1,
[]eventCheck{
requestBodyIs(0, string(simpleFooJSON)),
responseBodyMatches(0, `{.*"foo".*}`),
},
},
{
"not-allowed-named-create",
func(server string) (*http.Request, error) {
return http.NewRequest("POST", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/named", bytes.NewBuffer(simpleFooJSON))
},
selfLinker,
405,
1,
[]eventCheck{
requestBodyIs(0, ""), // the 405 is thrown long before the create handler would be executed
responseBodyIs(0, ""), // the 405 is thrown long before the create handler would be executed
},
},
{
"delete",
func(server string) (*http.Request, error) {
return http.NewRequest("DELETE", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/a", nil)
},
selfLinker,
200,
1,
[]eventCheck{
requestBodyMatches(0, ""),
responseBodyMatches(0, ""),
},
},
{
"delete-with-options-in-body",
func(server string) (*http.Request, error) {
return http.NewRequest("DELETE", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/a", bytes.NewBuffer([]byte(`{"kind":"DeleteOptions"}`)))
},
selfLinker,
200,
1,
[]eventCheck{
requestBodyMatches(0, "DeleteOptions"),
responseBodyMatches(0, ""),
},
},
{
"update",
func(server string) (*http.Request, error) {
return http.NewRequest("PUT", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/other/simple/c", bytes.NewBuffer(simpleCPrimeJSON))
},
selfLinker,
200,
1,
[]eventCheck{
requestBodyIs(0, string(simpleCPrimeJSON)),
responseBodyMatches(0, `{.*"bla".*}`),
},
},
{
"update-wrong-namespace",
func(server string) (*http.Request, error) {
return http.NewRequest("PUT", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/c", bytes.NewBuffer(simpleCPrimeJSON))
},
selfLinker,
400,
1,
[]eventCheck{
requestBodyIs(0, string(simpleCPrimeJSON)),
responseBodyMatches(0, `"Status".*"status":"Failure".*"code":400}`),
},
},
{
"patch",
func(server string) (*http.Request, error) {
req, _ := http.NewRequest("PATCH", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/other/simple/c", bytes.NewReader([]byte(`{"labels":{"foo":"bar"}}`)))
req.Header.Set("Content-Type", "application/merge-patch+json; charset=UTF-8")
return req, nil
},
&setTestSelfLinker{
t: t,
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/other/simple/c",
name: "c",
namespace: "other",
},
200,
1,
[]eventCheck{
requestBodyIs(0, `{"labels":{"foo":"bar"}}`),
responseBodyMatches(0, `"name":"c".*"labels":{"foo":"bar"}`),
},
},
{
"watch",
func(server string) (*http.Request, error) {
return http.NewRequest("GET", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/other/simple?watch=true", nil)
},
&setTestSelfLinker{
t: t,
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/other/simple",
namespace: "other",
},
200,
2,
[]eventCheck{
requestBodyMatches(0, ""),
responseBodyMatches(0, ""),
},
},
} {
sink := &fakeAuditSink{}
handler := handleInternal(map[string]rest.Storage{
"simple": &SimpleRESTStorage{
list: []genericapitesting.Simple{
{
ObjectMeta: metav1.ObjectMeta{Name: "a", Namespace: "other"},
Other: "foo",
},
{
ObjectMeta: metav1.ObjectMeta{Name: "b", Namespace: "other"},
Other: "foo",
},
},
item: genericapitesting.Simple{
ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: "other", UID: "uid"},
Other: "foo",
},
},
}, admissionControl, selfLinker, sink)
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{Timeout: 2 * time.Second}
req, err := test.req(server.URL)
if err != nil {
t.Errorf("[%s] error creating the request: %v", test.desc, err)
}
response, err := client.Do(req)
if err != nil {
t.Errorf("[%s] error: %v", test.desc, err)
}
if response.StatusCode != test.code {
t.Errorf("[%s] expected http code %d, got %#v", test.desc, test.code, response)
}
// close body because the handler might block in Flush, unable to send the remaining event.
response.Body.Close()
// wait for events to arrive, at least the given number in the test
events := []*auditinternal.Event{}
err = wait.Poll(50*time.Millisecond, wait.ForeverTestTimeout, wait.ConditionFunc(func() (done bool, err error) {
events = sink.Events()
return len(events) >= test.events, nil
}))
if err != nil {
t.Errorf("[%s] timeout waiting for events", test.desc)
}
if got := len(events); got != test.events {
t.Errorf("[%s] expected %d audit events, got %d", test.desc, test.events, got)
} else {
for i, check := range test.checks {
err := check(events)
if err != nil {
t.Errorf("[%s,%d] %v", test.desc, i, err)
}
}
}
if len(events) > 0 {
status := events[len(events)-1].ResponseStatus
if status == nil {
t.Errorf("[%s] expected non-nil ResponseStatus in last event", test.desc)
} else if int(status.Code) != test.code {
t.Errorf("[%s] expected ResponseStatus.Code=%d, got %d", test.desc, test.code, status.Code)
}
}
}
}

View File

@ -25,6 +25,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
],
)
@ -47,5 +48,6 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/handlers/negotiation:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
],
)

View File

@ -17,6 +17,7 @@ limitations under the License.
package discovery
import (
"errors"
"net/http"
"github.com/emicklei/go-restful"
@ -26,17 +27,18 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
)
// APIGroupHandler creates a webservice serving the supported versions, preferred version, and name
// of a group. E.g., such a web service will be registered at /apis/extensions.
type APIGroupHandler struct {
serializer runtime.NegotiatedSerializer
group metav1.APIGroup
serializer runtime.NegotiatedSerializer
contextMapper request.RequestContextMapper
group metav1.APIGroup
}
func NewAPIGroupHandler(serializer runtime.NegotiatedSerializer, group metav1.APIGroup) *APIGroupHandler {
func NewAPIGroupHandler(serializer runtime.NegotiatedSerializer, group metav1.APIGroup, contextMapper request.RequestContextMapper) *APIGroupHandler {
if keepUnversioned(group.Name) {
// Because in release 1.1, /apis/extensions returns response with empty
// APIVersion, we use stripVersionNegotiatedSerializer to keep the
@ -45,8 +47,9 @@ func NewAPIGroupHandler(serializer runtime.NegotiatedSerializer, group metav1.AP
}
return &APIGroupHandler{
serializer: serializer,
group: group,
serializer: serializer,
contextMapper: contextMapper,
group: group,
}
}
@ -70,5 +73,10 @@ func (s *APIGroupHandler) handle(req *restful.Request, resp *restful.Response) {
}
func (s *APIGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, w, req, http.StatusOK, &s.group)
ctx, ok := s.contextMapper.Get(req)
if !ok {
responsewriters.InternalError(w, req, errors.New("no context found for request"))
return
}
responsewriters.WriteObjectNegotiated(ctx, s.serializer, schema.GroupVersion{}, w, req, http.StatusOK, &s.group)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package discovery
import (
"errors"
"net/http"
"github.com/emicklei/go-restful"
@ -27,28 +28,31 @@ import (
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
)
// legacyRootAPIHandler creates a webservice serving api group discovery.
type legacyRootAPIHandler struct {
// addresses is used to build cluster IPs for discovery.
addresses Addresses
apiPrefix string
serializer runtime.NegotiatedSerializer
apiVersions []string
addresses Addresses
apiPrefix string
serializer runtime.NegotiatedSerializer
apiVersions []string
contextMapper request.RequestContextMapper
}
func NewLegacyRootAPIHandler(addresses Addresses, serializer runtime.NegotiatedSerializer, apiPrefix string, apiVersions []string) *legacyRootAPIHandler {
func NewLegacyRootAPIHandler(addresses Addresses, serializer runtime.NegotiatedSerializer, apiPrefix string, apiVersions []string, contextMapper request.RequestContextMapper) *legacyRootAPIHandler {
// Because in release 1.1, /apis returns response with empty APIVersion, we
// use stripVersionNegotiatedSerializer to keep the response backwards
// compatible.
serializer = stripVersionNegotiatedSerializer{serializer}
return &legacyRootAPIHandler{
addresses: addresses,
apiPrefix: apiPrefix,
serializer: serializer,
apiVersions: apiVersions,
addresses: addresses,
apiPrefix: apiPrefix,
serializer: serializer,
apiVersions: apiVersions,
contextMapper: contextMapper,
}
}
@ -68,11 +72,17 @@ func (s *legacyRootAPIHandler) WebService() *restful.WebService {
}
func (s *legacyRootAPIHandler) handle(req *restful.Request, resp *restful.Response) {
ctx, ok := s.contextMapper.Get(req.Request)
if !ok {
responsewriters.InternalError(resp.ResponseWriter, req.Request, errors.New("no context found for request"))
return
}
clientIP := utilnet.GetClientIP(req.Request)
apiVersions := &metav1.APIVersions{
ServerAddressByClientCIDRs: s.addresses.ServerAddressByClientCIDRs(clientIP),
Versions: s.apiVersions,
}
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, apiVersions)
responsewriters.WriteObjectNegotiated(ctx, s.serializer, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, apiVersions)
}

View File

@ -17,10 +17,11 @@ limitations under the License.
package discovery
import (
"errors"
"net/http"
"sync"
"github.com/emicklei/go-restful"
restful "github.com/emicklei/go-restful"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -28,6 +29,7 @@ import (
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
)
// GroupManager is an interface that allows dynamic mutation of the existing webservice to handle
@ -46,7 +48,8 @@ type rootAPIsHandler struct {
// addresses is used to build cluster IPs for discovery.
addresses Addresses
serializer runtime.NegotiatedSerializer
serializer runtime.NegotiatedSerializer
contextMapper request.RequestContextMapper
// Map storing information about all groups to be exposed in discovery response.
// The map is from name to the group.
@ -56,16 +59,17 @@ type rootAPIsHandler struct {
apiGroupNames []string
}
func NewRootAPIsHandler(addresses Addresses, serializer runtime.NegotiatedSerializer) *rootAPIsHandler {
func NewRootAPIsHandler(addresses Addresses, serializer runtime.NegotiatedSerializer, contextMapper request.RequestContextMapper) *rootAPIsHandler {
// Because in release 1.1, /apis returns response with empty APIVersion, we
// use stripVersionNegotiatedSerializer to keep the response backwards
// compatible.
serializer = stripVersionNegotiatedSerializer{serializer}
return &rootAPIsHandler{
addresses: addresses,
serializer: serializer,
apiGroups: map[string]metav1.APIGroup{},
addresses: addresses,
serializer: serializer,
apiGroups: map[string]metav1.APIGroup{},
contextMapper: contextMapper,
}
}
@ -95,6 +99,12 @@ func (s *rootAPIsHandler) RemoveGroup(groupName string) {
}
func (s *rootAPIsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
ctx, ok := s.contextMapper.Get(req)
if !ok {
responsewriters.InternalError(resp, req, errors.New("no context found for request"))
return
}
s.lock.RLock()
defer s.lock.RUnlock()
@ -111,7 +121,7 @@ func (s *rootAPIsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request)
groups[i].ServerAddressByClientCIDRs = serverCIDR
}
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, resp, req, http.StatusOK, &metav1.APIGroupList{Groups: groups})
responsewriters.WriteObjectNegotiated(ctx, s.serializer, schema.GroupVersion{}, resp, req, http.StatusOK, &metav1.APIGroupList{Groups: groups})
}
func (s *rootAPIsHandler) restfulHandle(req *restful.Request, resp *restful.Response) {

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/endpoints/request"
)
var (
@ -83,9 +84,10 @@ func getGroupList(t *testing.T, server *httptest.Server) (*metav1.APIGroupList,
}
func TestDiscoveryAtAPIS(t *testing.T) {
handler := NewRootAPIsHandler(DefaultAddresses{DefaultAddress: "192.168.1.1"}, codecs)
mapper := request.NewRequestContextMapper()
handler := NewRootAPIsHandler(DefaultAddresses{DefaultAddress: "192.168.1.1"}, codecs, mapper)
server := httptest.NewServer(handler)
server := httptest.NewServer(request.WithRequestContext(handler, mapper))
groupList, err := getGroupList(t, server)
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -133,9 +135,10 @@ func TestDiscoveryAtAPIS(t *testing.T) {
}
func TestDiscoveryOrdering(t *testing.T) {
handler := NewRootAPIsHandler(DefaultAddresses{DefaultAddress: "192.168.1.1"}, codecs)
mapper := request.NewRequestContextMapper()
handler := NewRootAPIsHandler(DefaultAddresses{DefaultAddress: "192.168.1.1"}, codecs, mapper)
server := httptest.NewServer(handler)
server := httptest.NewServer(request.WithRequestContext(handler, mapper))
groupList, err := getGroupList(t, server)
if err != nil {
t.Fatalf("unexpected error: %v", err)

View File

@ -17,15 +17,17 @@ limitations under the License.
package discovery
import (
"errors"
"net/http"
"github.com/emicklei/go-restful"
restful "github.com/emicklei/go-restful"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
)
type APIResourceLister interface {
@ -41,13 +43,14 @@ func (f APIResourceListerFunc) ListAPIResources() []metav1.APIResource {
// APIVersionHandler creates a webservice serving the supported resources for the version
// E.g., such a web service will be registered at /apis/extensions/v1beta1.
type APIVersionHandler struct {
serializer runtime.NegotiatedSerializer
serializer runtime.NegotiatedSerializer
contextMapper request.RequestContextMapper
groupVersion schema.GroupVersion
apiResourceLister APIResourceLister
}
func NewAPIVersionHandler(serializer runtime.NegotiatedSerializer, groupVersion schema.GroupVersion, apiResourceLister APIResourceLister) *APIVersionHandler {
func NewAPIVersionHandler(serializer runtime.NegotiatedSerializer, groupVersion schema.GroupVersion, apiResourceLister APIResourceLister, contextMapper request.RequestContextMapper) *APIVersionHandler {
if keepUnversioned(groupVersion.Group) {
// Because in release 1.1, /apis/extensions returns response with empty
// APIVersion, we use stripVersionNegotiatedSerializer to keep the
@ -59,6 +62,7 @@ func NewAPIVersionHandler(serializer runtime.NegotiatedSerializer, groupVersion
serializer: serializer,
groupVersion: groupVersion,
apiResourceLister: apiResourceLister,
contextMapper: contextMapper,
}
}
@ -78,6 +82,12 @@ func (s *APIVersionHandler) handle(req *restful.Request, resp *restful.Response)
}
func (s *APIVersionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, w, req, http.StatusOK,
ctx, ok := s.contextMapper.Get(req)
if !ok {
responsewriters.InternalError(w, req, errors.New("no context found for request"))
return
}
responsewriters.WriteObjectNegotiated(ctx, s.serializer, schema.GroupVersion{}, w, req, http.StatusOK,
&metav1.APIResourceList{GroupVersion: s.groupVersion.String(), APIResources: s.apiResourceLister.ListAPIResources()})
}

View File

@ -15,17 +15,21 @@ go_test(
"authentication_test.go",
"authorization_test.go",
"impersonation_test.go",
"legacy_audit_test.go",
"requestinfo_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/log:go_default_library",
"//vendor/k8s.io/client-go/pkg/apis/authentication/v1:go_default_library",
"//vendor/k8s.io/client-go/pkg/apis/batch/v1:go_default_library",
],
@ -39,6 +43,7 @@ go_library(
"authorization.go",
"doc.go",
"impersonation.go",
"legacy_audit.go",
"requestinfo.go",
],
tags = ["automanaged"],
@ -46,7 +51,11 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/pborman/uuid:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",

View File

@ -19,36 +19,154 @@ package filters
import (
"bufio"
"errors"
"fmt"
"io"
"net"
"net/http"
"strings"
"time"
"sync"
"github.com/golang/glog"
"github.com/pborman/uuid"
"fmt"
utilnet "k8s.io/apimachinery/pkg/util/net"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
authenticationapi "k8s.io/client-go/pkg/apis/authentication/v1"
)
// WithAudit decorates a http.Handler with audit logging information for all the
// requests coming to the server. If out is nil, no decoration takes place.
// Each audit log contains two entries:
// 1. the request line containing:
// - unique id allowing to match the response line (see 2)
// - source ip of the request
// - HTTP method being invoked
// - original user invoking the operation
// - original user's groups info
// - impersonated user for the operation
// - impersonated groups info
// - namespace of the request or <none>
// - uri is the full URI as requested
// 2. the response line containing:
// - the unique id from 1
// - response code
func WithAudit(handler http.Handler, requestContextMapper request.RequestContextMapper, sink audit.Sink, policy *auditinternal.Policy, longRunningCheck request.LongRunningRequestCheck) http.Handler {
if sink == nil {
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, ok := requestContextMapper.Get(req)
if !ok {
responsewriters.InternalError(w, req, errors.New("no context found for request"))
return
}
attribs, err := GetAuthorizerAttributes(ctx)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to GetAuthorizerAttributes: %v", err))
responsewriters.InternalError(w, req, errors.New("failed to parse request"))
return
}
ev, err := audit.NewEventFromRequest(req, policy, attribs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to complete audit event from request: %v", err))
responsewriters.InternalError(w, req, errors.New("failed to update context"))
return
}
ctx = request.WithAuditEvent(ctx, ev)
if err := requestContextMapper.Update(req, ctx); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to attach audit event to the context: %v", err))
responsewriters.InternalError(w, req, errors.New("failed to update context"))
return
}
// intercept the status code
longRunning := false
var longRunningSink audit.Sink
if longRunningCheck != nil {
ri, _ := request.RequestInfoFrom(ctx)
if longRunning = longRunningCheck(req, ri); longRunning {
longRunningSink = sink
}
}
respWriter := decorateResponseWriter(w, ev, longRunningSink)
// send audit event when we leave this func, either via a panic or cleanly. In the case of long
// running requests, this will be the second audit event.
defer func() {
if r := recover(); r != nil {
ev.ResponseStatus = &metav1.Status{
Code: http.StatusInternalServerError,
}
sink.ProcessEvents(ev)
panic(r)
}
if ev.ResponseStatus == nil {
ev.ResponseStatus = &metav1.Status{
Code: 200,
}
}
sink.ProcessEvents(ev)
}()
handler.ServeHTTP(respWriter, req)
})
}
func decorateResponseWriter(responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink) http.ResponseWriter {
delegate := &auditResponseWriter{
ResponseWriter: responseWriter,
event: ev,
sink: sink,
}
// check if the ResponseWriter we're wrapping is the fancy one we need
// or if the basic is sufficient
_, cn := responseWriter.(http.CloseNotifier)
_, fl := responseWriter.(http.Flusher)
_, hj := responseWriter.(http.Hijacker)
if cn && fl && hj {
return &fancyResponseWriterDelegator{delegate}
}
return delegate
}
var _ http.ResponseWriter = &auditResponseWriter{}
// auditResponseWriter intercepts WriteHeader, sets it in the event. If the sink is set, it will
// create immediately an event (for long running requests).
type auditResponseWriter struct {
http.ResponseWriter
out io.Writer
id string
event *auditinternal.Event
once sync.Once
sink audit.Sink
}
func (a *auditResponseWriter) processCode(code int) {
a.once.Do(func() {
if a.sink != nil {
a.sink.ProcessEvents(a.event)
}
// for now we use the ResponseStatus as marker that it's the first or second event
// of a long running request. As soon as we have such a field in the event, we can
// change this.
if a.event.ResponseStatus == nil {
a.event.ResponseStatus = &metav1.Status{}
}
a.event.ResponseStatus.Code = int32(code)
})
}
func (a *auditResponseWriter) Write(bs []byte) (int, error) {
a.processCode(200) // the Go library calls WriteHeader internally if no code was written yet. But this will go unnoticed for us
return a.ResponseWriter.Write(bs)
}
func (a *auditResponseWriter) WriteHeader(code int) {
line := fmt.Sprintf("%s AUDIT: id=%q response=\"%d\"\n", time.Now().Format(time.RFC3339Nano), a.id, code)
if _, err := fmt.Fprint(a.out, line); err != nil {
glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err)
}
a.processCode(code)
a.ResponseWriter.WriteHeader(code)
}
@ -74,89 +192,3 @@ func (f *fancyResponseWriterDelegator) Hijack() (net.Conn, *bufio.ReadWriter, er
var _ http.CloseNotifier = &fancyResponseWriterDelegator{}
var _ http.Flusher = &fancyResponseWriterDelegator{}
var _ http.Hijacker = &fancyResponseWriterDelegator{}
// WithAudit decorates a http.Handler with audit logging information for all the
// requests coming to the server. If out is nil, no decoration takes place.
// Each audit log contains two entries:
// 1. the request line containing:
// - unique id allowing to match the response line (see 2)
// - source ip of the request
// - HTTP method being invoked
// - original user invoking the operation
// - original user's groups info
// - impersonated user for the operation
// - impersonated groups info
// - namespace of the request or <none>
// - uri is the full URI as requested
// 2. the response line containing:
// - the unique id from 1
// - response code
func WithAudit(handler http.Handler, requestContextMapper request.RequestContextMapper, out io.Writer) http.Handler {
if out == nil {
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, ok := requestContextMapper.Get(req)
if !ok {
responsewriters.InternalError(w, req, errors.New("no context found for request"))
return
}
attribs, err := GetAuthorizerAttributes(ctx)
if err != nil {
responsewriters.InternalError(w, req, err)
return
}
username := "<none>"
groups := "<none>"
if attribs.GetUser() != nil {
username = attribs.GetUser().GetName()
if userGroups := attribs.GetUser().GetGroups(); len(userGroups) > 0 {
groups = auditStringSlice(userGroups)
}
}
asuser := req.Header.Get(authenticationapi.ImpersonateUserHeader)
if len(asuser) == 0 {
asuser = "<self>"
}
asgroups := "<lookup>"
requestedGroups := req.Header[authenticationapi.ImpersonateGroupHeader]
if len(requestedGroups) > 0 {
asgroups = auditStringSlice(requestedGroups)
}
namespace := attribs.GetNamespace()
if len(namespace) == 0 {
namespace = "<none>"
}
id := uuid.NewRandom().String()
line := fmt.Sprintf("%s AUDIT: id=%q ip=%q method=%q user=%q groups=%q as=%q asgroups=%q namespace=%q uri=%q\n",
time.Now().Format(time.RFC3339Nano), id, utilnet.GetClientIP(req), req.Method, username, groups, asuser, asgroups, namespace, req.URL)
if _, err := fmt.Fprint(out, line); err != nil {
glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err)
}
respWriter := decorateResponseWriter(w, out, id)
handler.ServeHTTP(respWriter, req)
})
}
func auditStringSlice(inList []string) string {
quotedElements := make([]string, len(inList))
for i, in := range inList {
quotedElements[i] = fmt.Sprintf("%q", in)
}
return strings.Join(quotedElements, ",")
}
func decorateResponseWriter(responseWriter http.ResponseWriter, out io.Writer, id string) http.ResponseWriter {
delegate := &auditResponseWriter{ResponseWriter: responseWriter, out: out, id: id}
// check if the ResponseWriter we're wrapping is the fancy one we need
// or if the basic is sufficient
_, cn := responseWriter.(http.CloseNotifier)
_, fl := responseWriter.(http.Flusher)
_, hj := responseWriter.(http.Hijacker)
if cn && fl && hj {
return &fancyResponseWriterDelegator{delegate}
}
return delegate
}

View File

@ -19,24 +19,62 @@ package filters
import (
"bufio"
"bytes"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"reflect"
"regexp"
"strings"
"sync"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
pluginlog "k8s.io/apiserver/plugin/pkg/audit/log"
)
type simpleResponseWriter struct {
http.ResponseWriter
type fakeAuditSink struct {
lock sync.Mutex
events []*auditinternal.Event
}
func (*simpleResponseWriter) WriteHeader(code int) {}
func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) {
s.lock.Lock()
defer s.lock.Unlock()
s.events = append(s.events, evs...)
}
func (s *fakeAuditSink) Events() []*auditinternal.Event {
s.lock.Lock()
defer s.lock.Unlock()
return append([]*auditinternal.Event{}, s.events...)
}
func (s *fakeAuditSink) Pop(timeout time.Duration) (*auditinternal.Event, error) {
var result *auditinternal.Event
err := wait.Poll(50*time.Millisecond, wait.ForeverTestTimeout, wait.ConditionFunc(func() (done bool, err error) {
s.lock.Lock()
defer s.lock.Unlock()
if len(s.events) == 0 {
return false, nil
}
result = s.events[0]
s.events = s.events[1:]
return true, nil
}))
return result, err
}
type simpleResponseWriter struct{}
var _ http.ResponseWriter = &simpleResponseWriter{}
func (*simpleResponseWriter) WriteHeader(code int) {}
func (*simpleResponseWriter) Write(bs []byte) (int, error) { return len(bs), nil }
func (*simpleResponseWriter) Header() http.Header { return http.Header{} }
type fancyResponseWriter struct {
simpleResponseWriter
@ -49,14 +87,14 @@ func (*fancyResponseWriter) Flush() {}
func (*fancyResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { return nil, nil, nil }
func TestConstructResponseWriter(t *testing.T) {
actual := decorateResponseWriter(&simpleResponseWriter{}, ioutil.Discard, "")
actual := decorateResponseWriter(&simpleResponseWriter{}, nil, nil)
switch v := actual.(type) {
case *auditResponseWriter:
default:
t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v))
}
actual = decorateResponseWriter(&fancyResponseWriter{}, ioutil.Discard, "")
actual = decorateResponseWriter(&fancyResponseWriter{}, nil, nil)
switch v := actual.(type) {
case *fancyResponseWriterDelegator:
default:
@ -64,6 +102,73 @@ func TestConstructResponseWriter(t *testing.T) {
}
}
func TestDecorateResponseWriterWithoutChannel(t *testing.T) {
ev := &auditinternal.Event{}
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil)
// write status. This will not block because firstEventSentCh is nil
actual.WriteHeader(42)
if ev.ResponseStatus == nil {
t.Fatalf("Expected ResponseStatus to be non-nil")
}
if ev.ResponseStatus.Code != 42 {
t.Errorf("expected status code 42, got %d", ev.ResponseStatus.Code)
}
}
func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) {
ev := &auditinternal.Event{}
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil)
// write status. This will not block because firstEventSentCh is nil
actual.Write([]byte("foo"))
if ev.ResponseStatus == nil {
t.Fatalf("Expected ResponseStatus to be non-nil")
}
if ev.ResponseStatus.Code != 200 {
t.Errorf("expected status code 200, got %d", ev.ResponseStatus.Code)
}
}
func TestDecorateResponseWriterChannel(t *testing.T) {
sink := &fakeAuditSink{}
ev := &auditinternal.Event{}
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, sink)
done := make(chan struct{})
go func() {
t.Log("Writing status code 42")
actual.WriteHeader(42)
t.Log("Finished writing status code 42")
close(done)
actual.Write([]byte("foo"))
}()
// sleep some time to give write the possibility to do wrong stuff
time.Sleep(100 * time.Millisecond)
t.Log("Waiting for event in the channel")
ev1, err := sink.Pop(time.Second)
if err != nil {
t.Fatal("Timeout waiting for events")
}
t.Logf("Seen event with status %v", ev1.ResponseStatus)
if ev != ev1 {
t.Fatalf("ev1 and ev must be equal")
}
<-done
t.Log("Seen the go routine finished")
// write again
_, err = actual.Write([]byte("foo"))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}
type fakeHTTPHandler struct{}
func (*fakeHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@ -71,32 +176,189 @@ func (*fakeHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
func TestAudit(t *testing.T) {
var buf bytes.Buffer
shortRunningPrefix := `[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="list" user="admin" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods"`
longRunningPrefix := `[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="watch" user="admin" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods\?watch=true"`
handler := WithAudit(&fakeHTTPHandler{}, &fakeRequestContextMapper{
user: &user.DefaultInfo{Name: "admin"},
}, &buf)
shortRunningPath := "/api/v1/namespaces/default/pods"
longRunningPath := "/api/v1/namespaces/default/pods?watch=true"
req, _ := http.NewRequest("GET", "/api/v1/namespaces/default/pods", nil)
req.RemoteAddr = "127.0.0.1"
handler.ServeHTTP(httptest.NewRecorder(), req)
line := strings.Split(strings.TrimSpace(buf.String()), "\n")
if len(line) != 2 {
t.Fatalf("Unexpected amount of lines in audit log: %d", len(line))
}
match, err := regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="GET" user="admin" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods"`, line[0])
if err != nil {
t.Errorf("Unexpected error matching first line: %v", err)
}
if !match {
t.Errorf("Unexpected first line of audit: %s", line[0])
}
match, err = regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" response="200"`, line[1])
if err != nil {
t.Errorf("Unexpected error matching second line: %v", err)
}
if !match {
t.Errorf("Unexpected second line of audit: %s", line[1])
delay := 500 * time.Millisecond
for _, test := range []struct {
desc string
path string
handler func(http.ResponseWriter, *http.Request)
expected []string
}{
// short running requests
{
"empty",
shortRunningPath,
func(http.ResponseWriter, *http.Request) {},
[]string{
shortRunningPrefix + ` response="200"`,
},
},
{
"sleep",
shortRunningPath,
func(http.ResponseWriter, *http.Request) {
time.Sleep(delay)
},
[]string{
shortRunningPrefix + ` response="200"`,
},
},
{
"403+write",
shortRunningPath,
func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(403)
w.Write([]byte("foo"))
},
[]string{
shortRunningPrefix + ` response="403"`,
},
},
{
"panic",
shortRunningPath,
func(w http.ResponseWriter, req *http.Request) {
panic("kaboom")
},
[]string{
shortRunningPrefix + ` response="500"`,
},
},
{
"write+panic",
shortRunningPath,
func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("foo"))
panic("kaboom")
},
[]string{
shortRunningPrefix + ` response="500"`,
},
},
// long running requests
{
"empty longrunning",
longRunningPath,
func(http.ResponseWriter, *http.Request) {},
[]string{
longRunningPrefix + ` response="200"`,
},
},
{
"sleep longrunning",
longRunningPath,
func(http.ResponseWriter, *http.Request) {
time.Sleep(delay)
},
[]string{
longRunningPrefix + ` response="200"`,
},
},
{
"sleep+403 longrunning",
longRunningPath,
func(w http.ResponseWriter, req *http.Request) {
time.Sleep(delay)
w.WriteHeader(403)
},
[]string{
longRunningPrefix + ` response="<deferred>"`,
longRunningPrefix + ` response="403"`,
},
},
{
"write longrunning",
longRunningPath,
func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("foo"))
},
[]string{
longRunningPrefix + ` response="<deferred>"`,
longRunningPrefix + ` response="200"`,
},
},
{
"403+write longrunning",
longRunningPath,
func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(403)
w.Write([]byte("foo"))
},
[]string{
longRunningPrefix + ` response="<deferred>"`,
longRunningPrefix + ` response="403"`,
},
},
{
"panic longrunning",
longRunningPath,
func(w http.ResponseWriter, req *http.Request) {
panic("kaboom")
},
[]string{
longRunningPrefix + ` response="500"`,
},
},
{
"write+panic longrunning",
longRunningPath,
func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("foo"))
panic("kaboom")
},
[]string{
longRunningPrefix + ` response="<deferred>"`,
longRunningPrefix + ` response="500"`,
},
},
} {
var buf bytes.Buffer
backend := pluginlog.NewBackend(&buf)
handler := WithAudit(http.HandlerFunc(test.handler), &fakeRequestContextMapper{
user: &user.DefaultInfo{Name: "admin"},
}, backend, auditinternal.NewConstantPolicy(auditinternal.LevelRequestResponse), func(r *http.Request, ri *request.RequestInfo) bool {
// simplified long-running check
return ri.Verb == "watch"
})
req, _ := http.NewRequest("GET", test.path, nil)
req.RemoteAddr = "127.0.0.1"
done := make(chan struct{})
go func() {
defer func() {
recover()
close(done)
}()
handler.ServeHTTP(httptest.NewRecorder(), req)
}()
<-done
t.Logf("[%s] audit log: %v", test.desc, buf.String())
line := strings.Split(strings.TrimSpace(buf.String()), "\n")
if len(line) != len(test.expected) {
t.Errorf("[%s] Unexpected amount of lines in audit log: %d", test.desc, len(line))
continue
}
for i, re := range test.expected {
match, err := regexp.MatchString(re, line[i])
if err != nil {
t.Errorf("[%s] Unexpected error matching line %d: %v", test.desc, i, err)
continue
}
if !match {
t.Errorf("[%s] Unexpected line %d of audit: %s", test.desc, i, line[i])
}
}
}
}
@ -124,29 +386,8 @@ func (*fakeRequestContextMapper) Update(req *http.Request, context request.Conte
}
func TestAuditNoPanicOnNilUser(t *testing.T) {
var buf bytes.Buffer
handler := WithAudit(&fakeHTTPHandler{}, &fakeRequestContextMapper{}, &buf)
handler := WithAudit(&fakeHTTPHandler{}, &fakeRequestContextMapper{}, &fakeAuditSink{}, auditinternal.NewConstantPolicy(auditinternal.LevelRequestResponse), nil)
req, _ := http.NewRequest("GET", "/api/v1/namespaces/default/pods", nil)
req.RemoteAddr = "127.0.0.1"
handler.ServeHTTP(httptest.NewRecorder(), req)
line := strings.Split(strings.TrimSpace(buf.String()), "\n")
if len(line) != 2 {
t.Fatalf("Unexpected amount of lines in audit log: %d", len(line))
}
match, err := regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="GET" user="<none>" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods"`, line[0])
if err != nil {
t.Errorf("Unexpected error matching first line: %v", err)
}
if !match {
t.Errorf("Unexpected first line of audit: %s", line[0])
}
match, err = regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" response="200"`, line[1])
if err != nil {
t.Errorf("Unexpected error matching second line: %v", err)
}
if !match {
t.Errorf("Unexpected second line of audit: %s", line[1])
}
}

View File

@ -0,0 +1,162 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"bufio"
"errors"
"fmt"
"io"
"net"
"net/http"
"strings"
"time"
"github.com/golang/glog"
"github.com/pborman/uuid"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
authenticationapi "k8s.io/client-go/pkg/apis/authentication/v1"
)
var _ http.ResponseWriter = &legacyAuditResponseWriter{}
type legacyAuditResponseWriter struct {
http.ResponseWriter
out io.Writer
id string
}
func (a *legacyAuditResponseWriter) WriteHeader(code int) {
line := fmt.Sprintf("%s AUDIT: id=%q response=\"%d\"\n", time.Now().Format(time.RFC3339Nano), a.id, code)
if _, err := fmt.Fprint(a.out, line); err != nil {
glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err)
}
a.ResponseWriter.WriteHeader(code)
}
// fancyLegacyResponseWriterDelegator implements http.CloseNotifier, http.Flusher and
// http.Hijacker which are needed to make certain http operation (e.g. watch, rsh, etc)
// working.
type fancyLegacyResponseWriterDelegator struct {
*legacyAuditResponseWriter
}
func (f *fancyLegacyResponseWriterDelegator) CloseNotify() <-chan bool {
return f.ResponseWriter.(http.CloseNotifier).CloseNotify()
}
func (f *fancyLegacyResponseWriterDelegator) Flush() {
f.ResponseWriter.(http.Flusher).Flush()
}
func (f *fancyLegacyResponseWriterDelegator) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return f.ResponseWriter.(http.Hijacker).Hijack()
}
var _ http.CloseNotifier = &fancyLegacyResponseWriterDelegator{}
var _ http.Flusher = &fancyLegacyResponseWriterDelegator{}
var _ http.Hijacker = &fancyLegacyResponseWriterDelegator{}
// WithAudit decorates a http.Handler with audit logging information for all the
// requests coming to the server. If out is nil, no decoration takes place.
// Each audit log contains two entries:
// 1. the request line containing:
// - unique id allowing to match the response line (see 2)
// - source ip of the request
// - HTTP method being invoked
// - original user invoking the operation
// - original user's groups info
// - impersonated user for the operation
// - impersonated groups info
// - namespace of the request or <none>
// - uri is the full URI as requested
// 2. the response line containing:
// - the unique id from 1
// - response code
func WithLegacyAudit(handler http.Handler, requestContextMapper request.RequestContextMapper, out io.Writer) http.Handler {
if out == nil {
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, ok := requestContextMapper.Get(req)
if !ok {
responsewriters.InternalError(w, req, errors.New("no context found for request"))
return
}
attribs, err := GetAuthorizerAttributes(ctx)
if err != nil {
responsewriters.InternalError(w, req, err)
return
}
username := "<none>"
groups := "<none>"
if attribs.GetUser() != nil {
username = attribs.GetUser().GetName()
if userGroups := attribs.GetUser().GetGroups(); len(userGroups) > 0 {
groups = auditStringSlice(userGroups)
}
}
asuser := req.Header.Get(authenticationapi.ImpersonateUserHeader)
if len(asuser) == 0 {
asuser = "<self>"
}
asgroups := "<lookup>"
requestedGroups := req.Header[authenticationapi.ImpersonateGroupHeader]
if len(requestedGroups) > 0 {
asgroups = auditStringSlice(requestedGroups)
}
namespace := attribs.GetNamespace()
if len(namespace) == 0 {
namespace = "<none>"
}
id := uuid.NewRandom().String()
line := fmt.Sprintf("%s AUDIT: id=%q ip=%q method=%q user=%q groups=%q as=%q asgroups=%q namespace=%q uri=%q\n",
time.Now().Format(time.RFC3339Nano), id, utilnet.GetClientIP(req), req.Method, username, groups, asuser, asgroups, namespace, req.URL)
if _, err := fmt.Fprint(out, line); err != nil {
glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err)
}
respWriter := legacyDecorateResponseWriter(w, out, id)
handler.ServeHTTP(respWriter, req)
})
}
func auditStringSlice(inList []string) string {
quotedElements := make([]string, len(inList))
for i, in := range inList {
quotedElements[i] = fmt.Sprintf("%q", in)
}
return strings.Join(quotedElements, ",")
}
func legacyDecorateResponseWriter(responseWriter http.ResponseWriter, out io.Writer, id string) http.ResponseWriter {
delegate := &legacyAuditResponseWriter{ResponseWriter: responseWriter, out: out, id: id}
// check if the ResponseWriter we're wrapping is the fancy one we need
// or if the basic is sufficient
_, cn := responseWriter.(http.CloseNotifier)
_, fl := responseWriter.(http.Flusher)
_, hj := responseWriter.(http.Hijacker)
if cn && fl && hj {
return &fancyLegacyResponseWriterDelegator{delegate}
}
return delegate
}

View File

@ -0,0 +1,104 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"bytes"
"io/ioutil"
"net/http"
"net/http/httptest"
"reflect"
"regexp"
"strings"
"testing"
"k8s.io/apiserver/pkg/authentication/user"
)
func TestLegacyConstructResponseWriter(t *testing.T) {
actual := legacyDecorateResponseWriter(&simpleResponseWriter{}, ioutil.Discard, "")
switch v := actual.(type) {
case *legacyAuditResponseWriter:
default:
t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v))
}
actual = legacyDecorateResponseWriter(&fancyResponseWriter{}, ioutil.Discard, "")
switch v := actual.(type) {
case *fancyLegacyResponseWriterDelegator:
default:
t.Errorf("Expected fancyResponseWriterDelegator, got %v", reflect.TypeOf(v))
}
}
func TestLegacyAudit(t *testing.T) {
var buf bytes.Buffer
handler := WithLegacyAudit(&fakeHTTPHandler{}, &fakeRequestContextMapper{
user: &user.DefaultInfo{Name: "admin"},
}, &buf)
req, _ := http.NewRequest("GET", "/api/v1/namespaces/default/pods", nil)
req.RemoteAddr = "127.0.0.1"
handler.ServeHTTP(httptest.NewRecorder(), req)
line := strings.Split(strings.TrimSpace(buf.String()), "\n")
if len(line) != 2 {
t.Fatalf("Unexpected amount of lines in audit log: %d", len(line))
}
match, err := regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="GET" user="admin" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods"`, line[0])
if err != nil {
t.Errorf("Unexpected error matching first line: %v", err)
}
if !match {
t.Errorf("Unexpected first line of audit: %s", line[0])
}
match, err = regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" response="200"`, line[1])
if err != nil {
t.Errorf("Unexpected error matching second line: %v", err)
}
if !match {
t.Errorf("Unexpected second line of audit: %s", line[1])
}
}
func TestLegacyAuditNoPanicOnNilUser(t *testing.T) {
var buf bytes.Buffer
handler := WithLegacyAudit(&fakeHTTPHandler{}, &fakeRequestContextMapper{}, &buf)
req, _ := http.NewRequest("GET", "/api/v1/namespaces/default/pods", nil)
req.RemoteAddr = "127.0.0.1"
handler.ServeHTTP(httptest.NewRecorder(), req)
line := strings.Split(strings.TrimSpace(buf.String()), "\n")
if len(line) != 2 {
t.Fatalf("Unexpected amount of lines in audit log: %d", len(line))
}
match, err := regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="GET" user="<none>" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods"`, line[0])
if err != nil {
t.Errorf("Unexpected error matching first line: %v", err)
}
if !match {
t.Errorf("Unexpected first line of audit: %s", line[0])
}
match, err = regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" response="200"`, line[1])
if err != nil {
t.Errorf("Unexpected error matching second line: %v", err)
}
if !match {
t.Errorf("Unexpected second line of audit: %s", line[1])
}
}

View File

@ -100,7 +100,7 @@ func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
if lister == nil {
lister = staticLister{apiResources}
}
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, lister)
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, lister, g.Context)
versionDiscoveryHandler.AddToWebService(ws)
container.Add(ws)
return utilerrors.NewAggregate(registrationErrors)
@ -129,7 +129,7 @@ func (g *APIGroupVersion) UpdateREST(container *restful.Container) error {
if lister == nil {
lister = staticLister{apiResources}
}
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, lister)
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, lister, g.Context)
versionDiscoveryHandler.AddToWebService(ws)
return utilerrors.NewAggregate(registrationErrors)
}

View File

@ -64,6 +64,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/handlers/negotiation:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",

View File

@ -118,14 +118,14 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
redirector, ok := storage.(rest.Redirector)
if !ok {
httplog.LogOf(req, w).Addf("'%v' is not a redirector", resource)
httpCode = responsewriters.ErrorNegotiated(apierrors.NewMethodNotSupported(schema.GroupResource{Resource: resource}, "proxy"), r.Serializer, gv, w, req)
httpCode = responsewriters.ErrorNegotiated(ctx, apierrors.NewMethodNotSupported(schema.GroupResource{Resource: resource}, "proxy"), r.Serializer, gv, w, req)
return
}
location, roundTripper, err := redirector.ResourceLocation(ctx, id)
if err != nil {
httplog.LogOf(req, w).Addf("Error getting ResourceLocation: %v", err)
httpCode = responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req)
httpCode = responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req)
return
}
if location == nil {
@ -158,7 +158,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newReq, err := http.NewRequest(req.Method, location.String(), req.Body)
if err != nil {
httpCode = responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req)
httpCode = responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req)
return
}
httpCode = http.StatusOK
@ -171,7 +171,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// TODO convert this entire proxy to an UpgradeAwareProxy similar to
// https://github.com/openshift/origin/blob/master/pkg/util/httpproxy/upgradeawareproxy.go.
// That proxy needs to be modified to support multiple backends, not just 1.
if r.tryUpgrade(w, req, newReq, location, roundTripper, gv) {
if r.tryUpgrade(ctx, w, req, newReq, location, roundTripper, gv) {
return
}
@ -220,13 +220,13 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
// tryUpgrade returns true if the request was handled.
func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Request, location *url.URL, transport http.RoundTripper, gv schema.GroupVersion) bool {
func (r *ProxyHandler) tryUpgrade(ctx request.Context, w http.ResponseWriter, req, newReq *http.Request, location *url.URL, transport http.RoundTripper, gv schema.GroupVersion) bool {
if !httpstream.IsUpgradeRequest(req) {
return false
}
backendConn, err := proxyutil.DialURL(location, transport)
if err != nil {
responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req)
responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req)
return true
}
defer backendConn.Close()
@ -236,13 +236,13 @@ func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Reque
// hijack, just for reference...
requestHijackedConn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req)
responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req)
return true
}
defer requestHijackedConn.Close()
if err = newReq.Write(backendConn); err != nil {
responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req)
responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req)
return true
}

View File

@ -39,8 +39,10 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/handlers/negotiation:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/flushwriter:go_default_library",

View File

@ -26,7 +26,9 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/flushwriter"
"k8s.io/apiserver/pkg/util/wsstream"
@ -37,16 +39,16 @@ import (
// response. The Accept header and current API version will be passed in, and the output will be copied
// directly to the response body. If content type is returned it is used, otherwise the content type will
// be "application/octet-stream". All other objects are sent to standard JSON serialization.
func WriteObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, object runtime.Object, w http.ResponseWriter, req *http.Request) {
func WriteObject(ctx request.Context, statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, object runtime.Object, w http.ResponseWriter, req *http.Request) {
stream, ok := object.(rest.ResourceStreamer)
if !ok {
WriteObjectNegotiated(s, gv, w, req, statusCode, object)
WriteObjectNegotiated(ctx, s, gv, w, req, statusCode, object)
return
}
out, flush, contentType, err := stream.InputStream(gv.String(), req.Header.Get("Accept"))
if err != nil {
ErrorNegotiated(err, s, gv, w, req)
ErrorNegotiated(ctx, err, s, gv, w, req)
return
}
if out == nil {
@ -76,8 +78,9 @@ func WriteObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSer
io.Copy(writer, out)
}
// WriteObjectNegotiated renders an object in the content type negotiated by the client
func WriteObjectNegotiated(s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
// WriteObjectNegotiated renders an object in the content type negotiated by the client.
// The context is optional and can be nil.
func WriteObjectNegotiated(ctx request.Context, s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
serializer, err := negotiation.NegotiateOutputSerializer(req, s)
if err != nil {
status := apiStatus(err)
@ -85,6 +88,10 @@ func WriteObjectNegotiated(s runtime.NegotiatedSerializer, gv schema.GroupVersio
return
}
if ae := request.AuditEventFrom(ctx); ae != nil {
audit.LogResponseObject(ae, object, gv, s)
}
w.Header().Set("Content-Type", serializer.MediaType)
w.WriteHeader(statusCode)
@ -95,7 +102,8 @@ func WriteObjectNegotiated(s runtime.NegotiatedSerializer, gv schema.GroupVersio
}
// ErrorNegotiated renders an error to the response. Returns the HTTP status code of the error.
func ErrorNegotiated(err error, s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request) int {
// The context is options and may be nil.
func ErrorNegotiated(ctx request.Context, err error, s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request) int {
status := apiStatus(err)
code := int(status.Code)
// when writing an error, check to see if the status indicates a retry after period
@ -109,7 +117,7 @@ func ErrorNegotiated(err error, s runtime.NegotiatedSerializer, gv schema.GroupV
return code
}
WriteObjectNegotiated(s, gv, w, req, code, status)
WriteObjectNegotiated(ctx, s, gv, w, req, code, status)
return code
}

View File

@ -42,6 +42,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
@ -72,7 +73,8 @@ type RequestScope struct {
}
func (scope *RequestScope) err(err error, w http.ResponseWriter, req *http.Request) {
responsewriters.ErrorNegotiated(err, scope.Serializer, scope.Kind.GroupVersion(), w, req)
ctx := scope.ContextFunc(req)
responsewriters.ErrorNegotiated(ctx, err, scope.Serializer, scope.Kind.GroupVersion(), w, req)
}
// getterFunc performs a get request with the given context and object name. The request
@ -111,8 +113,9 @@ func getResourceHandler(scope RequestScope, getter getterFunc) http.HandlerFunc
scope.err(err, w, req)
return
}
trace.Step("About to write a response")
responsewriters.WriteObject(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
responsewriters.WriteObject(ctx, http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
}
}
@ -231,7 +234,8 @@ type responder struct {
}
func (r *responder) Object(statusCode int, obj runtime.Object) {
responsewriters.WriteObject(statusCode, r.scope.Kind.GroupVersion(), r.scope.Serializer, obj, r.w, r.req)
ctx := r.scope.ContextFunc(r.req)
responsewriters.WriteObject(ctx, statusCode, r.scope.Kind.GroupVersion(), r.scope.Serializer, obj, r.w, r.req)
}
func (r *responder) Error(err error) {
@ -339,7 +343,8 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch
return
}
}
responsewriters.WriteObject(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
responsewriters.WriteObject(ctx, http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
trace.Step(fmt.Sprintf("Writing http response done (%d items)", numberOfItems))
}
}
@ -400,6 +405,9 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object
}
trace.Step("Conversion done")
ae := request.AuditEventFrom(ctx)
audit.LogRequestObject(ae, obj, scope.Resource.GroupVersion(), scope.Serializer)
if admit != nil && admit.Handles(admission.Create) {
userInfo, _ := request.UserFrom(ctx)
@ -435,7 +443,7 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object
}
trace.Step("Self-link added")
responsewriters.WriteObject(http.StatusCreated, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
responsewriters.WriteObject(ctx, http.StatusCreated, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
}
}
@ -495,6 +503,9 @@ func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface
return
}
ae := request.AuditEventFrom(ctx)
audit.LogRequestPatch(ae, patchJS)
s, ok := runtime.SerializerInfoForMediaType(scope.Serializer.SupportedMediaTypes(), runtime.ContentTypeJSON)
if !ok {
scope.err(fmt.Errorf("no serializer defined for JSON"), w, req)
@ -532,7 +543,7 @@ func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface
return
}
responsewriters.WriteObject(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
responsewriters.WriteObject(ctx, http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
}
}
@ -818,6 +829,9 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType
}
trace.Step("Conversion done")
ae := request.AuditEventFrom(ctx)
audit.LogRequestObject(ae, obj, scope.Resource.GroupVersion(), scope.Serializer)
if err := checkName(obj, name, namespace, scope.Namer); err != nil {
scope.err(err, w, req)
return
@ -859,7 +873,7 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType
if wasCreated {
status = http.StatusCreated
}
responsewriters.WriteObject(status, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
responsewriters.WriteObject(ctx, status, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
}
}
@ -906,6 +920,9 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope RequestSco
scope.err(fmt.Errorf("decoded object cannot be converted to DeleteOptions"), w, req)
return
}
ae := request.AuditEventFrom(ctx)
audit.LogRequestObject(ae, obj, scope.Resource.GroupVersion(), scope.Serializer)
} else {
if values := req.URL.Query(); len(values) > 0 {
if err := metainternalversion.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, options); err != nil {
@ -974,7 +991,8 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope RequestSco
}
}
}
responsewriters.WriteObject(status, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
responsewriters.WriteObject(ctx, status, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
}
}
@ -1046,6 +1064,9 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco
scope.err(fmt.Errorf("decoded object cannot be converted to DeleteOptions"), w, req)
return
}
ae := request.AuditEventFrom(ctx)
audit.LogRequestObject(ae, obj, scope.Resource.GroupVersion(), scope.Serializer)
}
}
@ -1076,7 +1097,8 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco
}
}
}
responsewriters.WriteObjectNegotiated(scope.Serializer, scope.Kind.GroupVersion(), w, req, http.StatusOK, result)
responsewriters.WriteObjectNegotiated(ctx, scope.Serializer, scope.Kind.GroupVersion(), w, req, http.StatusOK, result)
}
}

View File

@ -30,6 +30,7 @@ go_library(
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
],
)

View File

@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/authentication/user"
)
@ -63,6 +64,9 @@ const (
// userAgentKey is the context key for the request user agent.
userAgentKey
// auditKey is the context key for the audit event.
auditKey
namespaceDefault = "default" // TODO(sttts): solve import cycle when using metav1.NamespaceDefault
)
@ -143,3 +147,14 @@ func UserAgentFrom(ctx Context) (string, bool) {
userAgent, ok := ctx.Value(userAgentKey).(string)
return userAgent, ok
}
// WithAuditEvent returns set audit event struct.
func WithAuditEvent(parent Context, ev *audit.Event) Context {
return WithValue(parent, auditKey, ev)
}
// AuditEventFrom returns the audit event struct on the ctx
func AuditEventFrom(ctx Context) *audit.Event {
ev, _ := ctx.Value(auditKey).(*audit.Event)
return ev
}

View File

@ -24,6 +24,9 @@ import (
"github.com/golang/glog"
)
// LongRunningRequestCheck is a predicate which is true for long-running http requests.
type LongRunningRequestCheck func(r *http.Request, requestInfo *RequestInfo) bool
// RequestContextMapper keeps track of the context associated with a particular request
type RequestContextMapper interface {
// Get returns the context associated with the given request (if any), and true if the request has an associated context, and false if it does not.

View File

@ -77,6 +77,8 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/version:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/apiserver/install:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/authenticatorfactory:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/request/union:go_default_library",

View File

@ -20,7 +20,6 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net"
"net/http"
goruntime "runtime"
@ -39,6 +38,8 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/admission"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/authenticatorfactory"
authenticatorunion "k8s.io/apiserver/pkg/authentication/request/union"
@ -100,8 +101,11 @@ type Config struct {
// Version will enable the /version endpoint if non-nil
Version *version.Info
// AuditWriter is the destination for audit logs. If nil, they will not be written.
AuditWriter io.Writer
// AuditBackend is where audit events are sent to.
AuditBackend audit.Backend
// AuditPolicy defines rules which determine the audit level for different requests.
AuditPolicy *auditinternal.Policy
// SupportsBasicAuth indicates that's at least one Authenticator supports basic auth
// If this is true, a basic auth challenge is returned on authentication failure
// TODO(roberthbailey): Remove once the server no longer supports http basic auth.
@ -150,7 +154,7 @@ type Config struct {
// request has to wait.
MaxMutatingRequestsInFlight int
// Predicate which is true for paths of long-running http requests
LongRunningFunc genericfilters.LongRunningRequestCheck
LongRunningFunc apirequest.LongRunningRequestCheck
//===========================================================================
// values below here are targets for removal
@ -374,7 +378,7 @@ func (c completedConfig) New(delegationTarget DelegationTarget) (*GenericAPIServ
handlerChainBuilder := func(handler http.Handler) http.Handler {
return c.BuildHandlerChainFunc(handler, c.Config)
}
apiServerHandler := NewAPIServerHandler(c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
apiServerHandler := NewAPIServerHandler(c.RequestContextMapper, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
s := &GenericAPIServer{
discoveryAddresses: c.DiscoveryAddresses,
@ -383,6 +387,7 @@ func (c completedConfig) New(delegationTarget DelegationTarget) (*GenericAPIServ
admissionControl: c.AdmissionControl,
requestContextMapper: c.RequestContextMapper,
Serializer: c.Serializer,
AuditBackend: c.AuditBackend,
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
@ -401,7 +406,7 @@ func (c completedConfig) New(delegationTarget DelegationTarget) (*GenericAPIServ
healthzChecks: c.HealthzChecks,
DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer),
DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer, c.RequestContextMapper),
}
for k, v := range delegationTarget.PostStartHooks() {
@ -452,7 +457,8 @@ func (c completedConfig) New(delegationTarget DelegationTarget) (*GenericAPIServ
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := genericapifilters.WithAuthorization(apiHandler, c.RequestContextMapper, c.Authorizer)
handler = genericapifilters.WithImpersonation(handler, c.RequestContextMapper, c.Authorizer)
handler = genericapifilters.WithAudit(handler, c.RequestContextMapper, c.AuditWriter)
// TODO(audit): use WithLegacyAudit if feature flag is false
handler = genericapifilters.WithAudit(handler, c.RequestContextMapper, c.AuditBackend, c.AuditPolicy, c.LongRunningFunc)
handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, genericapifilters.Unauthorized(c.SupportsBasicAuth))
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithPanicRecovery(handler)

View File

@ -23,11 +23,8 @@ import (
apirequest "k8s.io/apiserver/pkg/endpoints/request"
)
// LongRunningRequestCheck is a predicate which is true for long-running http requests.
type LongRunningRequestCheck func(r *http.Request, requestInfo *apirequest.RequestInfo) bool
// BasicLongRunningRequestCheck returns true if the given request has one of the specified verbs or one of the specified subresources
func BasicLongRunningRequestCheck(longRunningVerbs, longRunningSubresources sets.String) LongRunningRequestCheck {
func BasicLongRunningRequestCheck(longRunningVerbs, longRunningSubresources sets.String) apirequest.LongRunningRequestCheck {
return func(r *http.Request, requestInfo *apirequest.RequestInfo) bool {
if longRunningVerbs.Has(requestInfo.Verb) {
return true

View File

@ -47,7 +47,7 @@ func WithMaxInFlightLimit(
nonMutatingLimit int,
mutatingLimit int,
requestContextMapper genericapirequest.RequestContextMapper,
longRunningRequestCheck LongRunningRequestCheck,
longRunningRequestCheck apirequest.LongRunningRequestCheck,
) http.Handler {
if nonMutatingLimit == 0 && mutatingLimit == 0 {
return handler

View File

@ -35,7 +35,7 @@ const globalTimeout = time.Minute
var errConnKilled = fmt.Errorf("kill connection/stream")
// WithTimeoutForNonLongRunningRequests times out non-long-running requests after the time given by globalTimeout.
func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMapper apirequest.RequestContextMapper, longRunning LongRunningRequestCheck) http.Handler {
func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMapper apirequest.RequestContextMapper, longRunning apirequest.LongRunningRequestCheck) http.Handler {
if longRunning == nil {
return handler
}

View File

@ -37,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/audit"
genericapi "k8s.io/apiserver/pkg/endpoints"
"k8s.io/apiserver/pkg/endpoints/discovery"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
@ -141,6 +142,9 @@ type GenericAPIServer struct {
healthzLock sync.Mutex
healthzChecks []healthz.HealthzChecker
healthzCreated bool
// auditing. The backend is started after the server starts listening.
AuditBackend audit.Backend
}
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
@ -272,6 +276,14 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
close(internalStopCh)
}()
// Start the audit backend before any request comes in. This means we cannot turn it into a
// post start hook because without calling Backend.Run the Backend.ProcessEvents call might block.
if s.AuditBackend != nil {
if err := s.AuditBackend.Run(stopCh); err != nil {
return fmt.Errorf("failed to run the audit backend: %v", err)
}
}
s.RunPostStartHooks(stopCh)
if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
@ -322,7 +334,7 @@ func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo
}
// Install the version handler.
// Add a handler at /<apiPrefix> to enumerate the supported api versions.
s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix, apiVersions).WebService())
s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix, apiVersions, s.requestContextMapper).WebService())
return nil
}
@ -366,7 +378,7 @@ func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
}
s.DiscoveryGroupManager.AddGroup(apiGroup)
s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup, s.requestContextMapper).WebService())
return nil
}

View File

@ -18,6 +18,7 @@ package server
import (
"bytes"
"errors"
"fmt"
"net/http"
rt "runtime"
@ -30,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/mux"
genericmux "k8s.io/apiserver/pkg/server/mux"
)
@ -52,7 +54,7 @@ type APIServerHandler struct {
// It is normally used to apply filtering like authentication and authorization
type HandlerChainBuilderFn func(apiHandler http.Handler) http.Handler
func NewAPIServerHandler(s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
func NewAPIServerHandler(contextMapper request.RequestContextMapper, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
postGoRestfulMux := genericmux.NewPathRecorderMux()
if notFoundHandler != nil {
postGoRestfulMux.NotFoundHandler(notFoundHandler)
@ -65,7 +67,11 @@ func NewAPIServerHandler(s runtime.NegotiatedSerializer, handlerChainBuilder Han
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
ctx, ok := contextMapper.Get(request.Request)
if !ok {
responsewriters.InternalError(response.ResponseWriter, request.Request, errors.New("no context found for request"))
}
serviceErrorHandler(ctx, s, serviceErr, request, response)
})
// register the defaultHandler for everything. This will allow an unhandled request to fall through to another handler instead of
@ -109,11 +115,13 @@ func logStackOnRecover(s runtime.NegotiatedSerializer, panicReason interface{},
if ct := w.Header().Get("Content-Type"); len(ct) > 0 {
headers.Set("Accept", ct)
}
responsewriters.ErrorNegotiated(apierrors.NewGenericServerResponse(http.StatusInternalServerError, "", schema.GroupResource{}, "", "", 0, false), s, schema.GroupVersion{}, w, &http.Request{Header: headers})
emptyContext := request.NewContext() // best we can do here: we don't know the request
responsewriters.ErrorNegotiated(emptyContext, apierrors.NewGenericServerResponse(http.StatusInternalServerError, "", schema.GroupResource{}, "", "", 0, false), s, schema.GroupVersion{}, w, &http.Request{Header: headers})
}
func serviceErrorHandler(s runtime.NegotiatedSerializer, serviceErr restful.ServiceError, request *restful.Request, resp *restful.Response) {
func serviceErrorHandler(ctx request.Context, s runtime.NegotiatedSerializer, serviceErr restful.ServiceError, request *restful.Request, resp *restful.Response) {
responsewriters.ErrorNegotiated(
ctx,
apierrors.NewGenericServerResponse(serviceErr.Code, "", schema.GroupResource{}, "", serviceErr.Message, 0, false),
s,
schema.GroupVersion{},

View File

@ -64,6 +64,7 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/log:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/authentication/v1beta1:go_default_library",

View File

@ -17,12 +17,14 @@ limitations under the License.
package options
import (
"io"
"os"
"github.com/spf13/pflag"
"gopkg.in/natefinch/lumberjack.v2"
"k8s.io/apiserver/pkg/server"
pluginlog "k8s.io/apiserver/plugin/pkg/audit/log"
)
type AuditLogOptions struct {
@ -52,16 +54,15 @@ func (o *AuditLogOptions) ApplyTo(c *server.Config) error {
return nil
}
if o.Path == "-" {
c.AuditWriter = os.Stdout
return nil
}
c.AuditWriter = &lumberjack.Logger{
Filename: o.Path,
MaxAge: o.MaxAge,
MaxBackups: o.MaxBackups,
MaxSize: o.MaxSize,
var w io.Writer = os.Stdout
if o.Path != "-" {
w = &lumberjack.Logger{
Filename: o.Path,
MaxAge: o.MaxAge,
MaxBackups: o.MaxBackups,
MaxSize: o.MaxSize,
}
}
c.AuditBackend = pluginlog.NewBackend(w)
return nil
}

View File

@ -0,0 +1,14 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["doc.go"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,18 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package auduút contains implementations for pkg/audit/AuditBackend interface
package audit // import "k8s.io/apiserver/plugin/pkg/audit"

View File

@ -0,0 +1,19 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["backend.go"],
tags = ["automanaged"],
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
],
)

View File

@ -0,0 +1,102 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package log
import (
"fmt"
"io"
"strconv"
"strings"
"time"
"github.com/golang/glog"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
)
type backend struct {
out io.Writer
sink chan *auditinternal.Event
}
var _ audit.Backend = &backend{}
func NewBackend(out io.Writer) *backend {
return &backend{
out: out,
sink: make(chan *auditinternal.Event, 100),
}
}
func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
for _, ev := range events {
b.logEvent(ev)
}
}
func (b *backend) logEvent(ev *auditinternal.Event) {
username := "<none>"
groups := "<none>"
if len(ev.User.Username) > 0 {
username = ev.User.Username
if len(ev.User.Groups) > 0 {
groups = auditStringSlice(ev.User.Groups)
}
}
asuser := "<self>"
asgroups := "<lookup>"
if ev.ImpersonatedUser != nil {
asuser = ev.ImpersonatedUser.Username
if ev.ImpersonatedUser.Groups != nil {
asgroups = auditStringSlice(ev.ImpersonatedUser.Groups)
}
}
namespace := "<none>"
if ev.ObjectRef != nil && len(ev.ObjectRef.Namespace) != 0 {
namespace = ev.ObjectRef.Namespace
}
response := "<deferred>"
if ev.ResponseStatus != nil {
response = strconv.Itoa(int(ev.ResponseStatus.Code))
}
ip := "<unknown>"
if len(ev.SourceIPs) > 0 {
ip = ev.SourceIPs[0]
}
line := fmt.Sprintf("%s AUDIT: id=%q ip=%q method=%q user=%q groups=%q as=%q asgroups=%q namespace=%q uri=%q response=\"%s\"\n",
ev.Timestamp.Format(time.RFC3339Nano), ev.AuditID, ip, ev.Verb, username, groups, asuser, asgroups, namespace, ev.RequestURI, response)
if _, err := fmt.Fprint(b.out, line); err != nil {
glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err)
}
}
func (b *backend) Run(stopCh <-chan struct{}) error {
return nil
}
func auditStringSlice(inList []string) string {
quotedElements := make([]string, len(inList))
for i, in := range inList {
quotedElements[i] = fmt.Sprintf("%q", in)
}
return strings.Join(quotedElements, ",")
}

View File

@ -172,6 +172,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
apisHandler := &apisHandler{
codecs: Codecs,
lister: s.lister,
mapper: s.contextMapper,
}
s.GenericAPIServer.Handler.PostGoRestfulMux.Handle("/apis", apisHandler)
s.GenericAPIServer.Handler.PostGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
@ -242,10 +243,11 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService, de
// it's time to register the group aggregation endpoint
groupPath := "/apis/" + apiService.Spec.Group
groupDiscoveryHandler := &apiGroupHandler{
codecs: Codecs,
groupName: apiService.Spec.Group,
lister: s.lister,
delegate: s.delegateHandler,
codecs: Codecs,
groupName: apiService.Spec.Group,
lister: s.lister,
delegate: s.delegateHandler,
contextMapper: s.contextMapper,
}
// aggregation is protected
s.GenericAPIServer.Handler.PostGoRestfulMux.Handle(groupPath, groupDiscoveryHandler)

View File

@ -17,6 +17,7 @@ limitations under the License.
package apiserver
import (
"errors"
"net/http"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -25,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
apiregistrationapi "k8s.io/kube-aggregator/pkg/apis/apiregistration"
apiregistrationv1beta1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
@ -36,6 +38,7 @@ import (
type apisHandler struct {
codecs serializer.CodecFactory
lister listers.APIServiceLister
mapper request.RequestContextMapper
}
var discoveryGroup = metav1.APIGroup{
@ -53,6 +56,12 @@ var discoveryGroup = metav1.APIGroup{
}
func (r *apisHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ctx, ok := r.mapper.Get(req)
if !ok {
responsewriters.InternalError(w, req, errors.New("no context found for request"))
return
}
discoveryGroupList := &metav1.APIGroupList{
// always add OUR api group to the list first. Since we'll never have a registered APIService for it
// and since this is the crux of the API, having this first will give our names priority. It's good to be king.
@ -76,7 +85,7 @@ func (r *apisHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}
responsewriters.WriteObjectNegotiated(r.codecs, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroupList)
responsewriters.WriteObjectNegotiated(ctx, r.codecs, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroupList)
}
// convertToDiscoveryAPIGroup takes apiservices in a single group and returns a discovery compatible object.
@ -115,8 +124,9 @@ func convertToDiscoveryAPIGroup(apiServices []*apiregistrationapi.APIService) *m
// apiGroupHandler serves the `/apis/<group>` endpoint.
type apiGroupHandler struct {
codecs serializer.CodecFactory
groupName string
codecs serializer.CodecFactory
groupName string
contextMapper request.RequestContextMapper
lister listers.APIServiceLister
@ -124,6 +134,12 @@ type apiGroupHandler struct {
}
func (r *apiGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ctx, ok := r.contextMapper.Get(req)
if !ok {
responsewriters.InternalError(w, req, errors.New("no context found for request"))
return
}
apiServices, err := r.lister.List(labels.Everything())
if statusErr, ok := err.(*apierrors.StatusError); ok && err != nil {
responsewriters.WriteRawJSON(int(statusErr.Status().Code), statusErr.Status(), w)
@ -151,5 +167,5 @@ func (r *apiGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
http.Error(w, "", http.StatusNotFound)
return
}
responsewriters.WriteObjectNegotiated(r.codecs, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroup)
responsewriters.WriteObjectNegotiated(ctx, r.codecs, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroup)
}

View File

@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/tools/cache"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
@ -236,16 +237,18 @@ func TestAPIs(t *testing.T) {
}
for _, tc := range tests {
mapper := request.NewRequestContextMapper()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
handler := &apisHandler{
codecs: Codecs,
lister: listers.NewAPIServiceLister(indexer),
mapper: mapper,
}
for _, o := range tc.apiservices {
indexer.Add(o)
}
server := httptest.NewServer(handler)
server := httptest.NewServer(request.WithRequestContext(handler, mapper))
defer server.Close()
resp, err := http.Get(server.URL + "/apis")
@ -272,6 +275,7 @@ func TestAPIs(t *testing.T) {
}
func TestAPIGroupMissing(t *testing.T) {
mapper := request.NewRequestContextMapper()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
handler := &apiGroupHandler{
codecs: Codecs,
@ -280,9 +284,10 @@ func TestAPIGroupMissing(t *testing.T) {
delegate: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusForbidden)
}),
contextMapper: mapper,
}
server := httptest.NewServer(handler)
server := httptest.NewServer(request.WithRequestContext(handler, mapper))
defer server.Close()
// this call should delegate
@ -415,17 +420,19 @@ func TestAPIGroup(t *testing.T) {
}
for _, tc := range tests {
mapper := request.NewRequestContextMapper()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
handler := &apiGroupHandler{
codecs: Codecs,
lister: listers.NewAPIServiceLister(indexer),
groupName: "foo",
codecs: Codecs,
lister: listers.NewAPIServiceLister(indexer),
groupName: "foo",
contextMapper: mapper,
}
for _, o := range tc.apiservices {
indexer.Add(o)
}
server := httptest.NewServer(handler)
server := httptest.NewServer(request.WithRequestContext(handler, mapper))
defer server.Close()
resp, err := http.Get(server.URL + "/apis/" + tc.group)

View File

@ -160,7 +160,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
s.GenericAPIServer.Handler.PostGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.PostGoRestfulMux.HandlePrefix("/apis/", crdHandler)
crdController := NewDiscoveryController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
crdController := NewDiscoveryController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler, c.GenericConfig.RequestContextMapper)
namingController := status.NewNamingConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient)
finalizingController := finalizer.NewCRDFinalizer(
s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),

View File

@ -28,6 +28,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/discovery"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
@ -39,6 +40,7 @@ import (
type DiscoveryController struct {
versionHandler *versionDiscoveryHandler
groupHandler *groupDiscoveryHandler
contextMapper request.RequestContextMapper
crdLister listers.CustomResourceDefinitionLister
crdsSynced cache.InformerSynced
@ -49,12 +51,13 @@ type DiscoveryController struct {
queue workqueue.RateLimitingInterface
}
func NewDiscoveryController(crdInformer informers.CustomResourceDefinitionInformer, versionHandler *versionDiscoveryHandler, groupHandler *groupDiscoveryHandler) *DiscoveryController {
func NewDiscoveryController(crdInformer informers.CustomResourceDefinitionInformer, versionHandler *versionDiscoveryHandler, groupHandler *groupDiscoveryHandler, contextMapper request.RequestContextMapper) *DiscoveryController {
c := &DiscoveryController{
versionHandler: versionHandler,
groupHandler: groupHandler,
crdLister: crdInformer.Lister(),
crdsSynced: crdInformer.Informer().HasSynced,
contextMapper: contextMapper,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DiscoveryController"),
}
@ -129,7 +132,7 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error {
// the preferred versions for a group is arbitrary since there cannot be duplicate resources
PreferredVersion: apiVersionsForDiscovery[0],
}
c.groupHandler.setDiscovery(version.Group, discovery.NewAPIGroupHandler(Codecs, apiGroup))
c.groupHandler.setDiscovery(version.Group, discovery.NewAPIGroupHandler(Codecs, apiGroup, c.contextMapper))
if !foundVersion {
c.versionHandler.unsetDiscovery(version)
@ -137,7 +140,7 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error {
}
c.versionHandler.setDiscovery(version, discovery.NewAPIVersionHandler(Codecs, version, discovery.APIResourceListerFunc(func() []metav1.APIResource {
return apiResourcesForDiscovery
})))
}), c.contextMapper))
return nil
}