Skip to content

Commit

Permalink
xds: implement ADS stream flow control mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Jul 29, 2024
1 parent 3eb0145 commit ba9f269
Show file tree
Hide file tree
Showing 31 changed files with 387 additions and 196 deletions.
8 changes: 2 additions & 6 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,8 @@ type ConfigParser interface {

// PickInfo contains additional information for the Pick operation.
type PickInfo struct {
// FullMethodName is the method name that NewClientStream() is called
// with. The canonical format is /service/Method.
FullMethodName string
// Ctx is the RPC's context, and may contain relevant RPC-level information
// like the outgoing header metadata.
Ctx context.Context
FullMethodName string // the RPC's full method name
Ctx context.Context // the RPC's context
}

// DoneInfo contains additional information for done.
Expand Down
48 changes: 36 additions & 12 deletions xds/csds/csds_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,27 +71,51 @@ func Test(t *testing.T) {

type unimplementedListenerWatcher struct{}

func (unimplementedListenerWatcher) OnUpdate(*xdsresource.ListenerResourceData) {}
func (unimplementedListenerWatcher) OnError(error) {}
func (unimplementedListenerWatcher) OnResourceDoesNotExist() {}
func (unimplementedListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) {
onDone.Done()
}
func (unimplementedListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
onDone.Done()
}
func (unimplementedListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
onDone.Done()
}

type unimplementedRouteConfigWatcher struct{}

func (unimplementedRouteConfigWatcher) OnUpdate(*xdsresource.RouteConfigResourceData) {}
func (unimplementedRouteConfigWatcher) OnError(error) {}
func (unimplementedRouteConfigWatcher) OnResourceDoesNotExist() {}
func (unimplementedRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) {
onDone.Done()
}
func (unimplementedRouteConfigWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
onDone.Done()
}
func (unimplementedRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
onDone.Done()
}

type unimplementedClusterWatcher struct{}

func (unimplementedClusterWatcher) OnUpdate(*xdsresource.ClusterResourceData) {}
func (unimplementedClusterWatcher) OnError(error) {}
func (unimplementedClusterWatcher) OnResourceDoesNotExist() {}
func (unimplementedClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) {
onDone.Done()
}
func (unimplementedClusterWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
onDone.Done()
}
func (unimplementedClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
onDone.Done()
}

type unimplementedEndpointsWatcher struct{}

func (unimplementedEndpointsWatcher) OnUpdate(*xdsresource.EndpointsResourceData) {}
func (unimplementedEndpointsWatcher) OnError(error) {}
func (unimplementedEndpointsWatcher) OnResourceDoesNotExist() {}
func (unimplementedEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) {
onDone.Done()
}
func (unimplementedEndpointsWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) {
onDone.Done()
}
func (unimplementedEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
onDone.Done()
}

// Creates a gRPC server and starts serving a CSDS service implementation on it.
// Returns the address of the newly created gRPC server.
Expand Down
24 changes: 12 additions & 12 deletions xds/internal/balancer/cdsbalancer/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,22 @@ type clusterWatcher struct {
parent *cdsBalancer
}

func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData) {
cw.parent.serializer.TrySchedule(func(context.Context) {
cw.parent.onClusterUpdate(cw.name, u.Resource)
})
func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) {
// Ensure that the onDone callback is always called.
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone.Done() }
cw.parent.serializer.ScheduleOr(handleUpdate, onDone.Done)
}

func (cw *clusterWatcher) OnError(err error) {
cw.parent.serializer.TrySchedule(func(context.Context) {
cw.parent.onClusterError(cw.name, err)
})
func (cw *clusterWatcher) OnError(err error, onDone xdsresource.DoneNotifier) {
// Ensure that the onDone callback is always called.
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone.Done() }
cw.parent.serializer.ScheduleOr(handleError, onDone.Done)
}

func (cw *clusterWatcher) OnResourceDoesNotExist() {
cw.parent.serializer.TrySchedule(func(context.Context) {
cw.parent.onClusterResourceNotFound(cw.name)
})
func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
// Ensure that the onDone callback is always called.
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone.Done() }
cw.parent.serializer.ScheduleOr(handleNotFound, onDone.Done)
}

