mirror of https://github.com/k3s-io/k3s
Merge pull request #53037 from smarterclayton/verify_client_cert
Automatic merge from submit-queue (batch tested with PRs 53978, 54008, 53037). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Verify the bootstrap client cert before using itpull/6/head
commit
d91e97f243
|
@ -342,7 +342,9 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager); err != nil {
|
// we set exitIfExpired to true because we use this client configuration to request new certs - if we are unable
|
||||||
|
// to request new certs, we will be unable to continue normal operation
|
||||||
|
if err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, true); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,14 +22,16 @@ go_library(
|
||||||
srcs = ["bootstrap.go"],
|
srcs = ["bootstrap.go"],
|
||||||
importpath = "k8s.io/kubernetes/pkg/kubelet/certificate/bootstrap",
|
importpath = "k8s.io/kubernetes/pkg/kubelet/certificate/bootstrap",
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/kubelet/util/csr:go_default_library",
|
|
||||||
"//vendor/github.com/golang/glog:go_default_library",
|
"//vendor/github.com/golang/glog:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
|
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/tools/clientcmd/api:go_default_library",
|
"//vendor/k8s.io/client-go/tools/clientcmd/api:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/transport:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/util/cert:go_default_library",
|
"//vendor/k8s.io/client-go/util/cert:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/util/certificate/csr:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -20,16 +20,19 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
certificates "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
|
certificates "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
"k8s.io/client-go/tools/clientcmd"
|
"k8s.io/client-go/tools/clientcmd"
|
||||||
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
|
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
|
||||||
|
"k8s.io/client-go/transport"
|
||||||
certutil "k8s.io/client-go/util/cert"
|
certutil "k8s.io/client-go/util/cert"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/csr"
|
"k8s.io/client-go/util/certificate/csr"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -42,17 +45,15 @@ const (
|
||||||
// On success, a kubeconfig file referencing the generated key and obtained certificate is written to kubeconfigPath.
|
// On success, a kubeconfig file referencing the generated key and obtained certificate is written to kubeconfigPath.
|
||||||
// The certificate and key file are stored in certDir.
|
// The certificate and key file are stored in certDir.
|
||||||
func LoadClientCert(kubeconfigPath string, bootstrapPath string, certDir string, nodeName types.NodeName) error {
|
func LoadClientCert(kubeconfigPath string, bootstrapPath string, certDir string, nodeName types.NodeName) error {
|
||||||
// Short-circuit if the kubeconfig file already exists.
|
// Short-circuit if the kubeconfig file exists and is valid.
|
||||||
// TODO: inspect the kubeconfig, ensure a rest client can be built from it, verify client cert expiration, etc.
|
ok, err := verifyBootstrapClientConfig(kubeconfigPath)
|
||||||
_, err := os.Stat(kubeconfigPath)
|
if err != nil {
|
||||||
if err == nil {
|
|
||||||
glog.V(2).Infof("Kubeconfig %s exists, skipping bootstrap", kubeconfigPath)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if !os.IsNotExist(err) {
|
|
||||||
glog.Errorf("Error reading kubeconfig %s, skipping bootstrap: %v", kubeconfigPath, err)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if ok {
|
||||||
|
glog.V(2).Infof("Kubeconfig %s exists and is valid, skipping bootstrap", kubeconfigPath)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
glog.V(2).Info("Using bootstrap kubeconfig to generate TLS client cert, key and kubeconfig file")
|
glog.V(2).Info("Using bootstrap kubeconfig to generate TLS client cert, key and kubeconfig file")
|
||||||
|
|
||||||
|
@ -72,6 +73,17 @@ func LoadClientCert(kubeconfigPath string, bootstrapPath string, certDir string,
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to build bootstrap key path: %v", err)
|
return fmt.Errorf("unable to build bootstrap key path: %v", err)
|
||||||
}
|
}
|
||||||
|
// If we are unable to generate a CSR, we remove our key file and start fresh.
|
||||||
|
// This method is used before enabling client rotation and so we must ensure we
|
||||||
|
// can make forward progress if we crash and exit when a CSR exists but the cert
|
||||||
|
// it is signed for has expired.
|
||||||
|
defer func() {
|
||||||
|
if !success {
|
||||||
|
if err := os.Remove(keyPath); err != nil && !os.IsNotExist(err) {
|
||||||
|
glog.Warningf("Cannot clean up the key file %q: %v", keyPath, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
keyData, _, err := certutil.LoadOrGenerateKeyFile(keyPath)
|
keyData, _, err := certutil.LoadOrGenerateKeyFile(keyPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -82,6 +94,13 @@ func LoadClientCert(kubeconfigPath string, bootstrapPath string, certDir string,
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to build bootstrap client cert path: %v", err)
|
return fmt.Errorf("unable to build bootstrap client cert path: %v", err)
|
||||||
}
|
}
|
||||||
|
defer func() {
|
||||||
|
if !success {
|
||||||
|
if err := os.Remove(certPath); err != nil && !os.IsNotExist(err) {
|
||||||
|
glog.Warningf("Cannot clean up the cert file %q: %v", certPath, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
certData, err := csr.RequestNodeCertificate(bootstrapClient.CertificateSigningRequests(), keyData, nodeName)
|
certData, err := csr.RequestNodeCertificate(bootstrapClient.CertificateSigningRequests(), keyData, nodeName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -89,13 +108,6 @@ func LoadClientCert(kubeconfigPath string, bootstrapPath string, certDir string,
|
||||||
if err := certutil.WriteCert(certPath, certData); err != nil {
|
if err := certutil.WriteCert(certPath, certData); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer func() {
|
|
||||||
if !success {
|
|
||||||
if err := os.Remove(certPath); err != nil {
|
|
||||||
glog.Warningf("Cannot clean up the cert file %q: %v", certPath, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Get the CA data from the bootstrap client config.
|
// Get the CA data from the bootstrap client config.
|
||||||
caFile, caData := bootstrapClientConfig.CAFile, []byte{}
|
caFile, caData := bootstrapClientConfig.CAFile, []byte{}
|
||||||
|
@ -150,3 +162,48 @@ func loadRESTClientConfig(kubeconfig string) (*restclient.Config, error) {
|
||||||
loader,
|
loader,
|
||||||
).ClientConfig()
|
).ClientConfig()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// verifyBootstrapClientConfig checks the provided kubeconfig to see if it has a valid
|
||||||
|
// client certificate. It returns true if the kubeconfig is valid, or an error if bootstrapping
|
||||||
|
// should stop immediately.
|
||||||
|
func verifyBootstrapClientConfig(kubeconfigPath string) (bool, error) {
|
||||||
|
_, err := os.Stat(kubeconfigPath)
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("error reading existing bootstrap kubeconfig %s: %v", kubeconfigPath, err)
|
||||||
|
}
|
||||||
|
bootstrapClientConfig, err := loadRESTClientConfig(kubeconfigPath)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Unable to read existing bootstrap client config: %v", err))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
transportConfig, err := bootstrapClientConfig.TransportConfig()
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Unable to load transport configuration from existing bootstrap client config: %v", err))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
// has side effect of populating transport config data fields
|
||||||
|
if _, err := transport.TLSConfigFor(transportConfig); err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Unable to load TLS configuration from existing bootstrap client config: %v", err))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
certs, err := certutil.ParseCertsPEM(transportConfig.TLS.CertData)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Unable to load TLS certificates from existing bootstrap client config: %v", err))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if len(certs) == 0 {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Unable to read TLS certificates from existing bootstrap client config: %v", err))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
now := time.Now()
|
||||||
|
for _, cert := range certs {
|
||||||
|
if now.After(cert.NotAfter) {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Part of the existing bootstrap client certificate is expired: %s", cert.NotAfter))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
|
@ -45,13 +45,13 @@ import (
|
||||||
//
|
//
|
||||||
// stopCh should be used to indicate when the transport is unused and doesn't need
|
// stopCh should be used to indicate when the transport is unused and doesn't need
|
||||||
// to continue checking the manager.
|
// to continue checking the manager.
|
||||||
func UpdateTransport(stopCh <-chan struct{}, clientConfig *restclient.Config, clientCertificateManager certificate.Manager) error {
|
func UpdateTransport(stopCh <-chan struct{}, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitIfExpired bool) error {
|
||||||
return updateTransport(stopCh, 10*time.Second, clientConfig, clientCertificateManager)
|
return updateTransport(stopCh, 10*time.Second, clientConfig, clientCertificateManager, exitIfExpired)
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateTransport is an internal method that exposes how often this method checks that the
|
// updateTransport is an internal method that exposes how often this method checks that the
|
||||||
// client cert has changed. Intended for testing.
|
// client cert has changed. Intended for testing.
|
||||||
func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager) error {
|
func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitIfExpired bool) error {
|
||||||
if clientConfig.Transport != nil {
|
if clientConfig.Transport != nil {
|
||||||
return fmt.Errorf("there is already a transport configured")
|
return fmt.Errorf("there is already a transport configured")
|
||||||
}
|
}
|
||||||
|
@ -80,6 +80,13 @@ func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig
|
||||||
lastCert := clientCertificateManager.Current()
|
lastCert := clientCertificateManager.Current()
|
||||||
go wait.Until(func() {
|
go wait.Until(func() {
|
||||||
curr := clientCertificateManager.Current()
|
curr := clientCertificateManager.Current()
|
||||||
|
if exitIfExpired && curr != nil && time.Now().After(curr.Leaf.NotAfter) {
|
||||||
|
if clientCertificateManager.ServerHealthy() {
|
||||||
|
glog.Fatalf("The currently active client certificate has expired and the server is responsive, exiting.")
|
||||||
|
} else {
|
||||||
|
glog.Errorf("The currently active client certificate has expired, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.")
|
||||||
|
}
|
||||||
|
}
|
||||||
if curr == nil || lastCert == curr {
|
if curr == nil || lastCert == curr {
|
||||||
// Cert hasn't been rotated.
|
// Cert hasn't been rotated.
|
||||||
return
|
return
|
||||||
|
|
|
@ -114,13 +114,16 @@ func newCertificateData(certificatePEM string, keyPEM string) *certificateData {
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakeManager struct {
|
type fakeManager struct {
|
||||||
cert atomic.Value // Always a *tls.Certificate
|
cert atomic.Value // Always a *tls.Certificate
|
||||||
|
healthy bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeManager) SetCertificateSigningRequestClient(certificatesclient.CertificateSigningRequestInterface) error {
|
func (f *fakeManager) SetCertificateSigningRequestClient(certificatesclient.CertificateSigningRequestInterface) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fakeManager) ServerHealthy() bool { return f.healthy }
|
||||||
|
|
||||||
func (f *fakeManager) Start() {}
|
func (f *fakeManager) Start() {}
|
||||||
|
|
||||||
func (f *fakeManager) Current() *tls.Certificate {
|
func (f *fakeManager) Current() *tls.Certificate {
|
||||||
|
@ -184,7 +187,7 @@ func TestRotateShutsDownConnections(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for a new cert every 10 milliseconds
|
// Check for a new cert every 10 milliseconds
|
||||||
if err := updateTransport(stop, 10*time.Millisecond, c, m); err != nil {
|
if err := updateTransport(stop, 10*time.Millisecond, c, m, false); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,7 +53,6 @@ filegroup(
|
||||||
srcs = [
|
srcs = [
|
||||||
":package-srcs",
|
":package-srcs",
|
||||||
"//pkg/kubelet/util/cache:all-srcs",
|
"//pkg/kubelet/util/cache:all-srcs",
|
||||||
"//pkg/kubelet/util/csr:all-srcs",
|
|
||||||
"//pkg/kubelet/util/format:all-srcs",
|
"//pkg/kubelet/util/format:all-srcs",
|
||||||
"//pkg/kubelet/util/ioutils:all-srcs",
|
"//pkg/kubelet/util/ioutils:all-srcs",
|
||||||
"//pkg/kubelet/util/queue:all-srcs",
|
"//pkg/kubelet/util/queue:all-srcs",
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
"k8s.io/client-go/tools/pager"
|
"k8s.io/client-go/tools/pager"
|
||||||
|
@ -103,6 +104,8 @@ func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error)
|
||||||
return lw.WatchFunc(options)
|
return lw.WatchFunc(options)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
|
||||||
|
// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
|
||||||
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
|
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
|
||||||
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) {
|
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) {
|
||||||
if len(conditions) == 0 {
|
if len(conditions) == 0 {
|
||||||
|
@ -166,5 +169,10 @@ func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return watch.Until(timeout, watchInterface, remainingConditions...)
|
evt, err := watch.Until(timeout, watchInterface, remainingConditions...)
|
||||||
|
if err == watch.ErrWatchClosed {
|
||||||
|
// present a consistent error interface to callers
|
||||||
|
err = wait.ErrWaitTimeout
|
||||||
|
}
|
||||||
|
return evt, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,10 @@ go_test(
|
||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
deps = [
|
deps = [
|
||||||
"//vendor/k8s.io/api/certificates/v1beta1:go_default_library",
|
"//vendor/k8s.io/api/certificates/v1beta1:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/util/cert:go_default_library",
|
"//vendor/k8s.io/client-go/util/cert:go_default_library",
|
||||||
|
@ -37,12 +40,12 @@ go_library(
|
||||||
deps = [
|
deps = [
|
||||||
"//vendor/github.com/golang/glog:go_default_library",
|
"//vendor/github.com/golang/glog:go_default_library",
|
||||||
"//vendor/k8s.io/api/certificates/v1beta1:go_default_library",
|
"//vendor/k8s.io/api/certificates/v1beta1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
|
||||||
"//vendor/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/util/cert:go_default_library",
|
"//vendor/k8s.io/client-go/util/cert:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/util/certificate/csr:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -55,7 +58,10 @@ filegroup(
|
||||||
|
|
||||||
filegroup(
|
filegroup(
|
||||||
name = "all-srcs",
|
name = "all-srcs",
|
||||||
srcs = [":package-srcs"],
|
srcs = [
|
||||||
|
":package-srcs",
|
||||||
|
"//staging/src/k8s.io/client-go/util/certificate/csr:all-srcs",
|
||||||
|
],
|
||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
)
|
)
|
||||||
|
|
|
@ -30,14 +30,18 @@ import (
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
|
||||||
certificates "k8s.io/api/certificates/v1beta1"
|
certificates "k8s.io/api/certificates/v1beta1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
|
||||||
certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
|
certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
|
||||||
"k8s.io/client-go/util/cert"
|
"k8s.io/client-go/util/cert"
|
||||||
|
"k8s.io/client-go/util/certificate/csr"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// certificateWaitBackoff controls the amount and timing of retries when the
|
||||||
|
// watch for certificate approval is interrupted.
|
||||||
|
var certificateWaitBackoff = wait.Backoff{Duration: 30 * time.Second, Steps: 4, Factor: 1.5, Jitter: 0.1}
|
||||||
|
|
||||||
// Manager maintains and updates the certificates in use by this certificate
|
// Manager maintains and updates the certificates in use by this certificate
|
||||||
// manager. In the background it communicates with the API server to get new
|
// manager. In the background it communicates with the API server to get new
|
||||||
// certificates for certificates about to expire.
|
// certificates for certificates about to expire.
|
||||||
|
@ -51,6 +55,12 @@ type Manager interface {
|
||||||
// certificate manager, as well as the associated certificate and key data
|
// certificate manager, as well as the associated certificate and key data
|
||||||
// in PEM format.
|
// in PEM format.
|
||||||
Current() *tls.Certificate
|
Current() *tls.Certificate
|
||||||
|
// ServerHealthy returns true if the manager is able to communicate with
|
||||||
|
// the server. This allows a caller to determine whether the cert manager
|
||||||
|
// thinks it can potentially talk to the API server. The cert manager may
|
||||||
|
// be very conservative and only return true if recent communication has
|
||||||
|
// occurred with the server.
|
||||||
|
ServerHealthy() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config is the set of configuration parameters available for a new Manager.
|
// Config is the set of configuration parameters available for a new Manager.
|
||||||
|
@ -134,6 +144,7 @@ type manager struct {
|
||||||
rotationDeadline time.Time
|
rotationDeadline time.Time
|
||||||
forceRotation bool
|
forceRotation bool
|
||||||
certificateExpiration Gauge
|
certificateExpiration Gauge
|
||||||
|
serverHealth bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewManager returns a new certificate manager. A certificate manager is
|
// NewManager returns a new certificate manager. A certificate manager is
|
||||||
|
@ -171,6 +182,14 @@ func (m *manager) Current() *tls.Certificate {
|
||||||
return m.cert
|
return m.cert
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ServerHealthy returns true if the cert manager believes the server
|
||||||
|
// is currently alive.
|
||||||
|
func (m *manager) ServerHealthy() bool {
|
||||||
|
m.certAccessLock.RLock()
|
||||||
|
defer m.certAccessLock.RUnlock()
|
||||||
|
return m.serverHealth
|
||||||
|
}
|
||||||
|
|
||||||
// SetCertificateSigningRequestClient sets the client interface that is used
|
// SetCertificateSigningRequestClient sets the client interface that is used
|
||||||
// for signing new certificates generated as part of rotation. It must be
|
// for signing new certificates generated as part of rotation. It must be
|
||||||
// called before Start() and can not be used to change the
|
// called before Start() and can not be used to change the
|
||||||
|
@ -205,9 +224,8 @@ func (m *manager) Start() {
|
||||||
// doesn't have a certificate at all yet.
|
// doesn't have a certificate at all yet.
|
||||||
if m.shouldRotate() {
|
if m.shouldRotate() {
|
||||||
glog.V(1).Infof("shouldRotate() is true, forcing immediate rotation")
|
glog.V(1).Infof("shouldRotate() is true, forcing immediate rotation")
|
||||||
_, err := m.rotateCerts()
|
if _, err := m.rotateCerts(); err != nil {
|
||||||
if err != nil {
|
utilruntime.HandleError(fmt.Errorf("Could not rotate certificates: %v", err))
|
||||||
glog.Errorf("Could not rotate certificates: %v", err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
backoff := wait.Backoff{
|
backoff := wait.Backoff{
|
||||||
|
@ -221,7 +239,7 @@ func (m *manager) Start() {
|
||||||
glog.V(2).Infof("Waiting %v for next certificate rotation", sleepInterval)
|
glog.V(2).Infof("Waiting %v for next certificate rotation", sleepInterval)
|
||||||
time.Sleep(sleepInterval)
|
time.Sleep(sleepInterval)
|
||||||
if err := wait.ExponentialBackoff(backoff, m.rotateCerts); err != nil {
|
if err := wait.ExponentialBackoff(backoff, m.rotateCerts); err != nil {
|
||||||
glog.Errorf("Reached backoff limit, still unable to rotate certs: %v", err)
|
utilruntime.HandleError(fmt.Errorf("Reached backoff limit, still unable to rotate certs: %v", err))
|
||||||
wait.PollInfinite(128*time.Second, m.rotateCerts)
|
wait.PollInfinite(128*time.Second, m.rotateCerts)
|
||||||
}
|
}
|
||||||
}, 0)
|
}, 0)
|
||||||
|
@ -275,26 +293,58 @@ func (m *manager) shouldRotate() bool {
|
||||||
return time.Now().After(m.rotationDeadline)
|
return time.Now().After(m.rotationDeadline)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rotateCerts attempts to request a client cert from the server, wait a reasonable
|
||||||
|
// period of time for it to be signed, and then update the cert on disk. If it cannot
|
||||||
|
// retrieve a cert, it will return false. It will only return error in exceptional cases.
|
||||||
|
// This method also keeps track of "server health" by interpreting the responses it gets
|
||||||
|
// from the server on the various calls it makes.
|
||||||
func (m *manager) rotateCerts() (bool, error) {
|
func (m *manager) rotateCerts() (bool, error) {
|
||||||
glog.V(2).Infof("Rotating certificates")
|
glog.V(2).Infof("Rotating certificates")
|
||||||
|
|
||||||
csrPEM, keyPEM, err := m.generateCSR()
|
csrPEM, keyPEM, privateKey, err := m.generateCSR()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Unable to generate a certificate signing request: %v", err)
|
utilruntime.HandleError(fmt.Errorf("Unable to generate a certificate signing request: %v", err))
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call the Certificate Signing Request API to get a certificate for the
|
// Call the Certificate Signing Request API to get a certificate for the
|
||||||
// new private key.
|
// new private key.
|
||||||
crtPEM, err := requestCertificate(m.certSigningRequestClient, csrPEM, m.usages)
|
req, err := csr.RequestCertificate(m.certSigningRequestClient, csrPEM, "", m.usages, privateKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed while requesting a signed certificate from the master: %v", err)
|
utilruntime.HandleError(fmt.Errorf("Failed while requesting a signed certificate from the master: %v", err))
|
||||||
|
return false, m.updateServerError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the certificate to be signed. Instead of one long watch, we retry with slighly longer
|
||||||
|
// intervals each time in order to tolerate failures from the server AND to preserve the liveliness
|
||||||
|
// of the cert manager loop. This creates slightly more traffic against the API server in return
|
||||||
|
// for bounding the amount of time we wait when a certificate expires.
|
||||||
|
var crtPEM []byte
|
||||||
|
watchDuration := time.Minute
|
||||||
|
if err := wait.ExponentialBackoff(certificateWaitBackoff, func() (bool, error) {
|
||||||
|
data, err := csr.WaitForCertificate(m.certSigningRequestClient, req, watchDuration)
|
||||||
|
switch {
|
||||||
|
case err == nil:
|
||||||
|
crtPEM = data
|
||||||
|
return true, nil
|
||||||
|
case err == wait.ErrWaitTimeout:
|
||||||
|
watchDuration += time.Minute
|
||||||
|
if watchDuration > 5*time.Minute {
|
||||||
|
watchDuration = 5 * time.Minute
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
default:
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Unable to check certificate signing status: %v", err))
|
||||||
|
return false, m.updateServerError(err)
|
||||||
|
}
|
||||||
|
}); err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Certificate request was not signed: %v", err))
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
cert, err := m.certStore.Update(crtPEM, keyPEM)
|
cert, err := m.certStore.Update(crtPEM, keyPEM)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Unable to store the new cert/key pair: %v", err)
|
utilruntime.HandleError(fmt.Errorf("Unable to store the new cert/key pair: %v", err))
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -337,91 +387,54 @@ var jitteryDuration = func(totalDuration float64) time.Duration {
|
||||||
return wait.Jitter(time.Duration(totalDuration), 0.2) - time.Duration(totalDuration*0.3)
|
return wait.Jitter(time.Duration(totalDuration), 0.2) - time.Duration(totalDuration*0.3)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// updateCached sets the most recent retrieved cert. It also sets the server
|
||||||
|
// as assumed healthy.
|
||||||
func (m *manager) updateCached(cert *tls.Certificate) {
|
func (m *manager) updateCached(cert *tls.Certificate) {
|
||||||
m.certAccessLock.Lock()
|
m.certAccessLock.Lock()
|
||||||
defer m.certAccessLock.Unlock()
|
defer m.certAccessLock.Unlock()
|
||||||
|
m.serverHealth = true
|
||||||
m.cert = cert
|
m.cert = cert
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *manager) generateCSR() (csrPEM []byte, keyPEM []byte, err error) {
|
// updateServerError takes an error returned by the server and infers
|
||||||
|
// the health of the server based on the error. It will return nil if
|
||||||
|
// the error does not require immediate termination of any wait loops,
|
||||||
|
// and otherwise it will return the error.
|
||||||
|
func (m *manager) updateServerError(err error) error {
|
||||||
|
m.certAccessLock.Lock()
|
||||||
|
defer m.certAccessLock.Unlock()
|
||||||
|
switch {
|
||||||
|
case errors.IsUnauthorized(err):
|
||||||
|
// SSL terminating proxies may report this error instead of the master
|
||||||
|
m.serverHealth = true
|
||||||
|
case errors.IsUnexpectedServerError(err):
|
||||||
|
// generally indicates a proxy or other load balancer problem, rather than a problem coming
|
||||||
|
// from the master
|
||||||
|
m.serverHealth = false
|
||||||
|
default:
|
||||||
|
// Identify known errors that could be expected for a cert request that
|
||||||
|
// indicate everything is working normally
|
||||||
|
m.serverHealth = errors.IsNotFound(err) || errors.IsForbidden(err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *manager) generateCSR() (csrPEM []byte, keyPEM []byte, key interface{}, err error) {
|
||||||
// Generate a new private key.
|
// Generate a new private key.
|
||||||
privateKey, err := ecdsa.GenerateKey(elliptic.P256(), cryptorand.Reader)
|
privateKey, err := ecdsa.GenerateKey(elliptic.P256(), cryptorand.Reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("unable to generate a new private key: %v", err)
|
return nil, nil, nil, fmt.Errorf("unable to generate a new private key: %v", err)
|
||||||
}
|
}
|
||||||
der, err := x509.MarshalECPrivateKey(privateKey)
|
der, err := x509.MarshalECPrivateKey(privateKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("unable to marshal the new key to DER: %v", err)
|
return nil, nil, nil, fmt.Errorf("unable to marshal the new key to DER: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
keyPEM = pem.EncodeToMemory(&pem.Block{Type: cert.ECPrivateKeyBlockType, Bytes: der})
|
keyPEM = pem.EncodeToMemory(&pem.Block{Type: cert.ECPrivateKeyBlockType, Bytes: der})
|
||||||
|
|
||||||
csrPEM, err = cert.MakeCSRFromTemplate(privateKey, m.template)
|
csrPEM, err = cert.MakeCSRFromTemplate(privateKey, m.template)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("unable to create a csr from the private key: %v", err)
|
return nil, nil, nil, fmt.Errorf("unable to create a csr from the private key: %v", err)
|
||||||
}
|
}
|
||||||
return csrPEM, keyPEM, nil
|
return csrPEM, keyPEM, privateKey, nil
|
||||||
}
|
|
||||||
|
|
||||||
// requestCertificate will create a certificate signing request using the PEM
|
|
||||||
// encoded CSR and send it to API server, then it will watch the object's
|
|
||||||
// status, once approved by API server, it will return the API server's issued
|
|
||||||
// certificate (pem-encoded). If there is any errors, or the watch timeouts, it
|
|
||||||
// will return an error.
|
|
||||||
//
|
|
||||||
// NOTE This is a copy of a function with the same name in
|
|
||||||
// k8s.io/kubernetes/pkg/kubelet/util/csr/csr.go, changing only the package that
|
|
||||||
// CertificateSigningRequestInterface and KeyUsage are imported from.
|
|
||||||
func requestCertificate(client certificatesclient.CertificateSigningRequestInterface, csrData []byte, usages []certificates.KeyUsage) (certData []byte, err error) {
|
|
||||||
glog.Infof("Requesting new certificate.")
|
|
||||||
req, err := client.Create(&certificates.CertificateSigningRequest{
|
|
||||||
// Username, UID, Groups will be injected by API server.
|
|
||||||
TypeMeta: metav1.TypeMeta{Kind: "CertificateSigningRequest"},
|
|
||||||
ObjectMeta: metav1.ObjectMeta{GenerateName: "csr-"},
|
|
||||||
|
|
||||||
Spec: certificates.CertificateSigningRequestSpec{
|
|
||||||
Request: csrData,
|
|
||||||
Usages: usages,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot create certificate signing request: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make a default timeout = 3600s.
|
|
||||||
var defaultTimeoutSeconds int64 = 3600
|
|
||||||
certWatch, err := client.Watch(metav1.ListOptions{
|
|
||||||
Watch: true,
|
|
||||||
TimeoutSeconds: &defaultTimeoutSeconds,
|
|
||||||
FieldSelector: fields.OneTermEqualSelector("metadata.name", req.Name).String(),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot watch on the certificate signing request: %v", err)
|
|
||||||
}
|
|
||||||
defer certWatch.Stop()
|
|
||||||
ch := certWatch.ResultChan()
|
|
||||||
|
|
||||||
for {
|
|
||||||
event, ok := <-ch
|
|
||||||
if !ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if event.Type == watch.Modified || event.Type == watch.Added {
|
|
||||||
if event.Object.(*certificates.CertificateSigningRequest).UID != req.UID {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
status := event.Object.(*certificates.CertificateSigningRequest).Status
|
|
||||||
for _, c := range status.Conditions {
|
|
||||||
if c.Type == certificates.CertificateDenied {
|
|
||||||
return nil, fmt.Errorf("certificate signing request is not approved, reason: %v, message: %v", c.Reason, c.Message)
|
|
||||||
}
|
|
||||||
if c.Type == certificates.CertificateApproved && status.Certificate != nil {
|
|
||||||
return status.Certificate, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, fmt.Errorf("watch channel closed")
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,10 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
certificates "k8s.io/api/certificates/v1beta1"
|
certificates "k8s.io/api/certificates/v1beta1"
|
||||||
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
watch "k8s.io/apimachinery/pkg/watch"
|
watch "k8s.io/apimachinery/pkg/watch"
|
||||||
certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
|
certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
|
||||||
)
|
)
|
||||||
|
@ -288,6 +291,7 @@ func TestRotateCertWaitingForResultError(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
certificateWaitBackoff = wait.Backoff{Steps: 1}
|
||||||
if success, err := m.rotateCerts(); success {
|
if success, err := m.rotateCerts(); success {
|
||||||
t.Errorf("Got success from 'rotateCerts', wanted failure.")
|
t.Errorf("Got success from 'rotateCerts', wanted failure.")
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
@ -612,11 +616,170 @@ func TestInitializeOtherRESTClients(t *testing.T) {
|
||||||
} else {
|
} else {
|
||||||
m.setRotationDeadline()
|
m.setRotationDeadline()
|
||||||
if m.shouldRotate() {
|
if m.shouldRotate() {
|
||||||
if success, err := certificateManager.(*manager).rotateCerts(); !success {
|
success, err := certificateManager.(*manager).rotateCerts()
|
||||||
t.Errorf("Got failure from 'rotateCerts', expected success")
|
if err != nil {
|
||||||
} else if err != nil {
|
|
||||||
t.Errorf("Got error %v, expected none.", err)
|
t.Errorf("Got error %v, expected none.", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
if !success {
|
||||||
|
t.Errorf("Unexpected response 'rotateCerts': %t", success)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
certificate = certificateManager.Current()
|
||||||
|
if !certificatesEqual(certificate, tc.expectedCertAfterStart.certificate) {
|
||||||
|
t.Errorf("Got %v, wanted %v", certificateString(certificate), certificateString(tc.expectedCertAfterStart.certificate))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServerHealth(t *testing.T) {
|
||||||
|
type certs struct {
|
||||||
|
storeCert *certificateData
|
||||||
|
bootstrapCert *certificateData
|
||||||
|
apiCert *certificateData
|
||||||
|
expectedCertBeforeStart *certificateData
|
||||||
|
expectedCertAfterStart *certificateData
|
||||||
|
}
|
||||||
|
|
||||||
|
updatedCerts := certs{
|
||||||
|
storeCert: storeCertData,
|
||||||
|
bootstrapCert: bootstrapCertData,
|
||||||
|
apiCert: apiServerCertData,
|
||||||
|
expectedCertBeforeStart: storeCertData,
|
||||||
|
expectedCertAfterStart: apiServerCertData,
|
||||||
|
}
|
||||||
|
|
||||||
|
currentCerts := certs{
|
||||||
|
storeCert: storeCertData,
|
||||||
|
bootstrapCert: bootstrapCertData,
|
||||||
|
apiCert: apiServerCertData,
|
||||||
|
expectedCertBeforeStart: storeCertData,
|
||||||
|
expectedCertAfterStart: storeCertData,
|
||||||
|
}
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
description string
|
||||||
|
certs
|
||||||
|
|
||||||
|
failureType fakeClientFailureType
|
||||||
|
clientErr error
|
||||||
|
|
||||||
|
expectRotateFail bool
|
||||||
|
expectHealthy bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
description: "Current certificate, bootstrap certificate",
|
||||||
|
certs: updatedCerts,
|
||||||
|
expectHealthy: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "Generic error on create",
|
||||||
|
certs: currentCerts,
|
||||||
|
|
||||||
|
failureType: createError,
|
||||||
|
expectRotateFail: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "Unauthorized error on create",
|
||||||
|
certs: currentCerts,
|
||||||
|
|
||||||
|
failureType: createError,
|
||||||
|
clientErr: errors.NewUnauthorized("unauthorized"),
|
||||||
|
expectRotateFail: true,
|
||||||
|
expectHealthy: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "Generic unauthorized error on create",
|
||||||
|
certs: currentCerts,
|
||||||
|
|
||||||
|
failureType: createError,
|
||||||
|
clientErr: errors.NewGenericServerResponse(401, "POST", schema.GroupResource{}, "", "", 0, true),
|
||||||
|
expectRotateFail: true,
|
||||||
|
expectHealthy: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "Generic not found error on create",
|
||||||
|
certs: currentCerts,
|
||||||
|
|
||||||
|
failureType: createError,
|
||||||
|
clientErr: errors.NewGenericServerResponse(404, "POST", schema.GroupResource{}, "", "", 0, true),
|
||||||
|
expectRotateFail: true,
|
||||||
|
expectHealthy: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "Not found error on create",
|
||||||
|
certs: currentCerts,
|
||||||
|
|
||||||
|
failureType: createError,
|
||||||
|
clientErr: errors.NewGenericServerResponse(404, "POST", schema.GroupResource{}, "", "", 0, false),
|
||||||
|
expectRotateFail: true,
|
||||||
|
expectHealthy: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "Conflict error on watch",
|
||||||
|
certs: currentCerts,
|
||||||
|
|
||||||
|
failureType: watchError,
|
||||||
|
clientErr: errors.NewGenericServerResponse(409, "POST", schema.GroupResource{}, "", "", 0, false),
|
||||||
|
expectRotateFail: true,
|
||||||
|
expectHealthy: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.description, func(t *testing.T) {
|
||||||
|
certificateStore := &fakeStore{
|
||||||
|
cert: tc.storeCert.certificate,
|
||||||
|
}
|
||||||
|
|
||||||
|
certificateManager, err := NewManager(&Config{
|
||||||
|
Template: &x509.CertificateRequest{
|
||||||
|
Subject: pkix.Name{
|
||||||
|
Organization: []string{"system:nodes"},
|
||||||
|
CommonName: "system:node:fake-node-name",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Usages: []certificates.KeyUsage{
|
||||||
|
certificates.UsageDigitalSignature,
|
||||||
|
certificates.UsageKeyEncipherment,
|
||||||
|
certificates.UsageClientAuth,
|
||||||
|
},
|
||||||
|
CertificateStore: certificateStore,
|
||||||
|
BootstrapCertificatePEM: tc.bootstrapCert.certificatePEM,
|
||||||
|
BootstrapKeyPEM: tc.bootstrapCert.keyPEM,
|
||||||
|
CertificateSigningRequestClient: &fakeClient{
|
||||||
|
certificatePEM: tc.apiCert.certificatePEM,
|
||||||
|
failureType: tc.failureType,
|
||||||
|
err: tc.clientErr,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Got %v, wanted no error.", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
certificate := certificateManager.Current()
|
||||||
|
if !certificatesEqual(certificate, tc.expectedCertBeforeStart.certificate) {
|
||||||
|
t.Errorf("Got %v, wanted %v", certificateString(certificate), certificateString(tc.expectedCertBeforeStart.certificate))
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := certificateManager.(*manager); !ok {
|
||||||
|
t.Errorf("Expected a '*manager' from 'NewManager'")
|
||||||
|
} else {
|
||||||
|
success, err := certificateManager.(*manager).rotateCerts()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Got error %v, expected none.", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !success != tc.expectRotateFail {
|
||||||
|
t.Errorf("Unexpected response 'rotateCerts': %t", success)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if actual := certificateManager.(*manager).ServerHealthy(); actual != tc.expectHealthy {
|
||||||
|
t.Errorf("Unexpected manager server health: %t", actual)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -641,10 +804,29 @@ type fakeClient struct {
|
||||||
certificatesclient.CertificateSigningRequestInterface
|
certificatesclient.CertificateSigningRequestInterface
|
||||||
failureType fakeClientFailureType
|
failureType fakeClientFailureType
|
||||||
certificatePEM []byte
|
certificatePEM []byte
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c fakeClient) List(opts v1.ListOptions) (*certificates.CertificateSigningRequestList, error) {
|
||||||
|
if c.failureType == watchError {
|
||||||
|
if c.err != nil {
|
||||||
|
return nil, c.err
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("Watch error")
|
||||||
|
}
|
||||||
|
csrReply := certificates.CertificateSigningRequestList{
|
||||||
|
Items: []certificates.CertificateSigningRequest{
|
||||||
|
{ObjectMeta: v1.ObjectMeta{UID: "fake-uid"}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return &csrReply, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c fakeClient) Create(*certificates.CertificateSigningRequest) (*certificates.CertificateSigningRequest, error) {
|
func (c fakeClient) Create(*certificates.CertificateSigningRequest) (*certificates.CertificateSigningRequest, error) {
|
||||||
if c.failureType == createError {
|
if c.failureType == createError {
|
||||||
|
if c.err != nil {
|
||||||
|
return nil, c.err
|
||||||
|
}
|
||||||
return nil, fmt.Errorf("Create error")
|
return nil, fmt.Errorf("Create error")
|
||||||
}
|
}
|
||||||
csrReply := certificates.CertificateSigningRequest{}
|
csrReply := certificates.CertificateSigningRequest{}
|
||||||
|
@ -654,6 +836,9 @@ func (c fakeClient) Create(*certificates.CertificateSigningRequest) (*certificat
|
||||||
|
|
||||||
func (c fakeClient) Watch(opts v1.ListOptions) (watch.Interface, error) {
|
func (c fakeClient) Watch(opts v1.ListOptions) (watch.Interface, error) {
|
||||||
if c.failureType == watchError {
|
if c.failureType == watchError {
|
||||||
|
if c.err != nil {
|
||||||
|
return nil, c.err
|
||||||
|
}
|
||||||
return nil, fmt.Errorf("Watch error")
|
return nil, fmt.Errorf("Watch error")
|
||||||
}
|
}
|
||||||
return &fakeWatch{
|
return &fakeWatch{
|
||||||
|
|
|
@ -9,9 +9,8 @@ load(
|
||||||
go_library(
|
go_library(
|
||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = ["csr.go"],
|
srcs = ["csr.go"],
|
||||||
importpath = "k8s.io/kubernetes/pkg/kubelet/util/csr",
|
importpath = "k8s.io/client-go/util/certificate/csr",
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/apis/certificates/v1beta1:go_default_library",
|
|
||||||
"//vendor/github.com/golang/glog:go_default_library",
|
"//vendor/github.com/golang/glog:go_default_library",
|
||||||
"//vendor/k8s.io/api/certificates/v1beta1:go_default_library",
|
"//vendor/k8s.io/api/certificates/v1beta1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
|
@ -19,6 +18,7 @@ go_library(
|
||||||
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||||
|
@ -42,7 +42,7 @@ filegroup(
|
||||||
go_test(
|
go_test(
|
||||||
name = "go_default_test",
|
name = "go_default_test",
|
||||||
srcs = ["csr_test.go"],
|
srcs = ["csr_test.go"],
|
||||||
importpath = "k8s.io/kubernetes/pkg/kubelet/util/csr",
|
importpath = "k8s.io/client-go/util/certificate/csr",
|
||||||
library = ":go_default_library",
|
library = ":go_default_library",
|
||||||
deps = [
|
deps = [
|
||||||
"//vendor/k8s.io/api/certificates/v1beta1:go_default_library",
|
"//vendor/k8s.io/api/certificates/v1beta1:go_default_library",
|
|
@ -19,25 +19,26 @@ package csr
|
||||||
import (
|
import (
|
||||||
"crypto"
|
"crypto"
|
||||||
"crypto/sha512"
|
"crypto/sha512"
|
||||||
|
"crypto/x509"
|
||||||
"crypto/x509/pkix"
|
"crypto/x509/pkix"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
|
"encoding/pem"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/golang/glog"
|
||||||
"reflect"
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
|
||||||
|
|
||||||
certificates "k8s.io/api/certificates/v1beta1"
|
certificates "k8s.io/api/certificates/v1beta1"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
|
certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
certutil "k8s.io/client-go/util/cert"
|
certutil "k8s.io/client-go/util/cert"
|
||||||
certhelper "k8s.io/kubernetes/pkg/apis/certificates/v1beta1"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// RequestNodeCertificate will create a certificate signing request for a node
|
// RequestNodeCertificate will create a certificate signing request for a node
|
||||||
|
@ -68,16 +69,20 @@ func RequestNodeCertificate(client certificatesclient.CertificateSigningRequestI
|
||||||
certificates.UsageClientAuth,
|
certificates.UsageClientAuth,
|
||||||
}
|
}
|
||||||
name := digestedName(privateKeyData, subject, usages)
|
name := digestedName(privateKeyData, subject, usages)
|
||||||
return requestCertificate(client, csrData, name, usages, privateKey)
|
req, err := RequestCertificate(client, csrData, name, usages, privateKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return WaitForCertificate(client, req, 3600*time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
// requestCertificate will either use an existing (if this process has run
|
// RequestCertificate will either use an existing (if this process has run
|
||||||
// before but not to completion) or create a certificate signing request using the
|
// before but not to completion) or create a certificate signing request using the
|
||||||
// PEM encoded CSR and send it to API server, then it will watch the object's
|
// PEM encoded CSR and send it to API server, then it will watch the object's
|
||||||
// status, once approved by API server, it will return the API server's issued
|
// status, once approved by API server, it will return the API server's issued
|
||||||
// certificate (pem-encoded). If there is any errors, or the watch timeouts, it
|
// certificate (pem-encoded). If there is any errors, or the watch timeouts, it
|
||||||
// will return an error.
|
// will return an error.
|
||||||
func requestCertificate(client certificatesclient.CertificateSigningRequestInterface, csrData []byte, name string, usages []certificates.KeyUsage, privateKey interface{}) (certData []byte, err error) {
|
func RequestCertificate(client certificatesclient.CertificateSigningRequestInterface, csrData []byte, name string, usages []certificates.KeyUsage, privateKey interface{}) (req *certificates.CertificateSigningRequest, err error) {
|
||||||
csr := &certificates.CertificateSigningRequest{
|
csr := &certificates.CertificateSigningRequest{
|
||||||
// Username, UID, Groups will be injected by API server.
|
// Username, UID, Groups will be injected by API server.
|
||||||
TypeMeta: metav1.TypeMeta{Kind: "CertificateSigningRequest"},
|
TypeMeta: metav1.TypeMeta{Kind: "CertificateSigningRequest"},
|
||||||
|
@ -89,28 +94,35 @@ func requestCertificate(client certificatesclient.CertificateSigningRequestInter
|
||||||
Usages: usages,
|
Usages: usages,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
if len(csr.Name) == 0 {
|
||||||
|
csr.GenerateName = "csr-"
|
||||||
|
}
|
||||||
|
|
||||||
req, err := client.Create(csr)
|
req, err = client.Create(csr)
|
||||||
switch {
|
switch {
|
||||||
case err == nil:
|
case err == nil:
|
||||||
case errors.IsAlreadyExists(err):
|
case errors.IsAlreadyExists(err) && len(name) > 0:
|
||||||
glog.Infof("csr for this node already exists, reusing")
|
glog.Infof("csr for this node already exists, reusing")
|
||||||
req, err = client.Get(name, metav1.GetOptions{})
|
req, err = client.Get(name, metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot retrieve certificate signing request: %v", err)
|
return nil, formatError("cannot retrieve certificate signing request: %v", err)
|
||||||
}
|
}
|
||||||
if err := ensureCompatible(req, csr, privateKey); err != nil {
|
if err := ensureCompatible(req, csr, privateKey); err != nil {
|
||||||
return nil, fmt.Errorf("retrieved csr is not compatible: %v", err)
|
return nil, fmt.Errorf("retrieved csr is not compatible: %v", err)
|
||||||
}
|
}
|
||||||
glog.Infof("csr for this node is still valid")
|
glog.Infof("csr for this node is still valid")
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("cannot create certificate signing request: %v", err)
|
return nil, formatError("cannot create certificate signing request: %v", err)
|
||||||
}
|
}
|
||||||
|
return req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitForCertificate waits for a certificate to be issued until timeout, or returns an error.
|
||||||
|
func WaitForCertificate(client certificatesclient.CertificateSigningRequestInterface, req *certificates.CertificateSigningRequest, timeout time.Duration) (certData []byte, err error) {
|
||||||
fieldSelector := fields.OneTermEqualSelector("metadata.name", req.Name).String()
|
fieldSelector := fields.OneTermEqualSelector("metadata.name", req.Name).String()
|
||||||
|
|
||||||
event, err := cache.ListWatchUntil(
|
event, err := cache.ListWatchUntil(
|
||||||
3600*time.Second,
|
timeout,
|
||||||
&cache.ListWatch{
|
&cache.ListWatch{
|
||||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||||
options.FieldSelector = fieldSelector
|
options.FieldSelector = fieldSelector
|
||||||
|
@ -124,6 +136,8 @@ func requestCertificate(client certificatesclient.CertificateSigningRequestInter
|
||||||
func(event watch.Event) (bool, error) {
|
func(event watch.Event) (bool, error) {
|
||||||
switch event.Type {
|
switch event.Type {
|
||||||
case watch.Modified, watch.Added:
|
case watch.Modified, watch.Added:
|
||||||
|
case watch.Deleted:
|
||||||
|
return false, fmt.Errorf("csr %q was deleted", req.Name)
|
||||||
default:
|
default:
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
@ -142,12 +156,14 @@ func requestCertificate(client certificatesclient.CertificateSigningRequestInter
|
||||||
return false, nil
|
return false, nil
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
if err == wait.ErrWaitTimeout {
|
||||||
|
return nil, wait.ErrWaitTimeout
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot watch on the certificate signing request: %v", err)
|
return nil, formatError("cannot watch on the certificate signing request: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return event.Object.(*certificates.CertificateSigningRequest).Status.Certificate, nil
|
return event.Object.(*certificates.CertificateSigningRequest).Status.Certificate, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// This digest should include all the relevant pieces of the CSR we care about.
|
// This digest should include all the relevant pieces of the CSR we care about.
|
||||||
|
@ -184,11 +200,11 @@ func digestedName(privateKeyData []byte, subject *pkix.Name, usages []certificat
|
||||||
|
|
||||||
// ensureCompatible ensures that a CSR object is compatible with an original CSR
|
// ensureCompatible ensures that a CSR object is compatible with an original CSR
|
||||||
func ensureCompatible(new, orig *certificates.CertificateSigningRequest, privateKey interface{}) error {
|
func ensureCompatible(new, orig *certificates.CertificateSigningRequest, privateKey interface{}) error {
|
||||||
newCsr, err := certhelper.ParseCSR(new)
|
newCsr, err := ParseCSR(new)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to parse new csr: %v", err)
|
return fmt.Errorf("unable to parse new csr: %v", err)
|
||||||
}
|
}
|
||||||
origCsr, err := certhelper.ParseCSR(orig)
|
origCsr, err := ParseCSR(orig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to parse original csr: %v", err)
|
return fmt.Errorf("unable to parse original csr: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -203,5 +219,43 @@ func ensureCompatible(new, orig *certificates.CertificateSigningRequest, private
|
||||||
if err := newCsr.CheckSignature(); err != nil {
|
if err := newCsr.CheckSignature(); err != nil {
|
||||||
return fmt.Errorf("error validating signature new CSR against old key: %v", err)
|
return fmt.Errorf("error validating signature new CSR against old key: %v", err)
|
||||||
}
|
}
|
||||||
|
if len(new.Status.Certificate) > 0 {
|
||||||
|
certs, err := certutil.ParseCertsPEM(new.Status.Certificate)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error parsing signed certificate for CSR: %v", err)
|
||||||
|
}
|
||||||
|
now := time.Now()
|
||||||
|
for _, cert := range certs {
|
||||||
|
if now.After(cert.NotAfter) {
|
||||||
|
return fmt.Errorf("one of the certificates for the CSR has expired: %s", cert.NotAfter)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// formatError preserves the type of an API message but alters the message. Expects
|
||||||
|
// a single argument format string, and returns the wrapped error.
|
||||||
|
func formatError(format string, err error) error {
|
||||||
|
if s, ok := err.(errors.APIStatus); ok {
|
||||||
|
se := &errors.StatusError{ErrStatus: s.Status()}
|
||||||
|
se.ErrStatus.Message = fmt.Sprintf(format, se.ErrStatus.Message)
|
||||||
|
return se
|
||||||
|
}
|
||||||
|
return fmt.Errorf(format, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParseCSR extracts the CSR from the API object and decodes it.
|
||||||
|
func ParseCSR(obj *certificates.CertificateSigningRequest) (*x509.CertificateRequest, error) {
|
||||||
|
// extract PEM from request object
|
||||||
|
pemBytes := obj.Spec.Request
|
||||||
|
block, _ := pem.Decode(pemBytes)
|
||||||
|
if block == nil || block.Type != "CERTIFICATE REQUEST" {
|
||||||
|
return nil, fmt.Errorf("PEM block type must be CERTIFICATE REQUEST")
|
||||||
|
}
|
||||||
|
csr, err := x509.ParseCertificateRequest(block.Bytes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return csr, nil
|
||||||
|
}
|
Loading…
Reference in New Issue