Skip to content

Commit

Permalink
xds: switch EDS watch to new generic xdsClient API (#6414)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Jun 27, 2023
1 parent e859984 commit 7eb5727
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 193 deletions.
46 changes: 28 additions & 18 deletions xds/internal/balancer/clusterresolver/resource_resolver_eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

type edsResourceWatcher interface {
WatchEndpoints(string, func(xdsresource.EndpointsUpdate, error)) func()
}

type edsDiscoveryMechanism struct {
nameToWatch string
cancelWatch func()
topLevelResolver topLevelResolver
stopped *grpcsync.Event
Expand Down Expand Up @@ -64,31 +61,44 @@ func (er *edsDiscoveryMechanism) stop() {
er.cancelWatch()
}

func (er *edsDiscoveryMechanism) handleEndpointsUpdate(update xdsresource.EndpointsUpdate, err error) {
if er.stopped.HasFired() {
return
// newEDSResolver returns an implementation of the endpointsResolver interface
// that uses EDS to resolve the given name to endpoints.
func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelResolver topLevelResolver) *edsDiscoveryMechanism {
ret := &edsDiscoveryMechanism{
nameToWatch: nameToWatch,
topLevelResolver: topLevelResolver,
stopped: grpcsync.NewEvent(),
}
ret.cancelWatch = xdsresource.WatchEndpoints(producer, nameToWatch, ret)
return ret
}

if err != nil {
er.topLevelResolver.onError(err)
// OnUpdate is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData) {
if er.stopped.HasFired() {
return
}

er.mu.Lock()
er.update = update
er.update = update.Resource
er.updateReceived = true
er.mu.Unlock()

er.topLevelResolver.onUpdate()
}

// newEDSResolver returns an implementation of the endpointsResolver interface
// that uses EDS to resolve the given name to endpoints.
func newEDSResolver(nameToWatch string, watcher edsResourceWatcher, topLevelResolver topLevelResolver) *edsDiscoveryMechanism {
ret := &edsDiscoveryMechanism{
topLevelResolver: topLevelResolver,
stopped: grpcsync.NewEvent(),
func (er *edsDiscoveryMechanism) OnError(err error) {
if er.stopped.HasFired() {
return
}
ret.cancelWatch = watcher.WatchEndpoints(nameToWatch, ret.handleEndpointsUpdate)
return ret

er.topLevelResolver.onError(err)
}

func (er *edsDiscoveryMechanism) OnResourceDoesNotExist() {
if er.stopped.HasFired() {
return
}

er.topLevelResolver.onError(xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Endpoints not found in received response", er.nameToWatch))
}
1 change: 0 additions & 1 deletion xds/internal/xdsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type XDSClient interface {
WatchListener(string, func(xdsresource.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func()
WatchCluster(string, func(xdsresource.ClusterUpdate, error)) func()
WatchEndpoints(string, func(xdsresource.EndpointsUpdate, error)) func()

// WatchResource uses xDS to discover the resource associated with the
// provided resource name. The resource type implementation determines how
Expand Down
31 changes: 0 additions & 31 deletions xds/internal/xdsclient/clientimpl_watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,37 +112,6 @@ func (c *clientImpl) WatchCluster(resourceName string, cb func(xdsresource.Clust
return xdsresource.WatchCluster(c, resourceName, watcher)
}

// This is only required temporarily, while we modify the
// clientImpl.WatchEndpoints API to be implemented via the wrapper
// WatchEndpoints() API which calls the WatchResource() API.
type endpointsWatcher struct {
resourceName string
cb func(xdsresource.EndpointsUpdate, error)
}

func (c *endpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData) {
c.cb(update.Resource, nil)
}

func (c *endpointsWatcher) OnError(err error) {
c.cb(xdsresource.EndpointsUpdate{}, err)
}

func (c *endpointsWatcher) OnResourceDoesNotExist() {
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Endpoints not found in received response", c.resourceName)
c.cb(xdsresource.EndpointsUpdate{}, err)
}

// WatchEndpoints uses EDS to discover information about the
// ClusterLoadAssignment resource identified by resourceName.
//
// WatchEndpoints can be called multiple times, with same or different
// clusterNames. Each call will start an independent watcher for the resource.
func (c *clientImpl) WatchEndpoints(resourceName string, cb func(xdsresource.EndpointsUpdate, error)) (cancel func()) {
watcher := &endpointsWatcher{resourceName: resourceName, cb: cb}
return xdsresource.WatchEndpoints(c, resourceName, watcher)
}

// WatchResource uses xDS to discover the resource associated with the provided
// resource name. The resource type implementation determines how xDS requests
// are sent out and how responses are deserialized and validated. Upon receipt
Expand Down
8 changes: 7 additions & 1 deletion xds/internal/xdsclient/tests/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func compareDump(ctx context.Context, client xdsclient.XDSClient, want map[strin
}
}

type noopEndpointsWatcher struct{}

func (noopEndpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData) {}
func (noopEndpointsWatcher) OnError(err error) {}
func (noopEndpointsWatcher) OnResourceDoesNotExist() {}

func (s) TestDumpResources(t *testing.T) {
// Initialize the xDS resources to be used in this test.
ldsTargets := []string{"lds.target.good:0000", "lds.target.good:1111"}
Expand Down Expand Up @@ -122,7 +128,7 @@ func (s) TestDumpResources(t *testing.T) {
client.WatchCluster(target, func(xdsresource.ClusterUpdate, error) {})
}
for _, target := range edsTargets {
client.WatchEndpoints(target, func(xdsresource.EndpointsUpdate, error) {})
xdsresource.WatchEndpoints(client, target, noopEndpointsWatcher{})
}
want := map[string]map[string]xdsresource.UpdateWithMD{
"type.googleapis.com/envoy.config.listener.v3.Listener": {
Expand Down
Loading

0 comments on commit 7eb5727

Please sign in to comment.