[release-1.19] Add disable components flags (#3023)

* Add disable flags for control components (#2900)

* Add disable flags to control components

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* golint

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* more fixes

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* fixes to disable flags

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* Add comments to functions

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* Fix joining problem

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* more fixes

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* golint

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* fix ticker

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* fix role labels

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* more fixes

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* update dynamiclistener

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* remove etcd member if disable etcd is passed

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* Mark disable components flags as experimental

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* change error to warn when removing self from etcd members

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* Add hidden to disable flags

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* go mod

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>
pull/3052/head
Hussein Galal 2021-03-05 00:28:56 +02:00 committed by GitHub
parent 95fc76b206
commit f621760825
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 644 additions and 102 deletions

2
go.mod
View File

@ -86,7 +86,7 @@ require (
github.com/opencontainers/selinux v1.6.0 github.com/opencontainers/selinux v1.6.0
github.com/pierrec/lz4 v2.5.2+incompatible github.com/pierrec/lz4 v2.5.2+incompatible
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/rancher/dynamiclistener v0.2.1 github.com/rancher/dynamiclistener v0.2.2
github.com/rancher/kine v0.5.1 github.com/rancher/kine v0.5.1
github.com/rancher/remotedialer v0.2.0 github.com/rancher/remotedialer v0.2.0
github.com/rancher/wrangler v0.6.1 github.com/rancher/wrangler v0.6.1

4
go.sum
View File

@ -700,8 +700,8 @@ github.com/rancher/cri v1.4.0-k3s.2 h1:fX9dGTD9xu6eKB2EDgla2DZHlyusZZzS/hVHvQd3U
github.com/rancher/cri v1.4.0-k3s.2/go.mod h1:Ht5T1dIKzm+4NExmb7wDVG6qR+j0xeXIjjhCv1d9geY= github.com/rancher/cri v1.4.0-k3s.2/go.mod h1:Ht5T1dIKzm+4NExmb7wDVG6qR+j0xeXIjjhCv1d9geY=
github.com/rancher/cri-tools v1.19.0-k3s1 h1:c6lqNWyoAB5+NaUREbpZxKXCuYl9he24/DZEgHywg+A= github.com/rancher/cri-tools v1.19.0-k3s1 h1:c6lqNWyoAB5+NaUREbpZxKXCuYl9he24/DZEgHywg+A=
github.com/rancher/cri-tools v1.19.0-k3s1/go.mod h1:bitvtZRi5F7t505Yw3zPzp22LOao1lqJKHfx6x0hnpw= github.com/rancher/cri-tools v1.19.0-k3s1/go.mod h1:bitvtZRi5F7t505Yw3zPzp22LOao1lqJKHfx6x0hnpw=
github.com/rancher/dynamiclistener v0.2.1 h1:QiY1jxs2TOLrKB04G36vE2ehEvPMPGiWp8zEHLKB1nE= github.com/rancher/dynamiclistener v0.2.2 h1:70dMwOr1sqb6mQqfU2nDb/fr5cv7HJjH+kFYzoxb8KU=
github.com/rancher/dynamiclistener v0.2.1/go.mod h1:9WusTANoiRr8cDWCTtf5txieulezHbpv4vhLADPp0zU= github.com/rancher/dynamiclistener v0.2.2/go.mod h1:9WusTANoiRr8cDWCTtf5txieulezHbpv4vhLADPp0zU=
github.com/rancher/flannel v0.12.0-k3s1 h1:P23dWSk/9mGT1x2rDWW9JXNrF/0kjftiHwMau/+ZLGM= github.com/rancher/flannel v0.12.0-k3s1 h1:P23dWSk/9mGT1x2rDWW9JXNrF/0kjftiHwMau/+ZLGM=
github.com/rancher/flannel v0.12.0-k3s1/go.mod h1:zQ/9Uhaw0yV4Wh6ljVwHVT1x5KuhenZA+6L8lRzOJEY= github.com/rancher/flannel v0.12.0-k3s1/go.mod h1:zQ/9Uhaw0yV4Wh6ljVwHVT1x5KuhenZA+6L8lRzOJEY=
github.com/rancher/go-powershell v0.0.0-20200701182037-6845e6fcfa79 h1:UeC0rjrIel8hHz92cdVN09Cm4Hz+BhsPP/ZvQnPOr58= github.com/rancher/go-powershell v0.0.0-20200701182037-6845e6fcfa79 h1:UeC0rjrIel8hHz92cdVN09Cm4Hz+BhsPP/ZvQnPOr58=

View File

@ -402,6 +402,7 @@ func get(envInfo *cmds.Agent, proxy proxy.Proxy) (*config.Node, error) {
SELinux: envInfo.EnableSELinux, SELinux: envInfo.EnableSELinux,
ContainerRuntimeEndpoint: envInfo.ContainerRuntimeEndpoint, ContainerRuntimeEndpoint: envInfo.ContainerRuntimeEndpoint,
FlannelBackend: controlConfig.FlannelBackend, FlannelBackend: controlConfig.FlannelBackend,
ServerHTTPSPort: controlConfig.HTTPSPort,
} }
nodeConfig.FlannelIface = flannelIface nodeConfig.FlannelIface = flannelIface
nodeConfig.Images = filepath.Join(envInfo.DataDir, "images") nodeConfig.Images = filepath.Join(envInfo.DataDir, "images")

View File

@ -5,6 +5,7 @@ import (
"errors" "errors"
"net" "net"
"path/filepath" "path/filepath"
"strconv"
"sync" "sync"
"github.com/google/tcpproxy" "github.com/google/tcpproxy"
@ -26,15 +27,19 @@ type LoadBalancer struct {
randomServers []string randomServers []string
currentServerAddress string currentServerAddress string
nextServerIndex int nextServerIndex int
Listener net.Listener
} }
const RandomPort = 0
var ( var (
SupervisorServiceName = version.Program + "-agent-load-balancer" SupervisorServiceName = version.Program + "-agent-load-balancer"
APIServerServiceName = version.Program + "-api-server-agent-load-balancer" APIServerServiceName = version.Program + "-api-server-agent-load-balancer"
ETCDServerServiceName = version.Program + "-etcd-server-load-balancer"
) )
func New(dataDir, serviceName, serverURL string) (_lb *LoadBalancer, _err error) { func New(dataDir, serviceName, serverURL string, lbServerPort int) (_lb *LoadBalancer, _err error) {
listener, err := net.Listen("tcp", "127.0.0.1:0") listener, err := net.Listen("tcp", "127.0.0.1:"+strconv.Itoa(lbServerPort))
defer func() { defer func() {
if _err != nil { if _err != nil {
logrus.Warnf("Error starting load balancer: %s", _err) logrus.Warnf("Error starting load balancer: %s", _err)

View File

@ -105,7 +105,7 @@ func TestFailOver(t *testing.T) {
DataDir: tmpDir, DataDir: tmpDir,
} }
lb, err := New(cfg.DataDir, SupervisorServiceName, cfg.ServerURL) lb, err := New(cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort)
if err != nil { if err != nil {
assertEqual(t, err, nil) assertEqual(t, err, nil)
} }
@ -156,7 +156,7 @@ func TestFailFast(t *testing.T) {
DataDir: tmpDir, DataDir: tmpDir,
} }
lb, err := New(cfg.DataDir, SupervisorServiceName, cfg.ServerURL) lb, err := New(cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort)
if err != nil { if err != nil {
assertEqual(t, err, nil) assertEqual(t, err, nil)
} }

View File

@ -17,19 +17,21 @@ type Proxy interface {
SupervisorURL() string SupervisorURL() string
SupervisorAddresses() []string SupervisorAddresses() []string
APIServerURL() string APIServerURL() string
IsAPIServerLBEnabled() bool
} }
func NewAPIProxy(enabled bool, dataDir, supervisorURL string) (Proxy, error) { func NewAPIProxy(enabled bool, dataDir, supervisorURL string, lbServerPort int) (Proxy, error) {
p := &proxy{ p := proxy{
lbEnabled: enabled, lbEnabled: enabled,
dataDir: dataDir, dataDir: dataDir,
initialSupervisorURL: supervisorURL, initialSupervisorURL: supervisorURL,
supervisorURL: supervisorURL, supervisorURL: supervisorURL,
apiServerURL: supervisorURL, apiServerURL: supervisorURL,
lbServerPort: lbServerPort,
} }
if enabled { if enabled {
lb, err := loadbalancer.New(dataDir, loadbalancer.SupervisorServiceName, supervisorURL) lb, err := loadbalancer.New(dataDir, loadbalancer.SupervisorServiceName, supervisorURL, p.lbServerPort)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -45,12 +47,13 @@ func NewAPIProxy(enabled bool, dataDir, supervisorURL string) (Proxy, error) {
p.fallbackSupervisorAddress = u.Host p.fallbackSupervisorAddress = u.Host
p.supervisorPort = u.Port() p.supervisorPort = u.Port()
return p, nil return &p, nil
} }
type proxy struct { type proxy struct {
dataDir string dataDir string
lbEnabled bool lbEnabled bool
lbServerPort int
initialSupervisorURL string initialSupervisorURL string
supervisorURL string supervisorURL string
@ -71,14 +74,12 @@ func (p *proxy) Update(addresses []string) {
if p.apiServerEnabled { if p.apiServerEnabled {
supervisorAddresses = p.setSupervisorPort(supervisorAddresses) supervisorAddresses = p.setSupervisorPort(supervisorAddresses)
} }
if p.apiServerLB != nil { if p.apiServerLB != nil {
p.apiServerLB.Update(apiServerAddresses) p.apiServerLB.Update(apiServerAddresses)
} }
if p.supervisorLB != nil { if p.supervisorLB != nil {
p.supervisorLB.Update(supervisorAddresses) p.supervisorLB.Update(supervisorAddresses)
} }
p.supervisorAddresses = supervisorAddresses p.supervisorAddresses = supervisorAddresses
} }
@ -106,7 +107,11 @@ func (p *proxy) StartAPIServerProxy(port int) error {
p.apiServerEnabled = true p.apiServerEnabled = true
if p.lbEnabled { if p.lbEnabled {
lb, err := loadbalancer.New(p.dataDir, loadbalancer.APIServerServiceName, p.apiServerURL) lbServerPort := p.lbServerPort
if lbServerPort != 0 {
lbServerPort = lbServerPort + 1
}
lb, err := loadbalancer.New(p.dataDir, loadbalancer.APIServerServiceName, p.apiServerURL, lbServerPort)
if err != nil { if err != nil {
return err return err
} }
@ -131,3 +136,7 @@ func (p *proxy) SupervisorAddresses() []string {
func (p *proxy) APIServerURL() string { func (p *proxy) APIServerURL() string {
return p.apiServerURL return p.apiServerURL
} }
func (p *proxy) IsAPIServerLBEnabled() bool {
return p.apiServerLB != nil
}

View File

@ -3,7 +3,9 @@ package agent
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"io/ioutil" "io/ioutil"
"net/url"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
@ -87,12 +89,7 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
return err return err
} }
} }
if err := setupTunnelAndRunAgent(ctx, nodeConfig, cfg, proxy); err != nil {
if err := tunnel.Setup(ctx, nodeConfig, proxy); err != nil {
return err
}
if err := agent.Agent(&nodeConfig.AgentConfig); err != nil {
return err return err
} }
@ -146,7 +143,7 @@ func Run(ctx context.Context, cfg cmds.Agent) error {
return err return err
} }
proxy, err := proxy.NewAPIProxy(!cfg.DisableLoadBalancer, cfg.DataDir, cfg.ServerURL) proxy, err := proxy.NewAPIProxy(!cfg.DisableLoadBalancer, cfg.DataDir, cfg.ServerURL, cfg.LBServerPort)
if err != nil { if err != nil {
return err return err
} }
@ -165,7 +162,6 @@ func Run(ctx context.Context, cfg cmds.Agent) error {
cfg.Token = newToken.String() cfg.Token = newToken.String()
break break
} }
systemd.SdNotify(true, "READY=1\n") systemd.SdNotify(true, "READY=1\n")
return run(ctx, cfg, proxy) return run(ctx, cfg, proxy)
} }
@ -270,3 +266,39 @@ func updateAddressLabels(agentConfig *daemonconfig.Agent, nodeLabels map[string]
result = labels.Merge(nodeLabels, result) result = labels.Merge(nodeLabels, result)
return result, !equality.Semantic.DeepEqual(nodeLabels, result) return result, !equality.Semantic.DeepEqual(nodeLabels, result)
} }
// setupTunnelAndRunAgent should start the setup tunnel before starting kubelet and kubeproxy
// there are special case for etcd agents, it will wait until it can find the apiaddress from
// the address channel and update the proxy with the servers addresses, if in rke2 we need to
// start the agent before the tunnel is setup to allow kubelet to start first and start the pods
func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error {
var agentRan bool
if cfg.ETCDAgent {
// only in rke2 run the agent before the tunnel setup and check for that later in the function
if proxy.IsAPIServerLBEnabled() {
if err := agent.Agent(&nodeConfig.AgentConfig); err != nil {
return err
}
agentRan = true
}
select {
case address := <-cfg.APIAddressCh:
cfg.ServerURL = address
u, err := url.Parse(cfg.ServerURL)
if err != nil {
logrus.Warn(err)
}
proxy.Update([]string{fmt.Sprintf("%s:%d", u.Hostname(), nodeConfig.ServerHTTPSPort)})
case <-ctx.Done():
return ctx.Err()
}
}
if err := tunnel.Setup(ctx, nodeConfig, proxy); err != nil {
return err
}
if !agentRan {
return agent.Agent(&nodeConfig.AgentConfig)
}
return nil
}

View File

@ -75,7 +75,10 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
endpoint, _ := client.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{}) endpoint, _ := client.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{})
if endpoint != nil { if endpoint != nil {
proxy.Update(getAddresses(endpoint)) addresses := getAddresses(endpoint)
if len(addresses) > 0 {
proxy.Update(getAddresses(endpoint))
}
} }
disconnect := map[string]context.CancelFunc{} disconnect := map[string]context.CancelFunc{}

View File

@ -0,0 +1,87 @@
package apiaddresses
import (
"bytes"
"context"
"encoding/json"
"net"
"strconv"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/etcd"
"github.com/rancher/k3s/pkg/version"
controllerv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1"
etcdv3 "go.etcd.io/etcd/clientv3"
v1 "k8s.io/api/core/v1"
)
type EndpointsControllerGetter func() controllerv1.EndpointsController
func Register(ctx context.Context, runtime *config.ControlRuntime, endpoints controllerv1.EndpointsController) error {
h := &handler{
endpointsController: endpoints,
runtime: runtime,
ctx: ctx,
}
endpoints.OnChange(ctx, version.Program+"-apiserver-lb-controller", h.sync)
cl, err := etcd.GetClient(h.ctx, h.runtime, "https://127.0.0.1:2379")
if err != nil {
return err
}
h.etcdClient = cl
return nil
}
type handler struct {
endpointsController controllerv1.EndpointsController
runtime *config.ControlRuntime
ctx context.Context
etcdClient *etcdv3.Client
}
// This controller will update the version.program/apiaddresses etcd key with a list of
// api addresses endpoints found in the kubernetes service in the default namespace
func (h *handler) sync(key string, endpoint *v1.Endpoints) (*v1.Endpoints, error) {
if endpoint == nil {
return nil, nil
}
if endpoint.Namespace != "default" && endpoint.Name != "kubernetes" {
return nil, nil
}
w := &bytes.Buffer{}
if err := json.NewEncoder(w).Encode(getAddresses(endpoint)); err != nil {
return nil, err
}
_, err := h.etcdClient.Put(h.ctx, etcd.AddressKey, w.String())
if err != nil {
return nil, err
}
return endpoint, nil
}
func getAddresses(endpoint *v1.Endpoints) []string {
serverAddresses := []string{}
if endpoint == nil {
return serverAddresses
}
for _, subset := range endpoint.Subsets {
var port string
if len(subset.Ports) > 0 {
port = strconv.Itoa(int(subset.Ports[0].Port))
}
if port == "" {
port = "443"
}
for _, address := range subset.Addresses {
serverAddresses = append(serverAddresses, net.JoinHostPort(address.IP, port))
}
}
return serverAddresses
}

View File

@ -14,7 +14,10 @@ type Agent struct {
TokenFile string TokenFile string
ClusterSecret string ClusterSecret string
ServerURL string ServerURL string
APIAddressCh chan string
DisableLoadBalancer bool DisableLoadBalancer bool
ETCDAgent bool
LBServerPort int
ResolvConf string ResolvConf string
DataDir string DataDir string
NodeIP string NodeIP string
@ -155,6 +158,14 @@ var (
Destination: &AgentConfig.EnableSELinux, Destination: &AgentConfig.EnableSELinux,
EnvVar: version.ProgramUpper + "_SELINUX", EnvVar: version.ProgramUpper + "_SELINUX",
} }
LBServerPort = cli.IntFlag{
Name: "lb-server-port",
Usage: "(agent/node) Internal Loadbalancer port",
Hidden: false,
Destination: &AgentConfig.LBServerPort,
EnvVar: version.ProgramUpper + "_LB_SERVER_PORT",
Value: 0,
}
) )
func CheckSELinuxFlags(ctx *cli.Context) error { func CheckSELinuxFlags(ctx *cli.Context) error {

View File

@ -1,7 +0,0 @@
// +build !no_etcd
package cmds
const (
hideClusterFlags = false
)

View File

@ -12,6 +12,7 @@ const (
defaultSnapshotRentention = 5 defaultSnapshotRentention = 5
defaultSnapshotIntervalHours = 12 defaultSnapshotIntervalHours = 12
hideClusterFlags = true
) )
type Server struct { type Server struct {
@ -55,6 +56,9 @@ type Server struct {
DisableCCM bool DisableCCM bool
DisableNPC bool DisableNPC bool
DisableKubeProxy bool DisableKubeProxy bool
DisableAPIServer bool
DisableControllerManager bool
DisableETCD bool
ClusterInit bool ClusterInit bool
ClusterReset bool ClusterReset bool
ClusterResetRestorePath string ClusterResetRestorePath string
@ -262,6 +266,24 @@ func NewServerCommand(action func(*cli.Context) error) cli.Command {
Usage: "(components) Disable " + version.Program + " default network policy controller", Usage: "(components) Disable " + version.Program + " default network policy controller",
Destination: &ServerConfig.DisableNPC, Destination: &ServerConfig.DisableNPC,
}, },
cli.BoolFlag{
Name: "disable-api-server",
Hidden: hideClusterFlags,
Usage: "(experimental/components) Disable running api server",
Destination: &ServerConfig.DisableAPIServer,
},
cli.BoolFlag{
Name: "disable-controller-manager",
Hidden: hideClusterFlags,
Usage: "(experimental/components) Disable running kube-controller-manager",
Destination: &ServerConfig.DisableControllerManager,
},
cli.BoolFlag{
Name: "disable-etcd",
Hidden: hideClusterFlags,
Usage: "(experimental/components) Disable running etcd",
Destination: &ServerConfig.DisableETCD,
},
NodeNameFlag, NodeNameFlag,
WithNodeIDFlag, WithNodeIDFlag,
NodeLabels, NodeLabels,

View File

@ -7,6 +7,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
systemd "github.com/coreos/go-systemd/daemon" systemd "github.com/coreos/go-systemd/daemon"
"github.com/erikdubbelboer/gspt" "github.com/erikdubbelboer/gspt"
@ -14,6 +15,7 @@ import (
"github.com/rancher/k3s/pkg/agent" "github.com/rancher/k3s/pkg/agent"
"github.com/rancher/k3s/pkg/cli/cmds" "github.com/rancher/k3s/pkg/cli/cmds"
"github.com/rancher/k3s/pkg/datadir" "github.com/rancher/k3s/pkg/datadir"
"github.com/rancher/k3s/pkg/etcd"
"github.com/rancher/k3s/pkg/netutil" "github.com/rancher/k3s/pkg/netutil"
"github.com/rancher/k3s/pkg/rootless" "github.com/rancher/k3s/pkg/rootless"
"github.com/rancher/k3s/pkg/server" "github.com/rancher/k3s/pkg/server"
@ -31,6 +33,10 @@ import (
_ "github.com/mattn/go-sqlite3" // ensure we have sqlite _ "github.com/mattn/go-sqlite3" // ensure we have sqlite
) )
const (
lbServerPort = 6444
)
func Run(app *cli.Context) error { func Run(app *cli.Context) error {
if err := cmds.InitLogging(); err != nil { if err := cmds.InitLogging(); err != nil {
return err return err
@ -86,7 +92,6 @@ func run(app *cli.Context, cfg *cmds.Server) error {
serverConfig.ControlConfig.DataDir = cfg.DataDir serverConfig.ControlConfig.DataDir = cfg.DataDir
serverConfig.ControlConfig.KubeConfigOutput = cfg.KubeConfigOutput serverConfig.ControlConfig.KubeConfigOutput = cfg.KubeConfigOutput
serverConfig.ControlConfig.KubeConfigMode = cfg.KubeConfigMode serverConfig.ControlConfig.KubeConfigMode = cfg.KubeConfigMode
serverConfig.ControlConfig.NoScheduler = cfg.DisableScheduler
serverConfig.Rootless = cfg.Rootless serverConfig.Rootless = cfg.Rootless
serverConfig.ControlConfig.SANs = knownIPs(cfg.TLSSan) serverConfig.ControlConfig.SANs = knownIPs(cfg.TLSSan)
serverConfig.ControlConfig.BindAddress = cfg.BindAddress serverConfig.ControlConfig.BindAddress = cfg.BindAddress
@ -109,6 +114,10 @@ func run(app *cli.Context, cfg *cmds.Server) error {
serverConfig.ControlConfig.DisableCCM = cfg.DisableCCM serverConfig.ControlConfig.DisableCCM = cfg.DisableCCM
serverConfig.ControlConfig.DisableNPC = cfg.DisableNPC serverConfig.ControlConfig.DisableNPC = cfg.DisableNPC
serverConfig.ControlConfig.DisableKubeProxy = cfg.DisableKubeProxy serverConfig.ControlConfig.DisableKubeProxy = cfg.DisableKubeProxy
serverConfig.ControlConfig.DisableETCD = cfg.DisableETCD
serverConfig.ControlConfig.DisableAPIServer = cfg.DisableAPIServer
serverConfig.ControlConfig.DisableScheduler = cfg.DisableScheduler
serverConfig.ControlConfig.DisableControllerManager = cfg.DisableControllerManager
serverConfig.ControlConfig.ClusterInit = cfg.ClusterInit serverConfig.ControlConfig.ClusterInit = cfg.ClusterInit
serverConfig.ControlConfig.EncryptSecrets = cfg.EncryptSecrets serverConfig.ControlConfig.EncryptSecrets = cfg.EncryptSecrets
serverConfig.ControlConfig.EtcdSnapshotCron = cfg.EtcdSnapshotCron serverConfig.ControlConfig.EtcdSnapshotCron = cfg.EtcdSnapshotCron
@ -117,7 +126,7 @@ func run(app *cli.Context, cfg *cmds.Server) error {
serverConfig.ControlConfig.EtcdDisableSnapshots = cfg.EtcdDisableSnapshots serverConfig.ControlConfig.EtcdDisableSnapshots = cfg.EtcdDisableSnapshots
if cfg.ClusterResetRestorePath != "" && !cfg.ClusterReset { if cfg.ClusterResetRestorePath != "" && !cfg.ClusterReset {
return errors.New("Invalid flag use. --cluster-reset required with --cluster-reset-restore-path") return errors.New("invalid flag use. --cluster-reset required with --cluster-reset-restore-path")
} }
serverConfig.ControlConfig.ClusterReset = cfg.ClusterReset serverConfig.ControlConfig.ClusterReset = cfg.ClusterReset
@ -127,6 +136,13 @@ func run(app *cli.Context, cfg *cmds.Server) error {
serverConfig.ControlConfig.SupervisorPort = serverConfig.ControlConfig.HTTPSPort serverConfig.ControlConfig.SupervisorPort = serverConfig.ControlConfig.HTTPSPort
} }
if serverConfig.ControlConfig.DisableAPIServer {
serverConfig.ControlConfig.APIServerPort = lbServerPort
if serverConfig.ControlConfig.SupervisorPort != serverConfig.ControlConfig.HTTPSPort {
serverConfig.ControlConfig.APIServerPort = lbServerPort + 1
}
}
if cmds.AgentConfig.FlannelIface != "" && cmds.AgentConfig.NodeIP == "" { if cmds.AgentConfig.FlannelIface != "" && cmds.AgentConfig.NodeIP == "" {
cmds.AgentConfig.NodeIP = netutil.GetIPFromInterface(cmds.AgentConfig.FlannelIface) cmds.AgentConfig.NodeIP = netutil.GetIPFromInterface(cmds.AgentConfig.FlannelIface)
} }
@ -238,13 +254,19 @@ func run(app *cli.Context, cfg *cmds.Server) error {
os.Unsetenv("NOTIFY_SOCKET") os.Unsetenv("NOTIFY_SOCKET")
ctx := signals.SetupSignalHandler(context.Background()) ctx := signals.SetupSignalHandler(context.Background())
if err := server.StartServer(ctx, &serverConfig); err != nil { if err := server.StartServer(ctx, &serverConfig); err != nil {
return err return err
} }
go func() { go func() {
<-serverConfig.ControlConfig.Runtime.APIServerReady if !serverConfig.ControlConfig.DisableAPIServer {
logrus.Info("Kube API server is now running") <-serverConfig.ControlConfig.Runtime.APIServerReady
logrus.Info("Kube API server is now running")
} else {
<-serverConfig.ControlConfig.Runtime.ETCDReady
logrus.Info("ETCD server is now running")
}
logrus.Info(version.Program + " is up and running") logrus.Info(version.Program + " is up and running")
if notifySocket != "" { if notifySocket != "" {
os.Setenv("NOTIFY_SOCKET", notifySocket) os.Setenv("NOTIFY_SOCKET", notifySocket)
@ -273,13 +295,24 @@ func run(app *cli.Context, cfg *cmds.Server) error {
agentConfig.DataDir = filepath.Dir(serverConfig.ControlConfig.DataDir) agentConfig.DataDir = filepath.Dir(serverConfig.ControlConfig.DataDir)
agentConfig.ServerURL = url agentConfig.ServerURL = url
agentConfig.Token = token agentConfig.Token = token
agentConfig.DisableLoadBalancer = true agentConfig.DisableLoadBalancer = !serverConfig.ControlConfig.DisableAPIServer
agentConfig.ETCDAgent = serverConfig.ControlConfig.DisableAPIServer
agentConfig.Rootless = cfg.Rootless agentConfig.Rootless = cfg.Rootless
if agentConfig.Rootless { if agentConfig.Rootless {
// let agent specify Rootless kubelet flags, but not unshare twice // let agent specify Rootless kubelet flags, but not unshare twice
agentConfig.RootlessAlreadyUnshared = true agentConfig.RootlessAlreadyUnshared = true
} }
if serverConfig.ControlConfig.DisableAPIServer {
// setting LBServerPort to a prespecified port to initialize the kubeconfigs with the right address
agentConfig.LBServerPort = lbServerPort
// initialize the apiAddress Channel for receiving the api address from etcd
agentConfig.APIAddressCh = make(chan string, 1)
setAPIAddressChannel(ctx, &serverConfig, &agentConfig)
defer close(agentConfig.APIAddressCh)
}
return agent.Run(ctx, agentConfig) return agent.Run(ctx, agentConfig)
} }
@ -304,3 +337,30 @@ func getArgValueFromList(searchArg string, argList []string) string {
} }
return value return value
} }
// setAPIAddressChannel will try to get the api address key from etcd and when it succeed it will
// set the APIAddressCh channel with its value, the function works for both k3s and rke2 in case
// of k3s we block returning back to the agent.Run until we get the api address, however in rke2
// the code will not block operation and will run the operation in a goroutine
func setAPIAddressChannel(ctx context.Context, serverConfig *server.Config, agentConfig *cmds.Agent) {
// start a goroutine to check for the server ip if set from etcd in case of rke2
if serverConfig.ControlConfig.HTTPSPort != serverConfig.ControlConfig.SupervisorPort {
go getAPIAddressFromEtcd(ctx, serverConfig, agentConfig)
return
}
getAPIAddressFromEtcd(ctx, serverConfig, agentConfig)
}
func getAPIAddressFromEtcd(ctx context.Context, serverConfig *server.Config, agentConfig *cmds.Agent) {
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for range t.C {
serverAddress, err := etcd.GetAPIServerURLFromETCD(ctx, &serverConfig.ControlConfig)
if err == nil {
agentConfig.ServerURL = "https://" + serverAddress
agentConfig.APIAddressCh <- agentConfig.ServerURL
break
}
logrus.Warn(err)
}
}

View File

@ -2,14 +2,17 @@ package cluster
import ( import (
"context" "context"
"net/url"
"strings" "strings"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/rancher/k3s/pkg/clientaccess" "github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/cluster/managed" "github.com/rancher/k3s/pkg/cluster/managed"
"github.com/rancher/k3s/pkg/daemons/config" "github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/etcd"
"github.com/rancher/kine/pkg/client" "github.com/rancher/kine/pkg/client"
"github.com/rancher/kine/pkg/endpoint" "github.com/rancher/kine/pkg/endpoint"
"github.com/sirupsen/logrus"
) )
type Cluster struct { type Cluster struct {
@ -34,6 +37,37 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
return nil, errors.Wrap(err, "init cluster datastore and https") return nil, errors.Wrap(err, "init cluster datastore and https")
} }
if c.config.DisableETCD {
ready := make(chan struct{})
defer close(ready)
// try to get /db/info urls first before attempting to use join url
clientURLs, _, err := etcd.ClientURLs(ctx, c.clientAccessInfo, c.config.PrivateIP)
if err != nil {
return nil, err
}
if len(clientURLs) < 1 {
clientURL, err := url.Parse(c.config.JoinURL)
if err != nil {
return nil, err
}
clientURL.Host = clientURL.Hostname() + ":2379"
clientURLs = append(clientURLs, clientURL.String())
}
etcdProxy, err := etcd.NewETCDProxy(true, c.config.DataDir, clientURLs[0])
if err != nil {
return nil, err
}
c.setupEtcdProxy(ctx, etcdProxy)
// remove etcd member if it exists
if err := c.managedDB.RemoveSelf(ctx); err != nil {
logrus.Warnf("Failed to remove this node from etcd members")
}
return ready, nil
}
// start managed database (if necessary) // start managed database (if necessary)
if err := c.start(ctx); err != nil { if err := c.start(ctx); err != nil {
return nil, errors.Wrap(err, "start managed database") return nil, errors.Wrap(err, "start managed database")
@ -42,24 +76,14 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
// get the wait channel for testing managed database readiness // get the wait channel for testing managed database readiness
ready, err := c.testClusterDB(ctx) ready, err := c.testClusterDB(ctx)
if err != nil { if err != nil {
return nil, err if c.shouldBootstrap {
} if err := c.bootstrapped(); err != nil {
return nil, err
// if necessary, store bootstrap data to datastore }
if c.saveBootstrap {
if err := c.save(ctx); err != nil {
return nil, err
} }
} }
// if necessary, record successful bootstrap
if c.shouldBootstrap {
if err := c.bootstrapped(); err != nil {
return nil, err
}
}
return ready, c.startStorage(ctx) return ready, c.startStorage(ctx)
} }
// startStorage starts the kine listener and configures the endpoints, if necessary. // startStorage starts the kine listener and configures the endpoints, if necessary.

View File

@ -127,3 +127,23 @@ func (c *Cluster) assignManagedDriver(ctx context.Context) error {
return nil return nil
} }
// setupEtcdProxy
func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) {
if c.managedDB == nil {
return
}
go func() {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
for range t.C {
newAddresses, err := c.managedDB.GetMembersClientURLs(ctx)
if err != nil {
logrus.Warnf("failed to get etcd client URLs: %v", err)
continue
}
etcdProxy.Update(newAddresses)
}
}()
}

View File

@ -21,6 +21,8 @@ type Driver interface {
Test(ctx context.Context) error Test(ctx context.Context) error
Restore(ctx context.Context) error Restore(ctx context.Context) error
EndpointName() string EndpointName() string
GetMembersClientURLs(ctx context.Context) ([]string, error)
RemoveSelf(ctx context.Context) error
} }
func RegisterDriver(d Driver) { func RegisterDriver(d Driver) {

View File

@ -26,7 +26,6 @@ func Agent(config *config.Agent) error {
logs.InitLogs() logs.InitLogs()
defer logs.FlushLogs() defer logs.FlushLogs()
if err := startKubelet(config); err != nil { if err := startKubelet(config); err != nil {
return err return err
} }

View File

@ -37,6 +37,7 @@ type Node struct {
AgentConfig Agent AgentConfig Agent
CACerts []byte CACerts []byte
Certificate *tls.Certificate Certificate *tls.Certificate
ServerHTTPSPort int
} }
type Containerd struct { type Containerd struct {
@ -111,7 +112,6 @@ type Control struct {
Skips map[string]bool Skips map[string]bool
Disables map[string]bool Disables map[string]bool
Datastore endpoint.Config Datastore endpoint.Config
NoScheduler bool
ExtraAPIArgs []string ExtraAPIArgs []string
ExtraControllerArgs []string ExtraControllerArgs []string
ExtraCloudControllerArgs []string ExtraCloudControllerArgs []string
@ -124,6 +124,10 @@ type Control struct {
DisableCCM bool DisableCCM bool
DisableNPC bool DisableNPC bool
DisableKubeProxy bool DisableKubeProxy bool
DisableAPIServer bool
DisableControllerManager bool
DisableScheduler bool
DisableETCD bool
ClusterInit bool ClusterInit bool
ClusterReset bool ClusterReset bool
ClusterResetRestorePath string ClusterResetRestorePath string

View File

@ -93,15 +93,20 @@ func Server(ctx context.Context, cfg *config.Control) error {
cfg.Runtime.Tunnel = setupTunnel() cfg.Runtime.Tunnel = setupTunnel()
util.DisableProxyHostnameCheck = true util.DisableProxyHostnameCheck = true
auth, handler, err := apiServer(ctx, cfg, runtime) var auth authenticator.Request
if err != nil { var handler http.Handler
return err var err error
}
if err := waitForAPIServerInBackground(ctx, runtime); err != nil { if !cfg.DisableAPIServer {
return err auth, handler, err = apiServer(ctx, cfg, runtime)
} if err != nil {
return err
}
if err := waitForAPIServerInBackground(ctx, runtime); err != nil {
return err
}
}
basicAuth, err := basicAuthenticator(runtime.PasswdFile) basicAuth, err := basicAuthenticator(runtime.PasswdFile)
if err != nil { if err != nil {
return err return err
@ -110,14 +115,15 @@ func Server(ctx context.Context, cfg *config.Control) error {
runtime.Authenticator = combineAuthenticators(basicAuth, auth) runtime.Authenticator = combineAuthenticators(basicAuth, auth)
runtime.Handler = handler runtime.Handler = handler
if !cfg.NoScheduler { if !cfg.DisableScheduler {
if err := scheduler(cfg, runtime); err != nil { if err := scheduler(cfg, runtime); err != nil {
return err return err
} }
} }
if !cfg.DisableControllerManager {
if err := controllerManager(cfg, runtime); err != nil { if err := controllerManager(cfg, runtime); err != nil {
return err return err
}
} }
if !cfg.DisableCCM { if !cfg.DisableCCM {
@ -935,7 +941,7 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control, runtime *c
select { select {
case <-ctx.Done(): case <-ctx.Done():
logrus.Fatalf("cloud-controller-manager context canceled: %v", ctx.Err()) logrus.Fatalf("cloud-controller-manager context canceled: %v", ctx.Err())
case <-time.After(time.Second): case <-time.After(5 * time.Second):
continue continue
} }
} }

View File

@ -11,8 +11,8 @@ import (
) )
const ( const (
nodeID = "etcd.k3s.cattle.io/node-name" NodeID = "etcd.k3s.cattle.io/node-name"
nodeAddress = "etcd.k3s.cattle.io/node-address" NodeAddress = "etcd.k3s.cattle.io/node-address"
master = "node-role.kubernetes.io/master" master = "node-role.kubernetes.io/master"
etcdRole = "node-role.kubernetes.io/etcd" etcdRole = "node-role.kubernetes.io/etcd"
) )
@ -55,10 +55,11 @@ func (h *handler) sync(key string, node *v1.Node) (*v1.Node, error) {
} }
func (h *handler) handleSelf(node *v1.Node) (*v1.Node, error) { func (h *handler) handleSelf(node *v1.Node) (*v1.Node, error) {
if node.Annotations[nodeID] == h.etcd.name && if node.Annotations[NodeID] == h.etcd.name &&
node.Annotations[nodeAddress] == h.etcd.address && node.Annotations[NodeAddress] == h.etcd.address &&
node.Labels[etcdRole] == "true" && node.Labels[etcdRole] == "true" &&
node.Labels[master] == "true" { node.Labels[master] == "true" ||
h.etcd.config.DisableETCD {
return node, nil return node, nil
} }
@ -66,8 +67,8 @@ func (h *handler) handleSelf(node *v1.Node) (*v1.Node, error) {
if node.Annotations == nil { if node.Annotations == nil {
node.Annotations = map[string]string{} node.Annotations = map[string]string{}
} }
node.Annotations[nodeID] = h.etcd.name node.Annotations[NodeID] = h.etcd.name
node.Annotations[nodeAddress] = h.etcd.address node.Annotations[NodeAddress] = h.etcd.address
node.Labels[etcdRole] = "true" node.Labels[etcdRole] = "true"
node.Labels[master] = "true" node.Labels[master] = "true"
@ -79,11 +80,10 @@ func (h *handler) onRemove(key string, node *v1.Node) (*v1.Node, error) {
return node, nil return node, nil
} }
id := node.Annotations[nodeID] id := node.Annotations[NodeID]
address := node.Annotations[nodeAddress] address, ok := node.Annotations[NodeAddress]
if address == "" { if !ok {
return node, nil return node, nil
} }
return node, h.etcd.removePeer(h.ctx, id, address, false)
return node, h.etcd.removePeer(h.ctx, id, address)
} }

View File

@ -58,14 +58,18 @@ func NewETCD() *ETCD {
} }
} }
var learnerProgressKey = version.Program + "/etcd/learnerProgress" var (
learnerProgressKey = version.Program + "/etcd/learnerProgress"
// AddressKey will contain the value of api addresses list
AddressKey = version.Program + "/apiaddresses"
)
const ( const (
snapshotPrefix = "etcd-snapshot-" snapshotPrefix = "etcd-snapshot-"
endpoint = "https://127.0.0.1:2379" endpoint = "https://127.0.0.1:2379"
testTimeout = time.Second * 10 testTimeout = time.Second * 10
manageTickerTime = time.Second * 15 manageTickerTime = time.Second * 15
learnerMaxStallTime = time.Minute * 1 learnerMaxStallTime = time.Minute * 5
// defaultDialTimeout is intentionally short so that connections timeout within the testTimeout defined above // defaultDialTimeout is intentionally short so that connections timeout within the testTimeout defined above
defaultDialTimeout = 2 * time.Second defaultDialTimeout = 2 * time.Second
@ -204,11 +208,6 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e
return errors.Wrapf(err, "configuration validation failed") return errors.Wrapf(err, "configuration validation failed")
} }
e.config.Runtime.ClusterControllerStart = func(ctx context.Context) error {
Register(ctx, e, e.config.Runtime.Core.Core().V1().Node())
return nil
}
if !e.config.EtcdDisableSnapshots { if !e.config.EtcdDisableSnapshots {
e.setSnapshotFunction(ctx) e.setSnapshotFunction(ctx)
e.cron.Start() e.cron.Start()
@ -244,12 +243,12 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e
// join attempts to add a member to an existing cluster // join attempts to add a member to an existing cluster
func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) error { func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
clientURLs, memberList, err := e.clientURLs(ctx, clientAccessInfo) clientURLs, memberList, err := ClientURLs(ctx, clientAccessInfo, e.config.PrivateIP)
if err != nil { if err != nil {
return err return err
} }
client, err := getClient(ctx, e.runtime, clientURLs...) client, err := GetClient(ctx, e.runtime, clientURLs...)
if err != nil { if err != nil {
return err return err
} }
@ -314,13 +313,13 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt
e.config = config e.config = config
e.runtime = config.Runtime e.runtime = config.Runtime
client, err := getClient(ctx, e.runtime, endpoint) client, err := GetClient(ctx, e.runtime, endpoint)
if err != nil { if err != nil {
return nil, err return nil, err
} }
e.client = client e.client = client
address, err := getAdvertiseAddress(config.PrivateIP) address, err := GetAdvertiseAddress(config.PrivateIP)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -333,6 +332,10 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt
if err := e.setName(false); err != nil { if err := e.setName(false); err != nil {
return nil, err return nil, err
} }
e.config.Runtime.ClusterControllerStart = func(ctx context.Context) error {
Register(ctx, e, e.config.Runtime.Core.Core().V1().Node())
return nil
}
return e.handler(handler), err return e.handler(handler), err
} }
@ -396,7 +399,7 @@ func (e *ETCD) infoHandler() http.Handler {
} }
// getClient returns an etcd client connected to the specified endpoints // getClient returns an etcd client connected to the specified endpoints
func getClient(ctx context.Context, runtime *config.ControlRuntime, endpoints ...string) (*etcd.Client, error) { func GetClient(ctx context.Context, runtime *config.ControlRuntime, endpoints ...string) (*etcd.Client, error) {
cfg, err := getClientConfig(ctx, runtime, endpoints...) cfg, err := getClientConfig(ctx, runtime, endpoints...)
if err != nil { if err != nil {
return nil, err return nil, err
@ -443,7 +446,7 @@ func toTLSConfig(runtime *config.ControlRuntime) (*tls.Config, error) {
} }
// getAdvertiseAddress returns the IP address best suited for advertising to clients // getAdvertiseAddress returns the IP address best suited for advertising to clients
func getAdvertiseAddress(advertiseIP string) (string, error) { func GetAdvertiseAddress(advertiseIP string) (string, error) {
ip := advertiseIP ip := advertiseIP
if ip == "" { if ip == "" {
ipAddr, err := utilnet.ChooseHostInterface() ipAddr, err := utilnet.ChooseHostInterface()
@ -506,7 +509,7 @@ func (e *ETCD) cluster(ctx context.Context, forceNew bool, options executor.Init
} }
// removePeer removes a peer from the cluster. The peer ID and IP address must both match. // removePeer removes a peer from the cluster. The peer ID and IP address must both match.
func (e *ETCD) removePeer(ctx context.Context, id, address string) error { func (e *ETCD) removePeer(ctx context.Context, id, address string, removeSelf bool) error {
members, err := e.client.MemberList(ctx) members, err := e.client.MemberList(ctx)
if err != nil { if err != nil {
return err return err
@ -522,8 +525,8 @@ func (e *ETCD) removePeer(ctx context.Context, id, address string) error {
return err return err
} }
if u.Hostname() == address { if u.Hostname() == address {
if e.address == address { if e.address == address && !removeSelf {
logrus.Fatalf("node has been delete from the cluster. Backup and delete ${datadir}/server/db if you like to rejoin the node") return errors.New("node has been deleted from the cluster")
} }
logrus.Infof("Removing name=%s id=%d address=%s from etcd", member.Name, member.ID, address) logrus.Infof("Removing name=%s id=%d address=%s from etcd", member.Name, member.ID, address)
_, err := e.client.MemberRemove(ctx, member.ID) _, err := e.client.MemberRemove(ctx, member.ID)
@ -669,7 +672,7 @@ func (e *ETCD) setLearnerProgress(ctx context.Context, status *learnerProgress)
} }
// clientURLs returns a list of all non-learner etcd cluster member client access URLs // clientURLs returns a list of all non-learner etcd cluster member client access URLs
func (e *ETCD) clientURLs(ctx context.Context, clientAccessInfo *clientaccess.Info) ([]string, Members, error) { func ClientURLs(ctx context.Context, clientAccessInfo *clientaccess.Info, selfIP string) ([]string, Members, error) {
var memberList Members var memberList Members
resp, err := clientaccess.Get("/db/info", clientAccessInfo) resp, err := clientaccess.Get("/db/info", clientAccessInfo)
if err != nil { if err != nil {
@ -679,13 +682,22 @@ func (e *ETCD) clientURLs(ctx context.Context, clientAccessInfo *clientaccess.In
if err := json.Unmarshal(resp, &memberList); err != nil { if err := json.Unmarshal(resp, &memberList); err != nil {
return nil, memberList, err return nil, memberList, err
} }
ip, err := GetAdvertiseAddress(selfIP)
if err != nil {
return nil, memberList, err
}
var clientURLs []string var clientURLs []string
members:
for _, member := range memberList.Members { for _, member := range memberList.Members {
// excluding learner member from the client list // excluding learner member from the client list
if member.IsLearner { if member.IsLearner {
continue continue
} }
for _, url := range member.ClientURLs {
if strings.Contains(url, ip) {
continue members
}
}
clientURLs = append(clientURLs, member.ClientURLs...) clientURLs = append(clientURLs, member.ClientURLs...)
} }
return clientURLs, memberList, nil return clientURLs, memberList, nil
@ -813,3 +825,54 @@ func snapshotRetention(retention int, snapshotDir string) error {
}) })
return os.Remove(filepath.Join(snapshotDir, snapshotFiles[0].Name())) return os.Remove(filepath.Join(snapshotDir, snapshotFiles[0].Name()))
} }
// GetAPIServerURLFromETCD will try to fetch the version.Program/apiaddresses key from etcd
// when it succeed it will parse the first address in the list and return back an address
func GetAPIServerURLFromETCD(ctx context.Context, cfg *config.Control) (string, error) {
if cfg.Runtime == nil {
return "", fmt.Errorf("runtime is not ready yet")
}
cl, err := GetClient(ctx, cfg.Runtime, endpoint)
if err != nil {
return "", err
}
etcdResp, err := cl.KV.Get(ctx, AddressKey)
if err != nil {
return "", err
}
if etcdResp.Count < 1 {
return "", fmt.Errorf("servers addresses are not yet set")
}
var addresses []string
if err := json.Unmarshal(etcdResp.Kvs[0].Value, &addresses); err != nil {
return "", fmt.Errorf("failed to unmarshal etcd key: %v", err)
}
return addresses[0], nil
}
// GetMembersClientURLs will list through the member lists in etcd and return
// back a combined list of client urls for each member in the cluster
func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) {
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()
members, err := e.client.MemberList(ctx)
if err != nil {
return nil, err
}
var memberUrls []string
for _, member := range members.Members {
for _, clientURL := range member.ClientURLs {
memberUrls = append(memberUrls, string(clientURL))
}
}
return memberUrls, nil
}
// RemoveSelf will remove the member if it exists in the cluster
func (e *ETCD) RemoveSelf(ctx context.Context) error {
return e.removePeer(ctx, e.name, e.address, true)
}

76
pkg/etcd/etcdproxy.go Normal file
View File

@ -0,0 +1,76 @@
package etcd
import (
"net/url"
"github.com/pkg/errors"
"github.com/rancher/k3s/pkg/agent/loadbalancer"
)
type Proxy interface {
Update(addresses []string)
ETCDURL() string
ETCDAddresses() []string
ETCDServerURL() string
}
// NewETCDProxy initializes a new proxy structure that contain a load balancer
// which listens on port 2379 and proxy between etcd cluster members
func NewETCDProxy(enabled bool, dataDir, etcdURL string) (Proxy, error) {
e := &etcdproxy{
dataDir: dataDir,
initialETCDURL: etcdURL,
etcdURL: etcdURL,
}
if enabled {
lb, err := loadbalancer.New(dataDir, loadbalancer.ETCDServerServiceName, etcdURL, 2379)
if err != nil {
return nil, err
}
e.etcdLB = lb
e.etcdLBURL = lb.LoadBalancerServerURL()
}
u, err := url.Parse(e.initialETCDURL)
if err != nil {
return nil, errors.Wrap(err, "failed to parse etcd client URL")
}
e.fallbackETCDAddress = u.Host
e.etcdPort = u.Port()
return e, nil
}
type etcdproxy struct {
dataDir string
etcdLBURL string
initialETCDURL string
etcdURL string
etcdPort string
fallbackETCDAddress string
etcdAddresses []string
etcdLB *loadbalancer.LoadBalancer
}
func (e *etcdproxy) Update(addresses []string) {
if e.etcdLB != nil {
e.etcdLB.Update(addresses)
}
}
func (e *etcdproxy) ETCDURL() string {
return e.etcdURL
}
func (e *etcdproxy) ETCDAddresses() []string {
if len(e.etcdAddresses) > 0 {
return e.etcdAddresses
}
return []string{e.fallbackETCDAddress}
}
func (e *etcdproxy) ETCDServerURL() string {
return e.etcdURL
}

104
pkg/server/etcd.go Normal file
View File

@ -0,0 +1,104 @@
package server
import (
"context"
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/rancher/k3s/pkg/etcd"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// setETCDLabelsAndAnnotations will set the etcd role label if not exists also it
// sets special annotaitons on the node object which are etcd node id and etcd node
// address, the function will also remove the controlplane and master role labels if
// they exist on the node
func setETCDLabelsAndAnnotations(ctx context.Context, config *Config) error {
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for range t.C {
controlConfig := &config.ControlConfig
sc, err := newContext(ctx, controlConfig.Runtime.KubeConfigAdmin)
if err != nil {
logrus.Infof("Failed to set etcd role label: %v", err)
continue
}
if err := stageFiles(ctx, sc, controlConfig); err != nil {
logrus.Infof("Failed to set etcd role label: %v", err)
continue
}
if err := sc.Start(ctx); err != nil {
logrus.Infof("Failed to set etcd role label: %v", err)
continue
}
controlConfig.Runtime.Core = sc.Core
nodes := sc.Core.Core().V1().Node()
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
logrus.Info("Failed to set etcd role label: node name not set")
continue
}
node, err := nodes.Get(nodeName, metav1.GetOptions{})
if err != nil {
logrus.Infof("Failed to set etcd role label: %v", err)
continue
}
if node.Labels == nil {
node.Labels = make(map[string]string)
}
// remove controlplane label if role label exists
var controlRoleLabelExists bool
if _, ok := node.Labels[MasterRoleLabelKey]; ok {
delete(node.Labels, MasterRoleLabelKey)
controlRoleLabelExists = true
}
if v, ok := node.Labels[ETCDRoleLabelKey]; ok && v == "true" && !controlRoleLabelExists {
break
}
node.Labels[ETCDRoleLabelKey] = "true"
// this is replacement to the etcd controller handleself function
if node.Annotations == nil {
node.Annotations = map[string]string{}
}
fileName := filepath.Join(controlConfig.DataDir, "db", "etcd", "name")
data, err := ioutil.ReadFile(fileName)
if err != nil {
logrus.Infof("Waiting for etcd node name file to be available: %v", err)
continue
}
etcdNodeName := string(data)
node.Annotations[etcd.NodeID] = etcdNodeName
address, err := etcd.GetAdvertiseAddress(controlConfig.PrivateIP)
if err != nil {
logrus.Infof("Waiting for etcd node address to be available: %v", err)
continue
}
node.Annotations[etcd.NodeAddress] = address
_, err = nodes.Update(node)
if err == nil {
logrus.Infof("Successfully set etcd role label and annotations on node %s", nodeName)
break
}
select {
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

View File

@ -17,6 +17,7 @@ import (
"github.com/k3s-io/helm-controller/pkg/helm" "github.com/k3s-io/helm-controller/pkg/helm"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/rancher/k3s/pkg/apiaddresses"
"github.com/rancher/k3s/pkg/clientaccess" "github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/daemons/config" "github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/daemons/control" "github.com/rancher/k3s/pkg/daemons/control"
@ -36,7 +37,10 @@ import (
"k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/net"
) )
const MasterRoleLabelKey = "node-role.kubernetes.io/master" const (
MasterRoleLabelKey = "node-role.kubernetes.io/master"
ETCDRoleLabelKey = "node-role.kubernetes.io/etcd"
)
func resolveDataDir(dataDir string) (string, error) { func resolveDataDir(dataDir string) (string, error) {
dataDir, err := datadir.Resolve(dataDir) dataDir, err := datadir.Resolve(dataDir)
@ -56,6 +60,10 @@ func StartServer(ctx context.Context, config *Config) error {
return errors.Wrap(err, "starting kubernetes") return errors.Wrap(err, "starting kubernetes")
} }
if config.ControlConfig.DisableAPIServer {
go setETCDLabelsAndAnnotations(ctx, config)
}
if err := startWrangler(ctx, config); err != nil { if err := startWrangler(ctx, config); err != nil {
return errors.Wrap(err, "starting tls server") return errors.Wrap(err, "starting tls server")
} }
@ -143,7 +151,7 @@ func runControllers(ctx context.Context, config *Config) error {
} }
} }
if !config.DisableAgent { if !config.DisableAgent {
go setMasterRoleLabel(ctx, sc.Core.Core().V1().Node()) go setMasterRoleLabel(ctx, sc.Core.Core().V1().Node(), config)
} }
go setClusterDNSConfig(ctx, config, sc.Core.Core().V1().ConfigMap()) go setClusterDNSConfig(ctx, config, sc.Core.Core().V1().ConfigMap())
@ -188,6 +196,10 @@ func masterControllers(ctx context.Context, sc *Context, config *Config) error {
return err return err
} }
if err := apiaddresses.Register(ctx, config.ControlConfig.Runtime, sc.Core.Core().V1().Endpoints()); err != nil {
return err
}
if config.Rootless { if config.Rootless {
return rootlessports.Register(ctx, sc.Core.Core().V1().Service(), !config.DisableServiceLB, config.ControlConfig.HTTPSPort) return rootlessports.Register(ctx, sc.Core.Core().V1().Service(), !config.DisableServiceLB, config.ControlConfig.HTTPSPort)
} }
@ -423,7 +435,7 @@ func isSymlink(config string) bool {
return false return false
} }
func setMasterRoleLabel(ctx context.Context, nodes v1.NodeClient) error { func setMasterRoleLabel(ctx context.Context, nodes v1.NodeClient, config *Config) error {
for { for {
nodeName := os.Getenv("NODE_NAME") nodeName := os.Getenv("NODE_NAME")
node, err := nodes.Get(nodeName, metav1.GetOptions{}) node, err := nodes.Get(nodeName, metav1.GetOptions{})
@ -432,13 +444,22 @@ func setMasterRoleLabel(ctx context.Context, nodes v1.NodeClient) error {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
continue continue
} }
if v, ok := node.Labels[MasterRoleLabelKey]; ok && v == "true" { // remove etcd label if etcd is disabled
var etcdRoleLabelExists bool
if config.ControlConfig.DisableETCD {
if _, ok := node.Labels[ETCDRoleLabelKey]; ok {
delete(node.Labels, ETCDRoleLabelKey)
etcdRoleLabelExists = true
}
}
if v, ok := node.Labels[MasterRoleLabelKey]; ok && v == "true" && !etcdRoleLabelExists {
break break
} }
if node.Labels == nil { if node.Labels == nil {
node.Labels = make(map[string]string) node.Labels = make(map[string]string)
} }
node.Labels[MasterRoleLabelKey] = "true" node.Labels[MasterRoleLabelKey] = "true"
_, err = nodes.Update(node) _, err = nodes.Update(node)
if err == nil { if err == nil {
logrus.Infof("Master role label has been set successfully on node: %s", nodeName) logrus.Infof("Master role label has been set successfully on node: %s", nodeName)

View File

@ -32,7 +32,7 @@ func (m *memory) Get() (*v1.Secret, error) {
} }
func (m *memory) Update(secret *v1.Secret) error { func (m *memory) Update(secret *v1.Secret) error {
if m.secret == nil || m.secret.ResourceVersion != secret.ResourceVersion { if m.secret == nil || m.secret.ResourceVersion == "" || m.secret.ResourceVersion != secret.ResourceVersion {
if m.storage != nil { if m.storage != nil {
if err := m.storage.Update(secret); err != nil { if err := m.storage.Update(secret); err != nil {
return err return err

2
vendor/modules.txt vendored
View File

@ -822,7 +822,7 @@ github.com/prometheus/procfs/internal/util
# github.com/rakelkar/gonetsh v0.0.0-20190930180311-e5c5ffe4bdf0 # github.com/rakelkar/gonetsh v0.0.0-20190930180311-e5c5ffe4bdf0
github.com/rakelkar/gonetsh/netroute github.com/rakelkar/gonetsh/netroute
github.com/rakelkar/gonetsh/netsh github.com/rakelkar/gonetsh/netsh
# github.com/rancher/dynamiclistener v0.2.1 # github.com/rancher/dynamiclistener v0.2.2
## explicit ## explicit
github.com/rancher/dynamiclistener github.com/rancher/dynamiclistener
github.com/rancher/dynamiclistener/cert github.com/rancher/dynamiclistener/cert