Skip to content

Commit

Permalink
When connections are broken on Watch, write fewer errors to logs
Browse files Browse the repository at this point in the history
Watch depends on long running connections, which intervening proxies
may break without the control of the remote server. Specific errors
handled are io.EOF, io.EOF wrapped by *url.Error, http connection
reset errors (caused by race conditions in golang http code), and
connection reset by peer (simply tolerated).
  • Loading branch information
smarterclayton committed Dec 18, 2014
1 parent af571a2 commit b2434de
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pkg/client/cache/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string) err

watchDuration := time.Now().Sub(start)
if watchDuration < 1*time.Second && eventCount == 0 {
glog.Errorf("unexpected watch close - watch lasted less than a second and no items received")
glog.V(4).Infof("Unexpected watch close - watch lasted less than a second and no items received")
return errors.New("very short watch")
}
glog.V(4).Infof("Watch close - %v total %v items received", r.expectedType, eventCount)
Expand Down
23 changes: 23 additions & 0 deletions pkg/client/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/url"
"path"
"strconv"
"strings"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
Expand Down Expand Up @@ -298,6 +299,9 @@ func (r *Request) Watch() (watch.Interface, error) {
}
resp, err := client.Do(req)
if err != nil {
if isProbableEOF(err) {
return watch.NewEmptyWatch(), nil
}
return nil, err
}
if resp.StatusCode != http.StatusOK {
Expand All @@ -310,6 +314,25 @@ func (r *Request) Watch() (watch.Interface, error) {
return watch.NewStreamWatcher(watchjson.NewDecoder(resp.Body, r.codec)), nil
}

// isProbableEOF returns true if the given error resembles a connection termination
// scenario that would justify assuming that the watch is empty. The watch stream
// mechanism handles many common partial data errors, so closed connections can be
// retried in many cases.
func isProbableEOF(err error) bool {
if uerr, ok := err.(*url.Error); ok {
err = uerr.Err
}
switch {
case err == io.EOF:
return true
case err.Error() == "http: can't write HTTP request on broken connection":
return true
case strings.Contains(err.Error(), "connection reset by peer"):
return true
}
return false
}

// Stream formats and executes the request, and offers streaming of the response.
// Returns io.ReadCloser which could be used for streaming of the response, or an error
func (r *Request) Stream() (io.ReadCloser, error) {
Expand Down
46 changes: 46 additions & 0 deletions pkg/client/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"encoding/base64"
"errors"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -164,6 +165,7 @@ func TestRequestWatch(t *testing.T) {
testCases := []struct {
Request *Request
Err bool
Empty bool
}{
{
Request: &Request{err: errors.New("bail")},
Expand Down Expand Up @@ -191,15 +193,59 @@ func TestRequestWatch(t *testing.T) {
},
Err: true,
},
{
Request: &Request{
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return nil, io.EOF
}),
baseURL: &url.URL{},
},
Empty: true,
},
{
Request: &Request{
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return nil, &url.Error{Err: io.EOF}
}),
baseURL: &url.URL{},
},
Empty: true,
},
{
Request: &Request{
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return nil, errors.New("http: can't write HTTP request on broken connection")
}),
baseURL: &url.URL{},
},
Empty: true,
},
{
Request: &Request{
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return nil, errors.New("foo: connection reset by peer")
}),
baseURL: &url.URL{},
},
Empty: true,
},
}
for i, testCase := range testCases {
watch, err := testCase.Request.Watch()
hasErr := err != nil
if hasErr != testCase.Err {
t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err)
continue
}
if hasErr && watch != nil {
t.Errorf("%d: watch should be nil when error is returned", i)
continue
}
if testCase.Empty {
_, ok := <-watch.ResultChan()
if ok {
t.Errorf("%d: expected the watch to be empty: %#v", watch)
}
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,26 @@ type Event struct {
Object runtime.Object
}

type emptyWatch chan Event

// NewEmptyWatch returns a watch interface that returns no results and is closed.
// May be used in certain error conditions where no information is available but
// an error is not warranted.
func NewEmptyWatch() Interface {
ch := make(chan Event)
close(ch)
return emptyWatch(ch)
}

// Stop implements Interface
func (w emptyWatch) Stop() {
}

// ResultChan implements Interface
func (w emptyWatch) ResultChan() <-chan Event {
return chan Event(w)
}

// FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
type FakeWatcher struct {
result chan Event
Expand Down
13 changes: 13 additions & 0 deletions pkg/watch/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,16 @@ func TestFake(t *testing.T) {
go sender()
consumer(f)
}

func TestEmpty(t *testing.T) {
w := NewEmptyWatch()
_, ok := <-w.ResultChan()
if ok {
t.Errorf("unexpected result channel result")
}
w.Stop()
_, ok = <-w.ResultChan()
if ok {
t.Errorf("unexpected result channel result")
}
}

0 comments on commit b2434de

Please sign in to comment.