From 1ae8801387d6bc7bc1fce9a90a873d3ff72939cf Mon Sep 17 00:00:00 2001 From: CJ Cullen Date: Tue, 2 Jun 2015 09:52:35 -0700 Subject: [PATCH] Fix transport creation logic. Refactor loadTunnels to allow one path for load, another for refresh. Make SSHTunnelList.Close sleep for a minute before actually closing each tunnel. --- cluster/kubectl.sh | 3 -- pkg/client/kubelet.go | 16 +++++------ pkg/master/master.go | 64 +++++++++++++++++++++++++++++-------------- pkg/util/ssh.go | 51 ++++++---------------------------- 4 files changed, 60 insertions(+), 74 deletions(-) diff --git a/cluster/kubectl.sh b/cluster/kubectl.sh index e62be51c42..b62280bfb6 100755 --- a/cluster/kubectl.sh +++ b/cluster/kubectl.sh @@ -102,9 +102,6 @@ kubectl="${KUBECTL_PATH:-${kubectl}}" if [[ "$KUBERNETES_PROVIDER" == "gke" ]]; then detect-project &> /dev/null - config=( - "--context=gke_${PROJECT}_${ZONE}_${CLUSTER_NAME}" - ) elif [[ "$KUBERNETES_PROVIDER" == "ubuntu" || "$KUBERNETES_PROVIDER" == "juju" ]]; then detect-master > /dev/null config=( diff --git a/pkg/client/kubelet.go b/pkg/client/kubelet.go index fbdeff1677..468a25961b 100644 --- a/pkg/client/kubelet.go +++ b/pkg/client/kubelet.go @@ -51,14 +51,7 @@ type HTTPKubeletClient struct { } func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) { - var transport http.RoundTripper - if config.Dial == nil { - transport = http.DefaultTransport - } else { - transport = &http.Transport{ - Dial: config.Dial, - } - } + cfg := &Config{TLSClientConfig: config.TLSClientConfig} if config.EnableHttps { hasCA := len(config.CAFile) > 0 || len(config.CAData) > 0 @@ -70,10 +63,15 @@ func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) { if err != nil { return nil, err } - if tlsConfig != nil { + + var transport http.RoundTripper + if config.Dial != nil || tlsConfig != nil { transport = &http.Transport{ + Dial: config.Dial, TLSClientConfig: tlsConfig, } + } else { + transport = http.DefaultTransport } return transport, nil } diff --git a/pkg/master/master.go b/pkg/master/master.go index c95af6769d..2929c05232 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -767,7 +767,7 @@ func (m *Master) Dial(net, addr string) (net.Conn, error) { return m.tunnels.Dial(net, addr) } -func (m *Master) detectTunnelChanges(addrs []string) bool { +func (m *Master) needToReplaceTunnels(addrs []string) bool { if len(m.tunnels) != len(addrs) { return true } @@ -779,27 +779,25 @@ func (m *Master) detectTunnelChanges(addrs []string) bool { return false } -func (m *Master) loadTunnels(user, keyfile string) error { +func (m *Master) getNodeAddresses() ([]string, error) { nodes, err := m.nodeRegistry.ListMinions(api.NewDefaultContext(), labels.Everything(), fields.Everything()) if err != nil { - return err + return nil, err } - result := []string{} + addrs := []string{} for ix := range nodes.Items { node := &nodes.Items[ix] addr, err := findExternalAddress(node) if err != nil { - return err + return nil, err } - result = append(result, addr) - } - changesExist := m.detectTunnelChanges(result) - if !changesExist { - return nil + addrs = append(addrs, addr) } + return addrs, nil +} - // TODO: This is going to drop connections in the middle. See comment about using Watch above. - tunnels, err := util.MakeSSHTunnels(user, keyfile, result) +func (m *Master) replaceTunnels(user, keyfile string, newAddrs []string) error { + tunnels, err := util.MakeSSHTunnels(user, keyfile, newAddrs) if err != nil { return err } @@ -811,28 +809,54 @@ func (m *Master) loadTunnels(user, keyfile string) error { return nil } -func (m *Master) setupSecureProxy(user, keyfile string) { - loadTunnelsPrintError := func() { - if err := m.loadTunnels(user, keyfile); err != nil { - glog.Errorf("Failed to load SSH Tunnels: %v", err) - } +func (m *Master) loadTunnels(user, keyfile string) error { + addrs, err := m.getNodeAddresses() + if err != nil { + return err } + if !m.needToReplaceTunnels(addrs) { + return nil + } + // TODO: This is going to unnecessarily close connections to unchanged nodes. + // See comment about using Watch above. + return m.replaceTunnels(user, keyfile, addrs) +} +func (m *Master) refreshTunnels(user, keyfile string) error { + addrs, err := m.getNodeAddresses() + if err != nil { + return err + } + return m.replaceTunnels(user, keyfile, addrs) +} + +func (m *Master) setupSecureProxy(user, keyfile string) { // Sync loop for tunnels // TODO: switch this to watch. go func() { for { - loadTunnelsPrintError() - + if err := m.loadTunnels(user, keyfile); err != nil { + glog.Errorf("Failed to load SSH Tunnels: %v", err) + } var sleep time.Duration if len(m.tunnels) == 0 { sleep = time.Second } else { - sleep = time.Second * 120 + sleep = time.Minute } time.Sleep(sleep) } }() + // Refresh loop for tunnels + // TODO: could make this more controller-ish + go func() { + for { + if err := m.refreshTunnels(user, keyfile); err != nil { + glog.Errorf("Failed to refresh SSH Tunnels: %v", err) + } + time.Sleep(5 * time.Minute) + } + }() } func (m *Master) generateSSHKey(user, keyfile string) error { diff --git a/pkg/util/ssh.go b/pkg/util/ssh.go index 2ed99e74bc..28ba337fc5 100644 --- a/pkg/util/ssh.go +++ b/pkg/util/ssh.go @@ -28,6 +28,7 @@ import ( mathrand "math/rand" "net" "os" + "time" "github.com/golang/glog" "golang.org/x/crypto/ssh" @@ -185,43 +186,6 @@ func MakePrivateKeySigner(key string) (ssh.Signer, error) { return signer, nil } -/* -if len(r.tunnels) == 0 { - list, err := listNodes() - if err != nil { - glog.Errorf("unexpected error making tunnels: %v", err) - return - } - tunnels, err := MakeNodeSSHTunnels(list) - if err != nil { - status := errToAPIStatus(err) - writeJSON(status.Code, r.codec, status, w) - httpCode = status.Code - return - } - r.tunnels = tunnels - } - // TODO: round robin here - tunnel := r.tunnels[0] - if err != nil { - status := errToAPIStatus(err) - writeJSON(status.Code, r.codec, status, w) - httpCode = status.Code - return - } - defer func() { - if err := tunnel.Close(); err != nil { - glog.Errorf("Error closing ssh tunnel: %v", err) - } - }() - if err := tunnel.Open(); err != nil { - status := errToAPIStatus(err) - writeJSON(status.Code, r.codec, status, w) - httpCode = status.Code - return - } -*/ - type SSHTunnelEntry struct { Address string Tunnel *SSHTunnel @@ -251,13 +215,16 @@ func (l SSHTunnelList) Open() error { return nil } -func (l SSHTunnelList) Close() error { +func (l SSHTunnelList) Close() { for ix := range l { - if err := l[ix].Tunnel.Close(); err != nil { - return err - } + entry := l[ix] + go func() { + time.Sleep(1 * time.Minute) + if err := entry.Tunnel.Close(); err != nil { + glog.Errorf("Failed to close tunnel %v: %v", entry, err) + } + }() } - return nil } func (l SSHTunnelList) Dial(network, addr string) (net.Conn, error) {