// watcherState groups the state associated with a clusterWatcher.
Expand Down
9 changes: 4 additions & 5 deletions xds/internal/balancer/clusterresolver/clusterresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,18 +207,17 @@ func (b *clusterResolverBalancer) handleClientConnUpdate(update *ccUpdate) {
// handleResourceUpdate handles a resource update or error from the resource
// resolver by propagating the same to the child LB policy.
func (b *clusterResolverBalancer) handleResourceUpdate(update *resourceUpdate) {
if err := update.err; err != nil {
b.handleErrorFromUpdate(err, false)
return
}

b.watchUpdateReceived = true
b.priorities = update.priorities

// An update from the resource resolver contains resolved endpoint addresses
// for all configured discovery mechanisms ordered by priority. This is used
// to generate configuration for the priority LB policy.
b.updateChildConfig()

if update.onDone != nil {
update.onDone.Done()
}
}

// updateChildConfig builds child policy configuration using endpoint addresses
Expand Down
38 changes: 27 additions & 11 deletions xds/internal/balancer/clusterresolver/resource_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,24 @@ import (
// resourceUpdate is a combined update from all the resources, in the order of
// priority. For example, it can be {EDS, EDS, DNS}.
type resourceUpdate struct {
// A discovery mechanism would return an empty update when it runs into
// errors, and this would result in the priority LB policy reporting
// TRANSIENT_FAILURE (if there was a single discovery mechanism), or would
// fallback to the next highest priority that is available.
priorities []priorityConfig
err error
// To be invoked once the update is completely processed.
onDone xdsresource.DoneNotifier
}

// topLevelResolver is used by concrete endpointsResolver implementations for
// reporting updates and errors. The `resourceResolver` type implements this
// interface and takes appropriate actions upon receipt of updates and errors
// from underlying concrete resolvers.
type topLevelResolver interface {
onUpdate()
// onUpdate is called when a new update is received from the underlying
// endpointsResolver implementation. The onDone callback is to be invoked
// once the update is completely processed.
onUpdate(onDone xdsresource.DoneNotifier)
}

// endpointsResolver wraps the functionality to resolve a given resource name to
Expand Down Expand Up @@ -205,7 +213,7 @@ func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) {
}
// Regenerate even if there's no change in discovery mechanism, in case
// priority order changed.
rr.generateLocked()
rr.generateLocked(xdsresource.NopDoneNotifier{})
}

