Skip to content

Commit

Permalink
xds/internal/server: switch to generic xDS API for LDS/RDS (#6726)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Oct 13, 2023
1 parent df8fc99 commit 6fe6085
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 62 deletions.
96 changes: 52 additions & 44 deletions xds/internal/server/listener_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ type DrainCallback func(addr net.Addr)
// XDSClient wraps the methods on the XDSClient which are required by
// the listenerWrapper.
type XDSClient interface {
WatchListener(string, func(xdsresource.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func()
WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func())
BootstrapConfig() *bootstrap.Config
}

Expand Down Expand Up @@ -110,7 +109,6 @@ func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan stru
mode: connectivity.ServingModeStarting,
closed: grpcsync.NewEvent(),
goodUpdate: grpcsync.NewEvent(),
ldsUpdateCh: make(chan ldsUpdateWithError, 1),
rdsUpdateCh: make(chan rdsHandlerUpdate, 1),
}
lw.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[xds-server-listener %p] ", lw))
Expand All @@ -120,17 +118,16 @@ func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan stru
lisAddr := lw.Listener.Addr().String()
lw.addr, lw.port, _ = net.SplitHostPort(lisAddr)

lw.rdsHandler = newRDSHandler(lw.xdsC, lw.rdsUpdateCh)
lw.cancelWatch = lw.xdsC.WatchListener(lw.name, lw.handleListenerUpdate)
lw.rdsHandler = newRDSHandler(lw.xdsC, lw.logger, lw.rdsUpdateCh)
lw.cancelWatch = xdsresource.WatchListener(lw.xdsC, lw.name, &ldsWatcher{
parent: lw,
logger: lw.logger,
name: lw.name,
})
go lw.run()
return lw, lw.goodUpdate.Done()
}

type ldsUpdateWithError struct {
update xdsresource.ListenerUpdate
err error
}

// listenerWrapper wraps the net.Listener associated with the listening address
// passed to Serve(). It also contains all other state associated with this
// particular invocation of Serve().
Expand Down Expand Up @@ -181,8 +178,6 @@ type listenerWrapper struct {
// rdsUpdates are the RDS resources received from the management
// server, keyed on the RouteName of the RDS resource.
rdsUpdates unsafe.Pointer // map[string]xdsclient.RouteConfigUpdate
// ldsUpdateCh is a channel for XDSClient LDS updates.
ldsUpdateCh chan ldsUpdateWithError
// rdsUpdateCh is a channel for XDSClient RDS updates.
rdsUpdateCh chan rdsHandlerUpdate
}
Expand Down Expand Up @@ -320,31 +315,12 @@ func (l *listenerWrapper) run() {
select {
case <-l.closed.Done():
return
case u := <-l.ldsUpdateCh:
l.handleLDSUpdate(u)
case u := <-l.rdsUpdateCh:
l.handleRDSUpdate(u)
}
}
}

// handleLDSUpdate is the callback which handles LDS Updates. It writes the
// received update to the update channel, which is picked up by the run
// goroutine.
func (l *listenerWrapper) handleListenerUpdate(update xdsresource.ListenerUpdate, err error) {
if l.closed.HasFired() {
l.logger.Warningf("Resource %q received update: %v with error: %v, after listener was closed", l.name, update, err)
return
}
// Remove any existing entry in ldsUpdateCh and replace with the new one, as the only update
// listener cares about is most recent update.
select {
case <-l.ldsUpdateCh:
default:
}
l.ldsUpdateCh <- ldsUpdateWithError{update: update, err: err}
}

// handleRDSUpdate handles a full rds update from rds handler. On a successful
// update, the server will switch to ServingModeServing as the full
// configuration (both LDS and RDS) has been received.
Expand All @@ -354,7 +330,6 @@ func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) {
return
}
if update.err != nil {
l.logger.Warningf("Received error for rds names specified in resource %q: %+v", l.name, update.err)
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
}
Expand All @@ -368,17 +343,7 @@ func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) {
l.goodUpdate.Fire()
}

