Allow webhook authenticator to use TokenReviewsInterface

pull/6/head
Jordan Liggitt 2016-09-12 18:04:42 -04:00
parent dd6f8bfb31
commit e9914f2c4a
No known key found for this signature in database
GPG Key ID: 24E7ADF9A3B42012
4 changed files with 272 additions and 81 deletions

View File

@ -410,6 +410,11 @@ func IsServerTimeout(err error) bool {
return reasonForError(err) == unversioned.StatusReasonServerTimeout
}
// IsInternalError determines if err is an error which indicates an internal server error.
func IsInternalError(err error) bool {
return reasonForError(err) == unversioned.StatusReasonInternalError
}
// IsUnexpectedServerError returns true if the server response was not in the expected API format,
// and may be the result of another HTTP actor.
func IsUnexpectedServerError(err error) bool {

View File

@ -18,18 +18,17 @@ limitations under the License.
package webhook
import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/authentication"
_ "k8s.io/kubernetes/pkg/apis/authentication/install"
"k8s.io/kubernetes/pkg/apis/authentication/v1beta1"
"k8s.io/kubernetes/pkg/auth/authenticator"
"k8s.io/kubernetes/pkg/auth/user"
"k8s.io/kubernetes/pkg/client/restclient"
authenticationclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/authentication/unversioned"
"k8s.io/kubernetes/pkg/util/cache"
"k8s.io/kubernetes/plugin/pkg/webhook"
_ "k8s.io/kubernetes/pkg/apis/authentication/install"
)
var (
@ -42,55 +41,90 @@ const retryBackoff = 500 * time.Millisecond
var _ authenticator.Token = (*WebhookTokenAuthenticator)(nil)
type WebhookTokenAuthenticator struct {
*webhook.GenericWebhook
responseCache *cache.LRUExpireCache
ttl time.Duration
tokenReview authenticationclient.TokenReviewInterface
responseCache *cache.LRUExpireCache
ttl time.Duration
initialBackoff time.Duration
}
// NewFromInterface creates a webhook authenticator using the given tokenReview client
func NewFromInterface(tokenReview authenticationclient.TokenReviewInterface, ttl time.Duration) (*WebhookTokenAuthenticator, error) {
return newWithBackoff(tokenReview, ttl, retryBackoff)
}
// New creates a new WebhookTokenAuthenticator from the provided kubeconfig file.
func New(kubeConfigFile string, ttl time.Duration) (*WebhookTokenAuthenticator, error) {
return newWithBackoff(kubeConfigFile, ttl, retryBackoff)
}
// newWithBackoff allows tests to skip the sleep.
func newWithBackoff(kubeConfigFile string, ttl, initialBackoff time.Duration) (*WebhookTokenAuthenticator, error) {
gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions, initialBackoff)
tokenReview, err := tokenReviewInterfaceFromKubeconfig(kubeConfigFile)
if err != nil {
return nil, err
}
return &WebhookTokenAuthenticator{gw, cache.NewLRUExpireCache(1024), ttl}, nil
return newWithBackoff(tokenReview, ttl, retryBackoff)
}
// newWithBackoff allows tests to skip the sleep.
func newWithBackoff(tokenReview authenticationclient.TokenReviewInterface, ttl, initialBackoff time.Duration) (*WebhookTokenAuthenticator, error) {
return &WebhookTokenAuthenticator{tokenReview, cache.NewLRUExpireCache(1024), ttl, initialBackoff}, nil
}
// AuthenticateToken implements the authenticator.Token interface.
func (w *WebhookTokenAuthenticator) AuthenticateToken(token string) (user.Info, bool, error) {
r := &v1beta1.TokenReview{
Spec: v1beta1.TokenReviewSpec{Token: token},
r := &authentication.TokenReview{
Spec: authentication.TokenReviewSpec{Token: token},
}
if entry, ok := w.responseCache.Get(r.Spec); ok {
r.Status = entry.(v1beta1.TokenReviewStatus)
r.Status = entry.(authentication.TokenReviewStatus)
} else {
result := w.WithExponentialBackoff(func() restclient.Result {
return w.RestClient.Post().Body(r).Do()
var (
result *authentication.TokenReview
err error
)
webhook.WithExponentialBackoff(w.initialBackoff, func() error {
result, err = w.tokenReview.Create(r)
return err
})
if err := result.Error(); err != nil {
if err != nil {
return nil, false, err
}
var statusCode int
if result.StatusCode(&statusCode); statusCode < 200 || statusCode >= 300 {
return nil, false, fmt.Errorf("Error contacting webhook: %d", statusCode)
}
spec := r.Spec
if err := result.Into(r); err != nil {
return nil, false, err
}
w.responseCache.Add(spec, r.Status, w.ttl)
r.Status = result.Status
w.responseCache.Add(r.Spec, result.Status, w.ttl)
}
if !r.Status.Authenticated {
return nil, false, nil
}
var extra map[string][]string
if r.Status.User.Extra != nil {
extra = map[string][]string{}
for k, v := range r.Status.User.Extra {
extra[k] = v
}
}
return &user.DefaultInfo{
Name: r.Status.User.Username,
UID: r.Status.User.UID,
Groups: r.Status.User.Groups,
Extra: extra,
}, true, nil
}
// tokenReviewInterfaceFromKubeconfig builds a client from the specified kubeconfig file,
// and returns a TokenReviewInterface that uses that client. Note that the client submits TokenReview
// requests to the exact path specified in the kubeconfig file, so arbitrary non-API servers can be targeted.
func tokenReviewInterfaceFromKubeconfig(kubeConfigFile string) (authenticationclient.TokenReviewInterface, error) {
gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions, 0)
if err != nil {
return nil, err
}
return &tokenReviewClient{gw}, nil
}
type tokenReviewClient struct {
w *webhook.GenericWebhook
}
func (t *tokenReviewClient) Create(tokenReview *authentication.TokenReview) (*authentication.TokenReview, error) {
result := &authentication.TokenReview{}
err := t.w.RestClient.Post().Body(tokenReview).Do().Into(result)
return result, err
}

View File

@ -24,6 +24,7 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"os"
"reflect"
"testing"
@ -45,6 +46,7 @@ type Service interface {
// NewTestServer wraps a Service as an httptest.Server.
func NewTestServer(s Service, cert, key, caCert []byte) (*httptest.Server, error) {
const webhookPath = "/testserver"
var tlsConfig *tls.Config
if cert != nil {
cert, err := tls.X509KeyPair(cert, key)
@ -65,25 +67,51 @@ func NewTestServer(s Service, cert, key, caCert []byte) (*httptest.Server, error
}
serveHTTP := func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, fmt.Sprintf("unexpected method: %v", r.Method), http.StatusMethodNotAllowed)
return
}
if r.URL.Path != webhookPath {
http.Error(w, fmt.Sprintf("unexpected path: %v", r.URL.Path), http.StatusNotFound)
return
}
var review v1beta1.TokenReview
if err := json.NewDecoder(r.Body).Decode(&review); err != nil {
bodyData, _ := ioutil.ReadAll(r.Body)
if err := json.Unmarshal(bodyData, &review); err != nil {
http.Error(w, fmt.Sprintf("failed to decode body: %v", err), http.StatusBadRequest)
return
}
// ensure we received the serialized tokenreview as expected
if review.APIVersion != "authentication.k8s.io/v1beta1" {
http.Error(w, fmt.Sprintf("wrong api version: %s", string(bodyData)), http.StatusBadRequest)
return
}
// once we have a successful request, always call the review to record that we were called
s.Review(&review)
if s.HTTPStatusCode() < 200 || s.HTTPStatusCode() >= 300 {
http.Error(w, "HTTP Error", s.HTTPStatusCode())
return
}
s.Review(&review)
type userInfo struct {
Username string `json:"username"`
UID string `json:"uid"`
Groups []string `json:"groups"`
Username string `json:"username"`
UID string `json:"uid"`
Groups []string `json:"groups"`
Extra map[string][]string `json:"extra"`
}
type status struct {
Authenticated bool `json:"authenticated"`
User userInfo `json:"user"`
}
var extra map[string][]string
if review.Status.User.Extra != nil {
extra = map[string][]string{}
for k, v := range review.Status.User.Extra {
extra[k] = v
}
}
resp := struct {
APIVersion string `json:"apiVersion"`
Status status `json:"status"`
@ -95,6 +123,7 @@ func NewTestServer(s Service, cert, key, caCert []byte) (*httptest.Server, error
Username: review.Status.User.Username,
UID: review.Status.User.UID,
Groups: review.Status.User.Groups,
Extra: extra,
},
},
}
@ -105,6 +134,12 @@ func NewTestServer(s Service, cert, key, caCert []byte) (*httptest.Server, error
server := httptest.NewUnstartedServer(http.HandlerFunc(serveHTTP))
server.TLS = tlsConfig
server.StartTLS()
// Adjust the path to point to our custom path
serverURL, _ := url.Parse(server.URL)
serverURL.Path = webhookPath
server.URL = serverURL.String()
return server, nil
}
@ -112,9 +147,11 @@ func NewTestServer(s Service, cert, key, caCert []byte) (*httptest.Server, error
type mockService struct {
allow bool
statusCode int
called int
}
func (m *mockService) Review(r *v1beta1.TokenReview) {
m.called++
r.Status.Authenticated = m.allow
if m.allow {
r.Status.User.Username = "realHooman@email.com"
@ -148,7 +185,13 @@ func newTokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, c
if err := json.NewEncoder(tempfile).Encode(config); err != nil {
return nil, err
}
return newWithBackoff(p, cacheTime, 0)
c, err := tokenReviewInterfaceFromKubeconfig(p)
if err != nil {
return nil, err
}
return newWithBackoff(c, cacheTime, 0)
}
func TestTLSConfig(t *testing.T) {
@ -294,6 +337,7 @@ func TestWebhookTokenAuthenticator(t *testing.T) {
Username: "person@place.com",
UID: "abcd-1234",
Groups: []string{"stuff-dev", "main-eng"},
Extra: map[string]v1beta1.ExtraValue{"foo": {"bar", "baz"}},
},
},
expectedAuthenticated: true,
@ -301,6 +345,7 @@ func TestWebhookTokenAuthenticator(t *testing.T) {
Name: "person@place.com",
UID: "abcd-1234",
Groups: []string{"stuff-dev", "main-eng"},
Extra: map[string][]string{"foo": {"bar", "baz"}},
},
},
// Unauthenticated shouldn't even include extra provided info.
@ -345,7 +390,7 @@ func TestWebhookTokenAuthenticator(t *testing.T) {
i, authenticated, tt.expectedAuthenticated)
}
if user != nil && tt.expectedUser != nil && !reflect.DeepEqual(user, tt.expectedUser) {
t.Errorf("case %d: Plugin returned incorrect user. Got %v, expected %v",
t.Errorf("case %d: Plugin returned incorrect user. Got %#v, expected %#v",
i, user, tt.expectedUser)
}
}
@ -374,8 +419,9 @@ func (a *authenticationUserInfo) GetExtra() map[string][]string {
var _ user.Info = (*authenticationUserInfo)(nil)
// TestWebhookCache verifies that error responses from the server are not
// cached, but successful responses are.
func TestWebhookCache(t *testing.T) {
// cached, but successful responses are. It also ensures that the webhook
// call is retried on 429 and 500+ errors
func TestWebhookCacheAndRetry(t *testing.T) {
serv := new(mockService)
s, err := NewTestServer(serv, serverCert, serverKey, caCert)
if err != nil {
@ -388,36 +434,129 @@ func TestWebhookCache(t *testing.T) {
if err != nil {
t.Fatal(err)
}
token := "t0k3n"
serv.allow = true
serv.statusCode = 500
if _, _, err := wh.AuthenticateToken(token); err == nil {
t.Errorf("Webhook returned HTTP 500, but authorizer reported success.")
testcases := []struct {
description string
token string
allow bool
code int
expectError bool
expectOk bool
expectCalls int
}{
{
description: "t0k3n, 500 error, retries and fails",
token: "t0k3n",
allow: false,
code: 500,
expectError: true,
expectOk: false,
expectCalls: 5,
},
{
description: "t0k3n, 404 error, fails (but no retry)",
token: "t0k3n",
allow: false,
code: 404,
expectError: true,
expectOk: false,
expectCalls: 1,
},
{
description: "t0k3n, 200 response, allowed, succeeds with a single call",
token: "t0k3n",
allow: true,
code: 200,
expectError: false,
expectOk: true,
expectCalls: 1,
},
{
description: "t0k3n, 500 response, disallowed, but never called because previous 200 response was cached",
token: "t0k3n",
allow: false,
code: 500,
expectError: false,
expectOk: true,
expectCalls: 0,
},
{
description: "an0th3r_t0k3n, 500 response, disallowed, should be called again with retries",
token: "an0th3r_t0k3n",
allow: false,
code: 500,
expectError: true,
expectOk: false,
expectCalls: 5,
},
{
description: "an0th3r_t0k3n, 429 response, disallowed, should be called again with retries",
token: "an0th3r_t0k3n",
allow: false,
code: 429,
expectError: true,
expectOk: false,
expectCalls: 5,
},
{
description: "an0th3r_t0k3n, 200 response, allowed, succeeds with a single call",
token: "an0th3r_t0k3n",
allow: true,
code: 200,
expectError: false,
expectOk: true,
expectCalls: 1,
},
{
description: "an0th3r_t0k3n, 500 response, disallowed, but never called because previous 200 response was cached",
token: "an0th3r_t0k3n",
allow: false,
code: 500,
expectError: false,
expectOk: true,
expectCalls: 0,
},
}
serv.statusCode = 404
if _, _, err := wh.AuthenticateToken(token); err == nil {
t.Errorf("Webhook returned HTTP 404, but authorizer reported success.")
}
serv.statusCode = 200
if _, _, err := wh.AuthenticateToken(token); err != nil {
t.Errorf("Webhook returned HTTP 200, but authorizer reported unauthorized.")
}
serv.statusCode = 500
if _, _, err := wh.AuthenticateToken(token); err != nil {
t.Errorf("Webhook should have successful response cached, but authorizer reported unauthorized.")
}
// For a different request, webhook should be called again.
token = "an0th3r_t0k3n"
serv.statusCode = 500
if _, _, err := wh.AuthenticateToken(token); err == nil {
t.Errorf("Webhook returned HTTP 500, but authorizer reported success.")
}
serv.statusCode = 200
if _, _, err := wh.AuthenticateToken(token); err != nil {
t.Errorf("Webhook returned HTTP 200, but authorizer reported unauthorized.")
}
serv.statusCode = 500
if _, _, err := wh.AuthenticateToken(token); err != nil {
t.Errorf("Webhook should have successful response cached, but authorizer reported unauthorized.")
for _, testcase := range testcases {
func() {
serv.allow = testcase.allow
serv.statusCode = testcase.code
serv.called = 0
_, ok, err := wh.AuthenticateToken(testcase.token)
hasError := err != nil
if hasError != testcase.expectError {
t.Log(testcase.description)
t.Errorf("Webhook returned HTTP %d, expected error=%v, but got error %v", testcase.code, testcase.expectError, err)
}
if serv.called != testcase.expectCalls {
t.Log(testcase.description)
t.Errorf("Expected %d calls, got %d", testcase.expectCalls, serv.called)
}
if ok != testcase.expectOk {
t.Log(testcase.description)
t.Errorf("Expected ok=%v, got %v", testcase.expectOk, ok)
}
}()
}
}

View File

@ -22,6 +22,7 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/client/restclient"
@ -70,28 +71,40 @@ func NewGenericWebhook(kubeConfigFile string, groupVersions []unversioned.GroupV
return &GenericWebhook{restClient, initialBackoff}, nil
}
// WithExponentialBackoff will retry webhookFn 5 times w/ exponentially
// increasing backoff when a 429 or a 5xx response code is returned.
// WithExponentialBackoff will retry webhookFn() up to 5 times with exponentially increasing backoff when
// it returns an error for which apierrors.SuggestsClientDelay() or apierrors.IsInternalError() returns true.
func (g *GenericWebhook) WithExponentialBackoff(webhookFn func() restclient.Result) restclient.Result {
var result restclient.Result
WithExponentialBackoff(g.initialBackoff, func() error {
result = webhookFn()
return result.Error()
})
return result
}
// WithExponentialBackoff will retry webhookFn() up to 5 times with exponentially increasing backoff when
// it returns an error for which apierrors.SuggestsClientDelay() or apierrors.IsInternalError() returns true.
func WithExponentialBackoff(initialBackoff time.Duration, webhookFn func() error) error {
backoff := wait.Backoff{
Duration: g.initialBackoff,
Duration: initialBackoff,
Factor: 1.5,
Jitter: 0.2,
Steps: 5,
}
var result restclient.Result
var err error
wait.ExponentialBackoff(backoff, func() (bool, error) {
result = webhookFn()
// Return from Request.Do() errors immediately.
if err := result.Error(); err != nil {
return false, err
}
// Retry 429s, and 5xxs.
var statusCode int
if result.StatusCode(&statusCode); statusCode == 429 || statusCode >= 500 {
err = webhookFn()
if _, shouldRetry := apierrors.SuggestsClientDelay(err); shouldRetry {
return false, nil
}
if apierrors.IsInternalError(err) {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
})
return result
return err
}