Skip to content

Commit

Permalink
Merge pull request kubernetes#1103 from smarterclayton/get_then_watch
Browse files Browse the repository at this point in the history
Services and Endpoints aren't syncing properly
  • Loading branch information
lavalamp committed Sep 3, 2014
2 parents 6c55682 + 01e6681 commit 0db7989
Show file tree
Hide file tree
Showing 14 changed files with 326 additions and 30 deletions.
1 change: 1 addition & 0 deletions pkg/api/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func init() {
ServerOp{},
ContainerManifestList{},
Endpoints{},
EndpointsList{},
Binding{},
)
}
6 changes: 6 additions & 0 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,12 @@ type Endpoints struct {
Endpoints []string `json:"endpoints,omitempty" yaml:"endpoints,omitempty"`
}

// EndpointsList is a list of endpoints.
type EndpointsList struct {
JSONBase `json:",inline" yaml:",inline"`
Items []Endpoints `json:"items,omitempty" yaml:"items,omitempty"`
}

// Minion is a worker node in Kubernetenes.
// The name of the minion according to etcd is in JSONBase.ID.
type Minion struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/api/v1beta1/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func init() {
ServerOp{},
ContainerManifestList{},
Endpoints{},
EndpointsList{},
Binding{},
)
}
6 changes: 6 additions & 0 deletions pkg/api/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,12 @@ type Endpoints struct {
Endpoints []string `json:"endpoints,omitempty" yaml:"endpoints,omitempty"`
}

// EndpointsList is a list of endpoints.
type EndpointsList struct {
JSONBase `json:",inline" yaml:",inline"`
Items []Endpoints `json:"items,omitempty" yaml:"items,omitempty"`
}

