mirror of https://github.com/k3s-io/k3s
Merge pull request #27993 from cjcullen/webhook
Automatic merge from submit-queue Add retries on 429s & 5xxs to the Auth webhooks This gets some retry logic above the restclient (as proposed in https://github.com/kubernetes/kubernetes/issues/27965#issuecomment-228188192). I'd like to add a unit test, but I'm not coming with a good way without either gross stubbing or making a few fields in restclient.Result public. Any thoughts?pull/6/head
commit
8571bb1e2c
|
@ -25,6 +25,7 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/apis/authentication.k8s.io/v1beta1"
|
"k8s.io/kubernetes/pkg/apis/authentication.k8s.io/v1beta1"
|
||||||
"k8s.io/kubernetes/pkg/auth/authenticator"
|
"k8s.io/kubernetes/pkg/auth/authenticator"
|
||||||
"k8s.io/kubernetes/pkg/auth/user"
|
"k8s.io/kubernetes/pkg/auth/user"
|
||||||
|
"k8s.io/kubernetes/pkg/client/restclient"
|
||||||
"k8s.io/kubernetes/pkg/util/cache"
|
"k8s.io/kubernetes/pkg/util/cache"
|
||||||
"k8s.io/kubernetes/plugin/pkg/webhook"
|
"k8s.io/kubernetes/plugin/pkg/webhook"
|
||||||
|
|
||||||
|
@ -35,6 +36,8 @@ var (
|
||||||
groupVersions = []unversioned.GroupVersion{v1beta1.SchemeGroupVersion}
|
groupVersions = []unversioned.GroupVersion{v1beta1.SchemeGroupVersion}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const retryBackoff = 500 * time.Millisecond
|
||||||
|
|
||||||
// Ensure WebhookTokenAuthenticator implements the authenticator.Token interface.
|
// Ensure WebhookTokenAuthenticator implements the authenticator.Token interface.
|
||||||
var _ authenticator.Token = (*WebhookTokenAuthenticator)(nil)
|
var _ authenticator.Token = (*WebhookTokenAuthenticator)(nil)
|
||||||
|
|
||||||
|
@ -46,7 +49,12 @@ type WebhookTokenAuthenticator struct {
|
||||||
|
|
||||||
// New creates a new WebhookTokenAuthenticator from the provided kubeconfig file.
|
// New creates a new WebhookTokenAuthenticator from the provided kubeconfig file.
|
||||||
func New(kubeConfigFile string, ttl time.Duration) (*WebhookTokenAuthenticator, error) {
|
func New(kubeConfigFile string, ttl time.Duration) (*WebhookTokenAuthenticator, error) {
|
||||||
gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions)
|
return newWithBackoff(kubeConfigFile, ttl, retryBackoff)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newWithBackoff allows tests to skip the sleep.
|
||||||
|
func newWithBackoff(kubeConfigFile string, ttl, initialBackoff time.Duration) (*WebhookTokenAuthenticator, error) {
|
||||||
|
gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions, initialBackoff)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -61,7 +69,9 @@ func (w *WebhookTokenAuthenticator) AuthenticateToken(token string) (user.Info,
|
||||||
if entry, ok := w.responseCache.Get(r.Spec); ok {
|
if entry, ok := w.responseCache.Get(r.Spec); ok {
|
||||||
r.Status = entry.(v1beta1.TokenReviewStatus)
|
r.Status = entry.(v1beta1.TokenReviewStatus)
|
||||||
} else {
|
} else {
|
||||||
result := w.RestClient.Post().Body(r).Do()
|
result := w.WithExponentialBackoff(func() restclient.Result {
|
||||||
|
return w.RestClient.Post().Body(r).Do()
|
||||||
|
})
|
||||||
if err := result.Error(); err != nil {
|
if err := result.Error(); err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -148,7 +148,7 @@ func newTokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, c
|
||||||
if err := json.NewEncoder(tempfile).Encode(config); err != nil {
|
if err := json.NewEncoder(tempfile).Encode(config); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return New(p, cacheTime)
|
return newWithBackoff(p, cacheTime, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTLSConfig(t *testing.T) {
|
func TestTLSConfig(t *testing.T) {
|
||||||
|
|
|
@ -26,6 +26,7 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/apis/authorization/v1beta1"
|
"k8s.io/kubernetes/pkg/apis/authorization/v1beta1"
|
||||||
"k8s.io/kubernetes/pkg/auth/authorizer"
|
"k8s.io/kubernetes/pkg/auth/authorizer"
|
||||||
|
"k8s.io/kubernetes/pkg/client/restclient"
|
||||||
"k8s.io/kubernetes/pkg/util/cache"
|
"k8s.io/kubernetes/pkg/util/cache"
|
||||||
"k8s.io/kubernetes/plugin/pkg/webhook"
|
"k8s.io/kubernetes/plugin/pkg/webhook"
|
||||||
|
|
||||||
|
@ -36,6 +37,8 @@ var (
|
||||||
groupVersions = []unversioned.GroupVersion{v1beta1.SchemeGroupVersion}
|
groupVersions = []unversioned.GroupVersion{v1beta1.SchemeGroupVersion}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const retryBackoff = 500 * time.Millisecond
|
||||||
|
|
||||||
// Ensure Webhook implements the authorizer.Authorizer interface.
|
// Ensure Webhook implements the authorizer.Authorizer interface.
|
||||||
var _ authorizer.Authorizer = (*WebhookAuthorizer)(nil)
|
var _ authorizer.Authorizer = (*WebhookAuthorizer)(nil)
|
||||||
|
|
||||||
|
@ -67,7 +70,12 @@ type WebhookAuthorizer struct {
|
||||||
// For additional HTTP configuration, refer to the kubeconfig documentation
|
// For additional HTTP configuration, refer to the kubeconfig documentation
|
||||||
// http://kubernetes.io/v1.1/docs/user-guide/kubeconfig-file.html.
|
// http://kubernetes.io/v1.1/docs/user-guide/kubeconfig-file.html.
|
||||||
func New(kubeConfigFile string, authorizedTTL, unauthorizedTTL time.Duration) (*WebhookAuthorizer, error) {
|
func New(kubeConfigFile string, authorizedTTL, unauthorizedTTL time.Duration) (*WebhookAuthorizer, error) {
|
||||||
gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions)
|
return newWithBackoff(kubeConfigFile, authorizedTTL, unauthorizedTTL, retryBackoff)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newWithBackoff allows tests to skip the sleep.
|
||||||
|
func newWithBackoff(kubeConfigFile string, authorizedTTL, unauthorizedTTL, initialBackoff time.Duration) (*WebhookAuthorizer, error) {
|
||||||
|
gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions, initialBackoff)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -148,7 +156,9 @@ func (w *WebhookAuthorizer) Authorize(attr authorizer.Attributes) (err error) {
|
||||||
if entry, ok := w.responseCache.Get(string(key)); ok {
|
if entry, ok := w.responseCache.Get(string(key)); ok {
|
||||||
r.Status = entry.(v1beta1.SubjectAccessReviewStatus)
|
r.Status = entry.(v1beta1.SubjectAccessReviewStatus)
|
||||||
} else {
|
} else {
|
||||||
result := w.RestClient.Post().Body(r).Do()
|
result := w.WithExponentialBackoff(func() restclient.Result {
|
||||||
|
return w.RestClient.Post().Body(r).Do()
|
||||||
|
})
|
||||||
if err := result.Error(); err != nil {
|
if err := result.Error(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -183,7 +183,7 @@ current-context: default
|
||||||
return fmt.Errorf("failed to execute test template: %v", err)
|
return fmt.Errorf("failed to execute test template: %v", err)
|
||||||
}
|
}
|
||||||
// Create a new authorizer
|
// Create a new authorizer
|
||||||
_, err = New(p, 0, 0)
|
_, err = newWithBackoff(p, 0, 0, 0)
|
||||||
return err
|
return err
|
||||||
}()
|
}()
|
||||||
if err != nil && !tt.wantErr {
|
if err != nil && !tt.wantErr {
|
||||||
|
@ -291,7 +291,7 @@ func newAuthorizer(callbackURL string, clientCert, clientKey, ca []byte, cacheTi
|
||||||
if err := json.NewEncoder(tempfile).Encode(config); err != nil {
|
if err := json.NewEncoder(tempfile).Encode(config); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return New(p, cacheTime, cacheTime)
|
return newWithBackoff(p, cacheTime, cacheTime, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTLSConfig(t *testing.T) {
|
func TestTLSConfig(t *testing.T) {
|
||||||
|
|
|
@ -19,6 +19,7 @@ package webhook
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
|
@ -27,16 +28,18 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
runtimeserializer "k8s.io/kubernetes/pkg/runtime/serializer"
|
runtimeserializer "k8s.io/kubernetes/pkg/runtime/serializer"
|
||||||
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
|
|
||||||
_ "k8s.io/kubernetes/pkg/apis/authorization/install"
|
_ "k8s.io/kubernetes/pkg/apis/authorization/install"
|
||||||
)
|
)
|
||||||
|
|
||||||
type GenericWebhook struct {
|
type GenericWebhook struct {
|
||||||
RestClient *restclient.RESTClient
|
RestClient *restclient.RESTClient
|
||||||
|
initialBackoff time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new GenericWebhook from the provided kubeconfig file.
|
// New creates a new GenericWebhook from the provided kubeconfig file.
|
||||||
func NewGenericWebhook(kubeConfigFile string, groupVersions []unversioned.GroupVersion) (*GenericWebhook, error) {
|
func NewGenericWebhook(kubeConfigFile string, groupVersions []unversioned.GroupVersion, initialBackoff time.Duration) (*GenericWebhook, error) {
|
||||||
for _, groupVersion := range groupVersions {
|
for _, groupVersion := range groupVersions {
|
||||||
if !registered.IsEnabledVersion(groupVersion) {
|
if !registered.IsEnabledVersion(groupVersion) {
|
||||||
return nil, fmt.Errorf("webhook plugin requires enabling extension resource: %s", groupVersion)
|
return nil, fmt.Errorf("webhook plugin requires enabling extension resource: %s", groupVersion)
|
||||||
|
@ -64,5 +67,31 @@ func NewGenericWebhook(kubeConfigFile string, groupVersions []unversioned.GroupV
|
||||||
|
|
||||||
// TODO(ericchiang): Can we ensure remote service is reachable?
|
// TODO(ericchiang): Can we ensure remote service is reachable?
|
||||||
|
|
||||||
return &GenericWebhook{restClient}, nil
|
return &GenericWebhook{restClient, initialBackoff}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithExponentialBackoff will retry webhookFn 5 times w/ exponentially
|
||||||
|
// increasing backoff when a 429 or a 5xx response code is returned.
|
||||||
|
func (g *GenericWebhook) WithExponentialBackoff(webhookFn func() restclient.Result) restclient.Result {
|
||||||
|
backoff := wait.Backoff{
|
||||||
|
Duration: g.initialBackoff,
|
||||||
|
Factor: 1.5,
|
||||||
|
Jitter: 0.2,
|
||||||
|
Steps: 5,
|
||||||
|
}
|
||||||
|
var result restclient.Result
|
||||||
|
wait.ExponentialBackoff(backoff, func() (bool, error) {
|
||||||
|
result = webhookFn()
|
||||||
|
// Return from Request.Do() errors immediately.
|
||||||
|
if err := result.Error(); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
// Retry 429s, and 5xxs.
|
||||||
|
var statusCode int
|
||||||
|
if result.StatusCode(&statusCode); statusCode == 429 || statusCode >= 500 {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue