Fix transport creation logic.

Refactor loadTunnels to allow one path for load, another for refresh.
Make SSHTunnelList.Close sleep for a minute before actually closing each tunnel.
pull/6/head
CJ Cullen 2015-06-02 09:52:35 -07:00
parent 7ea533d871
commit 1ae8801387
4 changed files with 60 additions and 74 deletions

View File

@ -102,9 +102,6 @@ kubectl="${KUBECTL_PATH:-${kubectl}}"
if [[ "$KUBERNETES_PROVIDER" == "gke" ]]; then
detect-project &> /dev/null
config=(
"--context=gke_${PROJECT}_${ZONE}_${CLUSTER_NAME}"
)
elif [[ "$KUBERNETES_PROVIDER" == "ubuntu" || "$KUBERNETES_PROVIDER" == "juju" ]]; then
detect-master > /dev/null
config=(

View File

@ -51,14 +51,7 @@ type HTTPKubeletClient struct {
}
func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) {
var transport http.RoundTripper
if config.Dial == nil {
transport = http.DefaultTransport
} else {
transport = &http.Transport{
Dial: config.Dial,
}
}
cfg := &Config{TLSClientConfig: config.TLSClientConfig}
if config.EnableHttps {
hasCA := len(config.CAFile) > 0 || len(config.CAData) > 0
@ -70,10 +63,15 @@ func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) {
if err != nil {
return nil, err
}
if tlsConfig != nil {
var transport http.RoundTripper
if config.Dial != nil || tlsConfig != nil {
transport = &http.Transport{
Dial: config.Dial,
TLSClientConfig: tlsConfig,
}
} else {
transport = http.DefaultTransport
}
return transport, nil
}

View File

@ -767,7 +767,7 @@ func (m *Master) Dial(net, addr string) (net.Conn, error) {
return m.tunnels.Dial(net, addr)
}
func (m *Master) detectTunnelChanges(addrs []string) bool {
func (m *Master) needToReplaceTunnels(addrs []string) bool {
if len(m.tunnels) != len(addrs) {
return true
}
@ -779,27 +779,25 @@ func (m *Master) detectTunnelChanges(addrs []string) bool {
return false
}
func (m *Master) loadTunnels(user, keyfile string) error {
func (m *Master) getNodeAddresses() ([]string, error) {
nodes, err := m.nodeRegistry.ListMinions(api.NewDefaultContext(), labels.Everything(), fields.Everything())
if err != nil {
return err
return nil, err
}
result := []string{}
addrs := []string{}
for ix := range nodes.Items {
node := &nodes.Items[ix]
addr, err := findExternalAddress(node)
if err != nil {
return err
return nil, err
}
result = append(result, addr)
}
changesExist := m.detectTunnelChanges(result)
if !changesExist {
return nil
addrs = append(addrs, addr)
}
return addrs, nil
}
// TODO: This is going to drop connections in the middle. See comment about using Watch above.
tunnels, err := util.MakeSSHTunnels(user, keyfile, result)
func (m *Master) replaceTunnels(user, keyfile string, newAddrs []string) error {
tunnels, err := util.MakeSSHTunnels(user, keyfile, newAddrs)
if err != nil {
return err
}
@ -811,28 +809,54 @@ func (m *Master) loadTunnels(user, keyfile string) error {
return nil
}
func (m *Master) setupSecureProxy(user, keyfile string) {
loadTunnelsPrintError := func() {
if err := m.loadTunnels(user, keyfile); err != nil {
glog.Errorf("Failed to load SSH Tunnels: %v", err)
}
func (m *Master) loadTunnels(user, keyfile string) error {
addrs, err := m.getNodeAddresses()
if err != nil {
return err
}
if !m.needToReplaceTunnels(addrs) {
return nil
}
// TODO: This is going to unnecessarily close connections to unchanged nodes.
// See comment about using Watch above.
return m.replaceTunnels(user, keyfile, addrs)
}
func (m *Master) refreshTunnels(user, keyfile string) error {
addrs, err := m.getNodeAddresses()
if err != nil {
return err
}
return m.replaceTunnels(user, keyfile, addrs)
}
func (m *Master) setupSecureProxy(user, keyfile string) {
// Sync loop for tunnels
// TODO: switch this to watch.
go func() {
for {
loadTunnelsPrintError()
if err := m.loadTunnels(user, keyfile); err != nil {
glog.Errorf("Failed to load SSH Tunnels: %v", err)
}
var sleep time.Duration
if len(m.tunnels) == 0 {
sleep = time.Second
} else {
sleep = time.Second * 120
sleep = time.Minute
}
time.Sleep(sleep)
}
}()
// Refresh loop for tunnels
// TODO: could make this more controller-ish
go func() {
for {
if err := m.refreshTunnels(user, keyfile); err != nil {
glog.Errorf("Failed to refresh SSH Tunnels: %v", err)
}
time.Sleep(5 * time.Minute)
}
}()
}
func (m *Master) generateSSHKey(user, keyfile string) error {

View File

@ -28,6 +28,7 @@ import (
mathrand "math/rand"
"net"
"os"
"time"
"github.com/golang/glog"
"golang.org/x/crypto/ssh"
@ -185,43 +186,6 @@ func MakePrivateKeySigner(key string) (ssh.Signer, error) {
return signer, nil
}
/*
if len(r.tunnels) == 0 {
list, err := listNodes()
if err != nil {
glog.Errorf("unexpected error making tunnels: %v", err)
return
}
tunnels, err := MakeNodeSSHTunnels(list)
if err != nil {
status := errToAPIStatus(err)
writeJSON(status.Code, r.codec, status, w)
httpCode = status.Code
return
}
r.tunnels = tunnels
}
// TODO: round robin here
tunnel := r.tunnels[0]
if err != nil {
status := errToAPIStatus(err)
writeJSON(status.Code, r.codec, status, w)
httpCode = status.Code
return
}
defer func() {
if err := tunnel.Close(); err != nil {
glog.Errorf("Error closing ssh tunnel: %v", err)
}
}()
if err := tunnel.Open(); err != nil {
status := errToAPIStatus(err)
writeJSON(status.Code, r.codec, status, w)
httpCode = status.Code
return
}
*/
type SSHTunnelEntry struct {
Address string
Tunnel *SSHTunnel
@ -251,13 +215,16 @@ func (l SSHTunnelList) Open() error {
return nil
}
func (l SSHTunnelList) Close() error {
func (l SSHTunnelList) Close() {
for ix := range l {
if err := l[ix].Tunnel.Close(); err != nil {
return err
}
entry := l[ix]
go func() {
time.Sleep(1 * time.Minute)
if err := entry.Tunnel.Close(); err != nil {
glog.Errorf("Failed to close tunnel %v: %v", entry, err)
}
}()
}
return nil
}
func (l SSHTunnelList) Dial(network, addr string) (net.Conn, error) {