// resolveNow is typically called to trigger re-resolve of DNS. The EDS
Expand Down Expand Up @@ -252,7 +260,10 @@ func (rr *resourceResolver) stop(closing bool) {
// after they are stopped. Therefore, we don't have to worry about another
// write to this channel happening at the same time as this one.
select {
case <-rr.updateChannel:
case ru := <-rr.updateChannel:
if ru.onDone != nil {
ru.onDone.Done()
}
default:
}
rr.updateChannel <- &resourceUpdate{}
Expand All @@ -263,13 +274,14 @@ func (rr *resourceResolver) stop(closing bool) {
// one update. Otherwise it returns early.
//
// caller must hold rr.mu.
func (rr *resourceResolver) generateLocked() {
func (rr *resourceResolver) generateLocked(onDone xdsresource.DoneNotifier) {
var ret []priorityConfig
for _, rDM := range rr.children {
u, ok := rDM.r.lastUpdate()
if !ok {
// Don't send updates to parent until all resolvers have update to
// send.
onDone.Done()
return
}
switch uu := u.(type) {
Expand All @@ -280,16 +292,20 @@ func (rr *resourceResolver) generateLocked() {
}
}
select {
case <-rr.updateChannel:
case ru := <-rr.updateChannel:
if ru.onDone != nil {
ru.onDone.Done()
}
default:
}
rr.updateChannel <- &resourceUpdate{priorities: ret}
rr.updateChannel <- &resourceUpdate{priorities: ret, onDone: onDone}
}

func (rr *resourceResolver) onUpdate() {
rr.serializer.TrySchedule(func(context.Context) {
func (rr *resourceResolver) onUpdate(onDone xdsresource.DoneNotifier) {
handleUpdate := func(context.Context) {
rr.mu.Lock()
rr.generateLocked()
rr.generateLocked(onDone)
rr.mu.Unlock()
})
}
rr.serializer.ScheduleOr(handleUpdate, func() { onDone.Done() })
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

var (
Expand Down Expand Up @@ -79,7 +80,7 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr
ret.logger.Infof("Failed to parse dns hostname %q in clusterresolver LB policy", target)
}
ret.updateReceived = true
ret.topLevelResolver.onUpdate()
ret.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
return ret
}

Expand All @@ -89,7 +90,7 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr
ret.logger.Infof("Failed to build DNS resolver for target %q: %v", target, err)
}
ret.updateReceived = true
ret.topLevelResolver.onUpdate()
ret.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
return ret
}
ret.dnsR = r
Expand Down Expand Up @@ -153,7 +154,7 @@ func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error {
dr.updateReceived = true
dr.mu.Unlock()

dr.topLevelResolver.onUpdate()
dr.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
return nil
}

Expand All @@ -176,7 +177,7 @@ func (dr *dnsDiscoveryMechanism) ReportError(err error) {
dr.updateReceived = true
dr.mu.Unlock()

dr.topLevelResolver.onUpdate()
dr.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{})
}

func (dr *dnsDiscoveryMechanism) NewAddress(addresses []resolver.Address) {
Expand Down
16 changes: 10 additions & 6 deletions xds/internal/balancer/clusterresolver/resource_resolver_eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,22 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR
}

// OnUpdate is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData) {
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) {
if er.stopped.HasFired() {
onDone.Done()
return
}

er.mu.Lock()
er.update = &update.Resource
er.mu.Unlock()

er.topLevelResolver.onUpdate()
er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnError(err error) {
func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.DoneNotifier) {
if er.stopped.HasFired() {
onDone.Done()
return
}

Expand All @@ -102,6 +104,7 @@ func (er *edsDiscoveryMechanism) OnError(err error) {
// Continue using a previously received good configuration if one
// exists.
er.mu.Unlock()
onDone.Done()
return
}

Expand All @@ -114,11 +117,12 @@ func (er *edsDiscoveryMechanism) OnError(err error) {
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()

er.topLevelResolver.onUpdate()
er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnResourceDoesNotExist() {
func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
if er.stopped.HasFired() {
onDone.Done()
return
}

Expand All @@ -136,5 +140,5 @@ func (er *edsDiscoveryMechanism) OnResourceDoesNotExist() {
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()

er.topLevelResolver.onUpdate()
er.topLevelResolver.onUpdate(onDone)
}
45 changes: 21 additions & 24 deletions xds/internal/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,22 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch
return lw
}

func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
l.parent.serializer.TrySchedule(func(context.Context) {
l.parent.onListenerResourceUpdate(update.Resource)
})
func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) {
// Ensure that the onDone callback is always called.
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone.Done() }
l.parent.serializer.ScheduleOr(handleUpdate, onDone.Done)
}

func (l *listenerWatcher) OnError(err error) {
l.parent.serializer.TrySchedule(func(context.Context) {
l.parent.onListenerResourceError(err)
})
func (l *listenerWatcher) OnError(err error, onDone xdsresource.DoneNotifier) {
// Ensure that the onDone callback is always called.
handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone.Done() }
l.parent.serializer.ScheduleOr(handleError, onDone.Done)
}

func (l *listenerWatcher) OnResourceDoesNotExist() {
l.parent.serializer.TrySchedule(func(context.Context) {
l.parent.onListenerResourceNotFound()
})
func (l *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
// Ensure that the onDone callback is always called.
handleNotFound := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone.Done() }
l.parent.serializer.ScheduleOr(handleNotFound, onDone.Done)
}

func (l *listenerWatcher) stop() {
Expand All @@ -71,22 +71,19 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi
return rw
}

func (r *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
r.parent.serializer.TrySchedule(func(context.Context) {
r.parent.onRouteConfigResourceUpdate(r.resourceName, update.Resource)
})
func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) {
handleUpdate := func(context.Context) { r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource); onDone.Done() }
r.parent.serializer.ScheduleOr(handleUpdate, onDone.Done)
}

func (r *routeConfigWatcher) OnError(err error) {
r.parent.serializer.TrySchedule(func(context.Context) {
r.parent.onRouteConfigResourceError(r.resourceName, err)
})
func (r *routeConfigWatcher) OnError(err error, onDone xdsresource.DoneNotifier) {
handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone.Done() }
r.parent.serializer.ScheduleOr(handleError, onDone.Done)
}

func (r *routeConfigWatcher) OnResourceDoesNotExist() {
r.parent.serializer.TrySchedule(func(context.Context) {
r.parent.onRouteConfigResourceNotFound(r.resourceName)
})
func (r *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) {
handleNotFound := func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone.Done() }
r.parent.serializer.ScheduleOr(handleNotFound, onDone.Done)
}

func (r *routeConfigWatcher) stop() {
Expand Down
Loading

0 comments on commit ba9f269

Please sign in to comment.