certificates: implement certificates controller

pull/6/head
George Tankersley 2016-05-15 19:18:18 -07:00
parent 1c5af3e72f
commit 803c7ac299
8 changed files with 314 additions and 0 deletions

View File

@ -46,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
certcontroller "k8s.io/kubernetes/pkg/controller/certificates"
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
@ -423,6 +424,29 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
groupVersion = "certificates/v1alpha1"
resources, found = resourceMap[groupVersion]
glog.Infof("Attempting to start certificates, full resource map %+v", resourceMap)
if containsVersion(versions, groupVersion) && found {
glog.Infof("Starting %s apis", groupVersion)
if containsResource(resources, "certificatesigningrequests") {
glog.Infof("Starting certificate request controller")
resyncPeriod := ResyncPeriod(s)()
certController, err := certcontroller.NewCertificateController(
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "certificate-controller")),
resyncPeriod,
s.ClusterSigningCertFile,
s.ClusterSigningKeyFile,
)
if err != nil {
glog.Errorf("Failed to start certificate controller: %v", err)
} else {
go certController.Run(1, wait.NeverStop)
}
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
}
var rootCA []byte
if s.RootCAFile != "" {

View File

@ -93,6 +93,8 @@ func NewCMServer() *CMServer {
ControllerStartInterval: unversioned.Duration{Duration: 0 * time.Second},
EnableGarbageCollector: false,
ConcurrentGCSyncs: 5,
ClusterSigningCertFile: "/etc/kubernetes/ca/ca.pem",
ClusterSigningKeyFile: "/etc/kubernetes/ca/ca.key",
},
}
return &s
@ -150,6 +152,8 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&s.NodeMonitorPeriod.Duration, "node-monitor-period", s.NodeMonitorPeriod.Duration,
"The period for syncing NodeStatus in NodeController.")
fs.StringVar(&s.ServiceAccountKeyFile, "service-account-private-key-file", s.ServiceAccountKeyFile, "Filename containing a PEM-encoded private RSA key used to sign service account tokens.")
fs.StringVar(&s.ClusterSigningCertFile, "cluster-signing-cert-file", s.ClusterSigningCertFile, "Filename containing a PEM-encoded X509 CA certificate used to issue cluster-scoped certificates")
fs.StringVar(&s.ClusterSigningKeyFile, "cluster-signing-key-file", s.ClusterSigningKeyFile, "Filename containing a PEM-encoded RSA or ECDSA private key used to sign cluster-scoped certificates")
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
fs.StringVar(&s.ClusterName, "cluster-name", s.ClusterName, "The instance prefix for the cluster")
fs.StringVar(&s.ClusterCIDR, "cluster-cidr", s.ClusterCIDR, "CIDR Range for Pods in cluster.")

View File

@ -61,6 +61,8 @@ cluster-domain
cluster-name
cluster-tag
cluster-monitor-period
cluster-signing-cert-file
cluster-signing-key-file
concurrent-deployment-syncs
concurrent-endpoint-syncs
concurrent-namespace-syncs

View File

@ -560,6 +560,12 @@ type KubeControllerManagerConfiguration struct {
// serviceAccountKeyFile is the filename containing a PEM-encoded private RSA key
// used to sign service account tokens.
ServiceAccountKeyFile string `json:"serviceAccountKeyFile"`
// clusterSigningCertFile is the filename containing a PEM-encoded
// X509 CA certificate used to issue cluster-scoped certificates
ClusterSigningCertFile string `json:"clusterSigningCertFile"`
// clusterSigningCertFile is the filename containing a PEM-encoded
// RSA or ECDSA private key used to issue cluster-scoped certificates
ClusterSigningKeyFile string `json:"clusterSigningKeyFile"`
// enableProfiling enables profiling via web interface host:port/debug/pprof/
EnableProfiling bool `json:"enableProfiling"`
// clusterName is the instance prefix for the cluster.

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/certificates"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/labels"
)
@ -670,3 +671,25 @@ func (s *StoreToPetSetLister) GetPodPetSets(pod *api.Pod) (psList []apps.PetSet,
}
return
}
// StoreToCertificateRequestLister gives a store List and Exists methods. The store must contain only CertificateRequests.
type StoreToCertificateRequestLister struct {
Store
}
// Exists checks if the given csr exists in the store.
func (s *StoreToCertificateRequestLister) Exists(csr *certificates.CertificateSigningRequest) (bool, error) {
_, exists, err := s.Store.Get(csr)
if err != nil {
return false, err
}
return exists, nil
}
// StoreToCertificateRequestLister lists all csrs in the store.
func (s *StoreToCertificateRequestLister) List() (csrs certificates.CertificateSigningRequestList, err error) {
for _, c := range s.Store.List() {
csrs.Items = append(csrs.Items, *(c.(*certificates.CertificateSigningRequest)))
}
return csrs, nil
}

View File

