Skip to content

Commit

Permalink
Make scheduler not miss deletion events even in the case of a resync.
Browse files Browse the repository at this point in the history
  • Loading branch information
lavalamp committed Apr 9, 2015
1 parent 40c6e97 commit 54be979
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 41 deletions.
3 changes: 3 additions & 0 deletions pkg/client/cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type ExplicitKey string
// The key uses the format <namespace>/<name> unless <namespace> is empty, then
// it's just <name>.
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
Expand Down
80 changes: 42 additions & 38 deletions plugin/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
Expand All @@ -51,21 +52,52 @@ type ConfigFactory struct {
// a means to list all services
ServiceLister *cache.StoreToServiceLister

modeler scheduler.SystemModeler
// Close this to stop all reflectors
StopEverything chan struct{}

scheduledPodPopulator *framework.Controller
modeler scheduler.SystemModeler
}

// Initializes the factory.
func NewConfigFactory(client *client.Client) *ConfigFactory {
c := &ConfigFactory{
Client: client,
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
ScheduledPodLister: &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
ScheduledPodLister: &cache.StoreToPodLister{},
NodeLister: &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
ServiceLister: &cache.StoreToServiceLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
StopEverything: make(chan struct{}),
}
modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{c.PodQueue}, c.ScheduledPodLister)
c.modeler = modeler
c.PodLister = modeler.PodLister()

// On add/delete to the scheduled pods, remove from the assumed pods.
// We construct this here instead of in CreateFromKeys because
// ScheduledPodLister is something we provide to plug in functions that
// they may need to call.
c.ScheduledPodLister.Store, c.scheduledPodPopulator = framework.NewInformer(
c.createAssignedPodLW(),
&api.Pod{},
0,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if pod, ok := obj.(*api.Pod); ok {
c.modeler.ForgetPod(pod)
}
},
DeleteFunc: func(obj interface{}) {
switch t := obj.(type) {
case *api.Pod:
c.modeler.ForgetPod(t)
case cache.DeletedFinalStateUnknown:
c.modeler.ForgetPodByKey(t.Key)
}
},
},
)

return c
}

Expand Down Expand Up @@ -109,21 +141,6 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
return f.CreateFromKeys(predicateKeys, priorityKeys)
}

// ReflectorDeletionHook passes all operations through to Store, but calls
// OnDelete in a goroutine if there is a deletion.
type ReflectorDeletionHook struct {
cache.Store
OnDelete func(obj interface{})
}

func (r ReflectorDeletionHook) Delete(obj interface{}) error {
go func() {
defer util.HandleCrash()
r.OnDelete(obj)
}()
return r.Store.Delete(obj)
}

// Creates a scheduler from a set of registered fit predicate keys and priority keys.
func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSet) (*scheduler.Config, error) {
glog.V(2).Infof("creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys)
Expand All @@ -144,39 +161,25 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
}

// Watch and queue pods that need scheduling.
cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).Run()

// Pass through all events to the scheduled pod store, but on a deletion,
// also remove from the assumed pods.
assumedPodDeleter := ReflectorDeletionHook{
Store: f.ScheduledPodLister.Store,
OnDelete: func(obj interface{}) {
if pod, ok := obj.(*api.Pod); ok {
f.modeler.LockedAction(func() {
f.modeler.ForgetPod(pod)
})
}
},
}
cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).RunUntil(f.StopEverything)

// Watch and cache all running pods. Scheduler needs to find all pods
// so it knows where it's safe to place a pod. Cache this locally.
cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, assumedPodDeleter, 0).Run()
// Begin populating scheduled pods.
f.scheduledPodPopulator.Run(f.StopEverything)

// Watch minions.
// Minions may be listed frequently, so provide a local up-to-date cache.
if false {
// Disable this code until minions support watches. Note when this code is enabled,
// we need to make sure minion ListWatcher has proper FieldSelector.
cache.NewReflector(f.createMinionLW(), &api.Node{}, f.NodeLister.Store, 0).Run()
cache.NewReflector(f.createMinionLW(), &api.Node{}, f.NodeLister.Store, 10*time.Second).RunUntil(f.StopEverything)
} else {
cache.NewPoller(f.pollMinions, 10*time.Second, f.NodeLister.Store).Run()
cache.NewPoller(f.pollMinions, 10*time.Second, f.NodeLister.Store).RunUntil(f.StopEverything)
}

// Watch and cache all service objects. Scheduler needs to find all pods
// created by the same service, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).Run()
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).RunUntil(f.StopEverything)

r := rand.New(rand.NewSource(time.Now().UnixNano()))

Expand All @@ -200,7 +203,8 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
glog.V(2).Infof("About to try and schedule pod %v", pod.Name)
return pod
},
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
StopEverything: f.StopEverything,
}, nil
}

Expand Down
16 changes: 14 additions & 2 deletions plugin/pkg/scheduler/modeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ func (a *actionLocker) LockedAction(do func()) {

// FakeModeler implements the SystemModeler interface.
type FakeModeler struct {
AssumePodFunc func(pod *api.Pod)
ForgetPodFunc func(pod *api.Pod)
AssumePodFunc func(pod *api.Pod)
ForgetPodFunc func(pod *api.Pod)
ForgetPodByKeyFunc func(key string)
actionLocker
}

Expand All @@ -76,6 +77,13 @@ func (f *FakeModeler) ForgetPod(pod *api.Pod) {
}
}

// ForgetPodByKey calls the function variable if it is not nil.
func (f *FakeModeler) ForgetPodByKey(key string) {
if f.ForgetPodFunc != nil {
f.ForgetPodByKeyFunc(key)
}
}

// SimpleModeler implements the SystemModeler interface with a timed pod cache.
type SimpleModeler struct {
queuedPods ExtendedPodLister
Expand Down Expand Up @@ -110,6 +118,10 @@ func (s *SimpleModeler) ForgetPod(pod *api.Pod) {
s.assumedPods.Delete(pod)
}

func (s *SimpleModeler) ForgetPodByKey(key string) {
s.assumedPods.Delete(cache.ExplicitKey(key))
}

// Extract names for readable logging.
func podNames(pods []api.Pod) []string {
out := make([]string, len(pods))
Expand Down
6 changes: 5 additions & 1 deletion plugin/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type SystemModeler interface {
// show the absence of the given pod if the pod is in the scheduled
// pods list!)
ForgetPod(pod *api.Pod)
ForgetPodByKey(key string)

// For serializing calls to Assume/ForgetPod: imagine you want to add
// a pod iff a bind succeeds, but also remove a pod if it is deleted.
Expand Down Expand Up @@ -85,6 +86,9 @@ type Config struct {

// Recorder is the EventRecorder to use
Recorder record.EventRecorder

// Close this to shut down the scheduler.
StopEverything chan struct{}
}

// New returns a new scheduler.
Expand All @@ -98,7 +102,7 @@ func New(c *Config) *Scheduler {

// Run begins watching and scheduling. It starts a goroutine and returns immediately.
func (s *Scheduler) Run() {
go util.Forever(s.scheduleOne, 0)
go util.Until(s.scheduleOne, 0, s.config.StopEverything)
}

func (s *Scheduler) scheduleOne() {
Expand Down

0 comments on commit 54be979

Please sign in to comment.