Skip to content

Commit

Permalink
add occasional polling to reflector
Browse files Browse the repository at this point in the history
  • Loading branch information
lavalamp committed Feb 28, 2015
1 parent 156c505 commit 554b1c8
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 47 deletions.
100 changes: 65 additions & 35 deletions pkg/client/cache/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,23 @@ type Reflector struct {
listerWatcher ListerWatcher
// period controls timing between one watch ending and
// the beginning of the next one.
period time.Duration
period time.Duration
resyncPeriod time.Duration
}

// NewReflector creates a new Reflector object which will keep the given store up to
// date with the server's contents for the given resource. Reflector promises to
// only put things in the store that have the type of expectedType.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store) *Reflector {
// If resyncPeriod is non-zero, then lists will be executed after every resyncPeriod,
// so that you can use reflectors to periodically process everything as well as
// incrementally processing the things that change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
r := &Reflector{
listerWatcher: lw,
store: store,
expectedType: reflect.TypeOf(expectedType),
period: time.Second,
resyncPeriod: resyncPeriod,
}
return r
}
Expand All @@ -77,8 +82,25 @@ func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
go util.Until(func() { r.listAndWatch() }, r.period, stopCh)
}

var (
// nothing will ever be sent down this channel
neverExitWatch <-chan time.Time = make(chan time.Time)

// Used to indicate that watching stopped so that a resync could happen.
errorResyncRequested = errors.New("resync channel fired")
)

// resyncChan returns a channel which will receive something when a resync is required.
func (r *Reflector) resyncChan() <-chan time.Time {
if r.resyncPeriod == 0 {
return neverExitWatch
}
return time.After(r.resyncPeriod)
}

func (r *Reflector) listAndWatch() {
var resourceVersion string
exitWatch := r.resyncChan()

list, err := r.listerWatcher.List()
if err != nil {
Expand Down Expand Up @@ -114,8 +136,10 @@ func (r *Reflector) listAndWatch() {
}
return
}
if err := r.watchHandler(w, &resourceVersion); err != nil {
glog.Errorf("watch of %v ended with error: %v", r.expectedType, err)
if err := r.watchHandler(w, &resourceVersion, exitWatch); err != nil {
if err != errorResyncRequested {
glog.Errorf("watch of %v ended with error: %v", r.expectedType, err)
}
return
}
}
Expand All @@ -132,41 +156,47 @@ func (r *Reflector) syncWith(items []runtime.Object) error {
}

// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string) error {
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, exitWatch <-chan time.Time) error {
start := time.Now()
eventCount := 0
loop:
for {
event, ok := <-w.ResultChan()
if !ok {
break
}
if event.Type == watch.Error {
return apierrs.FromObject(event.Object)
}
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
glog.Errorf("expected type %v, but watch event object had type %v", e, a)
continue
}
meta, err := meta.Accessor(event.Object)
if err != nil {
glog.Errorf("unable to understand watch event %#v", event)
continue
}
switch event.Type {
case watch.Added:
r.store.Add(event.Object)
case watch.Modified:
r.store.Update(event.Object)
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
r.store.Delete(event.Object)
default:
glog.Errorf("unable to understand watch event %#v", event)
select {
case <-exitWatch:
w.Stop()
return errorResyncRequested
case event, ok := <-w.ResultChan():
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrs.FromObject(event.Object)
}
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
glog.Errorf("expected type %v, but watch event object had type %v", e, a)
continue
}
meta, err := meta.Accessor(event.Object)
if err != nil {
glog.Errorf("unable to understand watch event %#v", event)
continue
}
switch event.Type {
case watch.Added:
r.store.Add(event.Object)
case watch.Modified:
r.store.Update(event.Object)
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
r.store.Delete(event.Object)
default:
glog.Errorf("unable to understand watch event %#v", event)
}
*resourceVersion = meta.ResourceVersion()
eventCount++
}
*resourceVersion = meta.ResourceVersion()
eventCount++
}

watchDuration := time.Now().Sub(start)
Expand Down
38 changes: 32 additions & 6 deletions pkg/client/cache/reflector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strconv"
"testing"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
Expand All @@ -36,23 +37,35 @@ func (t *testLW) Watch(resourceVersion string) (watch.Interface, error) {
return t.WatchFunc(resourceVersion)
}

func TestReflector_resyncChan(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &api.Pod{}, s, time.Millisecond)
a, b := g.resyncChan(), time.After(100*time.Millisecond)
select {
case <-a:
t.Logf("got timeout as expected")
case <-b:
t.Errorf("resyncChan() is at least 99 milliseconds late??")
}
}

