Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add to controller framework; use in scheduler #6546

Merged
merged 5 commits into from
Apr 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,7 @@ func runSchedulerNoPhantomPodsTest(client *client.Client) {
}

// Delete a pod to free up room.
glog.Infof("Deleting pod %v", bar.Name)
err = client.Pods(api.NamespaceDefault).Delete(bar.Name)
if err != nil {
glog.Fatalf("FAILED: couldn't delete pod %q: %v", bar.Name, err)
Expand Down
7 changes: 7 additions & 0 deletions pkg/client/cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,18 @@ func (k KeyError) Error() string {
return fmt.Sprintf("couldn't create key for object %+v: %v", k.Obj, k.Err)
}

// ExplicitKey can be passed to MetaNamespaceKeyFunc if you have the key for
// the object but not the object itself.
type ExplicitKey string

// MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make
// keys for API objects which implement meta.Interface.
// The key uses the format <namespace>/<name> unless <namespace> is empty, then
// it's just <name>.
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
Expand Down
128 changes: 126 additions & 2 deletions pkg/controller/framework/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,17 @@ func New(c *Config) *Controller {

// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run does not block.
// Run blocks; call via go.
func (c *Controller) Run(stopCh <-chan struct{}) {
defer util.HandleCrash()
cache.NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
).RunUntil(stopCh)

go util.Until(c.processLoop, time.Second, stopCh)
util.Until(c.processLoop, time.Second, stopCh)
}

// processLoop drains the work queue.
Expand All @@ -102,3 +103,126 @@ func (c *Controller) processLoop() {
}
}
}

// ResourceEventHandler can handle notifications for events that happen to a
// resource. The events are informational only, so you can't return an
// error.
// * OnAdd is called when an object is added.
// * OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// * OnDelete will get the final state of the item if it is known, otherwise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a followup PR, it might be good to add some detail to this comment explaining under what circumstances the final state would be known and when it wouldn't. That will help a client know when it can depend on getting this.

// it will get an object of type cache.DeletedFinalStateUnknown.
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}

// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't understand the part about "as many notification functions as you want." It looks to me like you specify at most one Add, one Update, and one Delete function?

AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}

// OnAdd calls AddFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
if r.AddFunc != nil {
r.AddFunc(obj)
}
}

// OnUpdate calls UpdateFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
if r.UpdateFunc != nil {
r.UpdateFunc(oldObj, newObj)
}
}

// OnDelete calls DeleteFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
if r.DeleteFunc != nil {
r.DeleteFunc(obj)
}
}

// DeletionHandlingMetaNamespaceKeyFunc checks for
// cache.DeletedFinalStateUnknown objects before calling
// cache.MetaNamespaceKeyFunc.
func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
return d.Key, nil
}
return cache.MetaNamespaceKeyFunc(obj)
}

// NewInformer returns a cache.Store and a controller for populating the store
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great explanation of the return values but can you also explain the arguments?

// while also providing event notifications. You should only used the returned
// cache.Store for Get/List operations; Add/Modify/Deletes will cause the event
// notifications to be faulty.
//
// Parameters:
// * lw is list and watch functions for the source of the resource you want to
// be informed of.
// * objType is an object of the type that you expect to receieve.
// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand "you will get OnUpdate calls, even if nothing changed" Or more to the point, I guess I understand it but it seems sub-optimal, especially since you don't pass both the old and new object to OnUpdate(), so the function receiving the callback may not know whether it changed (or do you assume the function receiving the callback always has a copy of the old version?).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change OnUpdate to pass the old object as we knew it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks!

// calls, even if nothing changed). Otherwise, re-list will be delayed as
// long as possible (until the upstream source closes the watch or times out,
// or you stop the controller).
// * h is the object you want notifications sent to.
//
func NewInformer(
lw cache.ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
) (cache.Store, *Controller) {
// This will hold the client state, as we know it.
clientState := cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc)

// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState)

cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,

Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(cache.Deltas) {
switch d.Type {
case cache.Sync, cache.Added, cache.Updated:
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object)
} else {
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object)
}
case cache.Deleted:
if err := clientState.Delete(d.Object); err != nil {
return err
}
h.OnDelete(d.Object)
}
}
return nil
},
}
return clientState, New(cfg)
}
Loading