Skip to content

Commit

Permalink
Merge pull request #6546 from lavalamp/fix
Browse files Browse the repository at this point in the history
Add to controller framework; use in scheduler
  • Loading branch information
lavalamp committed Apr 10, 2015
2 parents 52d7395 + 395d696 commit 66d55e0
Show file tree
Hide file tree
Showing 9 changed files with 532 additions and 62 deletions.
1 change: 1 addition & 0 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,7 @@ func runSchedulerNoPhantomPodsTest(client *client.Client) {
}

// Delete a pod to free up room.
glog.Infof("Deleting pod %v", bar.Name)
err = client.Pods(api.NamespaceDefault).Delete(bar.Name)
if err != nil {
glog.Fatalf("FAILED: couldn't delete pod %q: %v", bar.Name, err)
Expand Down
7 changes: 7 additions & 0 deletions pkg/client/cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,18 @@ func (k KeyError) Error() string {
return fmt.Sprintf("couldn't create key for object %+v: %v", k.Obj, k.Err)
}

// ExplicitKey can be passed to MetaNamespaceKeyFunc if you have the key for
// the object but not the object itself.
type ExplicitKey string

// MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make
// keys for API objects which implement meta.Interface.
// The key uses the format <namespace>/<name> unless <namespace> is empty, then
// it's just <name>.
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
Expand Down
128 changes: 126 additions & 2 deletions pkg/controller/framework/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,17 @@ func New(c *Config) *Controller {

// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run does not block.
// Run blocks; call via go.
func (c *Controller) Run(stopCh <-chan struct{}) {
defer util.HandleCrash()
cache.NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
).RunUntil(stopCh)

go util.Until(c.processLoop, time.Second, stopCh)
util.Until(c.processLoop, time.Second, stopCh)
}

// processLoop drains the work queue.
Expand All @@ -102,3 +103,126 @@ func (c *Controller) processLoop() {
}
}
}

// ResourceEventHandler can handle notifications for events that happen to a
// resource. The events are informational only, so you can't return an
// error.
// * OnAdd is called when an object is added.
// * OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// * OnDelete will get the final state of the item if it is known, otherwise
// it will get an object of type cache.DeletedFinalStateUnknown.
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}

// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}

// OnAdd calls AddFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
if r.AddFunc != nil {
r.AddFunc(obj)
}
}

// OnUpdate calls UpdateFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
if r.UpdateFunc != nil {
r.UpdateFunc(oldObj, newObj)
}
}

// OnDelete calls DeleteFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
if r.DeleteFunc != nil {
r.DeleteFunc(obj)
}
}

// DeletionHandlingMetaNamespaceKeyFunc checks for
// cache.DeletedFinalStateUnknown objects before calling
// cache.MetaNamespaceKeyFunc.
func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
return d.Key, nil
}
return cache.MetaNamespaceKeyFunc(obj)
}

// NewInformer returns a cache.Store and a controller for populating the store
// while also providing event notifications. You should only used the returned
// cache.Store for Get/List operations; Add/Modify/Deletes will cause the event
// notifications to be faulty.
//
// Parameters:
// * lw is list and watch functions for the source of the resource you want to
// be informed of.
// * objType is an object of the type that you expect to receieve.
// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
// calls, even if nothing changed). Otherwise, re-list will be delayed as
// long as possible (until the upstream source closes the watch or times out,
// or you stop the controller).
// * h is the object you want notifications sent to.
//
func NewInformer(
lw cache.ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
) (cache.Store, *Controller) {
// This will hold the client state, as we know it.
clientState := cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc)

// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState)

cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,

Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(cache.Deltas) {
switch d.Type {
case cache.Sync, cache.Added, cache.Updated:
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object)
} else {
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object)
}
case cache.Deleted:
if err := clientState.Delete(d.Object); err != nil {
return err
}
h.OnDelete(d.Object)
}
}
return nil
},
}
return clientState, New(cfg)
}
Loading

0 comments on commit 66d55e0

Please sign in to comment.