Cleanup tunnel logs

pull/654/head
Erik Wilson 2019-07-18 05:00:07 -07:00
parent 0594b2bc9e
commit 8ce509ee6b
1 changed files with 15 additions and 7 deletions

View File

@ -8,6 +8,7 @@ import (
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"reflect"
"sync" "sync"
"time" "time"
@ -80,7 +81,7 @@ func Setup(ctx context.Context, config *config.Node) error {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for _, address := range addresses { for _, address := range addresses {
if _, ok := disconnect[address]; !ok { if _, ok := disconnect[address]; !ok {
disconnect[address] = connect(wg, address, config, transportConfig) disconnect[address] = connect(ctx, wg, address, config, transportConfig)
} }
} }
@ -101,7 +102,10 @@ func Setup(ctx context.Context, config *config.Node) error {
select { select {
case ev, ok := <-watch.ResultChan(): case ev, ok := <-watch.ResultChan():
if !ok || ev.Type == watchtypes.Error { if !ok || ev.Type == watchtypes.Error {
if ok {
logrus.Errorf("Tunnel endpoint watch channel closed: %v", ev) logrus.Errorf("Tunnel endpoint watch channel closed: %v", ev)
}
watch.Stop()
continue connect continue connect
} }
endpoint, ok := ev.Object.(*v1.Endpoints) endpoint, ok := ev.Object.(*v1.Endpoints)
@ -110,7 +114,11 @@ func Setup(ctx context.Context, config *config.Node) error {
continue watching continue watching
} }
var addresses = getAddresses(endpoint) newAddresses := getAddresses(endpoint)
if reflect.DeepEqual(newAddresses, addresses) {
continue watching
}
addresses = newAddresses
logrus.Infof("Tunnel endpoint watch event: %v", addresses) logrus.Infof("Tunnel endpoint watch event: %v", addresses)
validEndpoint := map[string]bool{} validEndpoint := map[string]bool{}
@ -118,7 +126,7 @@ func Setup(ctx context.Context, config *config.Node) error {
for _, address := range addresses { for _, address := range addresses {
validEndpoint[address] = true validEndpoint[address] = true
if _, ok := disconnect[address]; !ok { if _, ok := disconnect[address]; !ok {
disconnect[address] = connect(nil, address, config, transportConfig) disconnect[address] = connect(ctx, nil, address, config, transportConfig)
} }
} }
@ -126,6 +134,7 @@ func Setup(ctx context.Context, config *config.Node) error {
if !validEndpoint[address] { if !validEndpoint[address] {
cancel() cancel()
delete(disconnect, address) delete(disconnect, address)
logrus.Infof("Stopped tunnel to %s", address)
} }
} }
} }
@ -149,7 +158,7 @@ func Setup(ctx context.Context, config *config.Node) error {
return nil return nil
} }
func connect(waitGroup *sync.WaitGroup, address string, config *config.Node, transportConfig *transport.Config) context.CancelFunc { func connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, config *config.Node, transportConfig *transport.Config) context.CancelFunc {
wsURL := fmt.Sprintf("wss://%s/v1-k3s/connect", address) wsURL := fmt.Sprintf("wss://%s/v1-k3s/connect", address)
headers := map[string][]string{ headers := map[string][]string{
"X-K3s-NodeName": {config.AgentConfig.NodeName}, "X-K3s-NodeName": {config.AgentConfig.NodeName},
@ -175,7 +184,7 @@ func connect(waitGroup *sync.WaitGroup, address string, config *config.Node, tra
waitGroup.Add(1) waitGroup.Add(1)
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(rootCtx)
go func() { go func() {
for { for {
@ -193,7 +202,6 @@ func connect(waitGroup *sync.WaitGroup, address string, config *config.Node, tra
if waitGroup != nil { if waitGroup != nil {
once.Do(waitGroup.Done) once.Do(waitGroup.Done)
} }
logrus.Infof("Stopped tunnel to %s", wsURL)
return return
} }
} }