Skip to content

Commit

Permalink
make watch actually return an error when there's an error
Browse files Browse the repository at this point in the history
  • Loading branch information
lavalamp committed Sep 20, 2014
1 parent 334f9dc commit 0d69393
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 12 deletions.
42 changes: 30 additions & 12 deletions pkg/tools/etcd_tools_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package tools

import (
"sync"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
Expand Down Expand Up @@ -69,7 +70,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface,
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) {
w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
return w, <-w.immediateError
}

// TransformFunc attempts to convert an object to another object for use with a watcher.
Expand All @@ -88,6 +89,11 @@ type etcdWatcher struct {
etcdStop chan bool
etcdCallEnded chan struct{}

// etcdWatch will send an error down this channel if the Watch fails.
// Otherwise, a nil will be sent down this channel watchWaitDuration
// after the watch starts.
immediateError chan error

outgoing chan watch.Event
userStop chan struct{}
stopped bool
Expand All @@ -97,31 +103,42 @@ type etcdWatcher struct {
emit func(watch.Event)
}

// watchWaitDuration is the amount of time to wait for an error from watch.
const watchWaitDuration = 100 * time.Millisecond

// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner runtime.ResourceVersioner, transform TransformFunc) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
etcdStop: make(chan bool),
etcdCallEnded: make(chan struct{}),
outgoing: make(chan watch.Event),
userStop: make(chan struct{}),
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
etcdStop: make(chan bool),
etcdCallEnded: make(chan struct{}),
immediateError: make(chan error),
outgoing: make(chan watch.Event),
userStop: make(chan struct{}),
}
w.emit = func(e watch.Event) { w.outgoing <- e }
go w.translate()
return w
}

// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
// as a goroutine.
// as a goroutine. Will either send an error over w.immediateError if Watch fails, or in 100ms will
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) {
defer util.HandleCrash()
defer close(w.etcdCallEnded)
go func() {
// This is racy; assume that Watch will fail within 100ms if it is going to fail.
// It's still more useful than blocking until the first result shows up.
// Trying to detect the 401: watch window expired error.
<-time.After(watchWaitDuration)
w.immediateError <- nil
}()
if resourceVersion == 0 {
latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming)
if !ok {
Expand All @@ -132,6 +149,7 @@ func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion u
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop)
if err != etcd.ErrWatchStoppedByUser {
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key)
w.immediateError <- err
}
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/tools/etcd_tools_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,19 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
}
}

func TestWatchEtcdError(t *testing.T) {
codec := latest.Codec
fakeClient := NewFakeEtcdClient(t)
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
h := EtcdHelper{fakeClient, codec, versioner}

_, err := h.Watch("/some/key", 0)
if err == nil {
t.Fatalf("Unexpected non-error")
}
}

func TestWatch(t *testing.T) {
codec := latest.Codec
fakeClient := NewFakeEtcdClient(t)
Expand Down
5 changes: 5 additions & 0 deletions pkg/tools/fake_etcd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type FakeEtcdClient struct {
// Write to this to prematurely stop a Watch that is running in a goroutine.
WatchInjectError chan<- error
WatchStop chan<- bool
// If non-nil, will be returned immediately when Watch is called.
WatchImmediateError error
}

func NewFakeEtcdClient(t TestLogger) *FakeEtcdClient {
Expand Down Expand Up @@ -250,6 +252,9 @@ func (f *FakeEtcdClient) WaitForWatchCompletion() {
}

func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
if f.WatchImmediateError != nil {
return nil, f.WatchImmediateError
}
f.WatchResponse = receiver
f.WatchStop = stop
f.WatchIndex = waitIndex
Expand Down

0 comments on commit 0d69393

Please sign in to comment.