func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
if update.err != nil {
l.logger.Warningf("Received error for resource %q: %+v", l.name, update.err)
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
}
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
return
}

func (l *listenerWrapper) handleLDSUpdate(update xdsresource.ListenerUpdate) {
// Make sure that the socket address on the received Listener resource
// matches the address of the net.Listener passed to us by the user. This
// check is done here instead of at the XDSClient layer because of the
Expand All @@ -391,7 +356,7 @@ func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
// What this means is that the XDSClient has ACKed a resource which can push
// the server into a "not serving" mode. This is not ideal, but this is
// what we have decided to do. See gRPC A36 for more details.
ilc := update.update.InboundListenerCfg
ilc := update.InboundListenerCfg
if ilc.Address != l.addr || ilc.Port != l.port {
l.switchMode(nil, connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
return
Expand Down Expand Up @@ -440,3 +405,46 @@ func (l *listenerWrapper) switchMode(fcs *xdsresource.FilterChainManager, newMod
l.modeCallback(l.Listener.Addr(), newMode, err)
}
}

// ldsWatcher implements the xdsresource.ListenerWatcher interface and is
// passed to the WatchListener API.
type ldsWatcher struct {
parent *listenerWrapper
logger *internalgrpclog.PrefixLogger
name string
}

func (lw *ldsWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
if lw.parent.closed.HasFired() {
lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update)
return
}
if lw.logger.V(2) {
lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource)
}
lw.parent.handleLDSUpdate(update.Resource)
}

func (lw *ldsWatcher) OnError(err error) {
if lw.parent.closed.HasFired() {
lw.logger.Warningf("Resource %q received error: %v after listener was closed", lw.name, err)
return
}
if lw.logger.V(2) {
lw.logger.Infof("LDS watch for resource %q reported error: %#v", lw.name, err)
}
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
}

