From 703ba5cde7946249ed82aa61162374c05f7f5357 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Wed, 23 Sep 2020 23:29:25 -0700 Subject: [PATCH] Add a bunch of doc comments Also change identical error messages to clarify where problems are occurring. Signed-off-by: Brad Davidson --- pkg/clientaccess/clientaccess.go | 20 ++++++++++++++++++++ pkg/cluster/bootstrap.go | 12 ++++++++++++ pkg/cluster/cluster.go | 21 +++++++++++++++++++-- pkg/cluster/encrypt.go | 7 +++++++ pkg/cluster/https.go | 11 +++++++++++ pkg/cluster/managed.go | 12 +++++++++++- pkg/cluster/router.go | 4 ++++ pkg/cluster/storage.go | 6 ++++++ pkg/etcd/etcd.go | 2 +- 9 files changed, 91 insertions(+), 4 deletions(-) diff --git a/pkg/clientaccess/clientaccess.go b/pkg/clientaccess/clientaccess.go index 21a5c37a23..7c5239dae6 100644 --- a/pkg/clientaccess/clientaccess.go +++ b/pkg/clientaccess/clientaccess.go @@ -165,11 +165,14 @@ func validateCACerts(cacerts []byte, hash string) (bool, string, string) { return hash == newHash, hash, newHash } +// hashCA returns the hex-encoded SHA256 digest of a byte array. func hashCA(cacerts []byte) string { digest := sha256.Sum256(cacerts) return hex.EncodeToString(digest[:]) } +// ParseUsernamePassword returns the username and password portion of a token string, +// along with a bool indicating if the token was successfully parsed. func ParseUsernamePassword(token string) (string, string, bool) { parsed, err := parseToken(token) if err != nil { @@ -205,6 +208,10 @@ func parseToken(token string) (clientToken, error) { return result, nil } +// GetHTTPClient returns a http client that validates TLS server certificates using the provided CA bundle. +// If the CA bundle is empty, it validates using the default http client using the OS CA bundle. +// If the CA bundle is not empty but does not contain any valid certs, it validates using +// an empty CA bundle (which will always fail). func GetHTTPClient(cacerts []byte) *http.Client { if len(cacerts) == 0 { return http.DefaultClient @@ -223,6 +230,7 @@ func GetHTTPClient(cacerts []byte) *http.Client { } } +// Get makes a request to a subpath of info's BaseURL func Get(path string, info *Info) ([]byte, error) { u, err := url.Parse(info.URL) if err != nil { @@ -232,20 +240,30 @@ func Get(path string, info *Info) ([]byte, error) { return get(u.String(), GetHTTPClient(info.CACerts), info.username, info.password) } +// getCACerts retrieves the CA bundle from a server. +// An error is raised if the CA bundle cannot be retrieved, +// or if the server's cert is not signed by the returned bundle. func GetCACerts(u url.URL) ([]byte, error) { u.Path = "/cacerts" url := u.String() + // This first request is expected to fail. If the server has + // a cert that can be validated using the default CA bundle, return + // success with no CA certs. _, err := get(url, http.DefaultClient, "", "") if err == nil { return nil, nil } + // Download the CA bundle using a client that does not validate certs. cacerts, err := get(url, insecureClient, "", "") if err != nil { return nil, errors.Wrapf(err, "failed to get CA certs at %s", url) } + // Request the CA bundle again, validating that the CA bundle can be loaded + // and used to validate the server certificate. This should only fail if we somehow + // get an empty CA bundle. or if the dynamiclistener cert is incorrectly signed. _, err = get(url, GetHTTPClient(cacerts), "", "") if err != nil { return nil, errors.Wrapf(err, "server %s is not trusted", url) @@ -254,6 +272,8 @@ func GetCACerts(u url.URL) ([]byte, error) { return cacerts, nil } +// get makes a request to a url using a provided client, username, and password, +// returning the response body. func get(u string, client *http.Client, username, password string) ([]byte, error) { req, err := http.NewRequest(http.MethodGet, u, nil) if err != nil { diff --git a/pkg/cluster/bootstrap.go b/pkg/cluster/bootstrap.go index 9acf9cfe25..ca9e4e093f 100644 --- a/pkg/cluster/bootstrap.go +++ b/pkg/cluster/bootstrap.go @@ -13,6 +13,9 @@ import ( "github.com/sirupsen/logrus" ) +// Bootstrap attempts to load a managed database driver, if one has been initialized or should be created/joined. +// It then checks to see if the cluster needs to load boostrap data, and if so, loads data into the +// ControlRuntimeBoostrap struct, either via HTTP or from the datastore. func (c *Cluster) Bootstrap(ctx context.Context) error { if err := c.assignManagedDriver(ctx); err != nil { return err @@ -88,6 +91,7 @@ func (c *Cluster) shouldBootstrapLoad(ctx context.Context) (bool, error) { return true, nil } +// bootstrapped touches a file to indicate that bootstrap has been completed. func (c *Cluster) bootstrapped() error { if err := os.MkdirAll(filepath.Dir(c.bootstrapStamp()), 0700); err != nil { return err @@ -105,6 +109,9 @@ func (c *Cluster) bootstrapped() error { return f.Close() } +// httpBootstrap retrieves bootstrap data (certs and keys, etc) from the remote server via HTTP +// and loads it into the ControlRuntimeBootstrap struct. Unlike the storage bootstrap path, +// this data does not need to be decrypted since it is generated on-demand by an existing server. func (c *Cluster) httpBootstrap() error { content, err := clientaccess.Get("/v1-"+version.Program+"/server-bootstrap", c.clientAccessInfo) if err != nil { @@ -114,9 +121,11 @@ func (c *Cluster) httpBootstrap() error { return bootstrap.Read(bytes.NewBuffer(content), &c.runtime.ControlRuntimeBootstrap) } +// bootstrap performs cluster bootstrapping, either via HTTP (for managed databases) or direct load from datastore. func (c *Cluster) bootstrap(ctx context.Context) error { c.joining = true + // bootstrap managed database via HTTP if c.runtime.HTTPBootstrap { return c.httpBootstrap() } @@ -125,6 +134,9 @@ func (c *Cluster) bootstrap(ctx context.Context) error { return c.storageBootstrap(ctx) } +// bootstrapStamp returns the path to a file in datadir/db that is used to record +// that a cluster has been joined. The filename is based on a portion of the sha256 hash of the token. +// We hash the token value exactly as it is provided by the user, NOT the normalized version. func (c *Cluster) bootstrapStamp() string { return filepath.Join(c.config.DataDir, "db/joined-"+keyHash(c.config.Token)) } diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 09e4398ef1..b34e16d716 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -25,26 +25,34 @@ type Cluster struct { storageClient client.Client } +// Start creates the dynamic tls listener, http request handler, +// handles starting and writing/reading bootstrap data, and returns a channel +// that will be closed when datastore is ready. func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) { + // Set up the dynamiclistener and http request handlers if err := c.initClusterAndHTTPS(ctx); err != nil { - return nil, errors.Wrap(err, "start cluster and https") + return nil, errors.Wrap(err, "init cluster datastore and https") } + // start managed database (if necessary) if err := c.start(ctx); err != nil { - return nil, errors.Wrap(err, "start cluster and https") + return nil, errors.Wrap(err, "start managed database") } + // get the wait channel for testing managed database readiness ready, err := c.testClusterDB(ctx) if err != nil { return nil, err } + // if necessary, store bootstrap data to datastore if c.saveBootstrap { if err := c.save(ctx); err != nil { return nil, err } } + // if necessary, record successful bootstrap if c.shouldBootstrap { if err := c.bootstrapped(); err != nil { return nil, err @@ -54,17 +62,25 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) { return ready, c.startStorage(ctx) } +// startStorage starts the kine listener and configures the endpoints, if necessary. +// This calls into the kine endpoint code, which sets up the database client +// and unix domain socket listener if using an external database. In the case of an etcd +// backend it just returns the user-provided etcd endpoints and tls config. func (c *Cluster) startStorage(ctx context.Context) error { if c.storageStarted { return nil } c.storageStarted = true + // start listening on the kine socket as an etcd endpoint, or return the external etcd endpoints etcdConfig, err := endpoint.Listen(ctx, c.config.Datastore) if err != nil { return errors.Wrap(err, "creating storage endpoint") } + // Persist the returned etcd configuration. We decide if we're doing leader election for embedded controllers + // based on what the kine wrapper tells us about the datastore. Single-node datastores like sqlite don't require + // leader election, while basically all others (etcd, external database, etc) do since they allow multiple servers. c.etcdConfig = etcdConfig c.config.Datastore.Config = etcdConfig.TLSConfig c.config.Datastore.Endpoint = strings.Join(etcdConfig.Endpoints, ",") @@ -72,6 +88,7 @@ func (c *Cluster) startStorage(ctx context.Context) error { return nil } +// New creates an initial cluster using the provided configuration func New(config *config.Control) *Cluster { return &Cluster{ config: config, diff --git a/pkg/cluster/encrypt.go b/pkg/cluster/encrypt.go index be44ff2363..5dd2b51a6a 100644 --- a/pkg/cluster/encrypt.go +++ b/pkg/cluster/encrypt.go @@ -16,18 +16,23 @@ import ( "golang.org/x/crypto/pbkdf2" ) +// storageKey returns the etcd key for storing bootstrap data for a given passphrase. +// The key is derived from the sha256 hash of the passphrase. func storageKey(passphrase string) string { d := sha256.New() d.Write([]byte(passphrase)) return "/bootstrap/" + hex.EncodeToString(d.Sum(nil)[:])[:12] } +// keyHash returns the first 12 characters of the sha256 sum of the passphrase. func keyHash(passphrase string) string { d := sha256.New() d.Write([]byte(passphrase)) return hex.EncodeToString(d.Sum(nil)[:])[:12] } +// encrypt encrypts a byte slice using aes+gcm with a pbkdf2 key derived from the passphrase and a random salt. +// It returns a byte slice containing the salt and base64-encoded cyphertext. func encrypt(passphrase string, plaintext []byte) ([]byte, error) { salt, err := token.Random(8) if err != nil { @@ -55,6 +60,8 @@ func encrypt(passphrase string, plaintext []byte) ([]byte, error) { return []byte(salt + ":" + base64.StdEncoding.EncodeToString(sealed)), nil } +// decrypt attempts to decrypt the byte slice using the supplied passphrase. +// The input byte slice should be the cyphertext output from the encrypt function. func decrypt(passphrase string, ciphertext []byte) ([]byte, error) { parts := strings.SplitN(string(ciphertext), ":", 2) if len(parts) != 2 { diff --git a/pkg/cluster/https.go b/pkg/cluster/https.go index a03b65038b..33cd29a577 100644 --- a/pkg/cluster/https.go +++ b/pkg/cluster/https.go @@ -20,6 +20,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// newListener returns a new TCP listener and HTTP reqest handler using dynamiclistener. +// dynamiclistener will use the cluster's Server CA to sign the dynamically generate certificate, +// and will sync the certs into the Kubernetes datastore, with a local disk cache. func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler, error) { tcp, err := dynamiclistener.NewTCPListener(c.config.BindAddress, c.config.SupervisorPort) if err != nil { @@ -45,12 +48,15 @@ func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler, }) } +// initClusterAndHTTPS sets up the dynamic tls listener, request router, +// and cluster database. Once the database is up, it starts the supervisor http server. func (c *Cluster) initClusterAndHTTPS(ctx context.Context) error { l, handler, err := c.newListener(ctx) if err != nil { return err } + // Get the base request handler handler, err = c.getHandler(handler) if err != nil { return err @@ -62,16 +68,19 @@ func (c *Cluster) initClusterAndHTTPS(ctx context.Context) error { return err } + // Create a HTTP server with the registered request handlers, using logrus for logging server := http.Server{ Handler: handler, ErrorLog: log.New(logrus.StandardLogger().Writer(), "Cluster-Http-Server ", log.LstdFlags), } + // Start the supervisor http server on the tls listener go func() { err := server.Serve(l) logrus.Fatalf("server stopped: %v", err) }() + // Shutdown the http server when the context is closed go func() { <-ctx.Done() server.Shutdown(context.Background()) @@ -80,6 +89,8 @@ func (c *Cluster) initClusterAndHTTPS(ctx context.Context) error { return nil } +// tlsStorage creates an in-memory cache for dynamiclistener's certificate, backed by a file on disk +// and the Kubernetes datastore. func tlsStorage(ctx context.Context, dataDir string, runtime *config.ControlRuntime) dynamiclistener.TLSStorage { fileStorage := file.New(filepath.Join(dataDir, "tls/dynamic-cert.json")) cache := memory.NewBacked(fileStorage) diff --git a/pkg/cluster/managed.go b/pkg/cluster/managed.go index 47d33a0a5a..88c1b354b2 100644 --- a/pkg/cluster/managed.go +++ b/pkg/cluster/managed.go @@ -1,5 +1,8 @@ package cluster +// A managed database is one whose lifecycle we control - initializing the cluster, adding/removing members, taking snapshots, etc. +// This is currently just used for the embedded etcd datastore. Kine and other external etcd clusters are NOT considered managed. + import ( "context" "net" @@ -12,6 +15,8 @@ import ( "github.com/sirupsen/logrus" ) +// testClusterDB returns a channel that will be closed when the datastore connection is available. +// The datastore is tested for readiness every 5 seconds until the test succeeds. func (c *Cluster) testClusterDB(ctx context.Context) (<-chan struct{}, error) { result := make(chan struct{}) if c.managedDB == nil { @@ -40,6 +45,8 @@ func (c *Cluster) testClusterDB(ctx context.Context) (<-chan struct{}, error) { return result, nil } +// start starts the database, unless a cluster reset has been requested, in which case +// it does that instead. func (c *Cluster) start(ctx context.Context) error { if c.managedDB == nil { return nil @@ -61,7 +68,10 @@ func (c *Cluster) initClusterDB(ctx context.Context, handler http.Handler) (http return c.managedDB.Register(ctx, c.config, handler) } +// assignManagedDriver checks to see if any managed databases are already configured or should be created/joined. +// If a driver has been initialized it is used, otherwise we create or join a cluster using the default driver. func (c *Cluster) assignManagedDriver(ctx context.Context) error { + // Check all managed drivers for an initialized database on disk; use one if found for _, driver := range managed.Registered() { if ok, err := driver.IsInitialized(ctx, c.config); err != nil { return err @@ -71,7 +81,7 @@ func (c *Cluster) assignManagedDriver(ctx context.Context) error { } } - + // If we have been asked to initialize or join a cluster, do so using the default managed database. if c.config.Datastore.Endpoint == "" && (c.config.ClusterInit || (c.config.Token != "" && c.config.JoinURL != "")) { for _, driver := range managed.Registered() { if driver.EndpointName() == managed.Default() { diff --git a/pkg/cluster/router.go b/pkg/cluster/router.go index 48b14b94ee..2a894274e4 100644 --- a/pkg/cluster/router.go +++ b/pkg/cluster/router.go @@ -4,6 +4,8 @@ import ( "net/http" ) +// getHandler returns a basic request handler that processes requests through +// the cluster's request router chain. func (c *Cluster) getHandler(handler http.Handler) (http.Handler, error) { next := c.router() @@ -13,6 +15,8 @@ func (c *Cluster) getHandler(handler http.Handler) (http.Handler, error) { }), nil } +// router is a stub request router that returns a Service Unavailable response +// if no additional handlers are available. func (c *Cluster) router() http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { if c.runtime.Handler == nil { diff --git a/pkg/cluster/storage.go b/pkg/cluster/storage.go index 51e8aac967..e01a25cf9e 100644 --- a/pkg/cluster/storage.go +++ b/pkg/cluster/storage.go @@ -8,6 +8,10 @@ import ( "github.com/rancher/kine/pkg/client" ) +// save writes the current ControlRuntimeBootstrap data to the datastore. This contains a complete +// snapshot of the cluster's CA certs and keys, encryption passphrases, etc - encrypted with the join token. +// This is used when bootstrapping a cluster from a managed database or external etcd cluster. +// This is NOT used with embedded etcd, which bootstraps over HTTP. func (c *Cluster) save(ctx context.Context) error { buf := &bytes.Buffer{} if err := bootstrap.Write(buf, &c.runtime.ControlRuntimeBootstrap); err != nil { @@ -22,6 +26,8 @@ func (c *Cluster) save(ctx context.Context) error { return c.storageClient.Create(ctx, storageKey(c.config.Token), data) } +// storageBootstrap loads data from the datastore into the ControlRuntimeBootstrap struct. +// The storage key and encryption passphrase are both derived from the join token. func (c *Cluster) storageBootstrap(ctx context.Context) error { if err := c.startStorage(ctx); err != nil { return err diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 160a0efe71..3aba89e285 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -327,7 +327,7 @@ func (e *ETCD) setName(force bool) error { return nil } -// handler handles request routing for the base http listener +// handler wraps the handler with routes for database info func (e *ETCD) handler(next http.Handler) http.Handler { mux := mux.NewRouter() mux.Handle("/db/info", e.infoHandler())