You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
portainer/api/kubernetes/cli/client.go

398 lines
12 KiB

package cli
import (
"fmt"
"net/http"
"strconv"
"strings"
"sync"
"time"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/rs/zerolog/log"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
metricsv "k8s.io/metrics/pkg/client/clientset/versioned"
)
const (
DefaultKubeClientQPS = 30
DefaultKubeClientBurst = 100
)
type (
// ClientFactory is used to create Kubernetes clients
ClientFactory struct {
dataStore dataservices.DataStore
reverseTunnelService portainer.ReverseTunnelService
signatureService portainer.DigitalSignatureService
instanceID string
endpointClients map[string]*KubeClient
endpointProxyClients *cache.Cache
AddrHTTPS string
mu sync.Mutex
}
// KubeClient represent a service used to execute Kubernetes operations
KubeClient struct {
cli kubernetes.Interface
instanceID string
mu sync.Mutex
}
)
// NewClientFactory returns a new instance of a ClientFactory
func NewClientFactory(signatureService portainer.DigitalSignatureService, reverseTunnelService portainer.ReverseTunnelService, dataStore dataservices.DataStore, instanceID, addrHTTPS, userSessionTimeout string) (*ClientFactory, error) {
if userSessionTimeout == "" {
userSessionTimeout = portainer.DefaultUserSessionTimeout
}
timeout, err := time.ParseDuration(userSessionTimeout)
if err != nil {
return nil, err
}
return &ClientFactory{
dataStore: dataStore,
signatureService: signatureService,
reverseTunnelService: reverseTunnelService,
instanceID: instanceID,
endpointClients: make(map[string]*KubeClient),
endpointProxyClients: cache.New(timeout, timeout),
AddrHTTPS: addrHTTPS,
}, nil
}
func (factory *ClientFactory) GetInstanceID() (instanceID string) {
return factory.instanceID
}
// Remove the cached kube client so a new one can be created
func (factory *ClientFactory) RemoveKubeClient(endpointID portainer.EndpointID) {
factory.mu.Lock()
delete(factory.endpointClients, strconv.Itoa(int(endpointID)))
factory.mu.Unlock()
}
// GetKubeClient checks if an existing client is already registered for the environment(endpoint) and returns it if one is found.
// If no client is registered, it will create a new client, register it, and returns it.
func (factory *ClientFactory) GetKubeClient(endpoint *portainer.Endpoint) (*KubeClient, error) {
factory.mu.Lock()
key := strconv.Itoa(int(endpoint.ID))
if client, ok := factory.endpointClients[key]; ok {
factory.mu.Unlock()
return client, nil
}
factory.mu.Unlock()
// EE-6901: Do not lock
client, err := factory.createCachedAdminKubeClient(endpoint)
if err != nil {
return nil, err
}
factory.mu.Lock()
defer factory.mu.Unlock()
// The lock was released before the client was created,
// so we need to check again
if c, ok := factory.endpointClients[key]; ok {
return c, nil
}
factory.endpointClients[key] = client
return client, nil
}
// GetProxyKubeClient retrieves a KubeClient from the cache. You should be
// calling SetProxyKubeClient before first. It is normally, called the
// kubernetes middleware.
func (factory *ClientFactory) GetProxyKubeClient(endpointID, userID string) (*KubeClient, bool) {
client, ok := factory.endpointProxyClients.Get(endpointID + "." + userID)
if !ok {
return nil, false
}
return client.(*KubeClient), true
}
// SetProxyKubeClient stores a kubeclient in the cache.
func (factory *ClientFactory) SetProxyKubeClient(endpointID, userID string, cli *KubeClient) {
factory.endpointProxyClients.Set(endpointID+"."+userID, cli, 0)
}
// CreateKubeClientFromKubeConfig creates a KubeClient from a clusterID, and
// Kubernetes config.
func (factory *ClientFactory) CreateKubeClientFromKubeConfig(clusterID string, kubeConfig []byte) (*KubeClient, error) {
config, err := clientcmd.NewClientConfigFromBytes(kubeConfig)
if err != nil {
return nil, err
}
cliConfig, err := config.ClientConfig()
if err != nil {
return nil, err
}
cliConfig.QPS = DefaultKubeClientQPS
cliConfig.Burst = DefaultKubeClientBurst
cli, err := kubernetes.NewForConfig(cliConfig)
if err != nil {
return nil, err
}
return &KubeClient{
cli: cli,
instanceID: factory.instanceID,
}, nil
}
func (factory *ClientFactory) createCachedAdminKubeClient(endpoint *portainer.Endpoint) (*KubeClient, error) {
cli, err := factory.CreateClient(endpoint)
if err != nil {
return nil, err
}
return &KubeClient{
cli: cli,
instanceID: factory.instanceID,
}, nil
}
// CreateClient returns a pointer to a new Clientset instance.
func (factory *ClientFactory) CreateClient(endpoint *portainer.Endpoint) (*kubernetes.Clientset, error) {
switch endpoint.Type {
case portainer.KubernetesLocalEnvironment, portainer.AgentOnKubernetesEnvironment, portainer.EdgeAgentOnKubernetesEnvironment:
c, err := factory.CreateConfig(endpoint)
if err != nil {
return nil, err
}
return kubernetes.NewForConfig(c)
}
return nil, errors.New("unsupported environment type")
}
// CreateConfig returns a pointer to a new kubeconfig ready to create a client.
func (factory *ClientFactory) CreateConfig(endpoint *portainer.Endpoint) (*rest.Config, error) {
switch endpoint.Type {
case portainer.KubernetesLocalEnvironment:
return buildLocalConfig()
case portainer.AgentOnKubernetesEnvironment:
return factory.buildAgentConfig(endpoint)
case portainer.EdgeAgentOnKubernetesEnvironment:
return factory.buildEdgeConfig(endpoint)
}
return nil, errors.New("unsupported environment type")
}
type agentHeaderRoundTripper struct {
signatureHeader string
publicKeyHeader string
roundTripper http.RoundTripper
}
// RoundTrip is the implementation of the http.RoundTripper interface.
// It decorates the request with specific agent headers
func (rt *agentHeaderRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
req.Header.Add(portainer.PortainerAgentPublicKeyHeader, rt.publicKeyHeader)
req.Header.Add(portainer.PortainerAgentSignatureHeader, rt.signatureHeader)
return rt.roundTripper.RoundTrip(req)
}
func (factory *ClientFactory) buildAgentConfig(endpoint *portainer.Endpoint) (*rest.Config, error) {
var clientURL strings.Builder
if !strings.HasPrefix(endpoint.URL, "http") {
clientURL.WriteString("https://")
}
clientURL.WriteString(endpoint.URL)
clientURL.WriteString("/kubernetes")
signature, err := factory.signatureService.CreateSignature(portainer.PortainerAgentSignatureMessage)
if err != nil {
return nil, err
}
config, err := clientcmd.BuildConfigFromFlags(clientURL.String(), "")
if err != nil {
return nil, err
}
config.Insecure = true
config.QPS = DefaultKubeClientQPS
config.Burst = DefaultKubeClientBurst
config.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return &agentHeaderRoundTripper{
signatureHeader: signature,
publicKeyHeader: factory.signatureService.EncodedPublicKey(),
roundTripper: rt,
}
})
return config, nil
}
func (factory *ClientFactory) buildEdgeConfig(endpoint *portainer.Endpoint) (*rest.Config, error) {
tunnelAddr, err := factory.reverseTunnelService.TunnelAddr(endpoint)
if err != nil {
return nil, errors.Wrap(err, "failed activating tunnel")
}
endpointURL := fmt.Sprintf("http://%s/kubernetes", tunnelAddr)
config, err := clientcmd.BuildConfigFromFlags(endpointURL, "")
if err != nil {
return nil, err
}
signature, err := factory.signatureService.CreateSignature(portainer.PortainerAgentSignatureMessage)
config.Insecure = true
config.QPS = DefaultKubeClientQPS
config.Burst = DefaultKubeClientBurst
config.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return &agentHeaderRoundTripper{
signatureHeader: signature,
publicKeyHeader: factory.signatureService.EncodedPublicKey(),
roundTripper: rt,
}
})
return config, nil
}
func (factory *ClientFactory) CreateRemoteMetricsClient(endpoint *portainer.Endpoint) (*metricsv.Clientset, error) {
config, err := factory.CreateConfig(endpoint)
if err != nil {
return nil, fmt.Errorf("failed to create metrics KubeConfig")
}
return metricsv.NewForConfig(config)
}
func buildLocalConfig() (*rest.Config, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
config.QPS = DefaultKubeClientQPS
config.Burst = DefaultKubeClientBurst
return config, nil
}
func (factory *ClientFactory) MigrateEndpointIngresses(e *portainer.Endpoint, datastore dataservices.DataStore, cli *KubeClient) error {
return datastore.UpdateTx(func(tx dataservices.DataStoreTx) error {
environment, err := tx.Endpoint().Endpoint(e.ID)
if err != nil {
log.Error().Err(err).Msgf("Error retrieving environment %d", e.ID)
return err
}
// classes is a list of controllers which have been manually added to the
// cluster setup view. These need to all be allowed globally, but then
// blocked in specific namespaces which they were not previously allowed in.
classes := environment.Kubernetes.Configuration.IngressClasses
// In pre-2.16 versions of portainer, the namespace level permissions were stored by
// creating an actual ingress rule in the cluster with a particular
// annotation indicating that it's name (the class name) should be allowed.
detected, err := cli.GetIngressControllers()
if err != nil {
log.Error().Err(err).Msgf("Error getting ingress controllers in environment %d", environment.ID)
return err
}
// newControllers is a set of all currently detected controllers.
newControllers := make(map[string]struct{})
for _, controller := range detected {
newControllers[controller.ClassName] = struct{}{}
}
namespaces, err := cli.GetNamespaces()
if err != nil {
log.Error().Err(err).Msgf("Error getting namespaces in environment %d", environment.ID)
return err
}
// Set of namespaces, if any, in which "allow none" should be true.
allow := make(map[string]map[string]struct{})
for _, c := range classes {
allow[c.Name] = make(map[string]struct{})
}
allow["none"] = make(map[string]struct{})
for namespace := range namespaces {
// Compare old annotations with currently detected controllers.
ingresses, err := cli.GetIngresses(namespace)
if err != nil {
log.Error().Err(err).Msgf("Error getting ingresses in environment %d", environment.ID)
return err
}
for _, ingress := range ingresses {
oldController, ok := ingress.Annotations["ingress.portainer.io/ingress-type"]
if !ok {
// Skip rules without our old annotation.
continue
}
if _, ok := newControllers[oldController]; ok {
// Skip rules which match a detected controller.
// TODO: Allow this particular controller.
allow[oldController][ingress.Namespace] = struct{}{}
continue
}
allow["none"][ingress.Namespace] = struct{}{}
}
}
// Locally, disable "allow none" for namespaces not inside shouldAllowNone.
var newClasses []portainer.KubernetesIngressClassConfig
for _, c := range classes {
var blocked []string
for namespace := range namespaces {
if _, ok := allow[c.Name][namespace]; ok {
continue
}
blocked = append(blocked, namespace)
}
newClasses = append(newClasses, portainer.KubernetesIngressClassConfig{
Name: c.Name,
Type: c.Type,
GloballyBlocked: false,
BlockedNamespaces: blocked,
})
}
// Handle "none".
if len(allow["none"]) != 0 {
environment.Kubernetes.Configuration.AllowNoneIngressClass = true
var disallowNone []string
for namespace := range namespaces {
if _, ok := allow["none"][namespace]; ok {
continue
}
disallowNone = append(disallowNone, namespace)
}
newClasses = append(newClasses, portainer.KubernetesIngressClassConfig{
Name: "none",
Type: "custom",
GloballyBlocked: false,
BlockedNamespaces: disallowNone,
})
}
environment.Kubernetes.Configuration.IngressClasses = newClasses
environment.PostInitMigrations.MigrateIngresses = false
return tx.Endpoint().UpdateEndpoint(environment.ID, environment)
})
}