Merge pull request #73030 from tnozicka/fix-csr-list-watch

Switch WaitForCertificate to informers to avoid broken watches
pull/564/head
Kubernetes Prow Robot 2019-02-07 01:45:33 -08:00 committed by GitHub
commit 1b26097e1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 34 additions and 52 deletions

View File

@ -254,7 +254,13 @@ func (s *csrSimulator) ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer s.lock.Unlock() defer s.lock.Unlock()
t := s.t t := s.t
t.Logf("Request %s %s %s", req.Method, req.URL, req.UserAgent()) // filter out timeouts as csrSimulator don't support them
q := req.URL.Query()
q.Del("timeout")
q.Del("timeoutSeconds")
req.URL.RawQuery = q.Encode()
t.Logf("Request %q %q %q", req.Method, req.URL, req.UserAgent())
if len(s.expectUserAgent) > 0 && req.UserAgent() != s.expectUserAgent { if len(s.expectUserAgent) > 0 && req.UserAgent() != s.expectUserAgent {
t.Errorf("Unexpected user agent: %s", req.UserAgent()) t.Errorf("Unexpected user agent: %s", req.UserAgent())
@ -305,7 +311,7 @@ func (s *csrSimulator) ServeHTTP(w http.ResponseWriter, req *http.Request) {
} }
s.csr = csr s.csr = csr
case req.Method == "GET" && req.URL.Path == "/apis/certificates.k8s.io/v1beta1/certificatesigningrequests" && req.URL.RawQuery == "fieldSelector=metadata.name%3Dtest-csr&limit=500": case req.Method == "GET" && req.URL.Path == "/apis/certificates.k8s.io/v1beta1/certificatesigningrequests" && req.URL.RawQuery == "fieldSelector=metadata.name%3Dtest-csr&limit=500&resourceVersion=0":
if s.csr == nil { if s.csr == nil {
t.Fatalf("no csr") t.Fatalf("no csr")
} }

View File

@ -20,7 +20,6 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library",
], ],

View File

@ -40,9 +40,9 @@ import (
"k8s.io/client-go/util/certificate/csr" "k8s.io/client-go/util/certificate/csr"
) )
// certificateWaitBackoff controls the amount and timing of retries when the // certificateWaitTimeout controls the amount of time we wait for certificate
// watch for certificate approval is interrupted. // approval in one iteration.
var certificateWaitBackoff = wait.Backoff{Duration: 30 * time.Second, Steps: 4, Factor: 1.5, Jitter: 0.1} var certificateWaitTimeout = 15 * time.Minute
// Manager maintains and updates the certificates in use by this certificate // Manager maintains and updates the certificates in use by this certificate
// manager. In the background it communicates with the API server to get new // manager. In the background it communicates with the API server to get new
@ -388,29 +388,10 @@ func (m *manager) rotateCerts() (bool, error) {
// Once we've successfully submitted a CSR for this template, record that we did so // Once we've successfully submitted a CSR for this template, record that we did so
m.setLastRequest(template) m.setLastRequest(template)
// Wait for the certificate to be signed. Instead of one long watch, we retry with slightly longer // Wait for the certificate to be signed. This interface and internal timout
// intervals each time in order to tolerate failures from the server AND to preserve the liveliness // is a remainder after the old design using raw watch wrapped with backoff.
// of the cert manager loop. This creates slightly more traffic against the API server in return crtPEM, err := csr.WaitForCertificate(client, req, certificateWaitTimeout)
// for bounding the amount of time we wait when a certificate expires. if err != nil {
var crtPEM []byte
watchDuration := time.Minute
if err := wait.ExponentialBackoff(certificateWaitBackoff, func() (bool, error) {
data, err := csr.WaitForCertificate(client, req, watchDuration)
switch {
case err == nil:
crtPEM = data
return true, nil
case err == wait.ErrWaitTimeout:
watchDuration += time.Minute
if watchDuration > 5*time.Minute {
watchDuration = 5 * time.Minute
}
return false, nil
default:
utilruntime.HandleError(fmt.Errorf("Unable to check certificate signing status: %v", err))
return false, m.updateServerError(err)
}
}); err != nil {
utilruntime.HandleError(fmt.Errorf("Certificate request was not signed: %v", err)) utilruntime.HandleError(fmt.Errorf("Certificate request was not signed: %v", err))
return false, nil return false, nil
} }

View File

@ -31,7 +31,6 @@ import (
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
watch "k8s.io/apimachinery/pkg/watch" watch "k8s.io/apimachinery/pkg/watch"
certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1" certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
) )
@ -433,7 +432,8 @@ func TestRotateCertWaitingForResultError(t *testing.T) {
}, },
} }
certificateWaitBackoff = wait.Backoff{Steps: 1} defer func(t time.Duration) { certificateWaitTimeout = t }(certificateWaitTimeout)
certificateWaitTimeout = 1 * time.Millisecond
if success, err := m.rotateCerts(); success { if success, err := m.rotateCerts(); success {
t.Errorf("Got success from 'rotateCerts', wanted failure.") t.Errorf("Got success from 'rotateCerts', wanted failure.")
} else if err != nil { } else if err != nil {
@ -880,15 +880,6 @@ func TestServerHealth(t *testing.T) {
expectRotateFail: true, expectRotateFail: true,
expectHealthy: true, expectHealthy: true,
}, },
{
description: "Conflict error on watch",
certs: currentCerts,
failureType: watchError,
clientErr: errors.NewGenericServerResponse(409, "POST", schema.GroupResource{}, "", "", 0, false),
expectRotateFail: true,
expectHealthy: false,
},
} }
for _, tc := range testCases { for _, tc := range testCases {

View File

@ -17,6 +17,7 @@ limitations under the License.
package csr package csr
import ( import (
"context"
"crypto" "crypto"
"crypto/x509" "crypto/x509"
"encoding/pem" "encoding/pem"
@ -83,19 +84,23 @@ func RequestCertificate(client certificatesclient.CertificateSigningRequestInter
// WaitForCertificate waits for a certificate to be issued until timeout, or returns an error. // WaitForCertificate waits for a certificate to be issued until timeout, or returns an error.
func WaitForCertificate(client certificatesclient.CertificateSigningRequestInterface, req *certificates.CertificateSigningRequest, timeout time.Duration) (certData []byte, err error) { func WaitForCertificate(client certificatesclient.CertificateSigningRequestInterface, req *certificates.CertificateSigningRequest, timeout time.Duration) (certData []byte, err error) {
fieldSelector := fields.OneTermEqualSelector("metadata.name", req.Name).String() fieldSelector := fields.OneTermEqualSelector("metadata.name", req.Name).String()
lw := &cache.ListWatch{
event, err := watchtools.ListWatchUntil( ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
timeout, options.FieldSelector = fieldSelector
&cache.ListWatch{ return client.List(options)
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
return client.List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
return client.Watch(options)
},
}, },
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
return client.Watch(options)
},
}
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
event, err := watchtools.UntilWithSync(
ctx,
lw,
&certificates.CertificateSigningRequest{},
nil,
func(event watch.Event) (bool, error) { func(event watch.Event) (bool, error) {
switch event.Type { switch event.Type {
case watch.Modified, watch.Added: case watch.Modified, watch.Added: