Merge pull request #34703 from taimir/kubeadm

Automatic merge from submit-queue

kubeadm join: wait for API endpoints

**What this PR does / why we need it**: enhance kubeadm to allow for parallel provisioning of API endpoints and slave nodes, continued from https://github.com/kubernetes/kubernetes/pull/33543

**Fixes**: https://github.com/kubernetes/kubernetes/issues/33542

**Special notes for your reviewer**:

* Introduces a concurrent retry mechanism for bootstrapping with a single API endpoint during `kubeadm join` (this was left out in https://github.com/kubernetes/kubernetes/pull/33543 so that it can be implemented in a separate PR). The polling of the discovery service API itself is yet to come.

@errordeveloper @pires
pull/6/head
Kubernetes Submit Queue 2016-10-14 20:00:27 -07:00 committed by GitHub
commit 3e9e507a9b
2 changed files with 68 additions and 47 deletions

View File

@ -19,13 +19,17 @@ package node
import (
"fmt"
"os"
"sync"
"time"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/pkg/apis/certificates"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
certclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/certificates/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/wait"
)
// ConnectionDetails represents a master API endpoint connection
@ -36,7 +40,14 @@ type ConnectionDetails struct {
NodeName types.NodeName
}
// EstablishMasterConnection establishes a connection with exactly one of the provided API endpoints or errors.
// retryTimeout between the subsequent attempts to connect
// to an API endpoint
const retryTimeout = 5
// EstablishMasterConnection establishes a connection with exactly one of the provided API endpoints.
// The function builds a client for every endpoint and concurrently keeps trying to connect to any one
// of the provided endpoints. Blocks until at least one connection is established, then it stops the
// connection attempts for other endpoints.
func EstablishMasterConnection(s *kubeadmapi.NodeConfiguration, clusterInfo *kubeadmapi.ClusterInfo) (*ConnectionDetails, error) {
hostName, err := os.Hostname()
if err != nil {
@ -48,42 +59,53 @@ func EstablishMasterConnection(s *kubeadmapi.NodeConfiguration, clusterInfo *kub
endpoints := clusterInfo.Endpoints
caCert := []byte(clusterInfo.CertificateAuthorities[0])
var establishedConnection *ConnectionDetails
// TODO: add a wait mechanism for the API endpoints (retrying to connect to at least one)
stopChan := make(chan struct{})
result := make(chan *ConnectionDetails)
var wg sync.WaitGroup
for _, endpoint := range endpoints {
clientSet, err := createClients(caCert, endpoint, s.Secrets.BearerToken, nodeName)
if err != nil {
fmt.Printf("<node/bootstrap> warning: %s. Skipping endpoint %s\n", err, endpoint)
continue
}
fmt.Printf("<node/bootstrap> trying to connect to endpoint %s\n", endpoint)
// TODO: add a simple GET /version request to fail early if needed before attempting
// to connect with a discovery client.
if err := checkCertsAPI(clientSet.DiscoveryClient); err != nil {
fmt.Printf("<node/bootstrap> warning: failed to connect to %s: %v\n", endpoint, err)
continue
}
fmt.Printf("<node/bootstrap> successfully established connection with endpoint %s\n", endpoint)
// connection established
establishedConnection = &ConnectionDetails{
CertClient: clientSet.CertificatesClient,
Endpoint: endpoint,
CACert: caCert,
NodeName: nodeName,
}
break
wg.Add(1)
go func(apiEndpoint string) {
defer wg.Done()
wait.Until(func() {
fmt.Printf("<node/bootstrap> trying to connect to endpoint %s\n", apiEndpoint)
err := checkAPIEndpoint(clientSet, apiEndpoint)
if err != nil {
fmt.Printf("<node/bootstrap> endpoint check failed [%v]\n", err)
return
}
fmt.Printf("<node/bootstrap> successfully established connection with endpoint %s\n", apiEndpoint)
// connection established, stop all wait threads
close(stopChan)
result <- &ConnectionDetails{
CertClient: clientSet.CertificatesClient,
Endpoint: apiEndpoint,
CACert: caCert,
NodeName: nodeName,
}
}, retryTimeout*time.Second, stopChan)
}(endpoint)
}
if establishedConnection == nil {
go func() {
wg.Wait()
// all wait.Until() calls have finished now
close(result)
}()
establishedConnection, ok := <-result
if !ok {
return nil, fmt.Errorf("<node/bootstrap> failed to create bootstrap clients " +
"for any of the provided API endpoints")
}
return establishedConnection, nil
}
// Creates a set of clients for this endpoint
// creates a set of clients for this endpoint
func createClients(caCert []byte, endpoint, token string, nodeName types.NodeName) (*clientset.Clientset, error) {
bareClientConfig := kubeadmutil.CreateBasicClientConfig("kubernetes", endpoint, caCert)
bootstrapClientConfig, err := clientcmd.NewDefaultClientConfig(
@ -101,3 +123,26 @@ func createClients(caCert []byte, endpoint, token string, nodeName types.NodeNam
}
return clientSet, nil
}
// checks the connection requirements for a specific API endpoint
func checkAPIEndpoint(clientSet *clientset.Clientset, endpoint string) error {
// check general connectivity
version, err := clientSet.DiscoveryClient.ServerVersion()
if err != nil {
return fmt.Errorf("failed to connect to %s [%v]", endpoint, err)
}
fmt.Printf("<node/bootstrap> detected server version %s\n", version.String())
// check certificates API
serverGroups, err := clientSet.DiscoveryClient.ServerGroups()
if err != nil {
return fmt.Errorf("certificate API check failed: failed to retrieve a list of supported API objects [%v]", err)
}
for _, group := range serverGroups.Groups {
if group.Name == certificates.GroupName {
return nil
}
}
return fmt.Errorf("certificate API check failed: API version %s does not support certificates API, use v1.4.0 or newer",
version.String())
}

View File

@ -20,8 +20,6 @@ import (
"fmt"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/pkg/apis/certificates"
"k8s.io/kubernetes/pkg/client/typed/discovery"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
"k8s.io/kubernetes/pkg/kubelet/util/csr"
certutil "k8s.io/kubernetes/pkg/util/cert"
@ -57,25 +55,3 @@ func PerformTLSBootstrap(connection *ConnectionDetails) (*clientcmdapi.Config, e
return finalConfig, nil
}
// Checks if the certificates API for this endpoint is functional
func checkCertsAPI(discoveryClient *discovery.DiscoveryClient) error {
serverGroups, err := discoveryClient.ServerGroups()
if err != nil {
return fmt.Errorf("failed to retrieve a list of supported API objects [%v]", err)
}
for _, group := range serverGroups.Groups {
if group.Name == certificates.GroupName {
return nil
}
}
version, err := discoveryClient.ServerVersion()
if err != nil {
return fmt.Errorf("unable to obtain API version [%v]", err)
}
return fmt.Errorf("API version %s does not support certificates API, use v1.4.0 or newer", version.String())
}