Skip to content

Commit

Permalink
Make pkg/proxy/config more like pkg/kubelet/config
Browse files Browse the repository at this point in the history
Split SourceAPI into two subobjects.

Parallel structure for endpoints, services will allow
changing to use generic code in pkg/client/cache/reflector.go.

Rename some funcs to be more like pkg/client/cache.
  • Loading branch information
erictune committed Jan 12, 2015
1 parent fa152ab commit 2958002
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 58 deletions.
95 changes: 59 additions & 36 deletions pkg/proxy/config/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/golang/glog"
)

// TODO: to use Reflector, need to change the ServicesWatcher to a generic ListerWatcher.
// ServicesWatcher is capable of listing and watching for changes to services across ALL namespaces
type ServicesWatcher interface {
List(label labels.Selector) (*api.ServiceList, error)
Expand All @@ -43,55 +44,77 @@ type EndpointsWatcher interface {
// SourceAPI implements a configuration source for services and endpoints that
// uses the client watch API to efficiently detect changes.
type SourceAPI struct {
servicesWatcher ServicesWatcher
endpointsWatcher EndpointsWatcher
s servicesReflector
e endpointsReflector
}

services chan<- ServiceUpdate
endpoints chan<- EndpointsUpdate
type servicesReflector struct {
watcher ServicesWatcher
services chan<- ServiceUpdate
resourceVersion string
waitDuration time.Duration
reconnectDuration time.Duration
}

type endpointsReflector struct {
watcher EndpointsWatcher
endpoints chan<- EndpointsUpdate
resourceVersion string
waitDuration time.Duration
reconnectDuration time.Duration
}

// NewSourceAPI creates a config source that watches for changes to the services and endpoints.
func NewSourceAPI(servicesWatcher ServicesWatcher, endpointsWatcher EndpointsWatcher, period time.Duration, services chan<- ServiceUpdate, endpoints chan<- EndpointsUpdate) *SourceAPI {
config := &SourceAPI{
servicesWatcher: servicesWatcher,
endpointsWatcher: endpointsWatcher,
services: services,
endpoints: endpoints,

waitDuration: period,
// prevent hot loops if the server starts to misbehave
reconnectDuration: time.Second * 1,
s: servicesReflector{
watcher: servicesWatcher,
services: services,
resourceVersion: "",
waitDuration: period,
// prevent hot loops if the server starts to misbehave
reconnectDuration: time.Second * 1,
},
e: endpointsReflector{
watcher: endpointsWatcher,
endpoints: endpoints,
resourceVersion: "",
waitDuration: period,
// prevent hot loops if the server starts to misbehave
reconnectDuration: time.Second * 1,
},
}
serviceVersion := ""
go util.Forever(func() {
config.runServices(&serviceVersion)
time.Sleep(wait.Jitter(config.reconnectDuration, 0.0))
}, period)
endpointVersion := ""
go util.Forever(func() {
config.runEndpoints(&endpointVersion)
time.Sleep(wait.Jitter(config.reconnectDuration, 0.0))
}, period)
go util.Forever(func() { config.s.listAndWatch() }, period)
go util.Forever(func() { config.e.listAndWatch() }, period)
return config
}

// runServices loops forever looking for changes to services.
func (s *SourceAPI) runServices(resourceVersion *string) {
func (r *servicesReflector) listAndWatch() {
r.run(&r.resourceVersion)
time.Sleep(wait.Jitter(r.reconnectDuration, 0.0))
}

func (r *endpointsReflector) listAndWatch() {
r.run(&r.resourceVersion)
time.Sleep(wait.Jitter(r.reconnectDuration, 0.0))
}

// run loops forever looking for changes to services.
func (s *servicesReflector) run(resourceVersion *string) {
if len(*resourceVersion) == 0 {
services, err := s.servicesWatcher.List(labels.Everything())
services, err := s.watcher.List(labels.Everything())
if err != nil {
glog.Errorf("Unable to load services: %v", err)
// TODO: reconcile with pkg/client/cache which doesn't use reflector.
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
return
}
*resourceVersion = services.ResourceVersion
// TODO: replace with code to update the
s.services <- ServiceUpdate{Op: SET, Services: services.Items}
}

watcher, err := s.servicesWatcher.Watch(labels.Everything(), labels.Everything(), *resourceVersion)
watcher, err := s.watcher.Watch(labels.Everything(), labels.Everything(), *resourceVersion)
if err != nil {
glog.Errorf("Unable to watch for services changes: %v", err)
if !client.IsTimeout(err) {
Expand All @@ -104,11 +127,11 @@ func (s *SourceAPI) runServices(resourceVersion *string) {
defer watcher.Stop()

ch := watcher.ResultChan()
handleServicesWatch(resourceVersion, ch, s.services)
s.watchHandler(resourceVersion, ch, s.services)
}

// handleServicesWatch loops over an event channel and delivers config changes to an update channel.
func handleServicesWatch(resourceVersion *string, ch <-chan watch.Event, updates chan<- ServiceUpdate) {
// watchHandler loops over an event channel and delivers config changes to an update channel.
func (s *servicesReflector) watchHandler(resourceVersion *string, ch <-chan watch.Event, updates chan<- ServiceUpdate) {
for {
select {
case event, ok := <-ch:
Expand Down Expand Up @@ -146,10 +169,10 @@ func handleServicesWatch(resourceVersion *string, ch <-chan watch.Event, updates
}
}

// runEndpoints loops forever looking for changes to endpoints.
func (s *SourceAPI) runEndpoints(resourceVersion *string) {
// run loops forever looking for changes to endpoints.
func (s *endpointsReflector) run(resourceVersion *string) {
if len(*resourceVersion) == 0 {
endpoints, err := s.endpointsWatcher.List(labels.Everything())
endpoints, err := s.watcher.List(labels.Everything())
if err != nil {
glog.Errorf("Unable to load endpoints: %v", err)
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
Expand All @@ -159,7 +182,7 @@ func (s *SourceAPI) runEndpoints(resourceVersion *string) {
s.endpoints <- EndpointsUpdate{Op: SET, Endpoints: endpoints.Items}
}

watcher, err := s.endpointsWatcher.Watch(labels.Everything(), labels.Everything(), *resourceVersion)
watcher, err := s.watcher.Watch(labels.Everything(), labels.Everything(), *resourceVersion)
if err != nil {
glog.Errorf("Unable to watch for endpoints changes: %v", err)
if !client.IsTimeout(err) {
Expand All @@ -173,11 +196,11 @@ func (s *SourceAPI) runEndpoints(resourceVersion *string) {
defer watcher.Stop()

ch := watcher.ResultChan()
handleEndpointsWatch(resourceVersion, ch, s.endpoints)
s.watchHandler(resourceVersion, ch, s.endpoints)
}

// handleEndpointsWatch loops over an event channel and delivers config changes to an update channel.
func handleEndpointsWatch(resourceVersion *string, ch <-chan watch.Event, updates chan<- EndpointsUpdate) {
// watchHandler loops over an event channel and delivers config changes to an update channel.
func (s *endpointsReflector) watchHandler(resourceVersion *string, ch <-chan watch.Event, updates chan<- EndpointsUpdate) {
for {
select {
case event, ok := <-ch:
Expand Down
64 changes: 42 additions & 22 deletions pkg/proxy/config/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ func TestServices(t *testing.T) {
fakeWatch := watch.NewFake()
fakeClient := &client.Fake{Watch: fakeWatch}
services := make(chan ServiceUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}}
resourceVersion := "1"
go func() {
// called twice
source.runServices(&resourceVersion)
source.runServices(&resourceVersion)
source.s.run(&resourceVersion)
source.s.run(&resourceVersion)
}()

// test adding a service to the watch
Expand Down Expand Up @@ -84,11 +86,13 @@ func TestServicesFromZero(t *testing.T) {
},
}
services := make(chan ServiceUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}}
resourceVersion := ""
ch := make(chan struct{})
go func() {
source.runServices(&resourceVersion)
source.s.run(&resourceVersion)
close(ch)
}()

Expand All @@ -112,11 +116,13 @@ func TestServicesFromZero(t *testing.T) {
func TestServicesError(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("test")}
services := make(chan ServiceUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}}
resourceVersion := "1"
ch := make(chan struct{})
go func() {
source.runServices(&resourceVersion)
source.s.run(&resourceVersion)
close(ch)
}()

Expand All @@ -133,11 +139,13 @@ func TestServicesError(t *testing.T) {
func TestServicesErrorTimeout(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("use of closed network connection")}
services := make(chan ServiceUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}}
resourceVersion := "1"
ch := make(chan struct{})
go func() {
source.runServices(&resourceVersion)
source.s.run(&resourceVersion)
close(ch)
}()

Expand All @@ -154,11 +162,13 @@ func TestServicesErrorTimeout(t *testing.T) {
func TestServicesFromZeroError(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("test")}
services := make(chan ServiceUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}}
resourceVersion := ""
ch := make(chan struct{})
go func() {
source.runServices(&resourceVersion)
source.s.run(&resourceVersion)
close(ch)
}()

Expand All @@ -178,12 +188,14 @@ func TestEndpoints(t *testing.T) {
fakeWatch := watch.NewFake()
fakeClient := &client.Fake{Watch: fakeWatch}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}}
resourceVersion := "1"
go func() {
// called twice
source.runEndpoints(&resourceVersion)
source.runEndpoints(&resourceVersion)
source.e.run(&resourceVersion)
source.e.run(&resourceVersion)
}()

// test adding an endpoint to the watch
Expand Down Expand Up @@ -230,11 +242,13 @@ func TestEndpointsFromZero(t *testing.T) {
},
}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}}
resourceVersion := ""
ch := make(chan struct{})
go func() {
source.runEndpoints(&resourceVersion)
source.e.run(&resourceVersion)
close(ch)
}()

Expand All @@ -258,11 +272,13 @@ func TestEndpointsFromZero(t *testing.T) {
func TestEndpointsError(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("test")}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}}
resourceVersion := "1"
ch := make(chan struct{})
go func() {
source.runEndpoints(&resourceVersion)
source.e.run(&resourceVersion)
close(ch)
}()

Expand All @@ -279,11 +295,13 @@ func TestEndpointsError(t *testing.T) {
func TestEndpointsErrorTimeout(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("use of closed network connection")}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}}
resourceVersion := "1"
ch := make(chan struct{})
go func() {
source.runEndpoints(&resourceVersion)
source.e.run(&resourceVersion)
close(ch)
}()

Expand All @@ -300,11 +318,13 @@ func TestEndpointsErrorTimeout(t *testing.T) {
func TestEndpointsFromZeroError(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("test")}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}}
resourceVersion := ""
ch := make(chan struct{})
go func() {
source.runEndpoints(&resourceVersion)
source.e.run(&resourceVersion)
close(ch)
}()

Expand Down

0 comments on commit 2958002

Please sign in to comment.