Cleanup tunnel

pull/636/head
Erik Wilson 2019-07-14 00:29:21 -07:00
parent 34fc4d0336
commit e77dc568bb
1 changed files with 6 additions and 8 deletions

View File

@ -18,6 +18,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
watchtypes "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/transport" "k8s.io/client-go/transport"
@ -68,12 +69,10 @@ func Setup(config *config.Node) error {
} }
addresses := []string{config.ServerAddress} addresses := []string{config.ServerAddress}
endpointResourceVersion := ""
endpoint, _ := client.CoreV1().Endpoints("default").Get("kubernetes", metav1.GetOptions{}) endpoint, _ := client.CoreV1().Endpoints("default").Get("kubernetes", metav1.GetOptions{})
if endpoint != nil { if endpoint != nil {
addresses = getAddresses(endpoint) addresses = getAddresses(endpoint)
endpointResourceVersion = endpoint.ResourceVersion
} }
disconnect := map[string]context.CancelFunc{} disconnect := map[string]context.CancelFunc{}
@ -89,29 +88,28 @@ func Setup(config *config.Node) error {
go func() { go func() {
connect: connect:
for { for {
time.Sleep(5 * time.Second)
watch, err := client.CoreV1().Endpoints("default").Watch(metav1.ListOptions{ watch, err := client.CoreV1().Endpoints("default").Watch(metav1.ListOptions{
FieldSelector: fields.Set{"metadata.name": "kubernetes"}.String(), FieldSelector: fields.Set{"metadata.name": "kubernetes"}.String(),
ResourceVersion: endpointResourceVersion, ResourceVersion: "0",
}) })
if err != nil { if err != nil {
logrus.Errorf("Unable to watch for tunnel endpoints: %v", err) logrus.Errorf("Unable to watch for tunnel endpoints: %v", err)
time.Sleep(5 * time.Second)
continue connect continue connect
} }
watching: watching:
for { for {
select { select {
case ev, ok := <-watch.ResultChan(): case ev, ok := <-watch.ResultChan():
if !ok { if !ok || ev.Type == watchtypes.Error {
logrus.Error("Tunnel endpoint watch channel closed") logrus.Errorf("Tunnel endpoint watch channel closed: %v", ev)
continue connect continue connect
} }
endpoint, ok := ev.Object.(*v1.Endpoints) endpoint, ok := ev.Object.(*v1.Endpoints)
if !ok { if !ok {
logrus.Error("Tunnel could not case event object to endpoint") logrus.Errorf("Tunnel could not case event object to endpoint: %v", ev)
continue watching continue watching
} }
endpointResourceVersion = endpoint.ResourceVersion
var addresses = getAddresses(endpoint) var addresses = getAddresses(endpoint)
logrus.Infof("Tunnel endpoint watch event: %v", addresses) logrus.Infof("Tunnel endpoint watch event: %v", addresses)