Merge pull request #654 from erikwilson/cleanup-tunnel

Cleanup tunnel
pull/655/head
Erik Wilson 2019-07-18 05:23:00 -07:00 committed by GitHub
commit bb44211f51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 32 additions and 22 deletions

View File

@ -8,6 +8,7 @@ import (
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"reflect"
"sync" "sync"
"time" "time"
@ -80,7 +81,7 @@ func Setup(ctx context.Context, config *config.Node) error {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for _, address := range addresses { for _, address := range addresses {
if _, ok := disconnect[address]; !ok { if _, ok := disconnect[address]; !ok {
disconnect[address] = connect(wg, address, config, transportConfig) disconnect[address] = connect(ctx, wg, address, config, transportConfig)
} }
} }
@ -101,7 +102,10 @@ func Setup(ctx context.Context, config *config.Node) error {
select { select {
case ev, ok := <-watch.ResultChan(): case ev, ok := <-watch.ResultChan():
if !ok || ev.Type == watchtypes.Error { if !ok || ev.Type == watchtypes.Error {
logrus.Errorf("Tunnel endpoint watch channel closed: %v", ev) if ok {
logrus.Errorf("Tunnel endpoint watch channel closed: %v", ev)
}
watch.Stop()
continue connect continue connect
} }
endpoint, ok := ev.Object.(*v1.Endpoints) endpoint, ok := ev.Object.(*v1.Endpoints)
@ -110,7 +114,11 @@ func Setup(ctx context.Context, config *config.Node) error {
continue watching continue watching
} }
var addresses = getAddresses(endpoint) newAddresses := getAddresses(endpoint)
if reflect.DeepEqual(newAddresses, addresses) {
continue watching
}
addresses = newAddresses
logrus.Infof("Tunnel endpoint watch event: %v", addresses) logrus.Infof("Tunnel endpoint watch event: %v", addresses)
validEndpoint := map[string]bool{} validEndpoint := map[string]bool{}
@ -118,7 +126,7 @@ func Setup(ctx context.Context, config *config.Node) error {
for _, address := range addresses { for _, address := range addresses {
validEndpoint[address] = true validEndpoint[address] = true
if _, ok := disconnect[address]; !ok { if _, ok := disconnect[address]; !ok {
disconnect[address] = connect(nil, address, config, transportConfig) disconnect[address] = connect(ctx, nil, address, config, transportConfig)
} }
} }
@ -126,6 +134,7 @@ func Setup(ctx context.Context, config *config.Node) error {
if !validEndpoint[address] { if !validEndpoint[address] {
cancel() cancel()
delete(disconnect, address) delete(disconnect, address)
logrus.Infof("Stopped tunnel to %s", address)
} }
} }
} }
@ -149,7 +158,7 @@ func Setup(ctx context.Context, config *config.Node) error {
return nil return nil
} }
func connect(waitGroup *sync.WaitGroup, address string, config *config.Node, transportConfig *transport.Config) context.CancelFunc { func connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, config *config.Node, transportConfig *transport.Config) context.CancelFunc {
wsURL := fmt.Sprintf("wss://%s/v1-k3s/connect", address) wsURL := fmt.Sprintf("wss://%s/v1-k3s/connect", address)
headers := map[string][]string{ headers := map[string][]string{
"X-K3s-NodeName": {config.AgentConfig.NodeName}, "X-K3s-NodeName": {config.AgentConfig.NodeName},
@ -175,7 +184,7 @@ func connect(waitGroup *sync.WaitGroup, address string, config *config.Node, tra
waitGroup.Add(1) waitGroup.Add(1)
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(rootCtx)
go func() { go func() {
for { for {
@ -193,7 +202,6 @@ func connect(waitGroup *sync.WaitGroup, address string, config *config.Node, tra
if waitGroup != nil { if waitGroup != nil {
once.Do(waitGroup.Done) once.Do(waitGroup.Done)
} }
logrus.Infof("Stopped tunnel to %s", wsURL)
return return
} }
} }

View File

@ -142,7 +142,7 @@ import:
- package: github.com/hashicorp/golang-lru - package: github.com/hashicorp/golang-lru
version: v0.5.0 version: v0.5.0
- package: github.com/ibuildthecloud/kvsql - package: github.com/ibuildthecloud/kvsql
version: 79f1f6881e28b90976f070aad6edad8e259057c1 version: 9f00ccc82235f0433c736306d091abd2939b7449
repo: https://github.com/erikwilson/rancher-kvsql.git repo: https://github.com/erikwilson/rancher-kvsql.git
- package: github.com/imdario/mergo - package: github.com/imdario/mergo
version: v0.3.5 version: v0.3.5
@ -230,7 +230,8 @@ import:
- package: github.com/rancher/helm-controller - package: github.com/rancher/helm-controller
version: v0.2.1 version: v0.2.1
- package: github.com/rancher/remotedialer - package: github.com/rancher/remotedialer
version: 4a5a661be67697d6369df54ef62d5a30b0385697 version: 7c71ffa8f5d7a181704d92bb8a33b0c7d07dccaa
repo: https://github.com/erikwilson/rancher-remotedialer.git
- package: github.com/rancher/wrangler - package: github.com/rancher/wrangler
version: 7737c167e16514a38229bc64c839cee8cd14e6d3 version: 7737c167e16514a38229bc64c839cee8cd14e6d3
- package: github.com/rancher/wrangler-api - package: github.com/rancher/wrangler-api

View File

@ -14,7 +14,7 @@ k8s.io/kubernetes v1.14.4-k3s.1 ht
github.com/rancher/wrangler 7737c167e16514a38229bc64c839cee8cd14e6d3 github.com/rancher/wrangler 7737c167e16514a38229bc64c839cee8cd14e6d3
github.com/rancher/wrangler-api v0.1.4 github.com/rancher/wrangler-api v0.1.4
github.com/rancher/dynamiclistener c08b499d17195fbc2c1764b21c322951811629a5 https://github.com/erikwilson/rancher-dynamiclistener.git github.com/rancher/dynamiclistener c08b499d17195fbc2c1764b21c322951811629a5 https://github.com/erikwilson/rancher-dynamiclistener.git
github.com/rancher/remotedialer 4a5a661be67697d6369df54ef62d5a30b0385697 github.com/rancher/remotedialer 7c71ffa8f5d7a181704d92bb8a33b0c7d07dccaa https://github.com/erikwilson/rancher-remotedialer.git
github.com/rancher/helm-controller v0.2.1 github.com/rancher/helm-controller v0.2.1
github.com/matryer/moq ee5226d43009 https://github.com/rancher/moq.git github.com/matryer/moq ee5226d43009 https://github.com/rancher/moq.git
github.com/coreos/flannel 823afe66b2266bf71f5bec24e6e28b26d70cfc7c https://github.com/ibuildthecloud/flannel.git github.com/coreos/flannel 823afe66b2266bf71f5bec24e6e28b26d70cfc7c https://github.com/ibuildthecloud/flannel.git

View File

@ -18,7 +18,7 @@ func ClientConnect(ctx context.Context, wsURL string, headers http.Header, diale
} }
} }
func connectToProxy(ctx context.Context, proxyURL string, headers http.Header, auth ConnectAuthorizer, dialer *websocket.Dialer, onConnect func(context.Context) error) error { func connectToProxy(rootCtx context.Context, proxyURL string, headers http.Header, auth ConnectAuthorizer, dialer *websocket.Dialer, onConnect func(context.Context) error) error {
logrus.WithField("url", proxyURL).Info("Connecting to proxy") logrus.WithField("url", proxyURL).Info("Connecting to proxy")
if dialer == nil { if dialer == nil {
@ -31,11 +31,11 @@ func connectToProxy(ctx context.Context, proxyURL string, headers http.Header, a
} }
defer ws.Close() defer ws.Close()
if onConnect != nil { ctx, cancel := context.WithCancel(rootCtx)
ctxOnConnect, cancel := context.WithCancel(context.Background()) defer cancel()
defer cancel()
if err := onConnect(ctxOnConnect); err != nil { if onConnect != nil {
if err := onConnect(ctx); err != nil {
return err return err
} }
} }
@ -45,7 +45,7 @@ func connectToProxy(ctx context.Context, proxyURL string, headers http.Header, a
result := make(chan error, 1) result := make(chan error, 1)
go func() { go func() {
_, err = session.Serve() _, err = session.Serve(ctx)
result <- err result <- err
}() }()

View File

@ -106,7 +106,7 @@ outer:
} }
s.sessions.addListener(session) s.sessions.addListener(session)
_, err = session.Serve() _, err = session.Serve(context.Background())
s.sessions.removeListener(session) s.sessions.removeListener(session)
session.Close() session.Close()

View File

@ -1,6 +1,7 @@
package remotedialer package remotedialer
import ( import (
"context"
"net/http" "net/http"
"sync" "sync"
"time" "time"
@ -71,7 +72,7 @@ func (s *Server) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
defer s.sessions.remove(session) defer s.sessions.remove(session)
// Don't need to associate req.Context() to the Session, it will cancel otherwise // Don't need to associate req.Context() to the Session, it will cancel otherwise
code, err := session.Serve() code, err := session.Serve(context.Background())
if err != nil { if err != nil {
// Hijacked so we can't write to the client // Hijacked so we can't write to the client
logrus.Infof("error in remotedialer server [%d]: %v", code, err) logrus.Infof("error in remotedialer server [%d]: %v", code, err)

View File

@ -63,8 +63,8 @@ func newSession(sessionKey int64, clientKey string, conn *websocket.Conn) *Sessi
} }
} }
func (s *Session) startPings() { func (s *Session) startPings(rootCtx context.Context) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(rootCtx)
s.pingCancel = cancel s.pingCancel = cancel
s.pingWait.Add(1) s.pingWait.Add(1)
@ -99,9 +99,9 @@ func (s *Session) stopPings() {
s.pingWait.Wait() s.pingWait.Wait()
} }
func (s *Session) Serve() (int, error) { func (s *Session) Serve(ctx context.Context) (int, error) {
if s.client { if s.client {
s.startPings() s.startPings(ctx)
} }
for { for {