// Minion is a worker node in Kubernetenes.
// The name of the minion according to etcd is in JSONBase.ID.
type Minion struct {
Expand Down
16 changes: 14 additions & 2 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,17 @@ type ReplicationControllerInterface interface {

// ServiceInterface has methods to work with Service resources.
type ServiceInterface interface {
ListServices(selector labels.Selector) (api.ServiceList, error)
GetService(id string) (api.Service, error)
CreateService(api.Service) (api.Service, error)
UpdateService(api.Service) (api.Service, error)
DeleteService(string) error
WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
}

// EndpointsInterface has methods to work with Endpoints resources
type EndpointsInterface interface {
ListEndpoints(selector labels.Selector) (api.EndpointsList, error)
WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
}

Expand Down Expand Up @@ -318,8 +323,9 @@ func (c *Client) WatchReplicationControllers(label, field labels.Selector, resou
Watch()
}

func (c *Client) ListServices(selector labels.Selector) (list api.ServiceList, err error) {
err = c.Get().Path("services").SelectorParam("labels", selector).Do().Into(&list)
// ListServices takes a selector, and returns the list of services that match that selector
func (c *Client) ListServices(selector labels.Selector) (result api.ServiceList, err error) {
err = c.Get().Path("services").SelectorParam("labels", selector).Do().Into(&result)
return
}

Expand Down Expand Up @@ -361,6 +367,12 @@ func (c *Client) WatchServices(label, field labels.Selector, resourceVersion uin
Watch()
}

// ListEndpoints takes a selector, and returns the list of endpoints that match that selector
func (c *Client) ListEndpoints(selector labels.Selector) (result api.EndpointsList, err error) {
err = c.Get().Path("endpoints").SelectorParam("labels", selector).Do().Into(&result)
return
}

// WatchEndpoints returns a watch.Interface that watches the requested endpoints for a service.
func (c *Client) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return c.Get().
Expand Down
25 changes: 19 additions & 6 deletions pkg/client/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ type FakeAction struct {
// implementation. This makes faking out just the method you want to test easier.
type Fake struct {
// Fake by default keeps a simple list of the methods that have been called.
Actions []FakeAction
Pods api.PodList
Ctrl api.ReplicationController
Watch watch.Interface
Actions []FakeAction
Pods api.PodList
Ctrl api.ReplicationController
ServiceList api.ServiceList
EndpointsList api.EndpointsList
Err error
Watch watch.Interface
}

func (c *Fake) ListPods(selector labels.Selector) (api.PodList, error) {
Expand Down Expand Up @@ -93,6 +96,11 @@ func (c *Fake) WatchReplicationControllers(label, field labels.Selector, resourc
return c.Watch, nil
}

func (c *Fake) ListServices(selector labels.Selector) (api.ServiceList, error) {
c.Actions = append(c.Actions, FakeAction{Action: "list-services"})
return c.ServiceList, c.Err
}

func (c *Fake) GetService(name string) (api.Service, error) {
c.Actions = append(c.Actions, FakeAction{Action: "get-service", Value: name})
return api.Service{}, nil
Expand All @@ -115,12 +123,17 @@ func (c *Fake) DeleteService(service string) error {

func (c *Fake) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
c.Actions = append(c.Actions, FakeAction{Action: "watch-services", Value: resourceVersion})
return c.Watch, nil
return c.Watch, c.Err
}

func (c *Fake) ListEndpoints(selector labels.Selector) (api.EndpointsList, error) {
c.Actions = append(c.Actions, FakeAction{Action: "list-endpoints"})
return c.EndpointsList, c.Err
}

func (c *Fake) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
c.Actions = append(c.Actions, FakeAction{Action: "watch-endpoints", Value: resourceVersion})
return c.Watch, nil
return c.Watch, c.Err
}

func (c *Fake) ServerVersion() (*version.Info, error) {
Expand Down
32 changes: 28 additions & 4 deletions pkg/proxy/config/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (

// Watcher is the interface needed to receive changes to services and endpoints.
type Watcher interface {
ListServices(label labels.Selector) (api.ServiceList, error)
ListEndpoints(label labels.Selector) (api.EndpointsList, error)
WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
}
Expand Down Expand Up @@ -70,6 +72,17 @@ func NewSourceAPI(client Watcher, period time.Duration, services chan<- ServiceU

// runServices loops forever looking for changes to services.
func (s *SourceAPI) runServices(resourceVersion *uint64) {
if *resourceVersion == 0 {
services, err := s.client.ListServices(labels.Everything())
if err != nil {
glog.Errorf("Unable to load services: %v", err)
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
return
}
*resourceVersion = services.ResourceVersion
s.services <- ServiceUpdate{Op: SET, Services: services.Items}
}

watcher, err := s.client.WatchServices(labels.Everything(), labels.Everything(), *resourceVersion)
if err != nil {
glog.Errorf("Unable to watch for services changes: %v", err)
Expand Down Expand Up @@ -97,17 +110,28 @@ func handleServicesWatch(resourceVersion *uint64, ch <-chan watch.Event, updates

switch event.Type {
case watch.Added, watch.Modified:
updates <- ServiceUpdate{Op: SET, Services: []api.Service{*service}}
updates <- ServiceUpdate{Op: ADD, Services: []api.Service{*service}}

case watch.Deleted:
updates <- ServiceUpdate{Op: SET}
updates <- ServiceUpdate{Op: REMOVE, Services: []api.Service{*service}}
}
}
}
}

// runEndpoints loops forever looking for changes to endpoints.
func (s *SourceAPI) runEndpoints(resourceVersion *uint64) {
if *resourceVersion == 0 {
endpoints, err := s.client.ListEndpoints(labels.Everything())
if err != nil {
glog.Errorf("Unable to load endpoints: %v", err)
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
return
}
*resourceVersion = endpoints.ResourceVersion
s.endpoints <- EndpointsUpdate{Op: SET, Endpoints: endpoints.Items}
}

watcher, err := s.client.WatchEndpoints(labels.Everything(), labels.Everything(), *resourceVersion)
if err != nil {
glog.Errorf("Unable to watch for endpoints changes: %v", err)
Expand Down Expand Up @@ -135,10 +159,10 @@ func handleEndpointsWatch(resourceVersion *uint64, ch <-chan watch.Event, update

switch event.Type {
case watch.Added, watch.Modified:
updates <- EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints}}
updates <- EndpointsUpdate{Op: ADD, Endpoints: []api.Endpoints{*endpoints}}

case watch.Deleted:
updates <- EndpointsUpdate{Op: SET}
updates <- EndpointsUpdate{Op: REMOVE, Endpoints: []api.Endpoints{*endpoints}}
}
}
}
Expand Down
Loading

0 comments on commit 0db7989

Please sign in to comment.