func TestReflector_watchHandlerError(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &api.Pod{}, s)
g := NewReflector(&testLW{}, &api.Pod{}, s, 0)
fw := watch.NewFake()
go func() {
fw.Stop()
}()
var resumeRV string
err := g.watchHandler(fw, &resumeRV)
err := g.watchHandler(fw, &resumeRV, neverExitWatch)
if err == nil {
t.Errorf("unexpected non-error")
}
}

func TestReflector_watchHandler(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &api.Pod{}, s)
g := NewReflector(&testLW{}, &api.Pod{}, s, 0)
fw := watch.NewFake()
s.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
s.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}})
Expand All @@ -64,7 +77,7 @@ func TestReflector_watchHandler(t *testing.T) {
fw.Stop()
}()
var resumeRV string
err := g.watchHandler(fw, &resumeRV)
err := g.watchHandler(fw, &resumeRV, neverExitWatch)
if err != nil {
t.Errorf("unexpected error %v", err)
}
Expand Down Expand Up @@ -101,6 +114,19 @@ func TestReflector_watchHandler(t *testing.T) {
}
}

func TestReflector_watchHandlerTimeout(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &api.Pod{}, s, 0)
fw := watch.NewFake()
var resumeRV string
exit := make(chan time.Time, 1)
exit <- time.Now()
err := g.watchHandler(fw, &resumeRV, exit)
if err != errorResyncRequested {
t.Errorf("expected timeout error, but got %q", err)
}
}

func TestReflector_listAndWatch(t *testing.T) {
createdFakes := make(chan *watch.FakeWatcher)

Expand All @@ -125,7 +151,7 @@ func TestReflector_listAndWatch(t *testing.T) {
},
}
s := NewFIFO(MetaNamespaceKeyFunc)
r := NewReflector(lw, &api.Pod{}, s)
r := NewReflector(lw, &api.Pod{}, s, 0)
go r.listAndWatch()

ids := []string{"foo", "bar", "baz", "qux", "zoo"}
Expand Down Expand Up @@ -242,7 +268,7 @@ func TestReflector_listAndWatchWithErrors(t *testing.T) {
return item.list, item.listErr
},
}
r := NewReflector(lw, &api.Pod{}, s)
r := NewReflector(lw, &api.Pod{}, s, 0)
r.listAndWatch()
}
}
2 changes: 1 addition & 1 deletion pkg/kubelet/config/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}
}
updates <- kubelet.PodUpdate{bpods, kubelet.SET, kubelet.ApiserverSource}
}
cache.NewReflector(lw, &api.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)).Run()
cache.NewReflector(lw, &api.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run()
}

func getHostFieldLabel(apiVersion string) string {
Expand Down
7 changes: 6 additions & 1 deletion pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ func NewMainKubelet(

serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
if kubeClient != nil {
cache.NewReflector(cache.NewListWatchFromClient(kubeClient, "services", api.NamespaceAll, labels.Everything()), &api.Service{}, serviceStore).Run()
cache.NewReflector(
cache.NewListWatchFromClient(kubeClient, "services", api.NamespaceAll, labels.Everything()),
&api.Service{},
serviceStore,
0,
).Run()
}
serviceLister := &cache.StoreToServiceLister{serviceStore}

Expand Down
1 change: 1 addition & 0 deletions plugin/pkg/admission/namespace/autoprovision/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func NewProvision(c client.Interface) admission.Interface {
},
&api.Namespace{},
store,
0,
)
reflector.Run()
return &provision{
Expand Down
1 change: 1 addition & 0 deletions plugin/pkg/admission/namespace/exists/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func NewExists(c client.Interface) admission.Interface {
},
&api.Namespace{},
store,
0,
)
reflector.Run()
return &exists{
Expand Down
8 changes: 4 additions & 4 deletions plugin/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,26 +94,26 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
}

// Watch and queue pods that need scheduling.
cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue).Run()
cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).Run()

// Watch and cache all running pods. Scheduler needs to find all pods
// so it knows where it's safe to place a pod. Cache this locally.
cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, f.PodLister.Store).Run()
cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, f.PodLister.Store, 0).Run()

// Watch minions.
// Minions may be listed frequently, so provide a local up-to-date cache.
if false {
// Disable this code until minions support watches. Note when this code is enabled,
// we need to make sure minion ListWatcher has proper FieldSelector.
cache.NewReflector(f.createMinionLW(), &api.Node{}, f.MinionLister.Store).Run()
cache.NewReflector(f.createMinionLW(), &api.Node{}, f.MinionLister.Store, 0).Run()
} else {
cache.NewPoller(f.pollMinions, 10*time.Second, f.MinionLister.Store).Run()
}

// Watch and cache all service objects. Scheduler needs to find all pods
// created by the same service, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store).Run()
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).Run()

r := rand.New(rand.NewSource(time.Now().UnixNano()))

Expand Down

0 comments on commit 554b1c8

Please sign in to comment.