Skip to content

Commit

Permalink
Merge pull request kubernetes#852 from smarterclayton/deliver_state_o…
Browse files Browse the repository at this point in the history
…n_nil_version

Watch delivers current state for resourceVersion=0
  • Loading branch information
lavalamp committed Aug 11, 2014
2 parents 8666781 + c5630a9 commit 5e34a97
Show file tree
Hide file tree
Showing 3 changed files with 281 additions and 16 deletions.
121 changes: 111 additions & 10 deletions pkg/tools/etcd_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,21 @@ func IsEtcdWatchStoppedByUser(err error) bool {
return etcd.ErrWatchStoppedByUser == err
}

// Returns true iff err is an etcd error, whose errorCode matches errorCode
// isEtcdErrorNum returns true iff err is an etcd error, whose errorCode matches errorCode
func isEtcdErrorNum(err error, errorCode int) bool {
etcdError, ok := err.(*etcd.EtcdError)
return ok && etcdError != nil && etcdError.ErrorCode == errorCode
}

// etcdErrorIndex returns the index associated with the error message and whether the
// index was available.
func etcdErrorIndex(err error) (uint64, bool) {
if etcdError, ok := err.(*etcd.EtcdError); ok {
return etcdError.Index, true
}
return 0, false
}

func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, error) {
result, err := h.Client.Get(key, false, true)
if err != nil {
Expand Down Expand Up @@ -301,24 +310,48 @@ func Everything(interface{}) bool {
// WatchList begins watching the specified key's items. Items are decoded into
// API objects, and any items passing 'filter' are sent down the returned
// watch.Interface. resourceVersion may be used to specify what version to begin
// watching (e.g., for reconnecting without missing any updateds).
// watching (e.g., for reconnecting without missing any updates).
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
w := newEtcdWatcher(true, filter, h.Codec)
w := newEtcdWatcher(true, filter, h.Codec, h.ResourceVersioner, nil)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}

// Watch begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface.
func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) {
w := newEtcdWatcher(false, nil, h.Codec)
return h.WatchAndTransform(key, resourceVersion, nil)
}

// WatchAndTransform begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface. If the transform
// function is provided, the value decoded from etcd will be passed to the function
// prior to being returned.
//
// The transform function can be used to populate data not available to etcd, or to
// change or wrap the serialized etcd object.
//
// startTime := time.Now()
// helper.WatchAndTransform(key, version, func(input interface{}) (interface{}, error) {
// value := input.(TimeAwareValue)
// value.Since = startTime
// return value, nil
// })
//
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) {
w := newEtcdWatcher(false, nil, h.Codec, h.ResourceVersioner, transform)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}

// TransformFunc attempts to convert an object to another object for use with a watcher
type TransformFunc func(interface{}) (interface{}, error)

// etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct {
encoding Codec
encoding Codec
versioner ResourceVersioner
transform TransformFunc

list bool // If we're doing a recursive watch, should be true.
filter FilterFunc
Expand All @@ -336,10 +369,13 @@ type etcdWatcher struct {
emit func(watch.Event)
}

// Returns a new etcdWatcher; if list is true, watch sub-nodes.
func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher {
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec, versioner ResourceVersioner, transform TransformFunc) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
Expand All @@ -358,13 +394,55 @@ func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher {
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) {
defer util.HandleCrash()
defer close(w.etcdCallEnded)
if resourceVersion == 0 {
latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming)
if !ok {
return
}
resourceVersion = latest
}
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop)
if err != etcd.ErrWatchStoppedByUser {
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err)
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key)
}
}

// Pull stuff from etcd, convert, and push out the outgoing channel. Meant to be
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, success bool) {
success = true

resp, err := client.Get(key, false, recursive)
if err != nil {
if !IsEtcdNotFound(err) {
glog.Errorf("watch was unable to retrieve the current index for the provided key: %v (%#v)", err, key)
success = false
return
}
if index, ok := etcdErrorIndex(err); ok {
resourceVersion = index
}
return
}
resourceVersion = resp.EtcdIndex
convertRecursiveResponse(resp.Node, resp, incoming)
return
}

// convertRecursiveResponse turns a recursive get response from etcd into individual response objects
// by copying the original response. This emulates the behavior of a recursive watch.
func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming chan<- *etcd.Response) {
if node.Dir {
for i := range node.Nodes {
convertRecursiveResponse(node.Nodes[i], response, incoming)
}
return
}
copied := *response
copied.Node = node
incoming <- &copied
}

// translate pulls stuff from etcd, convert, and push out the outgoing channel. Meant to be
// called as a goroutine.
func (w *etcdWatcher) translate() {
defer close(w.outgoing)
Expand All @@ -389,27 +467,31 @@ func (w *etcdWatcher) translate() {
func (w *etcdWatcher) sendResult(res *etcd.Response) {
var action watch.EventType
var data []byte
var index uint64
switch res.Action {
case "create":
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
data = []byte(res.Node.Value)
index = res.Node.ModifiedIndex
action = watch.Added
case "set":
case "set", "compareAndSwap", "get":
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
data = []byte(res.Node.Value)
index = res.Node.ModifiedIndex
action = watch.Modified
case "delete":
if res.PrevNode == nil {
glog.Errorf("unexpected nil prev node: %#v", res)
return
}
data = []byte(res.PrevNode.Value)
index = res.PrevNode.ModifiedIndex
action = watch.Deleted
default:
glog.Errorf("unknown action: %v", res.Action)
Expand All @@ -423,6 +505,25 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
w.Stop()
return
}

// ensure resource version is set on the object we load from etcd
if w.versioner != nil {
if err := w.versioner.SetResourceVersion(obj, index); err != nil {
glog.Errorf("failure to version api object (%d) %#v: %v", index, obj, err)
}
}

// perform any necessary transformation
if w.transform != nil {
obj, err = w.transform(obj)
if err != nil {
glog.Errorf("failure to transform api object %#v: %v", obj, err)
// TODO: expose an error through watch.Interface?
w.Stop()
return
}
}

w.emit(watch.Event{
Type: action,
Object: obj,
Expand Down
Loading

0 comments on commit 5e34a97

Please sign in to comment.