mirror of https://github.com/k3s-io/k3s
Remove tunneler
parent
8fe1ebac0a
commit
9e464f5b6a
|
@ -27,7 +27,6 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -66,7 +65,6 @@ import (
|
||||||
kubeserver "k8s.io/kubernetes/pkg/kubeapiserver/server"
|
kubeserver "k8s.io/kubernetes/pkg/kubeapiserver/server"
|
||||||
"k8s.io/kubernetes/pkg/master"
|
"k8s.io/kubernetes/pkg/master"
|
||||||
"k8s.io/kubernetes/pkg/master/reconcilers"
|
"k8s.io/kubernetes/pkg/master/reconcilers"
|
||||||
"k8s.io/kubernetes/pkg/master/tunneler"
|
|
||||||
"k8s.io/kubernetes/pkg/registry/cachesize"
|
"k8s.io/kubernetes/pkg/registry/cachesize"
|
||||||
rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
|
rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
|
||||||
"k8s.io/kubernetes/pkg/serviceaccount"
|
"k8s.io/kubernetes/pkg/serviceaccount"
|
||||||
|
@ -148,19 +146,19 @@ func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) erro
|
||||||
|
|
||||||
// CreateServerChain creates the apiservers connected via delegation.
|
// CreateServerChain creates the apiservers connected via delegation.
|
||||||
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) {
|
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) {
|
||||||
nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
|
proxyTransport, err := CreateNodeDialer(completedOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
|
kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, proxyTransport)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// If additional API servers are added, they should be gated.
|
// If additional API servers are added, they should be gated.
|
||||||
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
|
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
|
||||||
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
|
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(nil, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -182,7 +180,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
|
||||||
apiExtensionsServer.GenericAPIServer.PrepareRun()
|
apiExtensionsServer.GenericAPIServer.PrepareRun()
|
||||||
|
|
||||||
// aggregator comes last in the chain
|
// aggregator comes last in the chain
|
||||||
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
|
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, nil, pluginInitializer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -215,45 +213,18 @@ func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer g
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateNodeDialer creates the dialer infrastructure to connect to the nodes.
|
// CreateNodeDialer creates the dialer infrastructure to connect to the nodes.
|
||||||
func CreateNodeDialer(s completedServerRunOptions) (tunneler.Tunneler, *http.Transport, error) {
|
func CreateNodeDialer(s completedServerRunOptions) (*http.Transport, error) {
|
||||||
// Setup nodeTunneler if needed
|
|
||||||
var nodeTunneler tunneler.Tunneler
|
|
||||||
var proxyDialerFn utilnet.DialFunc
|
|
||||||
if len(s.SSHUser) > 0 {
|
|
||||||
// Get ssh key distribution func, if supported
|
|
||||||
var installSSHKey tunneler.InstallSSHKey
|
|
||||||
if s.KubeletConfig.Port == 0 {
|
|
||||||
return nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified")
|
|
||||||
}
|
|
||||||
if s.KubeletConfig.ReadOnlyPort == 0 {
|
|
||||||
return nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified")
|
|
||||||
}
|
|
||||||
// Set up the nodeTunneler
|
|
||||||
// TODO(cjcullen): If we want this to handle per-kubelet ports or other
|
|
||||||
// kubelet listen-addresses, we need to plumb through options.
|
|
||||||
healthCheckPath := &url.URL{
|
|
||||||
Scheme: "http",
|
|
||||||
Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.ReadOnlyPort), 10)),
|
|
||||||
Path: "healthz",
|
|
||||||
}
|
|
||||||
nodeTunneler = tunneler.New(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSHKey)
|
|
||||||
|
|
||||||
// Use the nodeTunneler's dialer when proxying to pods, services, and nodes
|
|
||||||
proxyDialerFn = nodeTunneler.Dial
|
|
||||||
}
|
|
||||||
// Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
|
|
||||||
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
|
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
|
||||||
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
|
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
|
||||||
DialContext: proxyDialerFn,
|
DialContext: nil,
|
||||||
TLSClientConfig: proxyTLSClientConfig,
|
TLSClientConfig: proxyTLSClientConfig,
|
||||||
})
|
})
|
||||||
return nodeTunneler, proxyTransport, nil
|
return proxyTransport, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
|
// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
|
||||||
func CreateKubeAPIServerConfig(
|
func CreateKubeAPIServerConfig(
|
||||||
s completedServerRunOptions,
|
s completedServerRunOptions,
|
||||||
nodeTunneler tunneler.Tunneler,
|
|
||||||
proxyTransport *http.Transport,
|
proxyTransport *http.Transport,
|
||||||
) (
|
) (
|
||||||
config *master.Config,
|
config *master.Config,
|
||||||
|
@ -322,8 +293,6 @@ func CreateKubeAPIServerConfig(
|
||||||
EnableLogsSupport: s.EnableLogsHandler,
|
EnableLogsSupport: s.EnableLogsHandler,
|
||||||
ProxyTransport: proxyTransport,
|
ProxyTransport: proxyTransport,
|
||||||
|
|
||||||
Tunneler: nodeTunneler,
|
|
||||||
|
|
||||||
ServiceIPRange: serviceIPRange,
|
ServiceIPRange: serviceIPRange,
|
||||||
APIServerServiceIP: apiServerServiceIP,
|
APIServerServiceIP: apiServerServiceIP,
|
||||||
APIServerServicePort: 443,
|
APIServerServicePort: 443,
|
||||||
|
@ -341,11 +310,6 @@ func CreateKubeAPIServerConfig(
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
if nodeTunneler != nil {
|
|
||||||
// Use the nodeTunneler's dialer to connect to the kubelet
|
|
||||||
config.ExtraConfig.KubeletClientConfig.Dial = nodeTunneler.Dial
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -50,7 +50,6 @@ import (
|
||||||
"k8s.io/apiserver/pkg/endpoints/discovery"
|
"k8s.io/apiserver/pkg/endpoints/discovery"
|
||||||
"k8s.io/apiserver/pkg/registry/generic"
|
"k8s.io/apiserver/pkg/registry/generic"
|
||||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||||
"k8s.io/apiserver/pkg/server/healthz"
|
|
||||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||||
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
|
@ -59,12 +58,10 @@ import (
|
||||||
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
|
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
|
||||||
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
|
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
|
||||||
"k8s.io/kubernetes/pkg/master/reconcilers"
|
"k8s.io/kubernetes/pkg/master/reconcilers"
|
||||||
"k8s.io/kubernetes/pkg/master/tunneler"
|
|
||||||
"k8s.io/kubernetes/pkg/routes"
|
"k8s.io/kubernetes/pkg/routes"
|
||||||
"k8s.io/kubernetes/pkg/serviceaccount"
|
"k8s.io/kubernetes/pkg/serviceaccount"
|
||||||
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
|
|
||||||
// RESTStorage installers
|
// RESTStorage installers
|
||||||
|
@ -101,8 +98,6 @@ type ExtraConfig struct {
|
||||||
EventTTL time.Duration
|
EventTTL time.Duration
|
||||||
KubeletClientConfig kubeletclient.KubeletClientConfig
|
KubeletClientConfig kubeletclient.KubeletClientConfig
|
||||||
|
|
||||||
// Used to start and monitor tunneling
|
|
||||||
Tunneler tunneler.Tunneler
|
|
||||||
EnableLogsSupport bool
|
EnableLogsSupport bool
|
||||||
ProxyTransport http.RoundTripper
|
ProxyTransport http.RoundTripper
|
||||||
|
|
||||||
|
@ -337,10 +332,6 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
|
||||||
}
|
}
|
||||||
m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...)
|
m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...)
|
||||||
|
|
||||||
if c.ExtraConfig.Tunneler != nil {
|
|
||||||
m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes())
|
|
||||||
}
|
|
||||||
|
|
||||||
m.GenericAPIServer.AddPostStartHookOrDie("ca-registration", c.ExtraConfig.ClientCARegistrationHook.PostStartHook)
|
m.GenericAPIServer.AddPostStartHookOrDie("ca-registration", c.ExtraConfig.ClientCARegistrationHook.PostStartHook)
|
||||||
|
|
||||||
return m, nil
|
return m, nil
|
||||||
|
@ -363,15 +354,6 @@ func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Master) installTunneler(nodeTunneler tunneler.Tunneler, nodeClient corev1client.NodeInterface) {
|
|
||||||
nodeTunneler.Run(nodeAddressProvider{nodeClient}.externalAddresses)
|
|
||||||
m.GenericAPIServer.AddHealthzChecks(healthz.NamedCheck("SSH Tunnel Check", tunneler.TunnelSyncHealthChecker(nodeTunneler)))
|
|
||||||
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
||||||
Name: "apiserver_proxy_tunnel_sync_latency_secs",
|
|
||||||
Help: "The time since the last successful synchronization of the SSH tunnels for proxy requests.",
|
|
||||||
}, func() float64 { return float64(nodeTunneler.SecondsSinceSync()) })
|
|
||||||
}
|
|
||||||
|
|
||||||
// RESTStorageProvider is a factory type for REST storage.
|
// RESTStorageProvider is a factory type for REST storage.
|
||||||
type RESTStorageProvider interface {
|
type RESTStorageProvider interface {
|
||||||
GroupName() string
|
GroupName() string
|
||||||
|
|
|
@ -1,44 +0,0 @@
|
||||||
package(default_visibility = ["//visibility:public"])
|
|
||||||
|
|
||||||
load(
|
|
||||||
"@io_bazel_rules_go//go:def.bzl",
|
|
||||||
"go_library",
|
|
||||||
"go_test",
|
|
||||||
)
|
|
||||||
|
|
||||||
go_test(
|
|
||||||
name = "go_default_test",
|
|
||||||
srcs = ["ssh_test.go"],
|
|
||||||
embed = [":go_default_library"],
|
|
||||||
deps = [
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
|
||||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
go_library(
|
|
||||||
name = "go_default_library",
|
|
||||||
srcs = ["ssh.go"],
|
|
||||||
importpath = "k8s.io/kubernetes/pkg/master/tunneler",
|
|
||||||
deps = [
|
|
||||||
"//pkg/ssh:go_default_library",
|
|
||||||
"//pkg/util/file:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
|
||||||
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
|
|
||||||
"//vendor/k8s.io/klog:go_default_library",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
filegroup(
|
|
||||||
name = "package-srcs",
|
|
||||||
srcs = glob(["**"]),
|
|
||||||
tags = ["automanaged"],
|
|
||||||
visibility = ["//visibility:private"],
|
|
||||||
)
|
|
||||||
|
|
||||||
filegroup(
|
|
||||||
name = "all-srcs",
|
|
||||||
srcs = [":package-srcs"],
|
|
||||||
tags = ["automanaged"],
|
|
||||||
)
|
|
|
@ -1,234 +0,0 @@
|
||||||
/*
|
|
||||||
Copyright 2015 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package tunneler
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
"net/url"
|
|
||||||
"os"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
|
||||||
"k8s.io/kubernetes/pkg/ssh"
|
|
||||||
utilfile "k8s.io/kubernetes/pkg/util/file"
|
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
"k8s.io/klog"
|
|
||||||
)
|
|
||||||
|
|
||||||
type InstallSSHKey func(ctx context.Context, user string, data []byte) error
|
|
||||||
|
|
||||||
type AddressFunc func() (addresses []string, err error)
|
|
||||||
|
|
||||||
type Tunneler interface {
|
|
||||||
Run(AddressFunc)
|
|
||||||
Stop()
|
|
||||||
Dial(ctx context.Context, net, addr string) (net.Conn, error)
|
|
||||||
SecondsSinceSync() int64
|
|
||||||
SecondsSinceSSHKeySync() int64
|
|
||||||
}
|
|
||||||
|
|
||||||
// TunnelSyncHealthChecker returns a health func that indicates if a tunneler is healthy.
|
|
||||||
// It's compatible with healthz.NamedCheck
|
|
||||||
func TunnelSyncHealthChecker(tunneler Tunneler) func(req *http.Request) error {
|
|
||||||
return func(req *http.Request) error {
|
|
||||||
if tunneler == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
lag := tunneler.SecondsSinceSync()
|
|
||||||
if lag > 600 {
|
|
||||||
return fmt.Errorf("Tunnel sync is taking too long: %d", lag)
|
|
||||||
}
|
|
||||||
sshKeyLag := tunneler.SecondsSinceSSHKeySync()
|
|
||||||
// Since we are syncing ssh-keys every 5 minutes, the allowed
|
|
||||||
// lag since last sync should be more than 2x higher than that
|
|
||||||
// to allow for single failure, which can always happen.
|
|
||||||
// For now set it to 3x, which is 15 minutes.
|
|
||||||
// For more details see: http://pr.k8s.io/59347
|
|
||||||
if sshKeyLag > 900 {
|
|
||||||
return fmt.Errorf("SSHKey sync is taking too long: %d", sshKeyLag)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type SSHTunneler struct {
|
|
||||||
// Important: Since these two int64 fields are using sync/atomic, they have to be at the top of the struct due to a bug on 32-bit platforms
|
|
||||||
// See: https://golang.org/pkg/sync/atomic/ for more information
|
|
||||||
lastSync int64 // Seconds since Epoch
|
|
||||||
lastSSHKeySync int64 // Seconds since Epoch
|
|
||||||
|
|
||||||
SSHUser string
|
|
||||||
SSHKeyfile string
|
|
||||||
InstallSSHKey InstallSSHKey
|
|
||||||
HealthCheckURL *url.URL
|
|
||||||
|
|
||||||
tunnels *ssh.SSHTunnelList
|
|
||||||
lastSyncMetric prometheus.GaugeFunc
|
|
||||||
clock clock.Clock
|
|
||||||
|
|
||||||
getAddresses AddressFunc
|
|
||||||
stopChan chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(sshUser, sshKeyfile string, healthCheckURL *url.URL, installSSHKey InstallSSHKey) Tunneler {
|
|
||||||
return &SSHTunneler{
|
|
||||||
SSHUser: sshUser,
|
|
||||||
SSHKeyfile: sshKeyfile,
|
|
||||||
InstallSSHKey: installSSHKey,
|
|
||||||
HealthCheckURL: healthCheckURL,
|
|
||||||
clock: clock.RealClock{},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run establishes tunnel loops and returns
|
|
||||||
func (c *SSHTunneler) Run(getAddresses AddressFunc) {
|
|
||||||
if c.stopChan != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.stopChan = make(chan struct{})
|
|
||||||
|
|
||||||
// Save the address getter
|
|
||||||
if getAddresses != nil {
|
|
||||||
c.getAddresses = getAddresses
|
|
||||||
}
|
|
||||||
|
|
||||||
// Usernames are capped @ 32
|
|
||||||
if len(c.SSHUser) > 32 {
|
|
||||||
klog.Warning("SSH User is too long, truncating to 32 chars")
|
|
||||||
c.SSHUser = c.SSHUser[0:32]
|
|
||||||
}
|
|
||||||
klog.Infof("Setting up proxy: %s %s", c.SSHUser, c.SSHKeyfile)
|
|
||||||
|
|
||||||
// public keyfile is written last, so check for that.
|
|
||||||
publicKeyFile := c.SSHKeyfile + ".pub"
|
|
||||||
exists, err := utilfile.FileExists(publicKeyFile)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Error detecting if key exists: %v", err)
|
|
||||||
} else if !exists {
|
|
||||||
klog.Infof("Key doesn't exist, attempting to create")
|
|
||||||
if err := generateSSHKey(c.SSHKeyfile, publicKeyFile); err != nil {
|
|
||||||
klog.Errorf("Failed to create key pair: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
c.tunnels = ssh.NewSSHTunnelList(c.SSHUser, c.SSHKeyfile, c.HealthCheckURL, c.stopChan)
|
|
||||||
// Sync loop to ensure that the SSH key has been installed.
|
|
||||||
c.lastSSHKeySync = c.clock.Now().Unix()
|
|
||||||
c.installSSHKeySyncLoop(c.SSHUser, publicKeyFile)
|
|
||||||
// Sync tunnelList w/ nodes.
|
|
||||||
c.lastSync = c.clock.Now().Unix()
|
|
||||||
c.nodesSyncLoop()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop gracefully shuts down the tunneler
|
|
||||||
func (c *SSHTunneler) Stop() {
|
|
||||||
if c.stopChan != nil {
|
|
||||||
close(c.stopChan)
|
|
||||||
c.stopChan = nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *SSHTunneler) Dial(ctx context.Context, net, addr string) (net.Conn, error) {
|
|
||||||
return c.tunnels.Dial(ctx, net, addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *SSHTunneler) SecondsSinceSync() int64 {
|
|
||||||
now := c.clock.Now().Unix()
|
|
||||||
then := atomic.LoadInt64(&c.lastSync)
|
|
||||||
return now - then
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *SSHTunneler) SecondsSinceSSHKeySync() int64 {
|
|
||||||
now := c.clock.Now().Unix()
|
|
||||||
then := atomic.LoadInt64(&c.lastSSHKeySync)
|
|
||||||
return now - then
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *SSHTunneler) installSSHKeySyncLoop(user, publicKeyfile string) {
|
|
||||||
go wait.Until(func() {
|
|
||||||
if c.InstallSSHKey == nil {
|
|
||||||
klog.Error("Won't attempt to install ssh key: InstallSSHKey function is nil")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
key, err := ssh.ParsePublicKeyFromFile(publicKeyfile)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Failed to load public key: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
keyData, err := ssh.EncodeSSHKey(key)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Failed to encode public key: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := c.InstallSSHKey(context.TODO(), user, keyData); err != nil {
|
|
||||||
klog.Errorf("Failed to install ssh key: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
atomic.StoreInt64(&c.lastSSHKeySync, c.clock.Now().Unix())
|
|
||||||
}, 5*time.Minute, c.stopChan)
|
|
||||||
}
|
|
||||||
|
|
||||||
// nodesSyncLoop lists nodes every 15 seconds, calling Update() on the TunnelList
|
|
||||||
// each time (Update() is a noop if no changes are necessary).
|
|
||||||
func (c *SSHTunneler) nodesSyncLoop() {
|
|
||||||
// TODO (cjcullen) make this watch.
|
|
||||||
go wait.Until(func() {
|
|
||||||
addrs, err := c.getAddresses()
|
|
||||||
klog.V(4).Infof("Calling update w/ addrs: %v", addrs)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Failed to getAddresses: %v", err)
|
|
||||||
}
|
|
||||||
c.tunnels.Update(addrs)
|
|
||||||
atomic.StoreInt64(&c.lastSync, c.clock.Now().Unix())
|
|
||||||
}, 15*time.Second, c.stopChan)
|
|
||||||
}
|
|
||||||
|
|
||||||
func generateSSHKey(privateKeyfile, publicKeyfile string) error {
|
|
||||||
private, public, err := ssh.GenerateKey(2048)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// If private keyfile already exists, we must have only made it halfway
|
|
||||||
// through last time, so delete it.
|
|
||||||
exists, err := utilfile.FileExists(privateKeyfile)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Error detecting if private key exists: %v", err)
|
|
||||||
} else if exists {
|
|
||||||
klog.Infof("Private key exists, but public key does not")
|
|
||||||
if err := os.Remove(privateKeyfile); err != nil {
|
|
||||||
klog.Errorf("Failed to remove stale private key: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := ioutil.WriteFile(privateKeyfile, ssh.EncodePrivateKey(private), 0600); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
publicKeyBytes, err := ssh.EncodePublicKey(public)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := ioutil.WriteFile(publicKeyfile+".tmp", publicKeyBytes, 0600); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return os.Rename(publicKeyfile+".tmp", publicKeyfile)
|
|
||||||
}
|
|
|
@ -1,163 +0,0 @@
|
||||||
/*
|
|
||||||
Copyright 2015 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package tunneler
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TestSecondsSinceSync verifies that proper results are returned
|
|
||||||
// when checking the time between syncs
|
|
||||||
func TestSecondsSinceSync(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
lastSync int64
|
|
||||||
clock *clock.FakeClock
|
|
||||||
want int64
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "Nano Second. No difference",
|
|
||||||
lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(),
|
|
||||||
clock: clock.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 1, 2, time.UTC)),
|
|
||||||
want: int64(0),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "Second",
|
|
||||||
lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(),
|
|
||||||
clock: clock.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 2, 1, time.UTC)),
|
|
||||||
want: int64(1),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "Minute",
|
|
||||||
lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(),
|
|
||||||
clock: clock.NewFakeClock(time.Date(2015, time.January, 1, 1, 2, 1, 1, time.UTC)),
|
|
||||||
want: int64(60),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "Hour",
|
|
||||||
lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(),
|
|
||||||
clock: clock.NewFakeClock(time.Date(2015, time.January, 1, 2, 1, 1, 1, time.UTC)),
|
|
||||||
want: int64(3600),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "Day",
|
|
||||||
lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(),
|
|
||||||
clock: clock.NewFakeClock(time.Date(2015, time.January, 2, 1, 1, 1, 1, time.UTC)),
|
|
||||||
want: int64(86400),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "Month",
|
|
||||||
lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(),
|
|
||||||
clock: clock.NewFakeClock(time.Date(2015, time.February, 1, 1, 1, 1, 1, time.UTC)),
|
|
||||||
want: int64(2678400),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "Future Month. Should be -Month",
|
|
||||||
lastSync: time.Date(2015, time.February, 1, 1, 1, 1, 1, time.UTC).Unix(),
|
|
||||||
clock: clock.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 1, 2, time.UTC)),
|
|
||||||
want: int64(-2678400),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
tunneler := &SSHTunneler{}
|
|
||||||
assert := assert.New(t)
|
|
||||||
tunneler.lastSync = tt.lastSync
|
|
||||||
tunneler.clock = tt.clock
|
|
||||||
assert.Equal(int64(tt.want), tunneler.SecondsSinceSync())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// generateTempFile creates a temporary file path
|
|
||||||
func generateTempFilePath(prefix string) string {
|
|
||||||
tmpPath, _ := filepath.Abs(fmt.Sprintf("%s/%s-%d", os.TempDir(), prefix, time.Now().Unix()))
|
|
||||||
return tmpPath
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestGenerateSSHKey verifies that SSH key generation does indeed
|
|
||||||
// generate keys even with keys already exist.
|
|
||||||
func TestGenerateSSHKey(t *testing.T) {
|
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
privateKey := generateTempFilePath("private")
|
|
||||||
publicKey := generateTempFilePath("public")
|
|
||||||
|
|
||||||
// Make sure we have no test keys laying around
|
|
||||||
os.Remove(privateKey)
|
|
||||||
os.Remove(publicKey)
|
|
||||||
|
|
||||||
// Pass case: Sunny day case
|
|
||||||
err := generateSSHKey(privateKey, publicKey)
|
|
||||||
assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err)
|
|
||||||
|
|
||||||
// Pass case: PrivateKey exists test case
|
|
||||||
os.Remove(publicKey)
|
|
||||||
err = generateSSHKey(privateKey, publicKey)
|
|
||||||
assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err)
|
|
||||||
|
|
||||||
// Pass case: PublicKey exists test case
|
|
||||||
os.Remove(privateKey)
|
|
||||||
err = generateSSHKey(privateKey, publicKey)
|
|
||||||
assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err)
|
|
||||||
|
|
||||||
// Make sure we have no test keys laying around
|
|
||||||
os.Remove(privateKey)
|
|
||||||
os.Remove(publicKey)
|
|
||||||
|
|
||||||
// TODO: testing error cases where the file can not be removed?
|
|
||||||
}
|
|
||||||
|
|
||||||
type FakeTunneler struct {
|
|
||||||
SecondsSinceSyncValue int64
|
|
||||||
SecondsSinceSSHKeySyncValue int64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *FakeTunneler) Run(AddressFunc) {}
|
|
||||||
func (t *FakeTunneler) Stop() {}
|
|
||||||
func (t *FakeTunneler) Dial(ctx context.Context, net, addr string) (net.Conn, error) { return nil, nil }
|
|
||||||
func (t *FakeTunneler) SecondsSinceSync() int64 { return t.SecondsSinceSyncValue }
|
|
||||||
func (t *FakeTunneler) SecondsSinceSSHKeySync() int64 { return t.SecondsSinceSSHKeySyncValue }
|
|
||||||
|
|
||||||
// TestIsTunnelSyncHealthy verifies that the 600 second lag test
|
|
||||||
// is honored.
|
|
||||||
func TestIsTunnelSyncHealthy(t *testing.T) {
|
|
||||||
tunneler := &FakeTunneler{}
|
|
||||||
|
|
||||||
// Pass case: 540 second lag
|
|
||||||
tunneler.SecondsSinceSyncValue = 540
|
|
||||||
healthFn := TunnelSyncHealthChecker(tunneler)
|
|
||||||
err := healthFn(nil)
|
|
||||||
assert.NoError(t, err, "IsTunnelSyncHealthy() should not have returned an error.")
|
|
||||||
|
|
||||||
// Fail case: 720 second lag
|
|
||||||
tunneler.SecondsSinceSyncValue = 720
|
|
||||||
err = healthFn(nil)
|
|
||||||
assert.Error(t, err, "IsTunnelSyncHealthy() should have returned an error.")
|
|
||||||
}
|
|
Loading…
Reference in New Issue