Merge pull request #1757 from ibuildthecloud/separate-port

Add supervisor port
pull/1765/head
Darren Shepherd 2020-05-06 21:32:45 -07:00 committed by GitHub
commit e5fe184a44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 219 additions and 79 deletions

View File

@ -20,6 +20,7 @@ import (
"time"
"github.com/pkg/errors"
"github.com/rancher/k3s/pkg/agent/proxy"
"github.com/rancher/k3s/pkg/cli/cmds"
"github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/daemons/config"
@ -33,9 +34,9 @@ const (
DefaultPodManifestPath = "pod-manifests"
)
func Get(ctx context.Context, agent cmds.Agent) *config.Node {
func Get(ctx context.Context, agent cmds.Agent, proxy proxy.Proxy) *config.Node {
for {
agentConfig, err := get(&agent)
agentConfig, err := get(&agent, proxy)
if err != nil {
logrus.Error(err)
select {
@ -289,17 +290,12 @@ func locateOrGenerateResolvConf(envInfo *cmds.Agent) string {
return tmpConf
}
func get(envInfo *cmds.Agent) (*config.Node, error) {
func get(envInfo *cmds.Agent, proxy proxy.Proxy) (*config.Node, error) {
if envInfo.Debug {
logrus.SetLevel(logrus.DebugLevel)
}
serverURLParsed, err := url.Parse(envInfo.ServerURL)
if err != nil {
return nil, err
}
info, err := clientaccess.ParseAndValidateToken(envInfo.ServerURL, envInfo.Token)
info, err := clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), envInfo.Token)
if err != nil {
return nil, err
}
@ -309,6 +305,12 @@ func get(envInfo *cmds.Agent) (*config.Node, error) {
return nil, err
}
if controlConfig.SupervisorPort != controlConfig.HTTPSPort {
if err := proxy.StartAPIServerProxy(controlConfig.HTTPSPort); err != nil {
return nil, errors.Wrapf(err, "failed to setup access to API Server port %d on at %s", controlConfig.HTTPSPort, proxy.SupervisorURL())
}
}
var flannelIface *sysnet.Interface
if !envInfo.NoFlannel && len(envInfo.FlannelIface) > 0 {
flannelIface, err = sysnet.InterfaceByName(envInfo.FlannelIface)
@ -368,7 +370,7 @@ func get(envInfo *cmds.Agent) (*config.Node, error) {
}
kubeconfigKubelet := filepath.Join(envInfo.DataDir, "kubelet.kubeconfig")
if err := control.KubeConfig(kubeconfigKubelet, info.URL, serverCAFile, clientKubeletCert, clientKubeletKey); err != nil {
if err := control.KubeConfig(kubeconfigKubelet, proxy.APIServerURL(), serverCAFile, clientKubeletCert, clientKubeletKey); err != nil {
return nil, err
}
@ -379,7 +381,7 @@ func get(envInfo *cmds.Agent) (*config.Node, error) {
}
kubeconfigKubeproxy := filepath.Join(envInfo.DataDir, "kubeproxy.kubeconfig")
if err := control.KubeConfig(kubeconfigKubeproxy, info.URL, serverCAFile, clientKubeProxyCert, clientKubeProxyKey); err != nil {
if err := control.KubeConfig(kubeconfigKubeproxy, proxy.APIServerURL(), serverCAFile, clientKubeProxyCert, clientKubeProxyKey); err != nil {
return nil, err
}
@ -390,7 +392,7 @@ func get(envInfo *cmds.Agent) (*config.Node, error) {
}
kubeconfigK3sController := filepath.Join(envInfo.DataDir, "k3scontroller.kubeconfig")
if err := control.KubeConfig(kubeconfigK3sController, info.URL, serverCAFile, clientK3sControllerCert, clientK3sControllerKey); err != nil {
if err := control.KubeConfig(kubeconfigK3sController, proxy.APIServerURL(), serverCAFile, clientK3sControllerCert, clientK3sControllerKey); err != nil {
return nil, err
}
@ -432,7 +434,6 @@ func get(envInfo *cmds.Agent) (*config.Node, error) {
nodeConfig.Containerd.State = "/run/k3s/containerd"
nodeConfig.Containerd.Address = filepath.Join(nodeConfig.Containerd.State, "containerd.sock")
nodeConfig.Containerd.Template = filepath.Join(envInfo.DataDir, "etc/containerd/config.toml.tmpl")
nodeConfig.ServerAddress = serverURLParsed.Host
nodeConfig.Certificate = servingCert
if nodeConfig.FlannelBackend == config.FlannelBackendNone {

View File

@ -8,7 +8,6 @@ import (
"sync"
"github.com/google/tcpproxy"
"github.com/rancher/k3s/pkg/cli/cmds"
"github.com/sirupsen/logrus"
)
@ -29,14 +28,11 @@ type LoadBalancer struct {
}
const (
serviceName = "k3s-agent-load-balancer"
SupervisorServiceName = "k3s-agent-load-balancer"
APIServerServiceName = "k3s-api-server-agent-load-balancer"
)
func Setup(ctx context.Context, cfg cmds.Agent) (_lb *LoadBalancer, _err error) {
if cfg.DisableLoadBalancer {
return nil, nil
}
func New(dataDir, serviceName, serverURL string) (_lb *LoadBalancer, _err error) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
defer func() {
if _err != nil {
@ -51,18 +47,18 @@ func Setup(ctx context.Context, cfg cmds.Agent) (_lb *LoadBalancer, _err error)
}
localAddress := listener.Addr().String()
originalServerAddress, localServerURL, err := parseURL(cfg.ServerURL, localAddress)
originalServerAddress, localServerURL, err := parseURL(serverURL, localAddress)
if err != nil {
return nil, err
}
lb := &LoadBalancer{
dialer: &net.Dialer{},
configFile: filepath.Join(cfg.DataDir, "etc", serviceName+".json"),
configFile: filepath.Join(dataDir, "etc", serviceName+".json"),
localAddress: localAddress,
localServerURL: localServerURL,
originalServerAddress: originalServerAddress,
ServerURL: cfg.ServerURL,
ServerURL: serverURL,
}
lb.setServers([]string{lb.originalServerAddress})

View File

@ -106,7 +106,7 @@ func TestFailOver(t *testing.T) {
DataDir: tmpDir,
}
lb, err := Setup(context.Background(), cfg)
lb, err := New(context.Background(), cfg.DataDir, SupervisorServiceName, cfg.ServerURL)
if err != nil {
assertEqual(t, err, nil)
}
@ -157,7 +157,7 @@ func TestFailFast(t *testing.T) {
DataDir: tmpDir,
}
lb, err := Setup(context.Background(), cfg)
lb, err := New(context.Background(), cfg.DataDir, SupervisorServiceName, cfg.ServerURL)
if err != nil {
assertEqual(t, err, nil)
}

133
pkg/agent/proxy/apiproxy.go Normal file
View File

@ -0,0 +1,133 @@
package proxy
import (
sysnet "net"
"net/url"
"strconv"
"github.com/sirupsen/logrus"
"github.com/pkg/errors"
"github.com/rancher/k3s/pkg/agent/loadbalancer"
)
type Proxy interface {
Update(addresses []string)
StartAPIServerProxy(port int) error
SupervisorURL() string
SupervisorAddresses() []string
APIServerURL() string
}
func NewAPIProxy(enabled bool, dataDir, supervisorURL string) (Proxy, error) {
p := &proxy{
lbEnabled: enabled,
dataDir: dataDir,
initialSupervisorURL: supervisorURL,
supervisorURL: supervisorURL,
apiServerURL: supervisorURL,
}
if enabled {
lb, err := loadbalancer.New(dataDir, loadbalancer.SupervisorServiceName, supervisorURL)
if err != nil {
return nil, err
}
p.supervisorLB = lb
p.supervisorURL = lb.LoadBalancerServerURL()
p.apiServerURL = p.supervisorURL
}
u, err := url.Parse(p.initialSupervisorURL)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse %s", p.initialSupervisorURL)
}
p.fallbackSupervisorAddress = u.Host
p.supervisorPort = u.Port()
return p, nil
}
type proxy struct {
dataDir string
lbEnabled bool
initialSupervisorURL string
supervisorURL string
supervisorPort string
fallbackSupervisorAddress string
supervisorAddresses []string
supervisorLB *loadbalancer.LoadBalancer
apiServerURL string
apiServerLB *loadbalancer.LoadBalancer
apiServerEnabled bool
}
func (p *proxy) Update(addresses []string) {
apiServerAddresses := addresses
supervisorAddresses := addresses
if p.apiServerEnabled {
supervisorAddresses = p.setSupervisorPort(supervisorAddresses)
}
if p.apiServerLB != nil {
p.apiServerLB.Update(apiServerAddresses)
}
if p.supervisorLB != nil {
p.supervisorLB.Update(supervisorAddresses)
}
p.supervisorAddresses = supervisorAddresses
}
func (p *proxy) setSupervisorPort(addresses []string) []string {
var newAddresses []string
for _, address := range addresses {
h, _, err := sysnet.SplitHostPort(address)
if err != nil {
logrus.Errorf("failed to parse address %s, dropping: %v", address, err)
continue
}
newAddresses = append(newAddresses, sysnet.JoinHostPort(h, p.supervisorPort))
}
return newAddresses
}
func (p *proxy) StartAPIServerProxy(port int) error {
u, err := url.Parse(p.initialSupervisorURL)
if err != nil {
return errors.Wrapf(err, "failed to parse server URL %s", p.initialSupervisorURL)
}
u.Host = sysnet.JoinHostPort(u.Hostname(), strconv.Itoa(port))
p.apiServerURL = u.String()
p.apiServerEnabled = true
if p.lbEnabled {
lb, err := loadbalancer.New(p.dataDir, loadbalancer.APIServerServiceName, p.apiServerURL)
if err != nil {
return err
}
p.apiServerURL = lb.LoadBalancerServerURL()
p.apiServerLB = lb
}
return nil
}
func (p *proxy) SupervisorURL() string {
return p.supervisorURL
}
func (p *proxy) SupervisorAddresses() []string {
if len(p.supervisorAddresses) > 0 {
return p.supervisorAddresses
}
return []string{p.fallbackSupervisorAddress}
}
func (p *proxy) APIServerURL() string {
return p.apiServerURL
}

View File

@ -9,14 +9,12 @@ import (
"strings"
"time"
"k8s.io/apimachinery/pkg/labels"
systemd "github.com/coreos/go-systemd/daemon"
"github.com/rancher/k3s/pkg/agent/config"
"github.com/rancher/k3s/pkg/agent/containerd"
"github.com/rancher/k3s/pkg/agent/flannel"
"github.com/rancher/k3s/pkg/agent/loadbalancer"
"github.com/rancher/k3s/pkg/agent/netpol"
"github.com/rancher/k3s/pkg/agent/proxy"
"github.com/rancher/k3s/pkg/agent/syssetup"
"github.com/rancher/k3s/pkg/agent/tunnel"
"github.com/rancher/k3s/pkg/cli/cmds"
@ -28,6 +26,7 @@ import (
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/clientcmd"
@ -39,8 +38,8 @@ const (
HostnameLabel = "k3s.io/hostname"
)
func run(ctx context.Context, cfg cmds.Agent, lb *loadbalancer.LoadBalancer) error {
nodeConfig := config.Get(ctx, cfg)
func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
nodeConfig := config.Get(ctx, cfg, proxy)
if !nodeConfig.NoFlannel {
if err := flannel.Prepare(ctx, nodeConfig); err != nil {
@ -54,7 +53,7 @@ func run(ctx context.Context, cfg cmds.Agent, lb *loadbalancer.LoadBalancer) err
}
}
if err := tunnel.Setup(ctx, nodeConfig, lb.Update); err != nil {
if err := tunnel.Setup(ctx, nodeConfig, proxy); err != nil {
return err
}
@ -112,16 +111,13 @@ func Run(ctx context.Context, cfg cmds.Agent) error {
return err
}
lb, err := loadbalancer.Setup(ctx, cfg)
proxy, err := proxy.NewAPIProxy(!cfg.DisableLoadBalancer, cfg.DataDir, cfg.ServerURL)
if err != nil {
return err
}
if lb != nil {
cfg.ServerURL = lb.LoadBalancerServerURL()
}
for {
newToken, err := clientaccess.NormalizeAndValidateTokenForUser(cfg.ServerURL, cfg.Token, "node")
newToken, err := clientaccess.NormalizeAndValidateTokenForUser(proxy.SupervisorURL(), cfg.Token, "node")
if err != nil {
logrus.Error(err)
select {
@ -136,7 +132,7 @@ func Run(ctx context.Context, cfg cmds.Agent) error {
}
systemd.SdNotify(true, "READY=1\n")
return run(ctx, cfg, lb)
return run(ctx, cfg, proxy)
}
func validate() error {

View File

@ -11,6 +11,7 @@ import (
"time"
"github.com/gorilla/websocket"
"github.com/rancher/k3s/pkg/agent/proxy"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/remotedialer"
"github.com/sirupsen/logrus"
@ -50,7 +51,7 @@ func getAddresses(endpoint *v1.Endpoints) []string {
return serverAddresses
}
func Setup(ctx context.Context, config *config.Node, onChange func([]string)) error {
func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
restConfig, err := clientcmd.BuildConfigFromFlags("", config.AgentConfig.KubeConfigK3sController)
if err != nil {
return err
@ -71,20 +72,15 @@ func Setup(ctx context.Context, config *config.Node, onChange func([]string)) er
return err
}
addresses := []string{config.ServerAddress}
endpoint, _ := client.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{})
if endpoint != nil {
addresses = getAddresses(endpoint)
if onChange != nil {
onChange(addresses)
}
proxy.Update(getAddresses(endpoint))
}
disconnect := map[string]context.CancelFunc{}
wg := &sync.WaitGroup{}
for _, address := range addresses {
for _, address := range proxy.SupervisorAddresses() {
if _, ok := disconnect[address]; !ok {
disconnect[address] = connect(ctx, wg, address, tlsConfig)
}
@ -120,18 +116,14 @@ func Setup(ctx context.Context, config *config.Node, onChange func([]string)) er
}
newAddresses := getAddresses(endpoint)
if reflect.DeepEqual(newAddresses, addresses) {
if reflect.DeepEqual(newAddresses, proxy.SupervisorAddresses()) {
continue watching
}
addresses = newAddresses
logrus.Infof("Tunnel endpoint watch event: %v", addresses)
if onChange != nil {
onChange(addresses)
}
proxy.Update(newAddresses)
validEndpoint := map[string]bool{}
for _, address := range addresses {
for _, address := range proxy.SupervisorAddresses() {
validEndpoint[address] = true
if _, ok := disconnect[address]; !ok {
disconnect[address] = connect(ctx, nil, address, tlsConfig)

View File

@ -18,7 +18,13 @@ type Server struct {
ServiceCIDR string
ClusterDNS string
ClusterDomain string
// The port which kubectl clients can access k8s
HTTPSPort int
// The port which custom k3s API runs on
SupervisorPort int
// The port which kube-apiserver runs on
APIServerPort int
APIServerBindAddress string
DataDir string
DisableAgent bool
KubeConfigOutput string

View File

@ -83,7 +83,10 @@ func run(app *cli.Context, cfg *cmds.Server) error {
serverConfig.Rootless = cfg.Rootless
serverConfig.ControlConfig.SANs = knownIPs(cfg.TLSSan)
serverConfig.ControlConfig.BindAddress = cfg.BindAddress
serverConfig.ControlConfig.SupervisorPort = cfg.SupervisorPort
serverConfig.ControlConfig.HTTPSPort = cfg.HTTPSPort
serverConfig.ControlConfig.APIServerPort = cfg.APIServerPort
serverConfig.ControlConfig.APIServerBindAddress = cfg.APIServerBindAddress
serverConfig.ControlConfig.ExtraAPIArgs = cfg.ExtraAPIArgs
serverConfig.ControlConfig.ExtraControllerArgs = cfg.ExtraControllerArgs
serverConfig.ControlConfig.ExtraSchedulerAPIArgs = cfg.ExtraSchedulerArgs
@ -103,6 +106,10 @@ func run(app *cli.Context, cfg *cmds.Server) error {
serverConfig.ControlConfig.ClusterReset = cfg.ClusterReset
serverConfig.ControlConfig.EncryptSecrets = cfg.EncryptSecrets
if serverConfig.ControlConfig.SupervisorPort == 0 {
serverConfig.ControlConfig.SupervisorPort = serverConfig.ControlConfig.HTTPSPort
}
if cmds.AgentConfig.FlannelIface != "" && cmds.AgentConfig.NodeIP == "" {
cmds.AgentConfig.NodeIP = netutil.GetIPFromInterface(cmds.AgentConfig.FlannelIface)
}
@ -201,7 +208,7 @@ func run(app *cli.Context, cfg *cmds.Server) error {
ip = "127.0.0.1"
}
url := fmt.Sprintf("https://%s:%d", ip, serverConfig.ControlConfig.HTTPSPort)
url := fmt.Sprintf("https://%s:%d", ip, serverConfig.ControlConfig.SupervisorPort)
token, err := server.FormatToken(serverConfig.ControlConfig.Runtime.AgentToken, serverConfig.ControlConfig.Runtime.ServerCA)
if err != nil {
return err

View File

@ -148,7 +148,7 @@ func ParseAndValidateToken(server, token string) (*Info, error) {
}
func validateToken(u url.URL, cacerts []byte, username, password string) error {
u.Path = "/apis"
u.Path = "/cacerts"
_, err := get(u.String(), GetHTTPClient(cacerts), username, password)
if err != nil {
return errors.Wrap(err, "token is not valid")

View File

@ -18,7 +18,7 @@ import (
)
func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler, error) {
tcp, err := dynamiclistener.NewTCPListener(c.config.BindAddress, c.config.HTTPSPort)
tcp, err := dynamiclistener.NewTCPListener(c.config.BindAddress, c.config.SupervisorPort)
if err != nil {
return nil, nil, err
}

View File

@ -34,7 +34,6 @@ type Node struct {
Images string
AgentConfig Agent
CACerts []byte
ServerAddress string
Certificate *tls.Certificate
}
@ -87,10 +86,15 @@ type Agent struct {
type Control struct {
AdvertisePort int
AdvertiseIP string
ListenPort int
// The port which kubectl clients can access k8s
HTTPSPort int
AgentToken string
Token string
// The port which custom k3s API runs on
SupervisorPort int
// The port which kube-apiserver runs on
APIServerPort int
APIServerBindAddress string
AgentToken string `json:"-"`
Token string `json:"-"`
ClusterIPRange *net.IPNet
ServiceIPRange *net.IPNet
ClusterDNS net.IP

View File

@ -178,8 +178,12 @@ func apiServer(ctx context.Context, cfg *config.Control, runtime *config.Control
argsMap["advertise-address"] = cfg.AdvertiseIP
}
argsMap["insecure-port"] = "0"
argsMap["secure-port"] = strconv.Itoa(cfg.ListenPort)
argsMap["secure-port"] = strconv.Itoa(cfg.APIServerPort)
if cfg.APIServerBindAddress == "" {
argsMap["bind-address"] = localhostIP.String()
} else {
argsMap["bind-address"] = cfg.APIServerBindAddress
}
argsMap["tls-cert-file"] = runtime.ServingKubeAPICert
argsMap["tls-private-key-file"] = runtime.ServingKubeAPIKey
argsMap["service-account-key-file"] = runtime.ServiceKey
@ -227,11 +231,11 @@ func defaults(config *config.Control) {
config.AdvertisePort = config.HTTPSPort
}
if config.ListenPort == 0 {
if config.APIServerPort == 0 {
if config.HTTPSPort != 0 {
config.ListenPort = config.HTTPSPort + 1
config.APIServerPort = config.HTTPSPort + 1
} else {
config.ListenPort = 6444
config.APIServerPort = 6444
}
}
@ -471,7 +475,7 @@ func genClientCerts(config *config.Control, runtime *config.ControlRuntime) erro
factory := getSigningCertFactory(regen, nil, []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, runtime.ClientCA, runtime.ClientCAKey)
var certGen bool
apiEndpoint := fmt.Sprintf("https://127.0.0.1:%d", config.ListenPort)
apiEndpoint := fmt.Sprintf("https://127.0.0.1:%d", config.APIServerPort)
certGen, err = factory("system:admin", []string{"system:masters"}, runtime.ClientAdminCert, runtime.ClientAdminKey)
if err != nil {

View File

@ -223,7 +223,7 @@ func printTokens(advertiseIP string, config *config.Control) error {
}
if len(nodeFile) > 0 {
printToken(config.HTTPSPort, advertiseIP, "To join node to cluster:", "agent")
printToken(config.SupervisorPort, advertiseIP, "To join node to cluster:", "agent")
}
return nil

View File

@ -9,4 +9,5 @@ type Config struct {
DisableServiceLB bool
ControlConfig config.Control
Rootless bool
SupervisorPort int
}