Add a bunch of doc comments

Also change identical error messages to clarify where problems are
occurring.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/2300/head
Brad Davidson 2020-09-23 23:29:25 -07:00
parent ae916c2dec
commit 703ba5cde7
9 changed files with 91 additions and 4 deletions

View File

@ -165,11 +165,14 @@ func validateCACerts(cacerts []byte, hash string) (bool, string, string) {
return hash == newHash, hash, newHash return hash == newHash, hash, newHash
} }
// hashCA returns the hex-encoded SHA256 digest of a byte array.
func hashCA(cacerts []byte) string { func hashCA(cacerts []byte) string {
digest := sha256.Sum256(cacerts) digest := sha256.Sum256(cacerts)
return hex.EncodeToString(digest[:]) return hex.EncodeToString(digest[:])
} }
// ParseUsernamePassword returns the username and password portion of a token string,
// along with a bool indicating if the token was successfully parsed.
func ParseUsernamePassword(token string) (string, string, bool) { func ParseUsernamePassword(token string) (string, string, bool) {
parsed, err := parseToken(token) parsed, err := parseToken(token)
if err != nil { if err != nil {
@ -205,6 +208,10 @@ func parseToken(token string) (clientToken, error) {
return result, nil return result, nil
} }
// GetHTTPClient returns a http client that validates TLS server certificates using the provided CA bundle.
// If the CA bundle is empty, it validates using the default http client using the OS CA bundle.
// If the CA bundle is not empty but does not contain any valid certs, it validates using
// an empty CA bundle (which will always fail).
func GetHTTPClient(cacerts []byte) *http.Client { func GetHTTPClient(cacerts []byte) *http.Client {
if len(cacerts) == 0 { if len(cacerts) == 0 {
return http.DefaultClient return http.DefaultClient
@ -223,6 +230,7 @@ func GetHTTPClient(cacerts []byte) *http.Client {
} }
} }
// Get makes a request to a subpath of info's BaseURL
func Get(path string, info *Info) ([]byte, error) { func Get(path string, info *Info) ([]byte, error) {
u, err := url.Parse(info.URL) u, err := url.Parse(info.URL)
if err != nil { if err != nil {
@ -232,20 +240,30 @@ func Get(path string, info *Info) ([]byte, error) {
return get(u.String(), GetHTTPClient(info.CACerts), info.username, info.password) return get(u.String(), GetHTTPClient(info.CACerts), info.username, info.password)
} }
// getCACerts retrieves the CA bundle from a server.
// An error is raised if the CA bundle cannot be retrieved,
// or if the server's cert is not signed by the returned bundle.
func GetCACerts(u url.URL) ([]byte, error) { func GetCACerts(u url.URL) ([]byte, error) {
u.Path = "/cacerts" u.Path = "/cacerts"
url := u.String() url := u.String()
// This first request is expected to fail. If the server has
// a cert that can be validated using the default CA bundle, return
// success with no CA certs.
_, err := get(url, http.DefaultClient, "", "") _, err := get(url, http.DefaultClient, "", "")
if err == nil { if err == nil {
return nil, nil return nil, nil
} }
// Download the CA bundle using a client that does not validate certs.
cacerts, err := get(url, insecureClient, "", "") cacerts, err := get(url, insecureClient, "", "")
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "failed to get CA certs at %s", url) return nil, errors.Wrapf(err, "failed to get CA certs at %s", url)
} }
// Request the CA bundle again, validating that the CA bundle can be loaded
// and used to validate the server certificate. This should only fail if we somehow
// get an empty CA bundle. or if the dynamiclistener cert is incorrectly signed.
_, err = get(url, GetHTTPClient(cacerts), "", "") _, err = get(url, GetHTTPClient(cacerts), "", "")
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "server %s is not trusted", url) return nil, errors.Wrapf(err, "server %s is not trusted", url)
@ -254,6 +272,8 @@ func GetCACerts(u url.URL) ([]byte, error) {
return cacerts, nil return cacerts, nil
} }
// get makes a request to a url using a provided client, username, and password,
// returning the response body.
func get(u string, client *http.Client, username, password string) ([]byte, error) { func get(u string, client *http.Client, username, password string) ([]byte, error) {
req, err := http.NewRequest(http.MethodGet, u, nil) req, err := http.NewRequest(http.MethodGet, u, nil)
if err != nil { if err != nil {

View File

@ -13,6 +13,9 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// Bootstrap attempts to load a managed database driver, if one has been initialized or should be created/joined.
// It then checks to see if the cluster needs to load boostrap data, and if so, loads data into the
// ControlRuntimeBoostrap struct, either via HTTP or from the datastore.
func (c *Cluster) Bootstrap(ctx context.Context) error { func (c *Cluster) Bootstrap(ctx context.Context) error {
if err := c.assignManagedDriver(ctx); err != nil { if err := c.assignManagedDriver(ctx); err != nil {
return err return err
@ -88,6 +91,7 @@ func (c *Cluster) shouldBootstrapLoad(ctx context.Context) (bool, error) {
return true, nil return true, nil
} }
// bootstrapped touches a file to indicate that bootstrap has been completed.
func (c *Cluster) bootstrapped() error { func (c *Cluster) bootstrapped() error {
if err := os.MkdirAll(filepath.Dir(c.bootstrapStamp()), 0700); err != nil { if err := os.MkdirAll(filepath.Dir(c.bootstrapStamp()), 0700); err != nil {
return err return err
@ -105,6 +109,9 @@ func (c *Cluster) bootstrapped() error {
return f.Close() return f.Close()
} }
// httpBootstrap retrieves bootstrap data (certs and keys, etc) from the remote server via HTTP
// and loads it into the ControlRuntimeBootstrap struct. Unlike the storage bootstrap path,
// this data does not need to be decrypted since it is generated on-demand by an existing server.
func (c *Cluster) httpBootstrap() error { func (c *Cluster) httpBootstrap() error {
content, err := clientaccess.Get("/v1-"+version.Program+"/server-bootstrap", c.clientAccessInfo) content, err := clientaccess.Get("/v1-"+version.Program+"/server-bootstrap", c.clientAccessInfo)
if err != nil { if err != nil {
@ -114,9 +121,11 @@ func (c *Cluster) httpBootstrap() error {
return bootstrap.Read(bytes.NewBuffer(content), &c.runtime.ControlRuntimeBootstrap) return bootstrap.Read(bytes.NewBuffer(content), &c.runtime.ControlRuntimeBootstrap)
} }
// bootstrap performs cluster bootstrapping, either via HTTP (for managed databases) or direct load from datastore.
func (c *Cluster) bootstrap(ctx context.Context) error { func (c *Cluster) bootstrap(ctx context.Context) error {
c.joining = true c.joining = true
// bootstrap managed database via HTTP
if c.runtime.HTTPBootstrap { if c.runtime.HTTPBootstrap {
return c.httpBootstrap() return c.httpBootstrap()
} }
@ -125,6 +134,9 @@ func (c *Cluster) bootstrap(ctx context.Context) error {
return c.storageBootstrap(ctx) return c.storageBootstrap(ctx)
} }
// bootstrapStamp returns the path to a file in datadir/db that is used to record
// that a cluster has been joined. The filename is based on a portion of the sha256 hash of the token.
// We hash the token value exactly as it is provided by the user, NOT the normalized version.
func (c *Cluster) bootstrapStamp() string { func (c *Cluster) bootstrapStamp() string {
return filepath.Join(c.config.DataDir, "db/joined-"+keyHash(c.config.Token)) return filepath.Join(c.config.DataDir, "db/joined-"+keyHash(c.config.Token))
} }

View File

@ -25,26 +25,34 @@ type Cluster struct {
storageClient client.Client storageClient client.Client
} }
// Start creates the dynamic tls listener, http request handler,
// handles starting and writing/reading bootstrap data, and returns a channel
// that will be closed when datastore is ready.
func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) { func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
// Set up the dynamiclistener and http request handlers
if err := c.initClusterAndHTTPS(ctx); err != nil { if err := c.initClusterAndHTTPS(ctx); err != nil {
return nil, errors.Wrap(err, "start cluster and https") return nil, errors.Wrap(err, "init cluster datastore and https")
} }
// start managed database (if necessary)
if err := c.start(ctx); err != nil { if err := c.start(ctx); err != nil {
return nil, errors.Wrap(err, "start cluster and https") return nil, errors.Wrap(err, "start managed database")
} }
// get the wait channel for testing managed database readiness
ready, err := c.testClusterDB(ctx) ready, err := c.testClusterDB(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// if necessary, store bootstrap data to datastore
if c.saveBootstrap { if c.saveBootstrap {
if err := c.save(ctx); err != nil { if err := c.save(ctx); err != nil {
return nil, err return nil, err
} }
} }
// if necessary, record successful bootstrap
if c.shouldBootstrap { if c.shouldBootstrap {
if err := c.bootstrapped(); err != nil { if err := c.bootstrapped(); err != nil {
return nil, err return nil, err
@ -54,17 +62,25 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
return ready, c.startStorage(ctx) return ready, c.startStorage(ctx)
} }
// startStorage starts the kine listener and configures the endpoints, if necessary.
// This calls into the kine endpoint code, which sets up the database client
// and unix domain socket listener if using an external database. In the case of an etcd
// backend it just returns the user-provided etcd endpoints and tls config.
func (c *Cluster) startStorage(ctx context.Context) error { func (c *Cluster) startStorage(ctx context.Context) error {
if c.storageStarted { if c.storageStarted {
return nil return nil
} }
c.storageStarted = true c.storageStarted = true
// start listening on the kine socket as an etcd endpoint, or return the external etcd endpoints
etcdConfig, err := endpoint.Listen(ctx, c.config.Datastore) etcdConfig, err := endpoint.Listen(ctx, c.config.Datastore)
if err != nil { if err != nil {
return errors.Wrap(err, "creating storage endpoint") return errors.Wrap(err, "creating storage endpoint")
} }
// Persist the returned etcd configuration. We decide if we're doing leader election for embedded controllers
// based on what the kine wrapper tells us about the datastore. Single-node datastores like sqlite don't require
// leader election, while basically all others (etcd, external database, etc) do since they allow multiple servers.
c.etcdConfig = etcdConfig c.etcdConfig = etcdConfig
c.config.Datastore.Config = etcdConfig.TLSConfig c.config.Datastore.Config = etcdConfig.TLSConfig
c.config.Datastore.Endpoint = strings.Join(etcdConfig.Endpoints, ",") c.config.Datastore.Endpoint = strings.Join(etcdConfig.Endpoints, ",")
@ -72,6 +88,7 @@ func (c *Cluster) startStorage(ctx context.Context) error {
return nil return nil
} }
// New creates an initial cluster using the provided configuration
func New(config *config.Control) *Cluster { func New(config *config.Control) *Cluster {
return &Cluster{ return &Cluster{
config: config, config: config,

View File

@ -16,18 +16,23 @@ import (
"golang.org/x/crypto/pbkdf2" "golang.org/x/crypto/pbkdf2"
) )
// storageKey returns the etcd key for storing bootstrap data for a given passphrase.
// The key is derived from the sha256 hash of the passphrase.
func storageKey(passphrase string) string { func storageKey(passphrase string) string {
d := sha256.New() d := sha256.New()
d.Write([]byte(passphrase)) d.Write([]byte(passphrase))
return "/bootstrap/" + hex.EncodeToString(d.Sum(nil)[:])[:12] return "/bootstrap/" + hex.EncodeToString(d.Sum(nil)[:])[:12]
} }
// keyHash returns the first 12 characters of the sha256 sum of the passphrase.
func keyHash(passphrase string) string { func keyHash(passphrase string) string {
d := sha256.New() d := sha256.New()
d.Write([]byte(passphrase)) d.Write([]byte(passphrase))
return hex.EncodeToString(d.Sum(nil)[:])[:12] return hex.EncodeToString(d.Sum(nil)[:])[:12]
} }
// encrypt encrypts a byte slice using aes+gcm with a pbkdf2 key derived from the passphrase and a random salt.
// It returns a byte slice containing the salt and base64-encoded cyphertext.
func encrypt(passphrase string, plaintext []byte) ([]byte, error) { func encrypt(passphrase string, plaintext []byte) ([]byte, error) {
salt, err := token.Random(8) salt, err := token.Random(8)
if err != nil { if err != nil {
@ -55,6 +60,8 @@ func encrypt(passphrase string, plaintext []byte) ([]byte, error) {
return []byte(salt + ":" + base64.StdEncoding.EncodeToString(sealed)), nil return []byte(salt + ":" + base64.StdEncoding.EncodeToString(sealed)), nil
} }
// decrypt attempts to decrypt the byte slice using the supplied passphrase.
// The input byte slice should be the cyphertext output from the encrypt function.
func decrypt(passphrase string, ciphertext []byte) ([]byte, error) { func decrypt(passphrase string, ciphertext []byte) ([]byte, error) {
parts := strings.SplitN(string(ciphertext), ":", 2) parts := strings.SplitN(string(ciphertext), ":", 2)
if len(parts) != 2 { if len(parts) != 2 {

View File

@ -20,6 +20,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
// newListener returns a new TCP listener and HTTP reqest handler using dynamiclistener.
// dynamiclistener will use the cluster's Server CA to sign the dynamically generate certificate,
// and will sync the certs into the Kubernetes datastore, with a local disk cache.
func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler, error) { func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler, error) {
tcp, err := dynamiclistener.NewTCPListener(c.config.BindAddress, c.config.SupervisorPort) tcp, err := dynamiclistener.NewTCPListener(c.config.BindAddress, c.config.SupervisorPort)
if err != nil { if err != nil {
@ -45,12 +48,15 @@ func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler,
}) })
} }
// initClusterAndHTTPS sets up the dynamic tls listener, request router,
// and cluster database. Once the database is up, it starts the supervisor http server.
func (c *Cluster) initClusterAndHTTPS(ctx context.Context) error { func (c *Cluster) initClusterAndHTTPS(ctx context.Context) error {
l, handler, err := c.newListener(ctx) l, handler, err := c.newListener(ctx)
if err != nil { if err != nil {
return err return err
} }
// Get the base request handler
handler, err = c.getHandler(handler) handler, err = c.getHandler(handler)
if err != nil { if err != nil {
return err return err
@ -62,16 +68,19 @@ func (c *Cluster) initClusterAndHTTPS(ctx context.Context) error {
return err return err
} }
// Create a HTTP server with the registered request handlers, using logrus for logging
server := http.Server{ server := http.Server{
Handler: handler, Handler: handler,
ErrorLog: log.New(logrus.StandardLogger().Writer(), "Cluster-Http-Server ", log.LstdFlags), ErrorLog: log.New(logrus.StandardLogger().Writer(), "Cluster-Http-Server ", log.LstdFlags),
} }
// Start the supervisor http server on the tls listener
go func() { go func() {
err := server.Serve(l) err := server.Serve(l)
logrus.Fatalf("server stopped: %v", err) logrus.Fatalf("server stopped: %v", err)
}() }()
// Shutdown the http server when the context is closed
go func() { go func() {
<-ctx.Done() <-ctx.Done()
server.Shutdown(context.Background()) server.Shutdown(context.Background())
@ -80,6 +89,8 @@ func (c *Cluster) initClusterAndHTTPS(ctx context.Context) error {
return nil return nil
} }
// tlsStorage creates an in-memory cache for dynamiclistener's certificate, backed by a file on disk
// and the Kubernetes datastore.
func tlsStorage(ctx context.Context, dataDir string, runtime *config.ControlRuntime) dynamiclistener.TLSStorage { func tlsStorage(ctx context.Context, dataDir string, runtime *config.ControlRuntime) dynamiclistener.TLSStorage {
fileStorage := file.New(filepath.Join(dataDir, "tls/dynamic-cert.json")) fileStorage := file.New(filepath.Join(dataDir, "tls/dynamic-cert.json"))
cache := memory.NewBacked(fileStorage) cache := memory.NewBacked(fileStorage)

View File

@ -1,5 +1,8 @@
package cluster package cluster
// A managed database is one whose lifecycle we control - initializing the cluster, adding/removing members, taking snapshots, etc.
// This is currently just used for the embedded etcd datastore. Kine and other external etcd clusters are NOT considered managed.
import ( import (
"context" "context"
"net" "net"
@ -12,6 +15,8 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// testClusterDB returns a channel that will be closed when the datastore connection is available.
// The datastore is tested for readiness every 5 seconds until the test succeeds.
func (c *Cluster) testClusterDB(ctx context.Context) (<-chan struct{}, error) { func (c *Cluster) testClusterDB(ctx context.Context) (<-chan struct{}, error) {
result := make(chan struct{}) result := make(chan struct{})
if c.managedDB == nil { if c.managedDB == nil {
@ -40,6 +45,8 @@ func (c *Cluster) testClusterDB(ctx context.Context) (<-chan struct{}, error) {
return result, nil return result, nil
} }
// start starts the database, unless a cluster reset has been requested, in which case
// it does that instead.
func (c *Cluster) start(ctx context.Context) error { func (c *Cluster) start(ctx context.Context) error {
if c.managedDB == nil { if c.managedDB == nil {
return nil return nil
@ -61,7 +68,10 @@ func (c *Cluster) initClusterDB(ctx context.Context, handler http.Handler) (http
return c.managedDB.Register(ctx, c.config, handler) return c.managedDB.Register(ctx, c.config, handler)
} }
// assignManagedDriver checks to see if any managed databases are already configured or should be created/joined.
// If a driver has been initialized it is used, otherwise we create or join a cluster using the default driver.
func (c *Cluster) assignManagedDriver(ctx context.Context) error { func (c *Cluster) assignManagedDriver(ctx context.Context) error {
// Check all managed drivers for an initialized database on disk; use one if found
for _, driver := range managed.Registered() { for _, driver := range managed.Registered() {
if ok, err := driver.IsInitialized(ctx, c.config); err != nil { if ok, err := driver.IsInitialized(ctx, c.config); err != nil {
return err return err
@ -71,7 +81,7 @@ func (c *Cluster) assignManagedDriver(ctx context.Context) error {
} }
} }
// If we have been asked to initialize or join a cluster, do so using the default managed database.
if c.config.Datastore.Endpoint == "" && (c.config.ClusterInit || (c.config.Token != "" && c.config.JoinURL != "")) { if c.config.Datastore.Endpoint == "" && (c.config.ClusterInit || (c.config.Token != "" && c.config.JoinURL != "")) {
for _, driver := range managed.Registered() { for _, driver := range managed.Registered() {
if driver.EndpointName() == managed.Default() { if driver.EndpointName() == managed.Default() {

View File

@ -4,6 +4,8 @@ import (
"net/http" "net/http"
) )
// getHandler returns a basic request handler that processes requests through
// the cluster's request router chain.
func (c *Cluster) getHandler(handler http.Handler) (http.Handler, error) { func (c *Cluster) getHandler(handler http.Handler) (http.Handler, error) {
next := c.router() next := c.router()
@ -13,6 +15,8 @@ func (c *Cluster) getHandler(handler http.Handler) (http.Handler, error) {
}), nil }), nil
} }
// router is a stub request router that returns a Service Unavailable response
// if no additional handlers are available.
func (c *Cluster) router() http.Handler { func (c *Cluster) router() http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
if c.runtime.Handler == nil { if c.runtime.Handler == nil {

View File

@ -8,6 +8,10 @@ import (
"github.com/rancher/kine/pkg/client" "github.com/rancher/kine/pkg/client"
) )
// save writes the current ControlRuntimeBootstrap data to the datastore. This contains a complete
// snapshot of the cluster's CA certs and keys, encryption passphrases, etc - encrypted with the join token.
// This is used when bootstrapping a cluster from a managed database or external etcd cluster.
// This is NOT used with embedded etcd, which bootstraps over HTTP.
func (c *Cluster) save(ctx context.Context) error { func (c *Cluster) save(ctx context.Context) error {
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
if err := bootstrap.Write(buf, &c.runtime.ControlRuntimeBootstrap); err != nil { if err := bootstrap.Write(buf, &c.runtime.ControlRuntimeBootstrap); err != nil {
@ -22,6 +26,8 @@ func (c *Cluster) save(ctx context.Context) error {
return c.storageClient.Create(ctx, storageKey(c.config.Token), data) return c.storageClient.Create(ctx, storageKey(c.config.Token), data)
} }
// storageBootstrap loads data from the datastore into the ControlRuntimeBootstrap struct.
// The storage key and encryption passphrase are both derived from the join token.
func (c *Cluster) storageBootstrap(ctx context.Context) error { func (c *Cluster) storageBootstrap(ctx context.Context) error {
if err := c.startStorage(ctx); err != nil { if err := c.startStorage(ctx); err != nil {
return err return err

View File

@ -327,7 +327,7 @@ func (e *ETCD) setName(force bool) error {
return nil return nil
} }
// handler handles request routing for the base http listener // handler wraps the handler with routes for database info
func (e *ETCD) handler(next http.Handler) http.Handler { func (e *ETCD) handler(next http.Handler) http.Handler {
mux := mux.NewRouter() mux := mux.NewRouter()
mux.Handle("/db/info", e.infoHandler()) mux.Handle("/db/info", e.infoHandler())