Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watch delivers current state for resourceVersion=0 #852

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 111 additions & 10 deletions pkg/tools/etcd_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,21 @@ func IsEtcdWatchStoppedByUser(err error) bool {
return etcd.ErrWatchStoppedByUser == err
}

// Returns true iff err is an etcd error, whose errorCode matches errorCode
// isEtcdErrorNum returns true iff err is an etcd error, whose errorCode matches errorCode
func isEtcdErrorNum(err error, errorCode int) bool {
etcdError, ok := err.(*etcd.EtcdError)
return ok && etcdError != nil && etcdError.ErrorCode == errorCode
}

// etcdErrorIndex returns the index associated with the error message and whether the
// index was available.
func etcdErrorIndex(err error) (uint64, bool) {
if etcdError, ok := err.(*etcd.EtcdError); ok {
return etcdError.Index, true
}
return 0, false
}

func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, error) {
result, err := h.Client.Get(key, false, true)
if err != nil {
Expand Down Expand Up @@ -297,24 +306,48 @@ func Everything(interface{}) bool {
// WatchList begins watching the specified key's items. Items are decoded into
// API objects, and any items passing 'filter' are sent down the returned
// watch.Interface. resourceVersion may be used to specify what version to begin
// watching (e.g., for reconnecting without missing any updateds).
// watching (e.g., for reconnecting without missing any updates).
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
w := newEtcdWatcher(true, filter, h.Codec)
w := newEtcdWatcher(true, filter, h.Codec, h.ResourceVersioner, nil)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}

// Watch begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface.
func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) {
w := newEtcdWatcher(false, nil, h.Codec)
return h.WatchAndTransform(key, resourceVersion, nil)
}

// WatchAndTransform begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface. If the transform
// function is provided, the value decoded from etcd will be passed to the function
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to be a pain - what is the transform function signature, and what is it supposed to do? Maybe a (textual) example?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops, signature is just below. Still a bit more on the intended use would help

// prior to being returned.
//
// The transform function can be used to populate data not available to etcd, or to
// change or wrap the serialized etcd object.
//
// startTime := time.Now()
// helper.WatchAndTransform(key, version, func(input interface{}) (interface{}, error) {
// value := input.(TimeAwareValue)
// value.Since = startTime
// return value, nil
// })
//
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) {
w := newEtcdWatcher(false, nil, h.Codec, h.ResourceVersioner, transform)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}

// TransformFunc attempts to convert an object to another object for use with a watcher
type TransformFunc func(interface{}) (interface{}, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It occurs to me now that TransformFunc and my own FilterFunc are actually better applied as a separate, layered watch.

e.g., in the watch package, func Filter(w Interface, f FilterFunc) Interface and func Transform(w Interface, f TransformFunc) Interface. Or they could even be combined, with FilterFunc returning the new object and whether to send it.

I can do that in another PR if you don't want to do it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree - maybe follow on pull?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.


// etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct {
encoding Codec
encoding Codec
versioner ResourceVersioner
transform TransformFunc

list bool // If we're doing a recursive watch, should be true.
filter FilterFunc
Expand All @@ -332,10 +365,13 @@ type etcdWatcher struct {
emit func(watch.Event)
}

// Returns a new etcdWatcher; if list is true, watch sub-nodes.
func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher {
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec, versioner ResourceVersioner, transform TransformFunc) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
Expand All @@ -354,13 +390,55 @@ func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher {
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) {
defer util.HandleCrash()
defer close(w.etcdCallEnded)
if resourceVersion == 0 {
latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming)
if !ok {
return
}
resourceVersion = latest
}
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop)
if err != etcd.ErrWatchStoppedByUser {
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err)
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key)
}
}

// Pull stuff from etcd, convert, and push out the outgoing channel. Meant to be
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, success bool) {
success = true

resp, err := client.Get(key, false, recursive)
if err != nil {
if !IsEtcdNotFound(err) {
glog.Errorf("watch was unable to retrieve the current index for the provided key: %v (%#v)", err, key)
success = false
return
}
if index, ok := etcdErrorIndex(err); ok {
resourceVersion = index
}
return
}
resourceVersion = resp.EtcdIndex
convertRecursiveResponse(resp.Node, resp, incoming)
return
}

// convertRecursiveResponse turns a recursive get response from etcd into individual response objects
// by copying the original response. This emulates the behavior of a recursive watch.
func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming chan<- *etcd.Response) {
if node.Dir {
for i := range node.Nodes {
convertRecursiveResponse(node.Nodes[i], response, incoming)
}
return
}
copied := *response
copied.Node = node
incoming <- &copied
}

// translate pulls stuff from etcd, convert, and push out the outgoing channel. Meant to be
// called as a goroutine.
func (w *etcdWatcher) translate() {
defer close(w.outgoing)
Expand All @@ -385,27 +463,31 @@ func (w *etcdWatcher) translate() {
func (w *etcdWatcher) sendResult(res *etcd.Response) {
var action watch.EventType
var data []byte
var index uint64
switch res.Action {
case "create":
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
data = []byte(res.Node.Value)
index = res.Node.ModifiedIndex
action = watch.Added
case "set":
case "set", "compareAndSwap", "get":
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
data = []byte(res.Node.Value)
index = res.Node.ModifiedIndex
action = watch.Modified
case "delete":
if res.PrevNode == nil {
glog.Errorf("unexpected nil prev node: %#v", res)
return
}
data = []byte(res.PrevNode.Value)
index = res.PrevNode.ModifiedIndex
action = watch.Deleted
default:
glog.Errorf("unknown action: %v", res.Action)
Expand All @@ -419,6 +501,25 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
w.Stop()
return
}

// ensure resource version is set on the object we load from etcd
if w.versioner != nil {
if err := w.versioner.SetResourceVersion(obj, index); err != nil {
glog.Errorf("failure to version api object (%d) %#v: %v", index, obj, err)
}
}

// perform any necessary transformation
if w.transform != nil {
obj, err = w.transform(obj)
if err != nil {
glog.Errorf("failure to transform api object %#v: %v", obj, err)
// TODO: expose an error through watch.Interface?
w.Stop()
return
}
}

w.emit(watch.Event{
Type: action,
Object: obj,
Expand Down
Loading