@ -0,0 +1,200 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package certificates
import (
"time"
"github.com/cloudflare/cfssl/config"
"github.com/cloudflare/cfssl/signer"
"github.com/cloudflare/cfssl/signer/local"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/certificates"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)
type CertificateController struct {
kubeClient clientset.Interface
// CSR framework and store
csrController *framework.Controller
csrStore cache.StoreToCertificateRequestLister
// To allow injection of updateCertificateRequestStatus for testing.
updateHandler func(csr *certificates.CertificateSigningRequest) error
syncHandler func(csrKey string) error
signer *local.Signer
queue *workqueue.Type
}
func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Duration, caCertFile, caKeyFile string) (*CertificateController, error) {
// Send events to the apiserver
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
// Configure cfssl signer
// TODO: support non-default policy and remote/pkcs11 signing
policy := &config.Signing{
Default: config.DefaultConfig(),
}
ca, err := local.NewSignerFromFile(caCertFile, caKeyFile, policy)
if err != nil {
return nil, err
}
cc := &CertificateController{
kubeClient: kubeClient,
queue: workqueue.New(),
signer: ca,
}
// Manage the addition/update of certificate requests
cc.csrStore.Store, cc.csrController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return cc.kubeClient.Certificates().CertificateSigningRequests().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return cc.kubeClient.Certificates().CertificateSigningRequests().Watch(options)
},
},
&certificates.CertificateSigningRequest{},
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
csr := obj.(*certificates.CertificateSigningRequest)
glog.V(4).Infof("Adding certificate request %s", csr.Name)
cc.enqueueCertificateRequest(obj)
},
UpdateFunc: func(old, new interface{}) {
oldCSR := old.(*certificates.CertificateSigningRequest)
glog.V(4).Infof("Updating certificate request %s", oldCSR.Name)
cc.enqueueCertificateRequest(new)
},
DeleteFunc: func(obj interface{}) {
csr := obj.(*certificates.CertificateSigningRequest)
glog.V(4).Infof("Deleting certificate request %s", csr.Name)
cc.enqueueCertificateRequest(obj)
},
},
)
cc.syncHandler = cc.maybeSignCertificate
return cc, nil
}
// Run the main goroutine responsible for watching and syncing jobs.
func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go cc.csrController.Run(stopCh)
glog.Infof("Starting certificate controller manager")
for i := 0; i < workers; i++ {
go wait.Until(cc.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down certificate controller")
cc.queue.ShutDown()
}
// worker runs a thread that dequeues CSRs, handles them, and marks them done.
func (cc *CertificateController) worker() {
for {
func() {
key, quit := cc.queue.Get()
if quit {
return
}
defer cc.queue.Done(key)
err := cc.syncHandler(key.(string))
if err != nil {
glog.Errorf("Error syncing CSR: %v", err)
}
}()
}
}
func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
cc.queue.Add(key)
}
func (cc *CertificateController) updateCertificateRequestStatus(csr *certificates.CertificateSigningRequest) error {
_, updateErr := cc.kubeClient.Certificates().CertificateSigningRequests().UpdateStatus(csr)
if updateErr == nil {
// success!
return nil
}
// retry on failure
cc.enqueueCertificateRequest(csr)
return updateErr
}
// maybeSignCertificate will inspect the certificate request and, if it has
// been approved and meets policy expectations, generate an X509 cert using the
// cluster CA assets. If successful it will update the CSR approve subresource
// with the signed certificate.
func (cc *CertificateController) maybeSignCertificate(key string) error {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing certificate request %q (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := cc.csrStore.Store.GetByKey(key)
if err != nil {
cc.queue.Add(key)
return err
}
if !exists {
glog.V(3).Infof("csr has been deleted: %v", key)
return nil
}
csr := obj.(*certificates.CertificateSigningRequest)
// At this point, the controller needs to:
// 1. Check the approval conditions
// 2. Generate a signed certificate
// 3. Update the Status subresource
if csr.Status.Certificate == nil && IsCertificateRequestApproved(csr) {
pemBytes := csr.Spec.Request
req := signer.SignRequest{Request: string(pemBytes)}
certBytes, err := cc.signer.Sign(req)
if err != nil {
return err
}
csr.Status.Certificate = certBytes
}
return cc.updateCertificateRequestStatus(csr)
}

View File

@ -0,0 +1,36 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package certificates
import "k8s.io/kubernetes/pkg/apis/certificates"
// IsCertificateRequestApproved returns true if a certificate request has the
// "Approved" condition and no "Denied" conditions; false otherwise.
func IsCertificateRequestApproved(csr *certificates.CertificateSigningRequest) bool {
status := csr.Status
var approved, denied bool
// TODO: incorporate timestamps
for _, c := range status.Conditions {
if c.Type == certificates.CertificateApproved {
approved = true
}
if c.Type == certificates.CertificateDenied {
denied = true
}
}
return approved && !denied
}

View File

@ -0,0 +1,19 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package certificates contains logic for watching and synchronizing
// CertificateSigningRequests.
package certificates