Merge pull request #54861 from hzxuzhonghu/admission-webhook

Automatic merge from submit-queue (batch tested with PRs 53047, 54861, 55413, 55395, 55308). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

cache admission webhook restClient

**What this PR does / why we need it**:

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #54860

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```
pull/6/head
Kubernetes Submit Queue 2017-11-10 07:00:40 -08:00 committed by GitHub
commit 331c52c0fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 249 additions and 77 deletions

View File

@ -15,6 +15,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/hashicorp/golang-lru:go_default_library",
"//vendor/k8s.io/api/admission/v1alpha1:go_default_library",
"//vendor/k8s.io/api/admissionregistration/v1alpha1:go_default_library",
"//vendor/k8s.io/api/authentication/v1:go_default_library",

View File

@ -19,13 +19,15 @@ package webhook
import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/url"
"path"
"sync"
"github.com/golang/glog"
lru "github.com/hashicorp/golang-lru"
admissionv1alpha1 "k8s.io/api/admission/v1alpha1"
"k8s.io/api/admissionregistration/v1alpha1"
@ -46,6 +48,7 @@ import (
const (
// Name of admission plug-in
PluginName = "GenericAdmissionWebhook"
defaultCacheSize = 200
)
type ErrCallingWebhook struct {
@ -96,6 +99,11 @@ func NewGenericAdmissionWebhook(configFile io.Reader) (*GenericAdmissionWebhook,
return nil, err
}
cache, err := lru.New(defaultCacheSize)
if err != nil {
return nil, err
}
return &GenericAdmissionWebhook{
Handler: admission.NewHandler(
admission.Connect,
@ -105,6 +113,7 @@ func NewGenericAdmissionWebhook(configFile io.Reader) (*GenericAdmissionWebhook,
),
authInfoResolver: authInfoResolver,
serviceResolver: defaultServiceResolver{},
cache: cache,
}, nil
}
@ -116,6 +125,7 @@ type GenericAdmissionWebhook struct {
negotiatedSerializer runtime.NegotiatedSerializer
authInfoResolver AuthenticationInfoResolver
cache *lru.Cache
}
// serviceResolver knows how to convert a service reference into an actual location.
@ -300,23 +310,48 @@ func toStatusErr(name string, result *metav1.Status) *apierrors.StatusError {
}
func (a *GenericAdmissionWebhook) hookClient(h *v1alpha1.Webhook) (*rest.RESTClient, error) {
serverName := h.ClientConfig.Service.Name + "." + h.ClientConfig.Service.Namespace + ".svc"
u, err := a.serviceResolver.ResolveEndpoint(h.ClientConfig.Service.Namespace, h.ClientConfig.Service.Name)
cacheKey, err := json.Marshal(h.ClientConfig)
if err != nil {
return nil, err
}
if client, ok := a.cache.Get(string(cacheKey)); ok {
return client.(*rest.RESTClient), nil
}
// TODO: cache these instead of constructing one each time
serverName := h.ClientConfig.Service.Name + "." + h.ClientConfig.Service.Namespace + ".svc"
restConfig, err := a.authInfoResolver.ClientConfigFor(serverName)
if err != nil {
return nil, err
}
cfg := rest.CopyConfig(restConfig)
cfg.Host = u.Host
cfg.APIPath = path.Join(u.Path, h.ClientConfig.URLPath)
host := serverName + ":443"
cfg.Host = "https://" + host
cfg.APIPath = h.ClientConfig.URLPath
cfg.TLSClientConfig.ServerName = serverName
cfg.TLSClientConfig.CAData = h.ClientConfig.CABundle
cfg.ContentConfig.NegotiatedSerializer = a.negotiatedSerializer
cfg.ContentConfig.ContentType = runtime.ContentTypeJSON
return rest.UnversionedRESTClientFor(cfg)
delegateDialer := cfg.Dial
if delegateDialer == nil {
delegateDialer = net.Dial
}
cfg.Dial = func(network, addr string) (net.Conn, error) {
if addr == host {
u, err := a.serviceResolver.ResolveEndpoint(h.ClientConfig.Service.Namespace, h.ClientConfig.Service.Name)
if err != nil {
return nil, err
}
addr = u.Host
}
return delegateDialer(network, addr)
}
client, err := rest.UnversionedRESTClientFor(cfg)
if err == nil {
a.cache.Add(string(cacheKey), client)
}
return client, err
}

View File

@ -25,6 +25,7 @@ import (
"net/http/httptest"
"net/url"
"strings"
"sync/atomic"
"testing"
"k8s.io/api/admission/v1alpha1"
@ -70,19 +71,7 @@ func TestAdmit(t *testing.T) {
v1alpha1.AddToScheme(scheme)
api.AddToScheme(scheme)
// Create the test webhook server
sCert, err := tls.X509KeyPair(serverCert, serverKey)
if err != nil {
t.Fatal(err)
}
rootCAs := x509.NewCertPool()
rootCAs.AppendCertsFromPEM(caCert)
testServer := httptest.NewUnstartedServer(http.HandlerFunc(webhookHandler))
testServer.TLS = &tls.Config{
Certificates: []tls.Certificate{sCert},
ClientCAs: rootCAs,
ClientAuth: tls.RequireAndVerifyClientCert,
}
testServer := newTestServer(t)
testServer.StartTLS()
defer testServer.Close()
serverURL, err := url.ParseRequestURI(testServer.URL)
@ -93,15 +82,9 @@ func TestAdmit(t *testing.T) {
if err != nil {
t.Fatal(err)
}
wh.authInfoResolver = &fakeAuthenticationInfoResolver{
restConfig: &rest.Config{
TLSClientConfig: rest.TLSClientConfig{
CAData: caCert,
CertData: clientCert,
KeyData: clientKey,
},
},
}
wh.authInfoResolver = newFakeAuthenticationInfoResolver()
wh.serviceResolver = fakeServiceResolver{base: *serverURL}
wh.SetScheme(scheme)
// Set up a test object for the call
kind := api.SchemeGroupVersion.WithKind("Pod")
@ -137,25 +120,6 @@ func TestAdmit(t *testing.T) {
expectAllow bool
errorContains string
}
ccfg := func(urlPath string) registrationv1alpha1.WebhookClientConfig {
return registrationv1alpha1.WebhookClientConfig{
Service: registrationv1alpha1.ServiceReference{
Name: "webhook-test",
Namespace: "default",
},
URLPath: urlPath,
CABundle: caCert,
}
}
matchEverythingRules := []registrationv1alpha1.RuleWithOperations{{
Operations: []registrationv1alpha1.OperationType{registrationv1alpha1.OperationAll},
Rule: registrationv1alpha1.Rule{
APIGroups: []string{"*"},
APIVersions: []string{"*"},
Resources: []string{"*/*"},
},
}}
policyFail := registrationv1alpha1.Fail
policyIgnore := registrationv1alpha1.Ignore
@ -165,7 +129,7 @@ func TestAdmit(t *testing.T) {
hookSource: fakeHookSource{
hooks: []registrationv1alpha1.Webhook{{
Name: "nomatch",
ClientConfig: ccfg("disallow"),
ClientConfig: newFakeHookClientConfig("disallow"),
Rules: []registrationv1alpha1.RuleWithOperations{{
Operations: []registrationv1alpha1.OperationType{registrationv1alpha1.Create},
}},
@ -177,8 +141,8 @@ func TestAdmit(t *testing.T) {
hookSource: fakeHookSource{
hooks: []registrationv1alpha1.Webhook{{
Name: "allow",
ClientConfig: ccfg("allow"),
Rules: matchEverythingRules,
ClientConfig: newFakeHookClientConfig("allow"),
Rules: newMatchEverythingRules(),
}},
},
expectAllow: true,
@ -187,8 +151,8 @@ func TestAdmit(t *testing.T) {
hookSource: fakeHookSource{
hooks: []registrationv1alpha1.Webhook{{
Name: "disallow",
ClientConfig: ccfg("disallow"),
Rules: matchEverythingRules,
ClientConfig: newFakeHookClientConfig("disallow"),
Rules: newMatchEverythingRules(),
}},
},
errorContains: "without explanation",
@ -197,8 +161,8 @@ func TestAdmit(t *testing.T) {
hookSource: fakeHookSource{
hooks: []registrationv1alpha1.Webhook{{
Name: "disallowReason",
ClientConfig: ccfg("disallowReason"),
Rules: matchEverythingRules,
ClientConfig: newFakeHookClientConfig("disallowReason"),
Rules: newMatchEverythingRules(),
}},
},
errorContains: "you shall not pass",
@ -207,18 +171,18 @@ func TestAdmit(t *testing.T) {
hookSource: fakeHookSource{
hooks: []registrationv1alpha1.Webhook{{
Name: "internalErr A",
ClientConfig: ccfg("internalErr"),
Rules: matchEverythingRules,
ClientConfig: newFakeHookClientConfig("internalErr"),
Rules: newMatchEverythingRules(),
FailurePolicy: &policyIgnore,
}, {
Name: "internalErr B",
ClientConfig: ccfg("internalErr"),
Rules: matchEverythingRules,
ClientConfig: newFakeHookClientConfig("internalErr"),
Rules: newMatchEverythingRules(),
FailurePolicy: &policyIgnore,
}, {
Name: "internalErr C",
ClientConfig: ccfg("internalErr"),
Rules: matchEverythingRules,
ClientConfig: newFakeHookClientConfig("internalErr"),
Rules: newMatchEverythingRules(),
FailurePolicy: &policyIgnore,
}},
},
@ -228,16 +192,16 @@ func TestAdmit(t *testing.T) {
hookSource: fakeHookSource{
hooks: []registrationv1alpha1.Webhook{{
Name: "internalErr A",
ClientConfig: ccfg("internalErr"),
Rules: matchEverythingRules,
ClientConfig: newFakeHookClientConfig("internalErr"),
Rules: newMatchEverythingRules(),
}, {
Name: "internalErr B",
ClientConfig: ccfg("internalErr"),
Rules: matchEverythingRules,
ClientConfig: newFakeHookClientConfig("internalErr"),
Rules: newMatchEverythingRules(),
}, {
Name: "internalErr C",
ClientConfig: ccfg("internalErr"),
Rules: matchEverythingRules,
ClientConfig: newFakeHookClientConfig("internalErr"),
Rules: newMatchEverythingRules(),
}},
},
expectAllow: false,
@ -246,18 +210,18 @@ func TestAdmit(t *testing.T) {
hookSource: fakeHookSource{
hooks: []registrationv1alpha1.Webhook{{
Name: "internalErr A",
ClientConfig: ccfg("internalErr"),
Rules: matchEverythingRules,
ClientConfig: newFakeHookClientConfig("internalErr"),
Rules: newMatchEverythingRules(),
FailurePolicy: &policyFail,
}, {
Name: "internalErr B",
ClientConfig: ccfg("internalErr"),
Rules: matchEverythingRules,
ClientConfig: newFakeHookClientConfig("internalErr"),
Rules: newMatchEverythingRules(),
FailurePolicy: &policyFail,
}, {
Name: "internalErr C",
ClientConfig: ccfg("internalErr"),
Rules: matchEverythingRules,
ClientConfig: newFakeHookClientConfig("internalErr"),
Rules: newMatchEverythingRules(),
FailurePolicy: &policyFail,
}},
},
@ -268,8 +232,6 @@ func TestAdmit(t *testing.T) {
for name, tt := range table {
t.Run(name, func(t *testing.T) {
wh.hookSource = &tt.hookSource
wh.serviceResolver = fakeServiceResolver{base: *serverURL}
wh.SetScheme(scheme)
err = wh.Admit(admission.NewAttributesRecord(&object, &oldObject, kind, namespace, name, resource, subResource, operation, &userInfo))
if tt.expectAllow != (err == nil) {
@ -288,6 +250,144 @@ func TestAdmit(t *testing.T) {
}
}
// TestAdmitCachedClient tests that GenericAdmissionWebhook#Admit should cache restClient
func TestAdmitCachedClient(t *testing.T) {
scheme := runtime.NewScheme()
v1alpha1.AddToScheme(scheme)
api.AddToScheme(scheme)
testServer := newTestServer(t)
testServer.StartTLS()
defer testServer.Close()
serverURL, err := url.ParseRequestURI(testServer.URL)
if err != nil {
t.Fatalf("this should never happen? %v", err)
}
wh, err := NewGenericAdmissionWebhook(nil)
if err != nil {
t.Fatal(err)
}
wh.authInfoResolver = newFakeAuthenticationInfoResolver()
wh.serviceResolver = fakeServiceResolver{base: *serverURL}
wh.SetScheme(scheme)
// Set up a test object for the call
kind := api.SchemeGroupVersion.WithKind("Pod")
name := "my-pod"
namespace := "webhook-test"
object := api.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"pod.name": name,
},
Name: name,
Namespace: namespace,
},
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
}
oldObject := api.Pod{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace},
}
operation := admission.Update
resource := api.Resource("pods").WithVersion("v1")
subResource := ""
userInfo := user.DefaultInfo{
Name: "webhook-test",
UID: "webhook-test",
}
type test struct {
name string
hookSource fakeHookSource
expectAllow bool
expectCache bool
}
policyIgnore := registrationv1alpha1.Ignore
cases := []test{
{
name: "cache 1",
hookSource: fakeHookSource{
hooks: []registrationv1alpha1.Webhook{{
Name: "cache1",
ClientConfig: newFakeHookClientConfig("allow"),
Rules: newMatchEverythingRules(),
FailurePolicy: &policyIgnore,
}},
},
expectAllow: true,
expectCache: true,
},
{
name: "cache 2",
hookSource: fakeHookSource{
hooks: []registrationv1alpha1.Webhook{{
Name: "cache2",
ClientConfig: newFakeHookClientConfig("internalErr"),
Rules: newMatchEverythingRules(),
FailurePolicy: &policyIgnore,
}},
},
expectAllow: true,
expectCache: true,
},
{
name: "cache 3",
hookSource: fakeHookSource{
hooks: []registrationv1alpha1.Webhook{{
Name: "cache3",
ClientConfig: newFakeHookClientConfig("allow"),
Rules: newMatchEverythingRules(),
FailurePolicy: &policyIgnore,
}},
},
expectAllow: true,
expectCache: false,
},
}
for _, testcase := range cases {
t.Run(testcase.name, func(t *testing.T) {
wh.hookSource = &testcase.hookSource
wh.authInfoResolver.(*fakeAuthenticationInfoResolver).cachedCount = 0
err = wh.Admit(admission.NewAttributesRecord(&object, &oldObject, kind, namespace, testcase.name, resource, subResource, operation, &userInfo))
if testcase.expectAllow != (err == nil) {
t.Errorf("expected allowed=%v, but got err=%v", testcase.expectAllow, err)
}
if testcase.expectCache && wh.authInfoResolver.(*fakeAuthenticationInfoResolver).cachedCount != 1 {
t.Errorf("expected cacheclient, but got none")
}
if !testcase.expectCache && wh.authInfoResolver.(*fakeAuthenticationInfoResolver).cachedCount != 0 {
t.Errorf("expected not cacheclient, but got cache")
}
})
}
}
func newTestServer(t *testing.T) *httptest.Server {
// Create the test webhook server
sCert, err := tls.X509KeyPair(serverCert, serverKey)
if err != nil {
t.Fatal(err)
}
rootCAs := x509.NewCertPool()
rootCAs.AppendCertsFromPEM(caCert)
testServer := httptest.NewUnstartedServer(http.HandlerFunc(webhookHandler))
testServer.TLS = &tls.Config{
Certificates: []tls.Certificate{sCert},
ClientCAs: rootCAs,
ClientAuth: tls.RequireAndVerifyClientCert,
}
return testServer
}
func webhookHandler(w http.ResponseWriter, r *http.Request) {
fmt.Printf("got req: %v\n", r.URL.Path)
switch r.URL.Path {
@ -330,11 +430,25 @@ func webhookHandler(w http.ResponseWriter, r *http.Request) {
}
}
func newFakeAuthenticationInfoResolver() *fakeAuthenticationInfoResolver {
return &fakeAuthenticationInfoResolver{
restConfig: &rest.Config{
TLSClientConfig: rest.TLSClientConfig{
CAData: caCert,
CertData: clientCert,
KeyData: clientKey,
},
},
}
}
type fakeAuthenticationInfoResolver struct {
restConfig *rest.Config
cachedCount int32
}
func (c *fakeAuthenticationInfoResolver) ClientConfigFor(server string) (*rest.Config, error) {
atomic.AddInt32(&c.cachedCount, 1)
return c.restConfig, nil
}
@ -386,3 +500,25 @@ func TestToStatusErr(t *testing.T) {
}
}
}
func newFakeHookClientConfig(urlPath string) registrationv1alpha1.WebhookClientConfig {
return registrationv1alpha1.WebhookClientConfig{
Service: registrationv1alpha1.ServiceReference{
Name: "webhook-test",
Namespace: "default",
},
URLPath: urlPath,
CABundle: caCert,
}
}
func newMatchEverythingRules() []registrationv1alpha1.RuleWithOperations {
return []registrationv1alpha1.RuleWithOperations{{
Operations: []registrationv1alpha1.OperationType{registrationv1alpha1.OperationAll},
Rule: registrationv1alpha1.Rule{
APIGroups: []string{"*"},
APIVersions: []string{"*"},
Resources: []string{"*/*"},
},
}}
}