mirror of https://github.com/k3s-io/k3s
Merge pull request #3240 from brendandburns/tap
Fix the service proxy to re-poll on watch closure no matter what.pull/6/head
commit
937d88d829
|
@ -19,6 +19,9 @@ package client
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
|
||||||
|
@ -107,3 +110,25 @@ func (c *Client) ServerAPIVersions() (*api.APIVersions, error) {
|
||||||
}
|
}
|
||||||
return &v, nil
|
return &v, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsTimeout tests if this is a timeout error in the underlying transport.
|
||||||
|
// This is unbelievably ugly.
|
||||||
|
// See: http://stackoverflow.com/questions/23494950/specifically-check-for-timeout-error for details
|
||||||
|
func IsTimeout(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
switch err := err.(type) {
|
||||||
|
case *url.Error:
|
||||||
|
if err, ok := err.Err.(net.Error); ok {
|
||||||
|
return err.Timeout()
|
||||||
|
}
|
||||||
|
case net.Error:
|
||||||
|
return err.Timeout()
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.Contains(err.Error(), "use of closed network connection") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
|
||||||
|
@ -93,6 +94,10 @@ func (s *SourceAPI) runServices(resourceVersion *string) {
|
||||||
watcher, err := s.servicesWatcher.Watch(labels.Everything(), labels.Everything(), *resourceVersion)
|
watcher, err := s.servicesWatcher.Watch(labels.Everything(), labels.Everything(), *resourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Unable to watch for services changes: %v", err)
|
glog.Errorf("Unable to watch for services changes: %v", err)
|
||||||
|
if !client.IsTimeout(err) {
|
||||||
|
// Reset so that we do a fresh get request
|
||||||
|
*resourceVersion = ""
|
||||||
|
}
|
||||||
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
|
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -157,6 +162,11 @@ func (s *SourceAPI) runEndpoints(resourceVersion *string) {
|
||||||
watcher, err := s.endpointsWatcher.Watch(labels.Everything(), labels.Everything(), *resourceVersion)
|
watcher, err := s.endpointsWatcher.Watch(labels.Everything(), labels.Everything(), *resourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Unable to watch for endpoints changes: %v", err)
|
glog.Errorf("Unable to watch for endpoints changes: %v", err)
|
||||||
|
if !client.IsTimeout(err) {
|
||||||
|
// Reset so that we do a fresh get request
|
||||||
|
*resourceVersion = ""
|
||||||
|
}
|
||||||
|
|
||||||
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
|
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,6 +120,27 @@ func TestServicesError(t *testing.T) {
|
||||||
close(ch)
|
close(ch)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// should have listed only
|
||||||
|
<-ch
|
||||||
|
if resourceVersion != "" {
|
||||||
|
t.Errorf("unexpected resource version, got %#v", resourceVersion)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", "1"}}) {
|
||||||
|
t.Errorf("unexpected actions, got %#v", fakeClient)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServicesErrorTimeout(t *testing.T) {
|
||||||
|
fakeClient := &client.Fake{Err: errors.New("use of closed network connection")}
|
||||||
|
services := make(chan ServiceUpdate)
|
||||||
|
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services}
|
||||||
|
resourceVersion := "1"
|
||||||
|
ch := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
source.runServices(&resourceVersion)
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
|
||||||
// should have listed only
|
// should have listed only
|
||||||
<-ch
|
<-ch
|
||||||
if resourceVersion != "1" {
|
if resourceVersion != "1" {
|
||||||
|
@ -245,6 +266,27 @@ func TestEndpointsError(t *testing.T) {
|
||||||
close(ch)
|
close(ch)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// should have listed only
|
||||||
|
<-ch
|
||||||
|
if resourceVersion != "" {
|
||||||
|
t.Errorf("unexpected resource version, got %#v", resourceVersion)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", "1"}}) {
|
||||||
|
t.Errorf("unexpected actions, got %#v", fakeClient)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEndpointsErrorTimeout(t *testing.T) {
|
||||||
|
fakeClient := &client.Fake{Err: errors.New("use of closed network connection")}
|
||||||
|
endpoints := make(chan EndpointsUpdate)
|
||||||
|
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}
|
||||||
|
resourceVersion := "1"
|
||||||
|
ch := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
source.runEndpoints(&resourceVersion)
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
|
||||||
// should have listed only
|
// should have listed only
|
||||||
<-ch
|
<-ch
|
||||||
if resourceVersion != "1" {
|
if resourceVersion != "1" {
|
||||||
|
|
Loading…
Reference in New Issue