From 0f60d7bca30fb8506f5e116bb7a85792b6e99a4b Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Tue, 6 Jan 2015 11:36:03 -0800 Subject: [PATCH] Reset the resourceVersion so that we poll again for non-timeout errors. --- pkg/client/client.go | 25 +++++++++++++++++++++ pkg/proxy/config/api.go | 10 +++++++++ pkg/proxy/config/api_test.go | 42 ++++++++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+) diff --git a/pkg/client/client.go b/pkg/client/client.go index dc6d7f3e3e..b60e7880cf 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -19,6 +19,9 @@ package client import ( "encoding/json" "fmt" + "net" + "net/url" + "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/version" @@ -107,3 +110,25 @@ func (c *Client) ServerAPIVersions() (*api.APIVersions, error) { } return &v, nil } + +// IsTimeout tests if this is a timeout error in the underlying transport. +// This is unbelievably ugly. +// See: http://stackoverflow.com/questions/23494950/specifically-check-for-timeout-error for details +func IsTimeout(err error) bool { + if err == nil { + return false + } + switch err := err.(type) { + case *url.Error: + if err, ok := err.Err.(net.Error); ok { + return err.Timeout() + } + case net.Error: + return err.Timeout() + } + + if strings.Contains(err.Error(), "use of closed network connection") { + return true + } + return false +} diff --git a/pkg/proxy/config/api.go b/pkg/proxy/config/api.go index 68519b6758..87ce090247 100644 --- a/pkg/proxy/config/api.go +++ b/pkg/proxy/config/api.go @@ -20,6 +20,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" @@ -93,6 +94,10 @@ func (s *SourceAPI) runServices(resourceVersion *string) { watcher, err := s.servicesWatcher.Watch(labels.Everything(), labels.Everything(), *resourceVersion) if err != nil { glog.Errorf("Unable to watch for services changes: %v", err) + if !client.IsTimeout(err) { + // Reset so that we do a fresh get request + *resourceVersion = "" + } time.Sleep(wait.Jitter(s.waitDuration, 0.0)) return } @@ -157,6 +162,11 @@ func (s *SourceAPI) runEndpoints(resourceVersion *string) { watcher, err := s.endpointsWatcher.Watch(labels.Everything(), labels.Everything(), *resourceVersion) if err != nil { glog.Errorf("Unable to watch for endpoints changes: %v", err) + if !client.IsTimeout(err) { + // Reset so that we do a fresh get request + *resourceVersion = "" + } + time.Sleep(wait.Jitter(s.waitDuration, 0.0)) return } diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index bbca7a499f..ebb0b18d3c 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -120,6 +120,27 @@ func TestServicesError(t *testing.T) { close(ch) }() + // should have listed only + <-ch + if resourceVersion != "" { + t.Errorf("unexpected resource version, got %#v", resourceVersion) + } + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", "1"}}) { + t.Errorf("unexpected actions, got %#v", fakeClient) + } +} + +func TestServicesErrorTimeout(t *testing.T) { + fakeClient := &client.Fake{Err: errors.New("use of closed network connection")} + services := make(chan ServiceUpdate) + source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services} + resourceVersion := "1" + ch := make(chan struct{}) + go func() { + source.runServices(&resourceVersion) + close(ch) + }() + // should have listed only <-ch if resourceVersion != "1" { @@ -245,6 +266,27 @@ func TestEndpointsError(t *testing.T) { close(ch) }() + // should have listed only + <-ch + if resourceVersion != "" { + t.Errorf("unexpected resource version, got %#v", resourceVersion) + } + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", "1"}}) { + t.Errorf("unexpected actions, got %#v", fakeClient) + } +} + +func TestEndpointsErrorTimeout(t *testing.T) { + fakeClient := &client.Fake{Err: errors.New("use of closed network connection")} + endpoints := make(chan EndpointsUpdate) + source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints} + resourceVersion := "1" + ch := make(chan struct{}) + go func() { + source.runEndpoints(&resourceVersion) + close(ch) + }() + // should have listed only <-ch if resourceVersion != "1" {