-
Notifications
You must be signed in to change notification settings - Fork 40.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Watch delivers current state for resourceVersion=0 #852
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -106,12 +106,21 @@ func IsEtcdWatchStoppedByUser(err error) bool { | |
return etcd.ErrWatchStoppedByUser == err | ||
} | ||
|
||
// Returns true iff err is an etcd error, whose errorCode matches errorCode | ||
// isEtcdErrorNum returns true iff err is an etcd error, whose errorCode matches errorCode | ||
func isEtcdErrorNum(err error, errorCode int) bool { | ||
etcdError, ok := err.(*etcd.EtcdError) | ||
return ok && etcdError != nil && etcdError.ErrorCode == errorCode | ||
} | ||
|
||
// etcdErrorIndex returns the index associated with the error message and whether the | ||
// index was available. | ||
func etcdErrorIndex(err error) (uint64, bool) { | ||
if etcdError, ok := err.(*etcd.EtcdError); ok { | ||
return etcdError.Index, true | ||
} | ||
return 0, false | ||
} | ||
|
||
func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, error) { | ||
result, err := h.Client.Get(key, false, true) | ||
if err != nil { | ||
|
@@ -297,24 +306,48 @@ func Everything(interface{}) bool { | |
// WatchList begins watching the specified key's items. Items are decoded into | ||
// API objects, and any items passing 'filter' are sent down the returned | ||
// watch.Interface. resourceVersion may be used to specify what version to begin | ||
// watching (e.g., for reconnecting without missing any updateds). | ||
// watching (e.g., for reconnecting without missing any updates). | ||
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { | ||
w := newEtcdWatcher(true, filter, h.Codec) | ||
w := newEtcdWatcher(true, filter, h.Codec, h.ResourceVersioner, nil) | ||
go w.etcdWatch(h.Client, key, resourceVersion) | ||
return w, nil | ||
} | ||
|
||
// Watch begins watching the specified key. Events are decoded into | ||
// API objects and sent down the returned watch.Interface. | ||
func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) { | ||
w := newEtcdWatcher(false, nil, h.Codec) | ||
return h.WatchAndTransform(key, resourceVersion, nil) | ||
} | ||
|
||
// WatchAndTransform begins watching the specified key. Events are decoded into | ||
// API objects and sent down the returned watch.Interface. If the transform | ||
// function is provided, the value decoded from etcd will be passed to the function | ||
// prior to being returned. | ||
// | ||
// The transform function can be used to populate data not available to etcd, or to | ||
// change or wrap the serialized etcd object. | ||
// | ||
// startTime := time.Now() | ||
// helper.WatchAndTransform(key, version, func(input interface{}) (interface{}, error) { | ||
// value := input.(TimeAwareValue) | ||
// value.Since = startTime | ||
// return value, nil | ||
// }) | ||
// | ||
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) { | ||
w := newEtcdWatcher(false, nil, h.Codec, h.ResourceVersioner, transform) | ||
go w.etcdWatch(h.Client, key, resourceVersion) | ||
return w, nil | ||
} | ||
|
||
// TransformFunc attempts to convert an object to another object for use with a watcher | ||
type TransformFunc func(interface{}) (interface{}, error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It occurs to me now that TransformFunc and my own FilterFunc are actually better applied as a separate, layered watch. e.g., in the watch package, I can do that in another PR if you don't want to do it here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree - maybe follow on pull? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. |
||
|
||
// etcdWatcher converts a native etcd watch to a watch.Interface. | ||
type etcdWatcher struct { | ||
encoding Codec | ||
encoding Codec | ||
versioner ResourceVersioner | ||
transform TransformFunc | ||
|
||
list bool // If we're doing a recursive watch, should be true. | ||
filter FilterFunc | ||
|
@@ -332,10 +365,13 @@ type etcdWatcher struct { | |
emit func(watch.Event) | ||
} | ||
|
||
// Returns a new etcdWatcher; if list is true, watch sub-nodes. | ||
func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher { | ||
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform | ||
// and a versioner, the versioner must be able to handle the objects that transform creates. | ||
func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec, versioner ResourceVersioner, transform TransformFunc) *etcdWatcher { | ||
w := &etcdWatcher{ | ||
encoding: encoding, | ||
versioner: versioner, | ||
transform: transform, | ||
list: list, | ||
filter: filter, | ||
etcdIncoming: make(chan *etcd.Response), | ||
|
@@ -354,13 +390,55 @@ func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher { | |
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) { | ||
defer util.HandleCrash() | ||
defer close(w.etcdCallEnded) | ||
if resourceVersion == 0 { | ||
latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming) | ||
if !ok { | ||
return | ||
} | ||
resourceVersion = latest | ||
} | ||
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop) | ||
if err != etcd.ErrWatchStoppedByUser { | ||
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err) | ||
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key) | ||
} | ||
} | ||
|
||
// Pull stuff from etcd, convert, and push out the outgoing channel. Meant to be | ||
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent | ||
func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, success bool) { | ||
success = true | ||
|
||
resp, err := client.Get(key, false, recursive) | ||
if err != nil { | ||
if !IsEtcdNotFound(err) { | ||
glog.Errorf("watch was unable to retrieve the current index for the provided key: %v (%#v)", err, key) | ||
success = false | ||
return | ||
} | ||
if index, ok := etcdErrorIndex(err); ok { | ||
resourceVersion = index | ||
} | ||
return | ||
} | ||
resourceVersion = resp.EtcdIndex | ||
convertRecursiveResponse(resp.Node, resp, incoming) | ||
return | ||
} | ||
|
||
// convertRecursiveResponse turns a recursive get response from etcd into individual response objects | ||
// by copying the original response. This emulates the behavior of a recursive watch. | ||
func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming chan<- *etcd.Response) { | ||
if node.Dir { | ||
for i := range node.Nodes { | ||
convertRecursiveResponse(node.Nodes[i], response, incoming) | ||
} | ||
return | ||
} | ||
copied := *response | ||
copied.Node = node | ||
incoming <- &copied | ||
} | ||
|
||
// translate pulls stuff from etcd, convert, and push out the outgoing channel. Meant to be | ||
// called as a goroutine. | ||
func (w *etcdWatcher) translate() { | ||
defer close(w.outgoing) | ||
|
@@ -385,27 +463,31 @@ func (w *etcdWatcher) translate() { | |
func (w *etcdWatcher) sendResult(res *etcd.Response) { | ||
var action watch.EventType | ||
var data []byte | ||
var index uint64 | ||
switch res.Action { | ||
case "create": | ||
if res.Node == nil { | ||
glog.Errorf("unexpected nil node: %#v", res) | ||
return | ||
} | ||
data = []byte(res.Node.Value) | ||
index = res.Node.ModifiedIndex | ||
action = watch.Added | ||
case "set": | ||
case "set", "compareAndSwap", "get": | ||
if res.Node == nil { | ||
glog.Errorf("unexpected nil node: %#v", res) | ||
return | ||
} | ||
data = []byte(res.Node.Value) | ||
index = res.Node.ModifiedIndex | ||
action = watch.Modified | ||
case "delete": | ||
if res.PrevNode == nil { | ||
glog.Errorf("unexpected nil prev node: %#v", res) | ||
return | ||
} | ||
data = []byte(res.PrevNode.Value) | ||
index = res.PrevNode.ModifiedIndex | ||
action = watch.Deleted | ||
default: | ||
glog.Errorf("unknown action: %v", res.Action) | ||
|
@@ -419,6 +501,25 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) { | |
w.Stop() | ||
return | ||
} | ||
|
||
// ensure resource version is set on the object we load from etcd | ||
if w.versioner != nil { | ||
if err := w.versioner.SetResourceVersion(obj, index); err != nil { | ||
glog.Errorf("failure to version api object (%d) %#v: %v", index, obj, err) | ||
} | ||
} | ||
|
||
// perform any necessary transformation | ||
if w.transform != nil { | ||
obj, err = w.transform(obj) | ||
if err != nil { | ||
glog.Errorf("failure to transform api object %#v: %v", obj, err) | ||
// TODO: expose an error through watch.Interface? | ||
w.Stop() | ||
return | ||
} | ||
} | ||
|
||
w.emit(watch.Event{ | ||
Type: action, | ||
Object: obj, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to be a pain - what is the transform function signature, and what is it supposed to do? Maybe a (textual) example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woops, signature is just below. Still a bit more on the intended use would help