diff --git a/pkg/agent/tunnel/tunnel.go b/pkg/agent/tunnel/tunnel.go index f7884591c5..be39e21f19 100644 --- a/pkg/agent/tunnel/tunnel.go +++ b/pkg/agent/tunnel/tunnel.go @@ -8,6 +8,7 @@ import ( "fmt" "net" "net/http" + "reflect" "sync" "time" @@ -80,7 +81,7 @@ func Setup(ctx context.Context, config *config.Node) error { wg := &sync.WaitGroup{} for _, address := range addresses { if _, ok := disconnect[address]; !ok { - disconnect[address] = connect(wg, address, config, transportConfig) + disconnect[address] = connect(ctx, wg, address, config, transportConfig) } } @@ -101,7 +102,10 @@ func Setup(ctx context.Context, config *config.Node) error { select { case ev, ok := <-watch.ResultChan(): if !ok || ev.Type == watchtypes.Error { - logrus.Errorf("Tunnel endpoint watch channel closed: %v", ev) + if ok { + logrus.Errorf("Tunnel endpoint watch channel closed: %v", ev) + } + watch.Stop() continue connect } endpoint, ok := ev.Object.(*v1.Endpoints) @@ -110,7 +114,11 @@ func Setup(ctx context.Context, config *config.Node) error { continue watching } - var addresses = getAddresses(endpoint) + newAddresses := getAddresses(endpoint) + if reflect.DeepEqual(newAddresses, addresses) { + continue watching + } + addresses = newAddresses logrus.Infof("Tunnel endpoint watch event: %v", addresses) validEndpoint := map[string]bool{} @@ -118,7 +126,7 @@ func Setup(ctx context.Context, config *config.Node) error { for _, address := range addresses { validEndpoint[address] = true if _, ok := disconnect[address]; !ok { - disconnect[address] = connect(nil, address, config, transportConfig) + disconnect[address] = connect(ctx, nil, address, config, transportConfig) } } @@ -126,6 +134,7 @@ func Setup(ctx context.Context, config *config.Node) error { if !validEndpoint[address] { cancel() delete(disconnect, address) + logrus.Infof("Stopped tunnel to %s", address) } } } @@ -149,7 +158,7 @@ func Setup(ctx context.Context, config *config.Node) error { return nil } -func connect(waitGroup *sync.WaitGroup, address string, config *config.Node, transportConfig *transport.Config) context.CancelFunc { +func connect(rootCtx context.Context, waitGroup *sync.WaitGroup, address string, config *config.Node, transportConfig *transport.Config) context.CancelFunc { wsURL := fmt.Sprintf("wss://%s/v1-k3s/connect", address) headers := map[string][]string{ "X-K3s-NodeName": {config.AgentConfig.NodeName}, @@ -175,7 +184,7 @@ func connect(waitGroup *sync.WaitGroup, address string, config *config.Node, tra waitGroup.Add(1) } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(rootCtx) go func() { for { @@ -193,7 +202,6 @@ func connect(waitGroup *sync.WaitGroup, address string, config *config.Node, tra if waitGroup != nil { once.Do(waitGroup.Done) } - logrus.Infof("Stopped tunnel to %s", wsURL) return } }