Skip to content

Commit

Permalink
Reset the resourceVersion so that we poll again for non-timeout errors.
Browse files Browse the repository at this point in the history
  • Loading branch information
brendandburns committed Jan 6, 2015
1 parent 4432ba0 commit 0f60d7b
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 0 deletions.
25 changes: 25 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package client
import (
"encoding/json"
"fmt"
"net"
"net/url"
"strings"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
Expand Down Expand Up @@ -107,3 +110,25 @@ func (c *Client) ServerAPIVersions() (*api.APIVersions, error) {
}
return &v, nil
}

// IsTimeout tests if this is a timeout error in the underlying transport.
// This is unbelievably ugly.
// See: http://stackoverflow.com/questions/23494950/specifically-check-for-timeout-error for details
func IsTimeout(err error) bool {
if err == nil {
return false
}
switch err := err.(type) {
case *url.Error:
if err, ok := err.Err.(net.Error); ok {
return err.Timeout()
}
case net.Error:
return err.Timeout()
}

if strings.Contains(err.Error(), "use of closed network connection") {
return true
}
return false
}
10 changes: 10 additions & 0 deletions pkg/proxy/config/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
Expand Down Expand Up @@ -93,6 +94,10 @@ func (s *SourceAPI) runServices(resourceVersion *string) {
watcher, err := s.servicesWatcher.Watch(labels.Everything(), labels.Everything(), *resourceVersion)
if err != nil {
glog.Errorf("Unable to watch for services changes: %v", err)
if !client.IsTimeout(err) {
// Reset so that we do a fresh get request
*resourceVersion = ""
}
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
return
}
Expand Down Expand Up @@ -157,6 +162,11 @@ func (s *SourceAPI) runEndpoints(resourceVersion *string) {
watcher, err := s.endpointsWatcher.Watch(labels.Everything(), labels.Everything(), *resourceVersion)
if err != nil {
glog.Errorf("Unable to watch for endpoints changes: %v", err)
if !client.IsTimeout(err) {
// Reset so that we do a fresh get request
*resourceVersion = ""
}

time.Sleep(wait.Jitter(s.waitDuration, 0.0))
return
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/proxy/config/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,27 @@ func TestServicesError(t *testing.T) {
close(ch)
}()

// should have listed only
<-ch
if resourceVersion != "" {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", "1"}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}

func TestServicesErrorTimeout(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("use of closed network connection")}
services := make(chan ServiceUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services}
resourceVersion := "1"
ch := make(chan struct{})
go func() {
source.runServices(&resourceVersion)
close(ch)
}()

// should have listed only
<-ch
if resourceVersion != "1" {
Expand Down Expand Up @@ -245,6 +266,27 @@ func TestEndpointsError(t *testing.T) {
close(ch)
}()

// should have listed only
<-ch
if resourceVersion != "" {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", "1"}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}

func TestEndpointsErrorTimeout(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("use of closed network connection")}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}
resourceVersion := "1"
ch := make(chan struct{})
go func() {
source.runEndpoints(&resourceVersion)
close(ch)
}()

// should have listed only
<-ch
if resourceVersion != "1" {
Expand Down

0 comments on commit 0f60d7b

Please sign in to comment.