Merge pull request #60824 from hzxuzhonghu/requestContextMap-rwlock

Automatic merge from submit-queue (batch tested with PRs 62425, 62212, 60824, 62383, 62384). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

optimize requestcontext: use RWMutex to improve r/w performance

RequestContextMapper is one of the mostly used interface by every request, and the underlying struct is a map with Mutex protect. So here we should use RWMutex.

**Release note**:

```release-note
NONE
```
pull/8/head
Kubernetes Submit Queue 2018-04-11 16:36:19 -07:00 committed by GitHub
commit 14fca16a39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 224 additions and 46 deletions

View File

@ -541,6 +541,7 @@ staging/src/k8s.io/apiserver/pkg/endpoints/handlers
staging/src/k8s.io/apiserver/pkg/endpoints/handlers/negotiation
staging/src/k8s.io/apiserver/pkg/endpoints/metrics
staging/src/k8s.io/apiserver/pkg/endpoints/openapi/testing
staging/src/k8s.io/apiserver/pkg/endpoints/request
staging/src/k8s.io/apiserver/pkg/endpoints/testing
staging/src/k8s.io/apiserver/pkg/features
staging/src/k8s.io/apiserver/pkg/registry/generic

View File

@ -8,11 +8,17 @@ load(
go_test(
name = "go_default_test",
srcs = ["requestinfo_test.go"],
srcs = [
"context_test.go",
"requestcontext_test.go",
"requestinfo_test.go",
],
embed = [":go_default_library"],
deps = [
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
],
)
@ -35,17 +41,6 @@ go_library(
],
)
go_test(
name = "go_default_xtest",
srcs = ["context_test.go"],
deps = [
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),

View File

@ -18,7 +18,7 @@ package request
import (
"context"
stderrs "errors"
"errors"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -83,7 +83,7 @@ func NewDefaultContext() Context {
func WithValue(parent Context, key interface{}, val interface{}) Context {
internalCtx, ok := parent.(context.Context)
if !ok {
panic(stderrs.New("Invalid context type"))
panic(errors.New("Invalid context type"))
}
return context.WithValue(internalCtx, key, val)
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package request_test
package request
import (
"testing"
@ -22,13 +22,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/authentication/user"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
)
// TestNamespaceContext validates that a namespace can be get/set on a context object
func TestNamespaceContext(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
result, ok := genericapirequest.NamespaceFrom(ctx)
ctx := NewDefaultContext()
result, ok := NamespaceFrom(ctx)
if !ok {
t.Fatalf("Error getting namespace")
}
@ -36,8 +35,8 @@ func TestNamespaceContext(t *testing.T) {
t.Fatalf("Expected: %s, Actual: %s", metav1.NamespaceDefault, result)
}
ctx = genericapirequest.NewContext()
result, ok = genericapirequest.NamespaceFrom(ctx)
ctx = NewContext()
result, ok = NamespaceFrom(ctx)
if ok {
t.Fatalf("Should not be ok because there is no namespace on the context")
}
@ -45,12 +44,12 @@ func TestNamespaceContext(t *testing.T) {
//TestUserContext validates that a userinfo can be get/set on a context object
func TestUserContext(t *testing.T) {
ctx := genericapirequest.NewContext()
_, ok := genericapirequest.UserFrom(ctx)
ctx := NewContext()
_, ok := UserFrom(ctx)
if ok {
t.Fatalf("Should not be ok because there is no user.Info on the context")
}
ctx = genericapirequest.WithUser(
ctx = WithUser(
ctx,
&user.DefaultInfo{
Name: "bob",
@ -60,7 +59,7 @@ func TestUserContext(t *testing.T) {
},
)
result, ok := genericapirequest.UserFrom(ctx)
result, ok := UserFrom(ctx)
if !ok {
t.Fatalf("Error getting user info")
}
@ -96,16 +95,16 @@ func TestUserContext(t *testing.T) {
//TestUIDContext validates that a UID can be get/set on a context object
func TestUIDContext(t *testing.T) {
ctx := genericapirequest.NewContext()
_, ok := genericapirequest.UIDFrom(ctx)
ctx := NewContext()
_, ok := UIDFrom(ctx)
if ok {
t.Fatalf("Should not be ok because there is no UID on the context")
}
ctx = genericapirequest.WithUID(
ctx = WithUID(
ctx,
types.UID("testUID"),
)
_, ok = genericapirequest.UIDFrom(ctx)
_, ok = UIDFrom(ctx)
if !ok {
t.Fatalf("Error getting UID")
}
@ -113,17 +112,17 @@ func TestUIDContext(t *testing.T) {
//TestUserAgentContext validates that a useragent can be get/set on a context object
func TestUserAgentContext(t *testing.T) {
ctx := genericapirequest.NewContext()
_, ok := genericapirequest.UserAgentFrom(ctx)
ctx := NewContext()
_, ok := UserAgentFrom(ctx)
if ok {
t.Fatalf("Should not be ok because there is no UserAgent on the context")
}
ctx = genericapirequest.WithUserAgent(
ctx = WithUserAgent(
ctx,
"TestUserAgent",
)
result, ok := genericapirequest.UserAgentFrom(ctx)
result, ok := UserAgentFrom(ctx)
if !ok {
t.Fatalf("Error getting UserAgent")
}

View File

@ -20,6 +20,7 @@ import (
"errors"
"net/http"
"sync"
"sync/atomic"
"github.com/golang/glog"
)
@ -40,37 +41,62 @@ type RequestContextMapper interface {
}
type requestContextMap struct {
contexts map[*http.Request]Context
lock sync.Mutex
// contexts contains a request Context map
// atomic.Value has a very good read performance compared to sync.RWMutex
// almost all requests have 3-4 context updates associated with them,
// and they can use only read lock to protect updating context, which is of higher performance with higher burst.
contexts map[*http.Request]*atomic.Value
lock sync.RWMutex
}
// NewRequestContextMapper returns a new RequestContextMapper.
// The returned mapper must be added as a request filter using NewRequestContextFilter.
func NewRequestContextMapper() RequestContextMapper {
return &requestContextMap{
contexts: make(map[*http.Request]Context),
contexts: make(map[*http.Request]*atomic.Value),
}
}
func (c *requestContextMap) getValue(req *http.Request) (*atomic.Value, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
value, ok := c.contexts[req]
return value, ok
}
// contextWrap is a wrapper of Context to prevent atomic.Value to be copied
type contextWrap struct {
Context
}
// Get returns the context associated with the given request (if any), and true if the request has an associated context, and false if it does not.
// Get will only return a valid context when called from inside the filter chain set up by NewRequestContextFilter()
func (c *requestContextMap) Get(req *http.Request) (Context, bool) {
c.lock.Lock()
defer c.lock.Unlock()
context, ok := c.contexts[req]
return context, ok
value, ok := c.getValue(req)
if !ok {
return nil, false
}
if context, ok := value.Load().(contextWrap); ok {
return context.Context, ok
}
return nil, false
}
// Update maps the request to the given context.
// If no context was previously associated with the request, an error is returned and the context is ignored.
func (c *requestContextMap) Update(req *http.Request, context Context) error {
c.lock.Lock()
defer c.lock.Unlock()
if _, ok := c.contexts[req]; !ok {
return errors.New("No context associated")
value, ok := c.getValue(req)
if !ok {
return errors.New("no context associated")
}
// TODO: ensure the new context is a descendant of the existing one
c.contexts[req] = context
wrapper, ok := value.Load().(contextWrap)
if !ok {
return errors.New("value type does not match")
}
wrapper.Context = context
value.Store(wrapper)
return nil
}
@ -83,7 +109,10 @@ func (c *requestContextMap) init(req *http.Request, context Context) bool {
if _, exists := c.contexts[req]; exists {
return false
}
c.contexts[req] = context
value := &atomic.Value{}
value.Store(contextWrap{context})
c.contexts[req] = value
return true
}

View File

@ -0,0 +1,154 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package request
import (
"net/http"
"sync"
"testing"
)
func TestRequestContextMapperGet(t *testing.T) {
mapper := NewRequestContextMapper()
context := NewContext()
req, _ := http.NewRequest("GET", "/api/version/resource", nil)
// empty mapper
if _, ok := mapper.Get(req); ok {
t.Fatalf("got unexpected context")
}
// init mapper
mapper.(*requestContextMap).init(req, context)
if _, ok := mapper.Get(req); !ok {
t.Fatalf("got no context")
}
// remove request context
mapper.(*requestContextMap).remove(req)
if _, ok := mapper.Get(req); ok {
t.Fatalf("got unexpected context")
}
}
func TestRequestContextMapperUpdate(t *testing.T) {
mapper := NewRequestContextMapper()
context := NewContext()
req, _ := http.NewRequest("GET", "/api/version/resource", nil)
// empty mapper
if err := mapper.Update(req, context); err == nil {
t.Fatalf("got no error")
}
// init mapper
if !mapper.(*requestContextMap).init(req, context) {
t.Fatalf("unexpected error, should init mapper")
}
context = WithNamespace(context, "default")
if err := mapper.Update(req, context); err != nil {
t.Fatalf("unexpected error")
}
if context, ok := mapper.Get(req); !ok {
t.Fatalf("go no context")
} else {
if ns, _ := NamespaceFrom(context); ns != "default" {
t.Fatalf("unexpected namespace %s", ns)
}
}
}
func TestRequestContextMapperConcurrent(t *testing.T) {
mapper := NewRequestContextMapper()
testCases := []struct{ url, namespace string }{
{"/api/version/resource1", "ns1"},
{"/api/version/resource2", "ns2"},
{"/api/version/resource3", "ns3"},
{"/api/version/resource4", "ns4"},
{"/api/version/resource5", "ns5"},
}
wg := sync.WaitGroup{}
for _, testcase := range testCases {
wg.Add(1)
go func(testcase struct{ url, namespace string }) {
defer wg.Done()
context := NewContext()
req, _ := http.NewRequest("GET", testcase.url, nil)
if !mapper.(*requestContextMap).init(req, context) {
t.Errorf("unexpected init error")
return
}
if _, ok := mapper.Get(req); !ok {
t.Errorf("got no context")
return
}
context2 := WithNamespace(context, testcase.namespace)
if err := mapper.Update(req, context2); err != nil {
t.Errorf("unexpected update error")
return
}
if context, ok := mapper.Get(req); !ok {
t.Errorf("got no context")
return
} else {
if ns, _ := NamespaceFrom(context); ns != testcase.namespace {
t.Errorf("unexpected namespace %s", ns)
return
}
}
}(testcase)
}
wg.Wait()
}
func BenchmarkRequestContextMapper(b *testing.B) {
mapper := NewRequestContextMapper()
b.SetParallelism(500)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
context := NewContext()
req, _ := http.NewRequest("GET", "/api/version/resource", nil)
// 1 init
mapper.(*requestContextMap).init(req, context)
// 5 Get + 4 Update
mapper.Get(req)
context = WithNamespace(context, "default1")
mapper.Update(req, context)
mapper.Get(req)
context = WithNamespace(context, "default2")
mapper.Update(req, context)
mapper.Get(req)
context = WithNamespace(context, "default3")
mapper.Update(req, context)
mapper.Get(req)
context = WithNamespace(context, "default4")
mapper.Update(req, context)
mapper.Get(req)
// 1 remove
mapper.(*requestContextMap).remove(req)
}
})
}