Skip to content

Commit

Permalink
xds/internal/resolver: switch to generic xDS API for LDS/RDS (#6729)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Dec 7, 2023
1 parent a03c7f1 commit 477bd62
Show file tree
Hide file tree
Showing 5 changed files with 442 additions and 395 deletions.
2 changes: 1 addition & 1 deletion test/xds/xds_client_federation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (s) TestFederation_UnknownAuthorityInDialTarget(t *testing.T) {

target = fmt.Sprintf("xds://unknown-authority/%s", serviceName)
t.Logf("Dialing target %q with unknown authority which is expected to fail", target)
const wantErr = `authority "unknown-authority" is not found in the bootstrap file`
wantErr := fmt.Sprintf("authority \"unknown-authority\" specified in dial target %q is not found in the bootstrap file", target)
_, err = grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
if err == nil || !strings.Contains(err.Error(), wantErr) {
t.Fatalf("grpc.Dial(%q) returned %v, want: %s", target, err, wantErr)
Expand Down
111 changes: 6 additions & 105 deletions xds/internal/resolver/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/httpfilter"
rinternal "google.golang.org/grpc/xds/internal/resolver/internal"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

Expand Down Expand Up @@ -72,16 +71,6 @@ type xdsClusterManagerConfig struct {
Children map[string]xdsChildConfig `json:"children"`
}

// pruneActiveClusters deletes entries in r.activeClusters with zero
// references.
func (r *xdsResolver) pruneActiveClusters() {
for cluster, ci := range r.activeClusters {
if atomic.LoadInt32(&ci.refCount) == 0 {
delete(r.activeClusters, cluster)
}
}
}

// serviceConfigJSON produces a service config in JSON format representing all
// the clusters referenced in activeClusters. This includes clusters with zero
// references, so they must be pruned first.
Expand Down Expand Up @@ -193,10 +182,9 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
if v := atomic.AddInt32(ref, -1); v == 0 {
// This entry will be removed from activeClusters when
// producing the service config for the empty update.
select {
case cs.r.updateCh <- suWithError{emptyUpdate: true}:
default:
}
cs.r.serializer.Schedule(func(context.Context) {
cs.r.onClusterRefDownToZero()
})
}
},
Interceptor: interceptor,
Expand Down Expand Up @@ -338,97 +326,10 @@ func (cs *configSelector) stop() {
// selector; we need another update to delete clusters from the config (if
// we don't have another update pending already).
if needUpdate {
select {
case cs.r.updateCh <- suWithError{emptyUpdate: true}:
default:
}
}
}

// newConfigSelector creates the config selector for su; may add entries to
// r.activeClusters for previously-unseen clusters.
func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) {
cs := &configSelector{
r: r,
virtualHost: virtualHost{
httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride,
retryConfig: su.virtualHost.RetryConfig,
},
routes: make([]route, len(su.virtualHost.Routes)),
clusters: make(map[string]*clusterInfo),
httpFilterConfig: su.ldsConfig.httpFilterConfig,
cs.r.serializer.Schedule(func(context.Context) {
cs.r.onClusterRefDownToZero()
})
}

for i, rt := range su.virtualHost.Routes {
clusters := rinternal.NewWRR.(func() wrr.WRR)()
if rt.ClusterSpecifierPlugin != "" {
clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin
clusters.Add(&routeCluster{
name: clusterName,
}, 1)
cs.initializeCluster(clusterName, xdsChildConfig{
ChildPolicy: balancerConfig(su.clusterSpecifierPlugins[rt.ClusterSpecifierPlugin]),
})
} else {
for cluster, wc := range rt.WeightedClusters {
clusterName := clusterPrefix + cluster
clusters.Add(&routeCluster{
name: clusterName,
httpFilterConfigOverride: wc.HTTPFilterConfigOverride,
}, int64(wc.Weight))
cs.initializeCluster(clusterName, xdsChildConfig{
ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}),
})
}
}
cs.routes[i].clusters = clusters

var err error
cs.routes[i].m, err = xdsresource.RouteToMatcher(rt)
if err != nil {
return nil, err
}
cs.routes[i].actionType = rt.ActionType
if rt.MaxStreamDuration == nil {
cs.routes[i].maxStreamDuration = su.ldsConfig.maxStreamDuration
} else {
cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration
}

cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride
cs.routes[i].retryConfig = rt.RetryConfig
cs.routes[i].hashPolicies = rt.HashPolicies
}

