mirror of https://github.com/portainer/portainer
347 lines
10 KiB
Go
347 lines
10 KiB
Go
package cli
|
|
|
|
import (
|
|
"fmt"
|
|
"net/http"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
cmap "github.com/orcaman/concurrent-map"
|
|
"github.com/patrickmn/go-cache"
|
|
"github.com/pkg/errors"
|
|
portainer "github.com/portainer/portainer/api"
|
|
"github.com/portainer/portainer/api/dataservices"
|
|
"github.com/rs/zerolog/log"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
)
|
|
|
|
type (
|
|
// ClientFactory is used to create Kubernetes clients
|
|
ClientFactory struct {
|
|
dataStore dataservices.DataStore
|
|
reverseTunnelService portainer.ReverseTunnelService
|
|
signatureService portainer.DigitalSignatureService
|
|
instanceID string
|
|
endpointClients cmap.ConcurrentMap
|
|
endpointProxyClients *cache.Cache
|
|
AddrHTTPS string
|
|
}
|
|
|
|
// KubeClient represent a service used to execute Kubernetes operations
|
|
KubeClient struct {
|
|
cli kubernetes.Interface
|
|
instanceID string
|
|
lock *sync.Mutex
|
|
}
|
|
)
|
|
|
|
// NewClientFactory returns a new instance of a ClientFactory
|
|
func NewClientFactory(signatureService portainer.DigitalSignatureService, reverseTunnelService portainer.ReverseTunnelService, dataStore dataservices.DataStore, instanceID, addrHTTPS, userSessionTimeout string) (*ClientFactory, error) {
|
|
if userSessionTimeout == "" {
|
|
userSessionTimeout = portainer.DefaultUserSessionTimeout
|
|
}
|
|
timeout, err := time.ParseDuration(userSessionTimeout)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &ClientFactory{
|
|
dataStore: dataStore,
|
|
signatureService: signatureService,
|
|
reverseTunnelService: reverseTunnelService,
|
|
instanceID: instanceID,
|
|
endpointClients: cmap.New(),
|
|
endpointProxyClients: cache.New(timeout, timeout),
|
|
AddrHTTPS: addrHTTPS,
|
|
}, nil
|
|
}
|
|
|
|
func (factory *ClientFactory) GetInstanceID() (instanceID string) {
|
|
return factory.instanceID
|
|
}
|
|
|
|
// Remove the cached kube client so a new one can be created
|
|
func (factory *ClientFactory) RemoveKubeClient(endpointID portainer.EndpointID) {
|
|
factory.endpointClients.Remove(strconv.Itoa(int(endpointID)))
|
|
}
|
|
|
|
// GetKubeClient checks if an existing client is already registered for the environment(endpoint) and returns it if one is found.
|
|
// If no client is registered, it will create a new client, register it, and returns it.
|
|
func (factory *ClientFactory) GetKubeClient(endpoint *portainer.Endpoint) (portainer.KubeClient, error) {
|
|
key := strconv.Itoa(int(endpoint.ID))
|
|
client, ok := factory.endpointClients.Get(key)
|
|
if !ok {
|
|
client, err := factory.createCachedAdminKubeClient(endpoint)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
factory.endpointClients.Set(key, client)
|
|
return client, nil
|
|
}
|
|
|
|
return client.(portainer.KubeClient), nil
|
|
}
|
|
|
|
// GetProxyKubeClient retrieves a KubeClient from the cache. You should be
|
|
// calling SetProxyKubeClient before first. It is normally, called the
|
|
// kubernetes middleware.
|
|
func (factory *ClientFactory) GetProxyKubeClient(endpointID, token string) (portainer.KubeClient, bool) {
|
|
client, ok := factory.endpointProxyClients.Get(endpointID + "." + token)
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
return client.(portainer.KubeClient), true
|
|
}
|
|
|
|
// SetProxyKubeClient stores a kubeclient in the cache.
|
|
func (factory *ClientFactory) SetProxyKubeClient(endpointID, token string, cli portainer.KubeClient) {
|
|
factory.endpointProxyClients.Set(endpointID+"."+token, cli, 0)
|
|
}
|
|
|
|
// CreateKubeClientFromKubeConfig creates a KubeClient from a clusterID, and
|
|
// Kubernetes config.
|
|
func (factory *ClientFactory) CreateKubeClientFromKubeConfig(clusterID string, kubeConfig []byte) (portainer.KubeClient, error) {
|
|
config, err := clientcmd.NewClientConfigFromBytes([]byte(kubeConfig))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cliConfig, err := config.ClientConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cli, err := kubernetes.NewForConfig(cliConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
kubecli := &KubeClient{
|
|
cli: cli,
|
|
instanceID: factory.instanceID,
|
|
lock: &sync.Mutex{},
|
|
}
|
|
|
|
return kubecli, nil
|
|
}
|
|
|
|
func (factory *ClientFactory) createCachedAdminKubeClient(endpoint *portainer.Endpoint) (portainer.KubeClient, error) {
|
|
cli, err := factory.CreateClient(endpoint)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
kubecli := &KubeClient{
|
|
cli: cli,
|
|
instanceID: factory.instanceID,
|
|
lock: &sync.Mutex{},
|
|
}
|
|
|
|
return kubecli, nil
|
|
}
|
|
|
|
// CreateClient returns a pointer to a new Clientset instance
|
|
func (factory *ClientFactory) CreateClient(endpoint *portainer.Endpoint) (*kubernetes.Clientset, error) {
|
|
switch endpoint.Type {
|
|
case portainer.KubernetesLocalEnvironment:
|
|
return buildLocalClient()
|
|
case portainer.AgentOnKubernetesEnvironment:
|
|
return factory.buildAgentClient(endpoint)
|
|
case portainer.EdgeAgentOnKubernetesEnvironment:
|
|
return factory.buildEdgeClient(endpoint)
|
|
}
|
|
|
|
return nil, errors.New("unsupported environment type")
|
|
}
|
|
|
|
type agentHeaderRoundTripper struct {
|
|
signatureHeader string
|
|
publicKeyHeader string
|
|
|
|
roundTripper http.RoundTripper
|
|
}
|
|
|
|
// RoundTrip is the implementation of the http.RoundTripper interface.
|
|
// It decorates the request with specific agent headers
|
|
func (rt *agentHeaderRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
req.Header.Add(portainer.PortainerAgentPublicKeyHeader, rt.publicKeyHeader)
|
|
req.Header.Add(portainer.PortainerAgentSignatureHeader, rt.signatureHeader)
|
|
|
|
return rt.roundTripper.RoundTrip(req)
|
|
}
|
|
|
|
func (factory *ClientFactory) buildAgentClient(endpoint *portainer.Endpoint) (*kubernetes.Clientset, error) {
|
|
endpointURL := fmt.Sprintf("https://%s/kubernetes", endpoint.URL)
|
|
|
|
return factory.createRemoteClient(endpointURL)
|
|
}
|
|
|
|
func (factory *ClientFactory) buildEdgeClient(endpoint *portainer.Endpoint) (*kubernetes.Clientset, error) {
|
|
tunnel, err := factory.reverseTunnelService.GetActiveTunnel(endpoint)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed activating tunnel")
|
|
}
|
|
endpointURL := fmt.Sprintf("http://127.0.0.1:%d/kubernetes", tunnel.Port)
|
|
|
|
return factory.createRemoteClient(endpointURL)
|
|
}
|
|
|
|
func (factory *ClientFactory) createRemoteClient(endpointURL string) (*kubernetes.Clientset, error) {
|
|
signature, err := factory.signatureService.CreateSignature(portainer.PortainerAgentSignatureMessage)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
config, err := clientcmd.BuildConfigFromFlags(endpointURL, "")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
config.Insecure = true
|
|
|
|
config.Wrap(func(rt http.RoundTripper) http.RoundTripper {
|
|
return &agentHeaderRoundTripper{
|
|
signatureHeader: signature,
|
|
publicKeyHeader: factory.signatureService.EncodedPublicKey(),
|
|
roundTripper: rt,
|
|
}
|
|
})
|
|
|
|
return kubernetes.NewForConfig(config)
|
|
}
|
|
|
|
func buildLocalClient() (*kubernetes.Clientset, error) {
|
|
config, err := rest.InClusterConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return kubernetes.NewForConfig(config)
|
|
}
|
|
|
|
func (factory *ClientFactory) PostInitMigrateIngresses() error {
|
|
endpoints, err := factory.dataStore.Endpoint().Endpoints()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for i := range endpoints {
|
|
// Early exit if we do not need to migrate!
|
|
if endpoints[i].PostInitMigrations.MigrateIngresses == false {
|
|
return nil
|
|
}
|
|
|
|
err := factory.migrateEndpointIngresses(&endpoints[i])
|
|
if err != nil {
|
|
log.Debug().Err(err).Msg("failure migrating endpoint ingresses")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (factory *ClientFactory) migrateEndpointIngresses(e *portainer.Endpoint) error {
|
|
// classes is a list of controllers which have been manually added to the
|
|
// cluster setup view. These need to all be allowed globally, but then
|
|
// blocked in specific namespaces which they were not previously allowed in.
|
|
classes := e.Kubernetes.Configuration.IngressClasses
|
|
|
|
// We need a kube client to gather namespace level permissions. In pre-2.16
|
|
// versions of portainer, the namespace level permissions were stored by
|
|
// creating an actual ingress rule in the cluster with a particular
|
|
// annotation indicating that it's name (the class name) should be allowed.
|
|
cli, err := factory.GetKubeClient(e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
detected, err := cli.GetIngressControllers()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// newControllers is a set of all currently detected controllers.
|
|
newControllers := make(map[string]struct{})
|
|
for _, controller := range detected {
|
|
newControllers[controller.ClassName] = struct{}{}
|
|
}
|
|
|
|
namespaces, err := cli.GetNamespaces()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Set of namespaces, if any, in which "allow none" should be true.
|
|
allow := make(map[string]map[string]struct{})
|
|
for _, c := range classes {
|
|
allow[c.Name] = make(map[string]struct{})
|
|
}
|
|
allow["none"] = make(map[string]struct{})
|
|
|
|
for namespace := range namespaces {
|
|
// Compare old annotations with currently detected controllers.
|
|
ingresses, err := cli.GetIngresses(namespace)
|
|
if err != nil {
|
|
return fmt.Errorf("failure getting ingresses during migration")
|
|
}
|
|
for _, ingress := range ingresses {
|
|
oldController, ok := ingress.Annotations["ingress.portainer.io/ingress-type"]
|
|
if !ok {
|
|
// Skip rules without our old annotation.
|
|
continue
|
|
}
|
|
|
|
if _, ok := newControllers[oldController]; ok {
|
|
// Skip rules which match a detected controller.
|
|
// TODO: Allow this particular controller.
|
|
allow[oldController][ingress.Namespace] = struct{}{}
|
|
continue
|
|
}
|
|
|
|
allow["none"][ingress.Namespace] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// Locally, disable "allow none" for namespaces not inside shouldAllowNone.
|
|
var newClasses []portainer.KubernetesIngressClassConfig
|
|
for _, c := range classes {
|
|
var blocked []string
|
|
for namespace := range namespaces {
|
|
if _, ok := allow[c.Name][namespace]; ok {
|
|
continue
|
|
}
|
|
blocked = append(blocked, namespace)
|
|
}
|
|
|
|
newClasses = append(newClasses, portainer.KubernetesIngressClassConfig{
|
|
Name: c.Name,
|
|
Type: c.Type,
|
|
GloballyBlocked: false,
|
|
BlockedNamespaces: blocked,
|
|
})
|
|
}
|
|
|
|
// Handle "none".
|
|
if len(allow["none"]) != 0 {
|
|
e.Kubernetes.Configuration.AllowNoneIngressClass = true
|
|
var disallowNone []string
|
|
for namespace := range namespaces {
|
|
if _, ok := allow["none"][namespace]; ok {
|
|
continue
|
|
}
|
|
disallowNone = append(disallowNone, namespace)
|
|
}
|
|
newClasses = append(newClasses, portainer.KubernetesIngressClassConfig{
|
|
Name: "none",
|
|
Type: "custom",
|
|
GloballyBlocked: false,
|
|
BlockedNamespaces: disallowNone,
|
|
})
|
|
}
|
|
|
|
e.Kubernetes.Configuration.IngressClasses = newClasses
|
|
e.PostInitMigrations.MigrateIngresses = false
|
|
return factory.dataStore.Endpoint().UpdateEndpoint(e.ID, e)
|
|
}
|