func (lw *ldsWatcher) OnResourceDoesNotExist() {
if lw.parent.closed.HasFired() {
lw.logger.Warningf("Resource %q received resource-not-found-error after listener was closed", lw.name)
return
}
if lw.logger.V(2) {
lw.logger.Infof("LDS watch for resource %q reported resource-does-not-exist error: %v", lw.name)
}
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Listener not found in received response", lw.name)
lw.parent.switchMode(nil, connectivity.ServingModeNotServing, err)
}
53 changes: 41 additions & 12 deletions xds/internal/server/rds_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package server
import (
"sync"

igrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

Expand All @@ -34,7 +35,8 @@ type rdsHandlerUpdate struct {
// rdsHandler handles any RDS queries that need to be started for a given server
// side listeners Filter Chains (i.e. not inline).
type rdsHandler struct {
xdsC XDSClient
xdsC XDSClient
logger *igrpclog.PrefixLogger

mu sync.Mutex
updates map[string]xdsresource.RouteConfigUpdate
Expand All @@ -49,9 +51,10 @@ type rdsHandler struct {
// newRDSHandler creates a new rdsHandler to watch for RDS resources.
// listenerWrapper updates the list of route names to watch by calling
// updateRouteNamesToWatch() upon receipt of new Listener configuration.
func newRDSHandler(xdsC XDSClient, ch chan rdsHandlerUpdate) *rdsHandler {
func newRDSHandler(xdsC XDSClient, logger *igrpclog.PrefixLogger, ch chan rdsHandlerUpdate) *rdsHandler {
return &rdsHandler{
xdsC: xdsC,
logger: logger,
updateChannel: ch,
updates: make(map[string]xdsresource.RouteConfigUpdate),
cancels: make(map[string]func()),
Expand All @@ -69,11 +72,11 @@ func (rh *rdsHandler) updateRouteNamesToWatch(routeNamesToWatch map[string]bool)
// routeNamesToWatch.
for routeName := range routeNamesToWatch {
if _, ok := rh.cancels[routeName]; !ok {
func(routeName string) {
rh.cancels[routeName] = rh.xdsC.WatchRouteConfig(routeName, func(update xdsresource.RouteConfigUpdate, err error) {
rh.handleRouteUpdate(routeName, update, err)
})
}(routeName)
// The xDS client keeps a reference to the watcher until the cancel
// func is invoked. So, we don't need to keep a reference for fear
// of it being garbage collected.
w := &rdsWatcher{parent: rh, routeName: routeName}
rh.cancels[routeName] = xdsresource.WatchRouteConfig(rh.xdsC, routeName, w)
}
}

Expand All @@ -97,11 +100,7 @@ func (rh *rdsHandler) updateRouteNamesToWatch(routeNamesToWatch map[string]bool)
// handleRouteUpdate persists the route config for a given route name, and also
// sends an update to the Listener Wrapper on an error received or if the rds
// handler has a full collection of updates.
func (rh *rdsHandler) handleRouteUpdate(routeName string, update xdsresource.RouteConfigUpdate, err error) {
if err != nil {
drainAndPush(rh.updateChannel, rdsHandlerUpdate{err: err})
return
}
func (rh *rdsHandler) handleRouteUpdate(routeName string, update xdsresource.RouteConfigUpdate) {
rh.mu.Lock()
defer rh.mu.Unlock()
rh.updates[routeName] = update
Expand Down Expand Up @@ -131,3 +130,33 @@ func (rh *rdsHandler) close() {
cancel()
}
}

// rdsWatcher implements the xdsresource.RouteConfigWatcher interface and is
// passed to the WatchRouteConfig API.
type rdsWatcher struct {
parent *rdsHandler
logger *igrpclog.PrefixLogger
routeName string
}

func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource)
}
rw.parent.handleRouteUpdate(rw.routeName, update.Resource)
}

func (rw *rdsWatcher) OnError(err error) {
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q reported error: %v", rw.routeName, err)
}
drainAndPush(rw.parent.updateChannel, rdsHandlerUpdate{err: err})
}

func (rw *rdsWatcher) OnResourceDoesNotExist() {
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q reported resource-does-not-exist error: %v", rw.routeName)
}
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type RouteConfiguration not found in received response", rw.routeName)
drainAndPush(rw.parent.updateChannel, rdsHandlerUpdate{err: err})
}
12 changes: 6 additions & 6 deletions xds/internal/server/rds_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (s) TestRDSHandler_SuccessCaseOneRDSWatch(t *testing.T) {

// Create an rds handler and give it a single route to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, updateCh)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true})

// Verify that the given route is requested.
Expand All @@ -211,7 +211,7 @@ func (s) TestRDSHandler_SuccessCaseTwoUpdates(t *testing.T) {

// Create an rds handler and give it a single route to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, updateCh)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true})

// Verify that the given route is requested.
Expand Down Expand Up @@ -273,7 +273,7 @@ func (s) TestRDSHandler_SuccessCaseDeletedRoute(t *testing.T) {

// Create an rds handler and give it two routes to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, updateCh)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true})

// Verify that the given routes are requested.
Expand Down Expand Up @@ -329,7 +329,7 @@ func (s) TestRDSHandler_SuccessCaseTwoUpdatesAddAndDeleteRoute(t *testing.T) {

// Create an rds handler and give it two routes to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, updateCh)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true})

// Verify that the given routes are requested.
Expand Down Expand Up @@ -400,7 +400,7 @@ func (s) TestRDSHandler_SuccessCaseSecondUpdateMakesRouteFull(t *testing.T) {

// Create an rds handler and give it three routes to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, updateCh)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true, route3: true})

// Verify that the given routes are requested.
Expand Down Expand Up @@ -455,7 +455,7 @@ func (s) TestErrorReceived(t *testing.T) {

// Create an rds handler and give it a single route to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, updateCh)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true})

// Verify that the given route is requested.
Expand Down

0 comments on commit 6fe6085

Please sign in to comment.