// Account for this config selector's clusters. Do this after no further
// errors may occur. Note: cs.clusters are pointers to entries in
// activeClusters.
for _, ci := range cs.clusters {
atomic.AddInt32(&ci.refCount, 1)
}

return cs, nil
}

// initializeCluster initializes entries in cs.clusters map, creating entries in
// r.activeClusters as necessary. Any created entries will have a ref count set
// to zero as their ref count will be incremented by incRefs.
func (cs *configSelector) initializeCluster(clusterName string, cfg xdsChildConfig) {
ci := cs.r.activeClusters[clusterName]
if ci == nil {
ci = &clusterInfo{refCount: 0}
cs.r.activeClusters[clusterName] = ci
}
cs.clusters[clusterName] = ci
cs.clusters[clusterName].cfg = cfg
}

type clusterInfo struct {
// number of references to this cluster; accessed atomically
refCount int32
// cfg is the child configuration for this cluster, containing either the
// csp config or the cds cluster config.
cfg xdsChildConfig
}

type interceptorList struct {
Expand Down
210 changes: 51 additions & 159 deletions xds/internal/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,185 +19,77 @@
package resolver

import (
"fmt"
"sync"
"time"
"context"

"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/xds/internal/clusterspecifier"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

// serviceUpdate contains information received from the LDS/RDS responses which
// are of interest to the xds resolver. The RDS request is built by first
// making a LDS to get the RouteConfig name.
type serviceUpdate struct {
// virtualHost contains routes and other configuration to route RPCs.
virtualHost *xdsresource.VirtualHost
// clusterSpecifierPlugins contains the configurations for any cluster
// specifier plugins emitted by the xdsclient.
clusterSpecifierPlugins map[string]clusterspecifier.BalancerConfig
// ldsConfig contains configuration that applies to all routes.
ldsConfig ldsConfig
type listenerWatcher struct {
resourceName string
cancel func()
parent *xdsResolver
}

// ldsConfig contains information received from the LDS responses which are of
// interest to the xds resolver.
type ldsConfig struct {
// maxStreamDuration is from the HTTP connection manager's
// common_http_protocol_options field.
maxStreamDuration time.Duration
httpFilterConfig []xdsresource.HTTPFilter
func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatcher {
lw := &listenerWatcher{resourceName: resourceName, parent: parent}
lw.cancel = xdsresource.WatchListener(parent.xdsClient, resourceName, lw)
return lw
}

// watchService uses LDS and RDS to discover information about the provided
// serviceName.
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
//
// TODO(easwars): Make this function a method on the xdsResolver type.
// Currently, there is a single call site for this function, and all arguments
// passed to it are fields of the xdsResolver type.
func watchService(c xdsclient.XDSClient, serviceName string, cb func(serviceUpdate, error), logger *grpclog.PrefixLogger) (cancel func()) {
w := &serviceUpdateWatcher{
logger: logger,
c: c,
serviceName: serviceName,
serviceCb: cb,
}
w.ldsCancel = c.WatchListener(serviceName, w.handleLDSResp)

return w.close
func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
l.parent.serializer.Schedule(func(context.Context) {
l.parent.onListenerResourceUpdate(update.Resource)
})
}

// serviceUpdateWatcher handles LDS and RDS response, and calls the service
// callback at the right time.
type serviceUpdateWatcher struct {
logger *grpclog.PrefixLogger
c xdsclient.XDSClient
serviceName string
ldsCancel func()
serviceCb func(serviceUpdate, error)
lastUpdate serviceUpdate

mu sync.Mutex
closed bool
rdsName string
rdsCancel func()
func (l *listenerWatcher) OnError(err error) {
l.parent.serializer.Schedule(func(context.Context) {
l.parent.onListenerResourceError(err)
})
}

func (w *serviceUpdateWatcher) handleLDSResp(update xdsresource.ListenerUpdate, err error) {
w.logger.Infof("received LDS update: %+v, err: %v", pretty.ToJSON(update), err)
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return
}
if err != nil {
// We check the error type and do different things. For now, the only
// type we check is ResourceNotFound, which indicates the LDS resource
// was removed, and besides sending the error to callback, we also
// cancel the RDS watch.
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound && w.rdsCancel != nil {
w.rdsCancel()
w.rdsName = ""
w.rdsCancel = nil
w.lastUpdate = serviceUpdate{}
}
// The other error cases still return early without canceling the
// existing RDS watch.
w.serviceCb(serviceUpdate{}, err)
return
}

w.lastUpdate.ldsConfig = ldsConfig{
maxStreamDuration: update.MaxStreamDuration,
httpFilterConfig: update.HTTPFilters,
}

if update.InlineRouteConfig != nil {
// If there was an RDS watch, cancel it.
w.rdsName = ""
if w.rdsCancel != nil {
w.rdsCancel()
w.rdsCancel = nil
}
func (l *listenerWatcher) OnResourceDoesNotExist() {
l.parent.serializer.Schedule(func(context.Context) {
l.parent.onListenerResourceNotFound()
})
}

// Handle the inline RDS update as if it's from an RDS watch.
w.applyRouteConfigUpdate(*update.InlineRouteConfig)
return
}
func (l *listenerWatcher) stop() {
l.cancel()
l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName)
}

// RDS name from update is not an empty string, need RDS to fetch the
// routes.
type routeConfigWatcher struct {
resourceName string
cancel func()
parent *xdsResolver
}

if w.rdsName == update.RouteConfigName {
// If the new RouteConfigName is same as the previous, don't cancel and
// restart the RDS watch.
//
// If the route name did change, then we must wait until the first RDS
// update before reporting this LDS config.
if w.lastUpdate.virtualHost != nil {
// We want to send an update with the new fields from the new LDS
// (e.g. max stream duration), and old fields from the previous
// RDS.
//
// But note that this should only happen when virtual host is set,
// which means an RDS was received.
w.serviceCb(w.lastUpdate, nil)
}
return
}
w.rdsName = update.RouteConfigName
if w.rdsCancel != nil {
w.rdsCancel()
}
w.rdsCancel = w.c.WatchRouteConfig(update.RouteConfigName, w.handleRDSResp)
func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfigWatcher {
rw := &routeConfigWatcher{resourceName: resourceName, parent: parent}
rw.cancel = xdsresource.WatchRouteConfig(parent.xdsClient, resourceName, rw)
return rw
}

func (w *serviceUpdateWatcher) applyRouteConfigUpdate(update xdsresource.RouteConfigUpdate) {
matchVh := xdsresource.FindBestMatchingVirtualHost(w.serviceName, update.VirtualHosts)
if matchVh == nil {
// No matching virtual host found.
w.serviceCb(serviceUpdate{}, fmt.Errorf("no matching virtual host found for %q", w.serviceName))
return
}
func (r *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
r.parent.serializer.Schedule(func(context.Context) {
r.parent.onRouteConfigResourceUpdate(r.resourceName, update.Resource)
})
}

w.lastUpdate.virtualHost = matchVh
w.lastUpdate.clusterSpecifierPlugins = update.ClusterSpecifierPlugins
w.serviceCb(w.lastUpdate, nil)
func (r *routeConfigWatcher) OnError(err error) {
r.parent.serializer.Schedule(func(context.Context) {
r.parent.onRouteConfigResourceError(r.resourceName, err)
})
}

func (w *serviceUpdateWatcher) handleRDSResp(update xdsresource.RouteConfigUpdate, err error) {
w.logger.Infof("received RDS update: %+v, err: %v", pretty.ToJSON(update), err)
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return
}
if w.rdsCancel == nil {
// This mean only the RDS watch is canceled, can happen if the LDS
// resource is removed.
return
}
if err != nil {
w.serviceCb(serviceUpdate{}, err)
return
}
w.applyRouteConfigUpdate(update)
func (r *routeConfigWatcher) OnResourceDoesNotExist() {
r.parent.serializer.Schedule(func(context.Context) {
r.parent.onRouteConfigResourceNotFound(r.resourceName)
})
}

func (w *serviceUpdateWatcher) close() {
w.mu.Lock()
defer w.mu.Unlock()
w.closed = true
w.ldsCancel()
if w.rdsCancel != nil {
w.rdsCancel()
w.rdsCancel = nil
}
func (r *routeConfigWatcher) stop() {
r.cancel()
r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName)
}
Loading

0 comments on commit 477bd62

Please sign in to comment.