Skip to content

Commit

Permalink
Merge pull request kubernetes#535 from lavalamp/etcdWatcher
Browse files Browse the repository at this point in the history
Etcd watcher
  • Loading branch information
smarterclayton committed Jul 25, 2014
2 parents e3927b4 + b630c7b commit fbd71c9
Show file tree
Hide file tree
Showing 2 changed files with 338 additions and 0 deletions.
157 changes: 157 additions & 0 deletions pkg/tools/etcd_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import (
"reflect"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
)

const (
Expand Down Expand Up @@ -53,6 +56,7 @@ type EtcdGetSet interface {
Get(key string, sort, recursive bool) (*etcd.Response, error)
Set(key, value string, ttl uint64) (*etcd.Response, error)
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
}

// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
Expand Down Expand Up @@ -228,3 +232,156 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E
return err
}
}

// FilterFunc is a predicate which takes an API object and returns true
// iff the object should remain in the set.
type FilterFunc func(obj interface{}) bool

// Everything is a FilterFunc which accepts all objects.
func Everything(interface{}) bool {
return true
}

// WatchList begins watching the specified key's items. Items are decoded into
// API objects, and any items passing 'filter' are sent down the returned
// watch.Interface.
func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, error) {
w := newEtcdWatcher(true, filter)
go w.etcdWatch(h.Client, key)
return w, nil
}

// Watch begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface.
func (h *EtcdHelper) Watch(key string) (watch.Interface, error) {
w := newEtcdWatcher(false, nil)
go w.etcdWatch(h.Client, key)
return w, nil
}

// etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct {
list bool // If we're doing a recursive watch, should be true.
filter FilterFunc

etcdIncoming chan *etcd.Response
etcdStop chan bool

outgoing chan watch.Event
userStop chan struct{}
}

// Returns a new etcdWatcher; if list is true, watch sub-nodes.
func newEtcdWatcher(list bool, filter FilterFunc) *etcdWatcher {
w := &etcdWatcher{
list: list,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
etcdStop: make(chan bool),
outgoing: make(chan watch.Event),
userStop: make(chan struct{}),
}
go w.translate()
return w
}

// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
// as a goroutine.
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string) {
defer util.HandleCrash()
_, err := client.Watch(key, 0, w.list, w.etcdIncoming, w.etcdStop)
if err == etcd.ErrWatchStoppedByUser {
// etcd doesn't close the channel in this case.
close(w.etcdIncoming)
} else {
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err)
}
}

// Pull stuff from etcd, convert, and push out the outgoing channel. Meant to be
// called as a goroutine.
func (w *etcdWatcher) translate() {
defer close(w.outgoing)
defer util.HandleCrash()

for {
select {
case <-w.userStop:
w.etcdStop <- true
return
case res, ok := <-w.etcdIncoming:
if !ok {
return
}
w.sendResult(res)
}
}
}

func (w *etcdWatcher) sendResult(res *etcd.Response) {
var action watch.EventType
var data []byte
var nodes etcd.Nodes
switch res.Action {
case "set":
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
data = []byte(res.Node.Value)
nodes = res.Node.Nodes
// TODO: Is this conditional correct?
if res.EtcdIndex > 0 {
action = watch.Modified
} else {
action = watch.Added
}
case "delete":
if res.PrevNode == nil {
glog.Errorf("unexpected nil prev node: %#v", res)
return
}
data = []byte(res.PrevNode.Value)
nodes = res.PrevNode.Nodes
action = watch.Deleted
}

// If listing, we're interested in sub-nodes.
if w.list {
for _, n := range nodes {
obj, err := api.Decode([]byte(n.Value))
if err != nil {
glog.Errorf("failure to decode api object: %#v", res)
continue
}
if w.filter != nil && !w.filter(obj) {
continue
}
w.outgoing <- watch.Event{
Type: action,
Object: obj,
}
}
return
}

obj, err := api.Decode(data)
if err != nil {
glog.Errorf("failure to decode api object: %#v", res)
return
}
w.outgoing <- watch.Event{
Type: action,
Object: obj,
}
}

// ResultChannel implements watch.Interface.
func (w *etcdWatcher) ResultChan() <-chan watch.Event {
return w.outgoing
}

// Stop implements watch.Interface.
func (w *etcdWatcher) Stop() {
close(w.userStop)
}
181 changes: 181 additions & 0 deletions pkg/tools/etcd_tools_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ import (
"fmt"
"reflect"
"testing"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
)

Expand Down Expand Up @@ -150,3 +153,181 @@ func TestSetObj(t *testing.T) {
t.Errorf("Wanted %v, got %v", expect, got)
}
}

func TestWatchInterpretation_ListAdd(t *testing.T) {
called := false
w := newEtcdWatcher(true, func(interface{}) bool {
called = true
return true
})
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := api.Encode(pod)

go w.sendResult(&etcd.Response{
Action: "set",
Node: &etcd.Node{
Nodes: etcd.Nodes{
{
Value: string(podBytes),
},
},
},
})

got := <-w.outgoing
if e, a := watch.Added, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
if !called {
t.Errorf("filter never called")
}
}

func TestWatchInterpretation_ListDelete(t *testing.T) {
called := false
w := newEtcdWatcher(true, func(interface{}) bool {
called = true
return true
})
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := api.Encode(pod)

go w.sendResult(&etcd.Response{
Action: "delete",
PrevNode: &etcd.Node{
Nodes: etcd.Nodes{
{
Value: string(podBytes),
},
},
},
})

got := <-w.outgoing
if e, a := watch.Deleted, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
if !called {
t.Errorf("filter never called")
}
}

func TestWatchInterpretation_SingleAdd(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
})
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := api.Encode(pod)

go w.sendResult(&etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: string(podBytes),
},
})

got := <-w.outgoing
if e, a := watch.Added, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}

func TestWatchInterpretation_SingleDelete(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
})
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := api.Encode(pod)

go w.sendResult(&etcd.Response{
Action: "delete",
PrevNode: &etcd.Node{
Value: string(podBytes),
},
})

got := <-w.outgoing
if e, a := watch.Deleted, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}

func TestWatch(t *testing.T) {
fakeEtcd := MakeFakeEtcdClient(t)
h := EtcdHelper{fakeEtcd}

watching, err := h.Watch("/some/key")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

fakeEtcd.WaitForWatchCompletion()

// Test normal case
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := api.Encode(pod)
fakeEtcd.WatchResponse <- &etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: string(podBytes),
},
}

select {
case event := <-watching.ResultChan():
if e, a := watch.Added, event.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, event.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
case <-time.After(10 * time.Millisecond):
t.Errorf("Expected 1 call but got 0")
}

// Test error case
fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error")

// Did everything shut down?
if _, open := <-fakeEtcd.WatchResponse; open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
if _, open := <-watching.ResultChan(); open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
}

func TestWatchPurposefulShutdown(t *testing.T) {
fakeEtcd := MakeFakeEtcdClient(t)
h := EtcdHelper{fakeEtcd}

// Test purposeful shutdown
watching, err := h.Watch("/some/key")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeEtcd.WaitForWatchCompletion()
watching.Stop()

// Did everything shut down?
if _, open := <-fakeEtcd.WatchResponse; open {
t.Errorf("A stop did not cause a graceful shutdown")
}
if _, open := <-watching.ResultChan(); open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
}

0 comments on commit fbd71c9

Please sign in to comment.