Merge pull request #45514 from mikedanese/cert-refactor

Automatic merge from submit-queue (batch tested with PRs 45514, 45635)

refactor certificate controller to break it into two parts

Break pkg/controller/certificates into:
* pkg/controller/certificates/approver: containing the group approver
* pkg/controller/certificates/signer: containing the local signer
* pkg/controller/certificates: containing shared infrastructure
```release-note
Break the 'certificatesigningrequests' controller into a 'csrapprover' controller and 'csrsigner' controller.
```
pull/6/head
Kubernetes Submit Queue 2017-05-23 20:52:53 -07:00 committed by GitHub
commit 5be7a6a73e
20 changed files with 341 additions and 351 deletions

View File

@ -72,7 +72,7 @@ func Run(s *GKECertificatesController) error {
sharedInformers := informers.NewSharedInformerFactory(client, time.Duration(12)*time.Hour) sharedInformers := informers.NewSharedInformerFactory(client, time.Duration(12)*time.Hour)
signer, err := NewGKESigner(s.ClusterSigningGKEKubeconfig, s.ClusterSigningGKERetryBackoff.Duration, recorder) signer, err := NewGKESigner(s.ClusterSigningGKEKubeconfig, s.ClusterSigningGKERetryBackoff.Duration, recorder, client)
if err != nil { if err != nil {
return err return err
} }
@ -80,8 +80,7 @@ func Run(s *GKECertificatesController) error {
controller, err := certificates.NewCertificateController( controller, err := certificates.NewCertificateController(
client, client,
sharedInformers.Certificates().V1beta1().CertificateSigningRequests(), sharedInformers.Certificates().V1beta1().CertificateSigningRequests(),
signer, signer.handle,
certificates.NewGroupApprover(s.ApproveAllKubeletCSRsForGroup),
) )
if err != nil { if err != nil {
return err return err

View File

@ -29,11 +29,13 @@ import (
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
_ "k8s.io/kubernetes/pkg/apis/certificates/install" _ "k8s.io/kubernetes/pkg/apis/certificates/install"
certificates "k8s.io/kubernetes/pkg/apis/certificates/v1beta1" capi "k8s.io/kubernetes/pkg/apis/certificates/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/controller/certificates"
) )
var ( var (
groupVersions = []schema.GroupVersion{certificates.SchemeGroupVersion} groupVersions = []schema.GroupVersion{capi.SchemeGroupVersion}
) )
// GKESigner uses external calls to GKE in order to sign certificate signing // GKESigner uses external calls to GKE in order to sign certificate signing
@ -43,10 +45,11 @@ type GKESigner struct {
kubeConfigFile string kubeConfigFile string
retryBackoff time.Duration retryBackoff time.Duration
recorder record.EventRecorder recorder record.EventRecorder
client clientset.Interface
} }
// NewGKESigner will create a new instance of a GKESigner. // NewGKESigner will create a new instance of a GKESigner.
func NewGKESigner(kubeConfigFile string, retryBackoff time.Duration, recorder record.EventRecorder) (*GKESigner, error) { func NewGKESigner(kubeConfigFile string, retryBackoff time.Duration, recorder record.EventRecorder, client clientset.Interface) (*GKESigner, error) {
webhook, err := webhook.NewGenericWebhook(api.Registry, api.Codecs, kubeConfigFile, groupVersions, retryBackoff) webhook, err := webhook.NewGenericWebhook(api.Registry, api.Codecs, kubeConfigFile, groupVersions, retryBackoff)
if err != nil { if err != nil {
return nil, err return nil, err
@ -57,13 +60,29 @@ func NewGKESigner(kubeConfigFile string, retryBackoff time.Duration, recorder re
kubeConfigFile: kubeConfigFile, kubeConfigFile: kubeConfigFile,
retryBackoff: retryBackoff, retryBackoff: retryBackoff,
recorder: recorder, recorder: recorder,
client: client,
}, nil }, nil
} }
func (s *GKESigner) handle(csr *capi.CertificateSigningRequest) error {
if !certificates.IsCertificateRequestApproved(csr) {
return nil
}
csr, err := s.sign(csr)
if err != nil {
return fmt.Errorf("error auto signing csr: %v", err)
}
_, err = s.client.Certificates().CertificateSigningRequests().UpdateStatus(csr)
if err != nil {
return fmt.Errorf("error updating signature for csr: %v", err)
}
return nil
}
// Sign will make an external call to GKE order to sign the given // Sign will make an external call to GKE order to sign the given
// *certificates.CertificateSigningRequest, using the GKESigner's // *capi.CertificateSigningRequest, using the GKESigner's
// kubeConfigFile. // kubeConfigFile.
func (s *GKESigner) Sign(csr *certificates.CertificateSigningRequest) (*certificates.CertificateSigningRequest, error) { func (s *GKESigner) sign(csr *capi.CertificateSigningRequest) (*capi.CertificateSigningRequest, error) {
result := s.webhook.WithExponentialBackoff(func() rest.Result { result := s.webhook.WithExponentialBackoff(func() rest.Result {
return s.webhook.RestClient.Post().Body(csr).Do() return s.webhook.RestClient.Post().Body(csr).Do()
}) })
@ -80,7 +99,7 @@ func (s *GKESigner) Sign(csr *certificates.CertificateSigningRequest) (*certific
return nil, s.webhookError(csr, fmt.Errorf("received unsuccessful response code from webhook: %d", statusCode)) return nil, s.webhookError(csr, fmt.Errorf("received unsuccessful response code from webhook: %d", statusCode))
} }
result_csr := &certificates.CertificateSigningRequest{} result_csr := &capi.CertificateSigningRequest{}
if err := result.Into(result_csr); err != nil { if err := result.Into(result_csr); err != nil {
return nil, s.webhookError(result_csr, err) return nil, s.webhookError(result_csr, err)
@ -91,7 +110,7 @@ func (s *GKESigner) Sign(csr *certificates.CertificateSigningRequest) (*certific
return csr, nil return csr, nil
} }
func (s *GKESigner) webhookError(csr *certificates.CertificateSigningRequest, err error) error { func (s *GKESigner) webhookError(csr *capi.CertificateSigningRequest, err error) error {
glog.V(2).Infof("error contacting webhook backend: %s", err) glog.V(2).Infof("error contacting webhook backend: %s", err)
s.recorder.Eventf(csr, "Warning", "SigningError", "error while calling GKE: %v", err) s.recorder.Eventf(csr, "Warning", "SigningError", "error while calling GKE: %v", err)
return err return err

View File

@ -104,12 +104,12 @@ func TestGKESigner(t *testing.T) {
t.Fatalf("error closing kubeconfig template: %v", err) t.Fatalf("error closing kubeconfig template: %v", err)
} }
signer, err := NewGKESigner(kubeConfig.Name(), time.Duration(500)*time.Millisecond, record.NewFakeRecorder(10)) signer, err := NewGKESigner(kubeConfig.Name(), time.Duration(500)*time.Millisecond, record.NewFakeRecorder(10), nil)
if err != nil { if err != nil {
t.Fatalf("error creating GKESigner: %v", err) t.Fatalf("error creating GKESigner: %v", err)
} }
cert, err := signer.Sign(&certificates.CertificateSigningRequest{}) cert, err := signer.sign(&certificates.CertificateSigningRequest{})
if c.wantErr { if c.wantErr {
if err == nil { if err == nil {

View File

@ -44,7 +44,8 @@ go_library(
"//pkg/cloudprovider/providers/vsphere:go_default_library", "//pkg/cloudprovider/providers/vsphere:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/controller/bootstrap:go_default_library", "//pkg/controller/bootstrap:go_default_library",
"//pkg/controller/certificates:go_default_library", "//pkg/controller/certificates/approver:go_default_library",
"//pkg/controller/certificates/signer:go_default_library",
"//pkg/controller/cronjob:go_default_library", "//pkg/controller/cronjob:go_default_library",
"//pkg/controller/daemon:go_default_library", "//pkg/controller/daemon:go_default_library",
"//pkg/controller/deployment:go_default_library", "//pkg/controller/deployment:go_default_library",

View File

@ -24,26 +24,47 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
certcontroller "k8s.io/kubernetes/pkg/controller/certificates" "k8s.io/kubernetes/pkg/controller/certificates/approver"
"k8s.io/kubernetes/pkg/controller/certificates/signer"
) )
func startCSRController(ctx ControllerContext) (bool, error) { func startCSRSigningController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1beta1", Resource: "certificatesigningrequests"}] { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1beta1", Resource: "certificatesigningrequests"}] {
return false, nil return false, nil
} }
if ctx.Options.ClusterSigningCertFile == "" || ctx.Options.ClusterSigningKeyFile == "" {
return false, nil
}
c := ctx.ClientBuilder.ClientOrDie("certificate-controller") c := ctx.ClientBuilder.ClientOrDie("certificate-controller")
signer, err := certcontroller.NewCFSSLSigner(ctx.Options.ClusterSigningCertFile, ctx.Options.ClusterSigningKeyFile) signer, err := signer.NewCSRSigningController(
c,
ctx.InformerFactory.Certificates().V1beta1().CertificateSigningRequests(),
ctx.Options.ClusterSigningCertFile,
ctx.Options.ClusterSigningKeyFile,
)
if err != nil { if err != nil {
glog.Errorf("Failed to start certificate controller: %v", err) glog.Errorf("Failed to start certificate controller: %v", err)
return false, nil return false, nil
} }
go signer.Run(1, ctx.Stop)
certController, err := certcontroller.NewCertificateController( return true, nil
}
func startCSRApprovingController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1beta1", Resource: "certificatesigningrequests"}] {
return false, nil
}
if ctx.Options.ApproveAllKubeletCSRsForGroup == "" {
return false, nil
}
c := ctx.ClientBuilder.ClientOrDie("certificate-controller")
approver, err := approver.NewCSRApprovingController(
c, c,
ctx.InformerFactory.Certificates().V1beta1().CertificateSigningRequests(), ctx.InformerFactory.Certificates().V1beta1().CertificateSigningRequests(),
signer, ctx.Options.ApproveAllKubeletCSRsForGroup,
certcontroller.NewGroupApprover(ctx.Options.ApproveAllKubeletCSRsForGroup),
) )
if err != nil { if err != nil {
// TODO this is failing consistently in test-cmd and local-up-cluster.sh. Fix them and make it consistent with all others which // TODO this is failing consistently in test-cmd and local-up-cluster.sh. Fix them and make it consistent with all others which
@ -51,6 +72,7 @@ func startCSRController(ctx ControllerContext) (bool, error) {
glog.Errorf("Failed to start certificate controller: %v", err) glog.Errorf("Failed to start certificate controller: %v", err)
return false, nil return false, nil
} }
go certController.Run(1, ctx.Stop) go approver.Run(1, ctx.Stop)
return true, nil return true, nil
} }

View File

@ -309,7 +309,8 @@ func NewControllerInitializers() map[string]InitFunc {
controllers["disruption"] = startDisruptionController controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController controllers["statefulset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController controllers["cronjob"] = startCronJobController
controllers["certificatesigningrequests"] = startCSRController controllers["csrsigning"] = startCSRSigningController
controllers["csrapproving"] = startCSRApprovingController
controllers["ttl"] = startTTLController controllers["ttl"] = startTTLController
controllers["bootstrapsigner"] = startBootstrapSignerController controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController controllers["tokencleaner"] = startTokenCleanerController

View File

@ -1,5 +1,3 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"]) licenses(["notice"])
load( load(
@ -13,21 +11,18 @@ go_library(
srcs = [ srcs = [
"certificate_controller.go", "certificate_controller.go",
"certificate_controller_utils.go", "certificate_controller_utils.go",
"cfssl_signer.go",
"doc.go",
"groupapprove.go",
], ],
tags = ["automanaged"], tags = ["automanaged"],
visibility = [
":__subpackages__",
"//cmd/gke-certificates-controller:__subpackages__",
],
deps = [ deps = [
"//pkg/apis/certificates/v1beta1:go_default_library", "//pkg/apis/certificates/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/certificates/v1beta1:go_default_library", "//pkg/client/informers/informers_generated/externalversions/certificates/v1beta1:go_default_library",
"//pkg/client/listers/certificates/v1beta1:go_default_library", "//pkg/client/listers/certificates/v1beta1:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//vendor/github.com/cloudflare/cfssl/config:go_default_library",
"//vendor/github.com/cloudflare/cfssl/helpers:go_default_library",
"//vendor/github.com/cloudflare/cfssl/signer:go_default_library",
"//vendor/github.com/cloudflare/cfssl/signer/local:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
@ -49,22 +44,20 @@ filegroup(
filegroup( filegroup(
name = "all-srcs", name = "all-srcs",
srcs = [":package-srcs"], srcs = [
":package-srcs",
"//pkg/controller/certificates/approver:all-srcs",
"//pkg/controller/certificates/signer:all-srcs",
],
tags = ["automanaged"], tags = ["automanaged"],
visibility = [
"//pkg/controller:__pkg__",
],
) )
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = ["certificate_controller_test.go"],
"certificate_controller_test.go",
"cfssl_signer_test.go",
"groupapprove_test.go",
],
data = [
"testdata/ca.crt",
"testdata/ca.key",
"testdata/kubelet.csr",
],
library = ":go_default_library", library = ":go_default_library",
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
@ -73,10 +66,5 @@ go_test(
"//pkg/client/informers/informers_generated/externalversions:go_default_library", "//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/cert:go_default_library",
"//vendor/k8s.io/client-go/util/cert/triple:go_default_library",
], ],
) )

View File

@ -0,0 +1,43 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["groupapprove_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = ["//pkg/apis/certificates/v1beta1:go_default_library"],
)
go_library(
name = "go_default_library",
srcs = ["groupapprove.go"],
tags = ["automanaged"],
deps = [
"//pkg/apis/certificates/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/certificates/v1beta1:go_default_library",
"//pkg/controller/certificates:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -14,7 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package certificates // Package approver implements an automated approver for kubelet certificates.
package approver
import ( import (
"fmt" "fmt"
@ -22,31 +23,51 @@ import (
"strings" "strings"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
certificates "k8s.io/kubernetes/pkg/apis/certificates/v1beta1" capi "k8s.io/kubernetes/pkg/apis/certificates/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
certificatesinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/certificates/v1beta1"
"k8s.io/kubernetes/pkg/controller/certificates"
) )
func NewCSRApprovingController(
client clientset.Interface,
csrInformer certificatesinformers.CertificateSigningRequestInformer,
approveAllKubeletCSRsForGroup string,
) (*certificates.CertificateController, error) {
approver := &groupApprover{
approveAllKubeletCSRsForGroup: approveAllKubeletCSRsForGroup,
client: client,
}
return certificates.NewCertificateController(
client,
csrInformer,
approver.handle,
)
}
// groupApprover implements AutoApprover for signing Kubelet certificates. // groupApprover implements AutoApprover for signing Kubelet certificates.
type groupApprover struct { type groupApprover struct {
approveAllKubeletCSRsForGroup string approveAllKubeletCSRsForGroup string
client clientset.Interface
} }
// NewGroupApprover creates an approver that accepts any CSR requests where the subject group contains approveAllKubeletCSRsForGroup. func (ga *groupApprover) handle(csr *capi.CertificateSigningRequest) error {
func NewGroupApprover(approveAllKubeletCSRsForGroup string) AutoApprover {
return &groupApprover{
approveAllKubeletCSRsForGroup: approveAllKubeletCSRsForGroup,
}
}
func (cc *groupApprover) AutoApprove(csr *certificates.CertificateSigningRequest) (*certificates.CertificateSigningRequest, error) {
// short-circuit if we're not auto-approving
if cc.approveAllKubeletCSRsForGroup == "" {
return csr, nil
}
// short-circuit if we're already approved or denied // short-circuit if we're already approved or denied
if approved, denied := getCertApprovalCondition(&csr.Status); approved || denied { if approved, denied := certificates.GetCertApprovalCondition(&csr.Status); approved || denied {
return csr, nil return nil
}
csr, err := ga.autoApprove(csr)
if err != nil {
return fmt.Errorf("error auto approving csr: %v", err)
}
_, err = ga.client.Certificates().CertificateSigningRequests().UpdateApproval(csr)
if err != nil {
return fmt.Errorf("error updating approval for csr: %v", err)
}
return nil
} }
func (cc *groupApprover) autoApprove(csr *capi.CertificateSigningRequest) (*capi.CertificateSigningRequest, error) {
isKubeletBootstrapGroup := false isKubeletBootstrapGroup := false
for _, g := range csr.Spec.Groups { for _, g := range csr.Spec.Groups {
if g == cc.approveAllKubeletCSRsForGroup { if g == cc.approveAllKubeletCSRsForGroup {
@ -58,7 +79,7 @@ func (cc *groupApprover) AutoApprove(csr *certificates.CertificateSigningRequest
return csr, nil return csr, nil
} }
x509cr, err := certificates.ParseCSR(csr) x509cr, err := capi.ParseCSR(csr)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to parse csr %q: %v", csr.Name, err)) utilruntime.HandleError(fmt.Errorf("unable to parse csr %q: %v", csr.Name, err))
return csr, nil return csr, nil
@ -76,26 +97,26 @@ func (cc *groupApprover) AutoApprove(csr *certificates.CertificateSigningRequest
return csr, nil return csr, nil
} }
csr.Status.Conditions = append(csr.Status.Conditions, certificates.CertificateSigningRequestCondition{ csr.Status.Conditions = append(csr.Status.Conditions, capi.CertificateSigningRequestCondition{
Type: certificates.CertificateApproved, Type: capi.CertificateApproved,
Reason: "AutoApproved", Reason: "AutoApproved",
Message: "Auto approving of all kubelet CSRs is enabled on the controller manager", Message: "Auto approving of all kubelet CSRs is enabled on the controller manager",
}) })
return csr, nil return csr, nil
} }
var kubeletClientUsages = []certificates.KeyUsage{ var kubeletClientUsages = []capi.KeyUsage{
certificates.UsageKeyEncipherment, capi.UsageKeyEncipherment,
certificates.UsageDigitalSignature, capi.UsageDigitalSignature,
certificates.UsageClientAuth, capi.UsageClientAuth,
} }
func hasExactUsages(csr *certificates.CertificateSigningRequest, usages []certificates.KeyUsage) bool { func hasExactUsages(csr *capi.CertificateSigningRequest, usages []capi.KeyUsage) bool {
if len(usages) != len(csr.Spec.Usages) { if len(usages) != len(csr.Spec.Usages) {
return false return false
} }
usageMap := map[certificates.KeyUsage]struct{}{} usageMap := map[capi.KeyUsage]struct{}{}
for _, u := range usages { for _, u := range usages {
usageMap[u] = struct{}{} usageMap[u] = struct{}{}
} }

View File

@ -14,17 +14,17 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package certificates package approver
import ( import (
"testing" "testing"
certificates "k8s.io/kubernetes/pkg/apis/certificates/v1beta1" api "k8s.io/kubernetes/pkg/apis/certificates/v1beta1"
) )
func TestHasKubeletUsages(t *testing.T) { func TestHasKubeletUsages(t *testing.T) {
cases := []struct { cases := []struct {
usages []certificates.KeyUsage usages []api.KeyUsage
expected bool expected bool
}{ }{
{ {
@ -32,36 +32,36 @@ func TestHasKubeletUsages(t *testing.T) {
expected: false, expected: false,
}, },
{ {
usages: []certificates.KeyUsage{}, usages: []api.KeyUsage{},
expected: false, expected: false,
}, },
{ {
usages: []certificates.KeyUsage{ usages: []api.KeyUsage{
certificates.UsageKeyEncipherment, api.UsageKeyEncipherment,
certificates.UsageDigitalSignature, api.UsageDigitalSignature,
}, },
expected: false, expected: false,
}, },
{ {
usages: []certificates.KeyUsage{ usages: []api.KeyUsage{
certificates.UsageKeyEncipherment, api.UsageKeyEncipherment,
certificates.UsageDigitalSignature, api.UsageDigitalSignature,
certificates.UsageServerAuth, api.UsageServerAuth,
}, },
expected: false, expected: false,
}, },
{ {
usages: []certificates.KeyUsage{ usages: []api.KeyUsage{
certificates.UsageKeyEncipherment, api.UsageKeyEncipherment,
certificates.UsageDigitalSignature, api.UsageDigitalSignature,
certificates.UsageClientAuth, api.UsageClientAuth,
}, },
expected: true, expected: true,
}, },
} }
for _, c := range cases { for _, c := range cases {
if hasExactUsages(&certificates.CertificateSigningRequest{ if hasExactUsages(&api.CertificateSigningRequest{
Spec: certificates.CertificateSigningRequestSpec{ Spec: api.CertificateSigningRequestSpec{
Usages: c.usages, Usages: c.usages,
}, },
}, kubeletClientUsages) != c.expected { }, kubeletClientUsages) != c.expected {

View File

@ -14,6 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
// Package certificates implements an abstract controller that is useful for
// building controllers that manage CSRs
package certificates package certificates
import ( import (
@ -37,33 +39,22 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
// err returned from these interfaces should indicate utter failure that
// should be retried. "Buisness logic" errors should be indicated by adding
// a condition to the CSRs status, not by returning an error.
type AutoApprover interface {
AutoApprove(csr *certificates.CertificateSigningRequest) (*certificates.CertificateSigningRequest, error)
}
type Signer interface {
Sign(csr *certificates.CertificateSigningRequest) (*certificates.CertificateSigningRequest, error)
}
type CertificateController struct { type CertificateController struct {
kubeClient clientset.Interface kubeClient clientset.Interface
csrLister certificateslisters.CertificateSigningRequestLister csrLister certificateslisters.CertificateSigningRequestLister
csrsSynced cache.InformerSynced csrsSynced cache.InformerSynced
syncHandler func(csrKey string) error handler func(*certificates.CertificateSigningRequest) error
approver AutoApprover
signer Signer
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
} }
func NewCertificateController(kubeClient clientset.Interface, csrInformer certificatesinformers.CertificateSigningRequestInformer, signer Signer, approver AutoApprover) (*CertificateController, error) { func NewCertificateController(
kubeClient clientset.Interface,
csrInformer certificatesinformers.CertificateSigningRequestInformer,
handler func(*certificates.CertificateSigningRequest) error,
) (*CertificateController, error) {
// Send events to the apiserver // Send events to the apiserver
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartLogging(glog.Infof)
@ -72,8 +63,7 @@ func NewCertificateController(kubeClient clientset.Interface, csrInformer certif
cc := &CertificateController{ cc := &CertificateController{
kubeClient: kubeClient, kubeClient: kubeClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "certificate"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "certificate"),
signer: signer, handler: handler,
approver: approver,
} }
// Manage the addition/update of certificate requests // Manage the addition/update of certificate requests
@ -108,7 +98,7 @@ func NewCertificateController(kubeClient clientset.Interface, csrInformer certif
}) })
cc.csrLister = csrInformer.Lister() cc.csrLister = csrInformer.Lister()
cc.csrsSynced = csrInformer.Informer().HasSynced cc.csrsSynced = csrInformer.Informer().HasSynced
cc.syncHandler = cc.maybeSignCertificate cc.handler = handler
return cc, nil return cc, nil
} }
@ -145,17 +135,17 @@ func (cc *CertificateController) processNextWorkItem() bool {
} }
defer cc.queue.Done(cKey) defer cc.queue.Done(cKey)
err := cc.syncHandler(cKey.(string)) if err := cc.syncFunc(cKey.(string)); err != nil {
if err == nil {
cc.queue.Forget(cKey)
return true
}
cc.queue.AddRateLimited(cKey) cc.queue.AddRateLimited(cKey)
utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err)) utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err))
return true return true
} }
cc.queue.Forget(cKey)
return true
}
func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) { func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) {
key, err := controller.KeyFunc(obj) key, err := controller.KeyFunc(obj)
if err != nil { if err != nil {
@ -169,7 +159,7 @@ func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) {
// been approved and meets policy expectations, generate an X509 cert using the // been approved and meets policy expectations, generate an X509 cert using the
// cluster CA assets. If successful it will update the CSR approve subresource // cluster CA assets. If successful it will update the CSR approve subresource
// with the signed certificate. // with the signed certificate.
func (cc *CertificateController) maybeSignCertificate(key string) error { func (cc *CertificateController) syncFunc(key string) error {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
glog.V(4).Infof("Finished syncing certificate request %q (%v)", key, time.Now().Sub(startTime)) glog.V(4).Infof("Finished syncing certificate request %q (%v)", key, time.Now().Sub(startTime))
@ -195,32 +185,5 @@ func (cc *CertificateController) maybeSignCertificate(key string) error {
} }
csr = copy.(*certificates.CertificateSigningRequest) csr = copy.(*certificates.CertificateSigningRequest)
if cc.approver != nil { return cc.handler(csr)
csr, err = cc.approver.AutoApprove(csr)
if err != nil {
return fmt.Errorf("error auto approving csr: %v", err)
}
_, err = cc.kubeClient.Certificates().CertificateSigningRequests().UpdateApproval(csr)
if err != nil {
return fmt.Errorf("error updating approval for csr: %v", err)
}
}
// At this point, the controller needs to:
// 1. Check the approval conditions
// 2. Generate a signed certificate
// 3. Update the Status subresource
if cc.signer != nil && IsCertificateRequestApproved(csr) {
csr, err := cc.signer.Sign(csr)
if err != nil {
return fmt.Errorf("error auto signing csr: %v", err)
}
_, err = cc.kubeClient.Certificates().CertificateSigningRequests().UpdateStatus(csr)
if err != nil {
return fmt.Errorf("error updating signature for csr: %v", err)
}
}
return nil
} }

View File

@ -17,209 +17,70 @@ limitations under the License.
package certificates package certificates
import ( import (
"bytes"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"io/ioutil"
"os"
"testing" "testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/cert"
"k8s.io/client-go/util/cert/triple"
certificates "k8s.io/kubernetes/pkg/apis/certificates/v1beta1" certificates "k8s.io/kubernetes/pkg/apis/certificates/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
) )
type testController struct {
*CertificateController
certFile string
keyFile string
csrStore cache.Store
informerFactory informers.SharedInformerFactory
approver *fakeAutoApprover
}
func alwaysReady() bool { return true }
func newController(csrs ...runtime.Object) (*testController, error) {
client := fake.NewSimpleClientset(csrs...)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
certFile, keyFile, err := createTestCertFiles()
if err != nil {
return nil, err
}
signer, err := NewCFSSLSigner(certFile, keyFile)
if err != nil {
return nil, err
}
approver := &fakeAutoApprover{make(chan *certificates.CertificateSigningRequest, 1)}
controller, err := NewCertificateController(
client,
informerFactory.Certificates().V1beta1().CertificateSigningRequests(),
signer,
approver,
)
if err != nil {
return nil, err
}
controller.csrsSynced = alwaysReady
return &testController{
controller,
certFile,
keyFile,
informerFactory.Certificates().V1beta1().CertificateSigningRequests().Informer().GetStore(),
informerFactory,
approver,
}, nil
}
func (c *testController) cleanup() {
os.Remove(c.certFile)
os.Remove(c.keyFile)
}
func createTestCertFiles() (string, string, error) {
keyPair, err := triple.NewCA("test-ca")
if err != nil {
return "", "", err
}
// Generate cert
certBuffer := bytes.Buffer{}
if err := pem.Encode(&certBuffer, &pem.Block{Type: "CERTIFICATE", Bytes: keyPair.Cert.Raw}); err != nil {
return "", "", err
}
// Generate key
keyBuffer := bytes.Buffer{}
if err := pem.Encode(&keyBuffer, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(keyPair.Key)}); err != nil {
return "", "", err
}
dir, err := ioutil.TempDir("", "")
if err != nil {
return "", "", err
}
certFile, err := ioutil.TempFile(dir, "cert")
if err != nil {
return "", "", err
}
keyFile, err := ioutil.TempFile(dir, "key")
if err != nil {
return "", "", err
}
_, err = certFile.Write(certBuffer.Bytes())
if err != nil {
return "", "", err
}
certFile.Close()
_, err = keyFile.Write(keyBuffer.Bytes())
if err != nil {
return "", "", err
}
keyFile.Close()
return certFile.Name(), keyFile.Name(), nil
}
type fakeAutoApprover struct {
csr chan *certificates.CertificateSigningRequest
}
func (f *fakeAutoApprover) AutoApprove(csr *certificates.CertificateSigningRequest) (*certificates.CertificateSigningRequest, error) {
csr.Status.Conditions = append(csr.Status.Conditions, certificates.CertificateSigningRequestCondition{
Type: certificates.CertificateApproved,
Reason: "test reason",
Message: "test message",
})
f.csr <- csr
return csr, nil
}
// TODO flesh this out to cover things like not being able to find the csr in the cache, not // TODO flesh this out to cover things like not being able to find the csr in the cache, not
// auto-approving, etc. // auto-approving, etc.
func TestCertificateController(t *testing.T) { func TestCertificateController(t *testing.T) {
csrKey, err := cert.NewPrivateKey()
if err != nil {
t.Fatalf("error creating private key for csr: %v", err)
}
subject := &pkix.Name{
Organization: []string{"test org"},
CommonName: "test cn",
}
csrBytes, err := cert.MakeCSR(csrKey, subject, nil, nil)
if err != nil {
t.Fatalf("error creating csr: %v", err)
}
csr := &certificates.CertificateSigningRequest{ csr := &certificates.CertificateSigningRequest{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "test-csr", Name: "test-csr",
}, },
Spec: certificates.CertificateSigningRequestSpec{
Request: csrBytes,
Usages: []certificates.KeyUsage{
certificates.UsageDigitalSignature,
certificates.UsageKeyEncipherment,
certificates.UsageClientAuth,
},
},
} }
controller, err := newController(csr) client := fake.NewSimpleClientset(csr)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
handler := func(csr *certificates.CertificateSigningRequest) error {
csr.Status.Conditions = append(csr.Status.Conditions, certificates.CertificateSigningRequestCondition{
Type: certificates.CertificateApproved,
Reason: "test reason",
Message: "test message",
})
_, err := client.Certificates().CertificateSigningRequests().UpdateApproval(csr)
if err != nil {
return err
}
return nil
}
controller, err := NewCertificateController(
client,
informerFactory.Certificates().V1beta1().CertificateSigningRequests(),
handler,
)
if err != nil { if err != nil {
t.Fatalf("error creating controller: %v", err) t.Fatalf("error creating controller: %v", err)
} }
defer controller.cleanup() controller.csrsSynced = func() bool { return true }
received := make(chan struct{})
controllerSyncHandler := controller.syncHandler
controller.syncHandler = func(key string) error {
defer close(received)
return controllerSyncHandler(key)
}
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
go controller.Run(1, stopCh) go informerFactory.Start(stopCh)
go controller.informerFactory.Start(stopCh)
select { controller.processNextWorkItem()
case <-received:
case <-time.After(wait.ForeverTestTimeout): actions := client.Actions()
t.Errorf("timed out") if len(actions) != 3 {
t.Errorf("expected 3 actions")
}
if a := actions[0]; !a.Matches("list", "certificatesigningrequests") {
t.Errorf("unexpected action: %#v", a)
}
if a := actions[1]; !a.Matches("watch", "certificatesigningrequests") {
t.Errorf("unexpected action: %#v", a)
}
if a := actions[2]; !a.Matches("update", "certificatesigningrequests") ||
a.GetSubresource() != "approval" {
t.Errorf("unexpected action: %#v", a)
} }
csr = <-controller.approver.csr
if e, a := 1, len(csr.Status.Conditions); e != a {
t.Fatalf("expected %d status condition, got %d", e, a)
}
if e, a := certificates.CertificateApproved, csr.Status.Conditions[0].Type; e != a {
t.Errorf("type: expected %v, got %v", e, a)
}
if e, a := "test reason", csr.Status.Conditions[0].Reason; e != a {
t.Errorf("reason: expected %v, got %v", e, a)
}
if e, a := "test message", csr.Status.Conditions[0].Message; e != a {
t.Errorf("message: expected %v, got %v", e, a)
}
} }

View File

@ -21,11 +21,11 @@ import certificates "k8s.io/kubernetes/pkg/apis/certificates/v1beta1"
// IsCertificateRequestApproved returns true if a certificate request has the // IsCertificateRequestApproved returns true if a certificate request has the
// "Approved" condition and no "Denied" conditions; false otherwise. // "Approved" condition and no "Denied" conditions; false otherwise.
func IsCertificateRequestApproved(csr *certificates.CertificateSigningRequest) bool { func IsCertificateRequestApproved(csr *certificates.CertificateSigningRequest) bool {
approved, denied := getCertApprovalCondition(&csr.Status) approved, denied := GetCertApprovalCondition(&csr.Status)
return approved && !denied return approved && !denied
} }
func getCertApprovalCondition(status *certificates.CertificateSigningRequestStatus) (approved bool, denied bool) { func GetCertApprovalCondition(status *certificates.CertificateSigningRequestStatus) (approved bool, denied bool) {
for _, c := range status.Conditions { for _, c := range status.Conditions {
if c.Type == certificates.CertificateApproved { if c.Type == certificates.CertificateApproved {
approved = true approved = true

View File

@ -1,19 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package certificates contains logic for watching and synchronizing
// CertificateSigningRequests.
package certificates

View File

@ -0,0 +1,54 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["cfssl_signer_test.go"],
data = [
"testdata/ca.crt",
"testdata/ca.key",
"testdata/kubelet.csr",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/apis/certificates/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/util/cert:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = ["cfssl_signer.go"],
tags = ["automanaged"],
deps = [
"//pkg/apis/certificates/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/certificates/v1beta1:go_default_library",
"//pkg/controller/certificates:go_default_library",
"//vendor/github.com/cloudflare/cfssl/config:go_default_library",
"//vendor/github.com/cloudflare/cfssl/helpers:go_default_library",
"//vendor/github.com/cloudflare/cfssl/signer:go_default_library",
"//vendor/github.com/cloudflare/cfssl/signer/local:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -14,7 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package certificates // Package signer implements a CA signer that uses keys stored on local disk.
package signer
import ( import (
"crypto" "crypto"
@ -23,7 +24,10 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
certificates "k8s.io/kubernetes/pkg/apis/certificates/v1beta1" capi "k8s.io/kubernetes/pkg/apis/certificates/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
certificatesinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/certificates/v1beta1"
"k8s.io/kubernetes/pkg/controller/certificates"
"github.com/cloudflare/cfssl/config" "github.com/cloudflare/cfssl/config"
"github.com/cloudflare/cfssl/helpers" "github.com/cloudflare/cfssl/helpers"
@ -39,13 +43,30 @@ var onlySigningPolicy = &config.Signing{
}, },
} }
type CFSSLSigner struct { func NewCSRSigningController(
client clientset.Interface,
csrInformer certificatesinformers.CertificateSigningRequestInformer,
caFile, caKeyFile string,
) (*certificates.CertificateController, error) {
signer, err := newCFSSLSigner(caFile, caKeyFile, client)
if err != nil {
return nil, err
}
return certificates.NewCertificateController(
client,
csrInformer,
signer.handle,
)
}
type cfsslSigner struct {
ca *x509.Certificate ca *x509.Certificate
priv crypto.Signer priv crypto.Signer
sigAlgo x509.SignatureAlgorithm sigAlgo x509.SignatureAlgorithm
client clientset.Interface
} }
func NewCFSSLSigner(caFile, caKeyFile string) (*CFSSLSigner, error) { func newCFSSLSigner(caFile, caKeyFile string, client clientset.Interface) (*cfsslSigner, error) {
ca, err := ioutil.ReadFile(caFile) ca, err := ioutil.ReadFile(caFile)
if err != nil { if err != nil {
return nil, err return nil, err
@ -70,14 +91,30 @@ func NewCFSSLSigner(caFile, caKeyFile string) (*CFSSLSigner, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("Malformed private key %v", err) return nil, fmt.Errorf("Malformed private key %v", err)
} }
return &CFSSLSigner{ return &cfsslSigner{
priv: priv, priv: priv,
ca: parsedCa, ca: parsedCa,
sigAlgo: signer.DefaultSigAlgo(priv), sigAlgo: signer.DefaultSigAlgo(priv),
client: client,
}, nil }, nil
} }
func (cs *CFSSLSigner) Sign(csr *certificates.CertificateSigningRequest) (*certificates.CertificateSigningRequest, error) { func (s *cfsslSigner) handle(csr *capi.CertificateSigningRequest) error {
if !certificates.IsCertificateRequestApproved(csr) {
return nil
}
csr, err := s.sign(csr)
if err != nil {
return fmt.Errorf("error auto signing csr: %v", err)
}
_, err = s.client.Certificates().CertificateSigningRequests().UpdateStatus(csr)
if err != nil {
return fmt.Errorf("error updating signature for csr: %v", err)
}
return nil
}
func (s *cfsslSigner) sign(csr *capi.CertificateSigningRequest) (*capi.CertificateSigningRequest, error) {
var usages []string var usages []string
for _, usage := range csr.Spec.Usages { for _, usage := range csr.Spec.Usages {
usages = append(usages, string(usage)) usages = append(usages, string(usage))
@ -89,12 +126,12 @@ func (cs *CFSSLSigner) Sign(csr *certificates.CertificateSigningRequest) (*certi
ExpiryString: "8760h", ExpiryString: "8760h",
}, },
} }
s, err := local.NewSigner(cs.priv, cs.ca, cs.sigAlgo, policy) cfs, err := local.NewSigner(s.priv, s.ca, s.sigAlgo, policy)
if err != nil { if err != nil {
return nil, err return nil, err
} }
csr.Status.Certificate, err = s.Sign(signer.SignRequest{ csr.Status.Certificate, err = cfs.Sign(signer.SignRequest{
Request: string(csr.Spec.Request), Request: string(csr.Spec.Request),
}) })
if err != nil { if err != nil {

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package certificates package signer
import ( import (
"crypto/x509" "crypto/x509"
@ -27,7 +27,7 @@ import (
) )
func TestSigner(t *testing.T) { func TestSigner(t *testing.T) {
s, err := NewCFSSLSigner("./testdata/ca.crt", "./testdata/ca.key") s, err := newCFSSLSigner("./testdata/ca.crt", "./testdata/ca.key", nil)
if err != nil { if err != nil {
t.Fatalf("failed to create signer: %v", err) t.Fatalf("failed to create signer: %v", err)
} }
@ -49,7 +49,7 @@ func TestSigner(t *testing.T) {
}, },
} }
csr, err = s.Sign(csr) csr, err = s.sign(csr)
if err != nil { if err != nil {
t.Fatalf("failed to sign CSR: %v", err) t.Fatalf("failed to sign CSR: %v", err)
} }