-
Notifications
You must be signed in to change notification settings - Fork 40k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add to controller framework; use in scheduler #6546
Changes from all commits
065a8fa
880f922
5f7715f
6835318
395d696
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,16 +74,17 @@ func New(c *Config) *Controller { | |
|
||
// Run begins processing items, and will continue until a value is sent down stopCh. | ||
// It's an error to call Run more than once. | ||
// Run does not block. | ||
// Run blocks; call via go. | ||
func (c *Controller) Run(stopCh <-chan struct{}) { | ||
defer util.HandleCrash() | ||
cache.NewReflector( | ||
c.config.ListerWatcher, | ||
c.config.ObjectType, | ||
c.config.Queue, | ||
c.config.FullResyncPeriod, | ||
).RunUntil(stopCh) | ||
|
||
go util.Until(c.processLoop, time.Second, stopCh) | ||
util.Until(c.processLoop, time.Second, stopCh) | ||
} | ||
|
||
// processLoop drains the work queue. | ||
|
@@ -102,3 +103,126 @@ func (c *Controller) processLoop() { | |
} | ||
} | ||
} | ||
|
||
// ResourceEventHandler can handle notifications for events that happen to a | ||
// resource. The events are informational only, so you can't return an | ||
// error. | ||
// * OnAdd is called when an object is added. | ||
// * OnUpdate is called when an object is modified. Note that oldObj is the | ||
// last known state of the object-- it is possible that several changes | ||
// were combined together, so you can't use this to see every single | ||
// change. OnUpdate is also called when a re-list happens, and it will | ||
// get called even if nothing changed. This is useful for periodically | ||
// evaluating or syncing something. | ||
// * OnDelete will get the final state of the item if it is known, otherwise | ||
// it will get an object of type cache.DeletedFinalStateUnknown. | ||
type ResourceEventHandler interface { | ||
OnAdd(obj interface{}) | ||
OnUpdate(oldObj, newObj interface{}) | ||
OnDelete(obj interface{}) | ||
} | ||
|
||
// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or | ||
// as few of the notification functions as you want while still implementing | ||
// ResourceEventHandler. | ||
type ResourceEventHandlerFuncs struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't understand the part about "as many notification functions as you want." It looks to me like you specify at most one Add, one Update, and one Delete function? |
||
AddFunc func(obj interface{}) | ||
UpdateFunc func(oldObj, newObj interface{}) | ||
DeleteFunc func(obj interface{}) | ||
} | ||
|
||
// OnAdd calls AddFunc if it's not nil. | ||
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) { | ||
if r.AddFunc != nil { | ||
r.AddFunc(obj) | ||
} | ||
} | ||
|
||
// OnUpdate calls UpdateFunc if it's not nil. | ||
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) { | ||
if r.UpdateFunc != nil { | ||
r.UpdateFunc(oldObj, newObj) | ||
} | ||
} | ||
|
||
// OnDelete calls DeleteFunc if it's not nil. | ||
func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) { | ||
if r.DeleteFunc != nil { | ||
r.DeleteFunc(obj) | ||
} | ||
} | ||
|
||
// DeletionHandlingMetaNamespaceKeyFunc checks for | ||
// cache.DeletedFinalStateUnknown objects before calling | ||
// cache.MetaNamespaceKeyFunc. | ||
func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) { | ||
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { | ||
return d.Key, nil | ||
} | ||
return cache.MetaNamespaceKeyFunc(obj) | ||
} | ||
|
||
// NewInformer returns a cache.Store and a controller for populating the store | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a great explanation of the return values but can you also explain the arguments? |
||
// while also providing event notifications. You should only used the returned | ||
// cache.Store for Get/List operations; Add/Modify/Deletes will cause the event | ||
// notifications to be faulty. | ||
// | ||
// Parameters: | ||
// * lw is list and watch functions for the source of the resource you want to | ||
// be informed of. | ||
// * objType is an object of the type that you expect to receieve. | ||
// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand "you will get OnUpdate calls, even if nothing changed" Or more to the point, I guess I understand it but it seems sub-optimal, especially since you don't pass both the old and new object to OnUpdate(), so the function receiving the callback may not know whether it changed (or do you assume the function receiving the callback always has a copy of the old version?). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will change OnUpdate to pass the old object as we knew it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great, thanks! |
||
// calls, even if nothing changed). Otherwise, re-list will be delayed as | ||
// long as possible (until the upstream source closes the watch or times out, | ||
// or you stop the controller). | ||
// * h is the object you want notifications sent to. | ||
// | ||
func NewInformer( | ||
lw cache.ListerWatcher, | ||
objType runtime.Object, | ||
resyncPeriod time.Duration, | ||
h ResourceEventHandler, | ||
) (cache.Store, *Controller) { | ||
// This will hold the client state, as we know it. | ||
clientState := cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc) | ||
|
||
// This will hold incoming changes. Note how we pass clientState in as a | ||
// KeyLister, that way resync operations will result in the correct set | ||
// of update/delete deltas. | ||
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState) | ||
|
||
cfg := &Config{ | ||
Queue: fifo, | ||
ListerWatcher: lw, | ||
ObjectType: objType, | ||
FullResyncPeriod: resyncPeriod, | ||
RetryOnError: false, | ||
|
||
Process: func(obj interface{}) error { | ||
// from oldest to newest | ||
for _, d := range obj.(cache.Deltas) { | ||
switch d.Type { | ||
case cache.Sync, cache.Added, cache.Updated: | ||
if old, exists, err := clientState.Get(d.Object); err == nil && exists { | ||
if err := clientState.Update(d.Object); err != nil { | ||
return err | ||
} | ||
h.OnUpdate(old, d.Object) | ||
} else { | ||
if err := clientState.Add(d.Object); err != nil { | ||
return err | ||
} | ||
h.OnAdd(d.Object) | ||
} | ||
case cache.Deleted: | ||
if err := clientState.Delete(d.Object); err != nil { | ||
return err | ||
} | ||
h.OnDelete(d.Object) | ||
} | ||
} | ||
return nil | ||
}, | ||
} | ||
return clientState, New(cfg) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a followup PR, it might be good to add some detail to this comment explaining under what circumstances the final state would be known and when it wouldn't. That will help a client know when it can depend on getting this.