diff --git a/balancer/balancer.go b/balancer/balancer.go index b181f386a1ba..21977f1fe84c 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -282,12 +282,8 @@ type ConfigParser interface { // PickInfo contains additional information for the Pick operation. type PickInfo struct { - // FullMethodName is the method name that NewClientStream() is called - // with. The canonical format is /service/Method. - FullMethodName string - // Ctx is the RPC's context, and may contain relevant RPC-level information - // like the outgoing header metadata. - Ctx context.Context + FullMethodName string // the RPC's full method name + Ctx context.Context // the RPC's context } // DoneInfo contains additional information for done. diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index 6bfa45ec9482..d3f343eda6f7 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -71,27 +71,51 @@ func Test(t *testing.T) { type unimplementedListenerWatcher struct{} -func (unimplementedListenerWatcher) OnUpdate(*xdsresource.ListenerResourceData) {} -func (unimplementedListenerWatcher) OnError(error) {} -func (unimplementedListenerWatcher) OnResourceDoesNotExist() {} +func (unimplementedListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) { + onDone.Done() +} +func (unimplementedListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { + onDone.Done() +} +func (unimplementedListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + onDone.Done() +} type unimplementedRouteConfigWatcher struct{} -func (unimplementedRouteConfigWatcher) OnUpdate(*xdsresource.RouteConfigResourceData) {} -func (unimplementedRouteConfigWatcher) OnError(error) {} -func (unimplementedRouteConfigWatcher) OnResourceDoesNotExist() {} +func (unimplementedRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) { + onDone.Done() +} +func (unimplementedRouteConfigWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { + onDone.Done() +} +func (unimplementedRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + onDone.Done() +} type unimplementedClusterWatcher struct{} -func (unimplementedClusterWatcher) OnUpdate(*xdsresource.ClusterResourceData) {} -func (unimplementedClusterWatcher) OnError(error) {} -func (unimplementedClusterWatcher) OnResourceDoesNotExist() {} +func (unimplementedClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) { + onDone.Done() +} +func (unimplementedClusterWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { + onDone.Done() +} +func (unimplementedClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + onDone.Done() +} type unimplementedEndpointsWatcher struct{} -func (unimplementedEndpointsWatcher) OnUpdate(*xdsresource.EndpointsResourceData) {} -func (unimplementedEndpointsWatcher) OnError(error) {} -func (unimplementedEndpointsWatcher) OnResourceDoesNotExist() {} +func (unimplementedEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) { + onDone.Done() +} +func (unimplementedEndpointsWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { + onDone.Done() +} +func (unimplementedEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + onDone.Done() +} // Creates a gRPC server and starts serving a CSDS service implementation on it. // Returns the address of the newly created gRPC server. diff --git a/xds/internal/balancer/cdsbalancer/cluster_watcher.go b/xds/internal/balancer/cdsbalancer/cluster_watcher.go index be01e8f07a07..5609efd4c8fe 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_watcher.go +++ b/xds/internal/balancer/cdsbalancer/cluster_watcher.go @@ -32,22 +32,22 @@ type clusterWatcher struct { parent *cdsBalancer } -func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData) { - cw.parent.serializer.TrySchedule(func(context.Context) { - cw.parent.onClusterUpdate(cw.name, u.Resource) - }) +func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) { + // Ensure that the onDone callback is always called. + handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone.Done() } + cw.parent.serializer.ScheduleOr(handleUpdate, onDone.Done) } -func (cw *clusterWatcher) OnError(err error) { - cw.parent.serializer.TrySchedule(func(context.Context) { - cw.parent.onClusterError(cw.name, err) - }) +func (cw *clusterWatcher) OnError(err error, onDone xdsresource.DoneNotifier) { + // Ensure that the onDone callback is always called. + handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone.Done() } + cw.parent.serializer.ScheduleOr(handleError, onDone.Done) } -func (cw *clusterWatcher) OnResourceDoesNotExist() { - cw.parent.serializer.TrySchedule(func(context.Context) { - cw.parent.onClusterResourceNotFound(cw.name) - }) +func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + // Ensure that the onDone callback is always called. + handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone.Done() } + cw.parent.serializer.ScheduleOr(handleNotFound, onDone.Done) } // watcherState groups the state associated with a clusterWatcher. diff --git a/xds/internal/balancer/clusterresolver/clusterresolver.go b/xds/internal/balancer/clusterresolver/clusterresolver.go index 83ead92a4a69..65683a8cf16c 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver.go @@ -207,11 +207,6 @@ func (b *clusterResolverBalancer) handleClientConnUpdate(update *ccUpdate) { // handleResourceUpdate handles a resource update or error from the resource // resolver by propagating the same to the child LB policy. func (b *clusterResolverBalancer) handleResourceUpdate(update *resourceUpdate) { - if err := update.err; err != nil { - b.handleErrorFromUpdate(err, false) - return - } - b.watchUpdateReceived = true b.priorities = update.priorities @@ -219,6 +214,10 @@ func (b *clusterResolverBalancer) handleResourceUpdate(update *resourceUpdate) { // for all configured discovery mechanisms ordered by priority. This is used // to generate configuration for the priority LB policy. b.updateChildConfig() + + if update.onDone != nil { + update.onDone.Done() + } } // updateChildConfig builds child policy configuration using endpoint addresses diff --git a/xds/internal/balancer/clusterresolver/resource_resolver.go b/xds/internal/balancer/clusterresolver/resource_resolver.go index ab93a1fa5e6b..105bf1ee8011 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver.go @@ -30,8 +30,13 @@ import ( // resourceUpdate is a combined update from all the resources, in the order of // priority. For example, it can be {EDS, EDS, DNS}. type resourceUpdate struct { + // A discovery mechanism would return an empty update when it runs into + // errors, and this would result in the priority LB policy reporting + // TRANSIENT_FAILURE (if there was a single discovery mechanism), or would + // fallback to the next highest priority that is available. priorities []priorityConfig - err error + // To be invoked once the update is completely processed. + onDone xdsresource.DoneNotifier } // topLevelResolver is used by concrete endpointsResolver implementations for @@ -39,7 +44,10 @@ type resourceUpdate struct { // interface and takes appropriate actions upon receipt of updates and errors // from underlying concrete resolvers. type topLevelResolver interface { - onUpdate() + // onUpdate is called when a new update is received from the underlying + // endpointsResolver implementation. The onDone callback is to be invoked + // once the update is completely processed. + onUpdate(onDone xdsresource.DoneNotifier) } // endpointsResolver wraps the functionality to resolve a given resource name to @@ -205,7 +213,7 @@ func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) { } // Regenerate even if there's no change in discovery mechanism, in case // priority order changed. - rr.generateLocked() + rr.generateLocked(xdsresource.NopDoneNotifier{}) } // resolveNow is typically called to trigger re-resolve of DNS. The EDS @@ -252,7 +260,10 @@ func (rr *resourceResolver) stop(closing bool) { // after they are stopped. Therefore, we don't have to worry about another // write to this channel happening at the same time as this one. select { - case <-rr.updateChannel: + case ru := <-rr.updateChannel: + if ru.onDone != nil { + ru.onDone.Done() + } default: } rr.updateChannel <- &resourceUpdate{} @@ -263,13 +274,14 @@ func (rr *resourceResolver) stop(closing bool) { // one update. Otherwise it returns early. // // caller must hold rr.mu. -func (rr *resourceResolver) generateLocked() { +func (rr *resourceResolver) generateLocked(onDone xdsresource.DoneNotifier) { var ret []priorityConfig for _, rDM := range rr.children { u, ok := rDM.r.lastUpdate() if !ok { // Don't send updates to parent until all resolvers have update to // send. + onDone.Done() return } switch uu := u.(type) { @@ -280,16 +292,20 @@ func (rr *resourceResolver) generateLocked() { } } select { - case <-rr.updateChannel: + case ru := <-rr.updateChannel: + if ru.onDone != nil { + ru.onDone.Done() + } default: } - rr.updateChannel <- &resourceUpdate{priorities: ret} + rr.updateChannel <- &resourceUpdate{priorities: ret, onDone: onDone} } -func (rr *resourceResolver) onUpdate() { - rr.serializer.TrySchedule(func(context.Context) { +func (rr *resourceResolver) onUpdate(onDone xdsresource.DoneNotifier) { + handleUpdate := func(context.Context) { rr.mu.Lock() - rr.generateLocked() + rr.generateLocked(onDone) rr.mu.Unlock() - }) + } + rr.serializer.ScheduleOr(handleUpdate, func() { onDone.Done() }) } diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go index efdc3088a395..b22810e22080 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go @@ -27,6 +27,7 @@ import ( "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) var ( @@ -79,7 +80,7 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr ret.logger.Infof("Failed to parse dns hostname %q in clusterresolver LB policy", target) } ret.updateReceived = true - ret.topLevelResolver.onUpdate() + ret.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{}) return ret } @@ -89,7 +90,7 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr ret.logger.Infof("Failed to build DNS resolver for target %q: %v", target, err) } ret.updateReceived = true - ret.topLevelResolver.onUpdate() + ret.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{}) return ret } ret.dnsR = r @@ -153,7 +154,7 @@ func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error { dr.updateReceived = true dr.mu.Unlock() - dr.topLevelResolver.onUpdate() + dr.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{}) return nil } @@ -176,7 +177,7 @@ func (dr *dnsDiscoveryMechanism) ReportError(err error) { dr.updateReceived = true dr.mu.Unlock() - dr.topLevelResolver.onUpdate() + dr.topLevelResolver.onUpdate(xdsresource.NopDoneNotifier{}) } func (dr *dnsDiscoveryMechanism) NewAddress(addresses []resolver.Address) { diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go index 3d0ec356e93a..86efc4caf107 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go @@ -76,8 +76,9 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR } // OnUpdate is invoked to report an update for the resource being watched. -func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData) { +func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) { if er.stopped.HasFired() { + onDone.Done() return } @@ -85,11 +86,12 @@ func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceD er.update = &update.Resource er.mu.Unlock() - er.topLevelResolver.onUpdate() + er.topLevelResolver.onUpdate(onDone) } -func (er *edsDiscoveryMechanism) OnError(err error) { +func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.DoneNotifier) { if er.stopped.HasFired() { + onDone.Done() return } @@ -102,6 +104,7 @@ func (er *edsDiscoveryMechanism) OnError(err error) { // Continue using a previously received good configuration if one // exists. er.mu.Unlock() + onDone.Done() return } @@ -114,11 +117,12 @@ func (er *edsDiscoveryMechanism) OnError(err error) { er.update = &xdsresource.EndpointsUpdate{} er.mu.Unlock() - er.topLevelResolver.onUpdate() + er.topLevelResolver.onUpdate(onDone) } -func (er *edsDiscoveryMechanism) OnResourceDoesNotExist() { +func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { if er.stopped.HasFired() { + onDone.Done() return } @@ -136,5 +140,5 @@ func (er *edsDiscoveryMechanism) OnResourceDoesNotExist() { er.update = &xdsresource.EndpointsUpdate{} er.mu.Unlock() - er.topLevelResolver.onUpdate() + er.topLevelResolver.onUpdate(onDone) } diff --git a/xds/internal/resolver/watch_service.go b/xds/internal/resolver/watch_service.go index a4532eccc3ec..5bfd18313930 100644 --- a/xds/internal/resolver/watch_service.go +++ b/xds/internal/resolver/watch_service.go @@ -36,22 +36,22 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch return lw } -func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) { - l.parent.serializer.TrySchedule(func(context.Context) { - l.parent.onListenerResourceUpdate(update.Resource) - }) +func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) { + // Ensure that the onDone callback is always called. + handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone.Done() } + l.parent.serializer.ScheduleOr(handleUpdate, onDone.Done) } -func (l *listenerWatcher) OnError(err error) { - l.parent.serializer.TrySchedule(func(context.Context) { - l.parent.onListenerResourceError(err) - }) +func (l *listenerWatcher) OnError(err error, onDone xdsresource.DoneNotifier) { + // Ensure that the onDone callback is always called. + handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone.Done() } + l.parent.serializer.ScheduleOr(handleError, onDone.Done) } -func (l *listenerWatcher) OnResourceDoesNotExist() { - l.parent.serializer.TrySchedule(func(context.Context) { - l.parent.onListenerResourceNotFound() - }) +func (l *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + // Ensure that the onDone callback is always called. + handleNotFound := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone.Done() } + l.parent.serializer.ScheduleOr(handleNotFound, onDone.Done) } func (l *listenerWatcher) stop() { @@ -71,22 +71,19 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi return rw } -func (r *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) { - r.parent.serializer.TrySchedule(func(context.Context) { - r.parent.onRouteConfigResourceUpdate(r.resourceName, update.Resource) - }) +func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) { + handleUpdate := func(context.Context) { r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource); onDone.Done() } + r.parent.serializer.ScheduleOr(handleUpdate, onDone.Done) } -func (r *routeConfigWatcher) OnError(err error) { - r.parent.serializer.TrySchedule(func(context.Context) { - r.parent.onRouteConfigResourceError(r.resourceName, err) - }) +func (r *routeConfigWatcher) OnError(err error, onDone xdsresource.DoneNotifier) { + handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone.Done() } + r.parent.serializer.ScheduleOr(handleError, onDone.Done) } -func (r *routeConfigWatcher) OnResourceDoesNotExist() { - r.parent.serializer.TrySchedule(func(context.Context) { - r.parent.onRouteConfigResourceNotFound(r.resourceName) - }) +func (r *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + handleNotFound := func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone.Done() } + r.parent.serializer.ScheduleOr(handleNotFound, onDone.Done) } func (r *routeConfigWatcher) stop() { diff --git a/xds/internal/server/listener_wrapper.go b/xds/internal/server/listener_wrapper.go index 174b54c44117..c78fb163eea7 100644 --- a/xds/internal/server/listener_wrapper.go +++ b/xds/internal/server/listener_wrapper.go @@ -160,7 +160,7 @@ func (l *listenerWrapper) handleLDSUpdate(update xdsresource.ListenerUpdate) { ilc := update.InboundListenerCfg // Make sure that the socket address on the received Listener resource // matches the address of the net.Listener passed to us by the user. This - // check is done here instead of at the XDSClient layer because of the + // check is onDone here instead of at the XDSClient layer because of the // following couple of reasons: // - XDSClient cannot know the listening address of every listener in the // system, and hence cannot perform this check. @@ -410,7 +410,8 @@ type ldsWatcher struct { name string } -func (lw *ldsWatcher) OnUpdate(update *xdsresource.ListenerResourceData) { +func (lw *ldsWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) { + defer onDone.Done() if lw.parent.closed.HasFired() { lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update) return @@ -421,7 +422,8 @@ func (lw *ldsWatcher) OnUpdate(update *xdsresource.ListenerResourceData) { lw.parent.handleLDSUpdate(update.Resource) } -func (lw *ldsWatcher) OnError(err error) { +func (lw *ldsWatcher) OnError(err error, onDone xdsresource.DoneNotifier) { + defer onDone.Done() if lw.parent.closed.HasFired() { lw.logger.Warningf("Resource %q received error: %v after listener was closed", lw.name, err) return @@ -433,7 +435,8 @@ func (lw *ldsWatcher) OnError(err error) { // continue to use the old configuration. } -func (lw *ldsWatcher) OnResourceDoesNotExist() { +func (lw *ldsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + defer onDone.Done() if lw.parent.closed.HasFired() { lw.logger.Warningf("Resource %q received resource-does-not-exist error after listener was closed", lw.name) return diff --git a/xds/internal/server/rds_handler.go b/xds/internal/server/rds_handler.go index 67cde4602894..969f4bfc2253 100644 --- a/xds/internal/server/rds_handler.go +++ b/xds/internal/server/rds_handler.go @@ -147,7 +147,8 @@ type rdsWatcher struct { canceled bool // eats callbacks if true } -func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) { +func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) { + defer onDone.Done() rw.mu.Lock() if rw.canceled { rw.mu.Unlock() @@ -160,7 +161,8 @@ func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) { rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource}) } -func (rw *rdsWatcher) OnError(err error) { +func (rw *rdsWatcher) OnError(err error, onDone xdsresource.DoneNotifier) { + defer onDone.Done() rw.mu.Lock() if rw.canceled { rw.mu.Unlock() @@ -173,7 +175,8 @@ func (rw *rdsWatcher) OnError(err error) { rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err}) } -func (rw *rdsWatcher) OnResourceDoesNotExist() { +func (rw *rdsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + defer onDone.Done() rw.mu.Lock() if rw.canceled { rw.mu.Unlock() diff --git a/xds/internal/testutils/resource_watcher.go b/xds/internal/testutils/resource_watcher.go index 2b6b5b1ae448..d4ab3c9f195a 100644 --- a/xds/internal/testutils/resource_watcher.go +++ b/xds/internal/testutils/resource_watcher.go @@ -37,7 +37,8 @@ type TestResourceWatcher struct { // OnUpdate is invoked by the xDS client to report the latest update on the resource // being watched. -func (w *TestResourceWatcher) OnUpdate(data xdsresource.ResourceData) { +func (w *TestResourceWatcher) OnUpdate(data xdsresource.ResourceData, onDone xdsresource.DoneNotifier) { + defer onDone.Done() select { case <-w.UpdateCh: default: @@ -46,7 +47,8 @@ func (w *TestResourceWatcher) OnUpdate(data xdsresource.ResourceData) { } // OnError is invoked by the xDS client to report the latest error. -func (w *TestResourceWatcher) OnError(err error) { +func (w *TestResourceWatcher) OnError(err error, onDone xdsresource.DoneNotifier) { + defer onDone.Done() select { case <-w.ErrorCh: default: @@ -56,7 +58,8 @@ func (w *TestResourceWatcher) OnError(err error) { // OnResourceDoesNotExist is used by the xDS client to report that the resource // being watched no longer exists. -func (w *TestResourceWatcher) OnResourceDoesNotExist() { +func (w *TestResourceWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + defer onDone.Done() select { case <-w.ResourceDoesNotExistCh: default: diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 7214eb03c9b1..4b310e1dc645 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -148,7 +148,7 @@ func (a *authority) transportOnSendHandler(u *transport.ResourceSendInfo) { a.startWatchTimersLocked(rType, u.ResourceNames) } -func (a *authority) handleResourceUpdate(resourceUpdate transport.ResourceUpdate) error { +func (a *authority) handleResourceUpdate(resourceUpdate transport.ResourceUpdate, fc *transport.FlowControlManager) error { rType := a.resourceTypeGetter(resourceUpdate.URL) if rType == nil { return xdsresource.NewErrorf(xdsresource.ErrorTypeResourceTypeUnsupported, "Resource URL %v unknown in response from server", resourceUpdate.URL) @@ -159,14 +159,27 @@ func (a *authority) handleResourceUpdate(resourceUpdate transport.ResourceUpdate ServerConfig: a.serverCfg, } updates, md, err := decodeAllResources(opts, rType, resourceUpdate) - a.updateResourceStateAndScheduleCallbacks(rType, updates, md) + a.updateResourceStateAndScheduleCallbacks(rType, updates, md, fc) return err } -func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Type, updates map[string]resourceDataErrTuple, md xdsresource.UpdateMetadata) { +func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Type, updates map[string]resourceDataErrTuple, md xdsresource.UpdateMetadata, fc *transport.FlowControlManager) { a.resourcesMu.Lock() defer a.resourcesMu.Unlock() + // We build a list of callback funcs to invoke, and invoke them at the end + // of this method instead of inline (when handling the update for a + // particular resource), because we want to make sure that all calls to + // `fc.Add` happen before any callbacks are invoked. This will ensure that + // the next read is never attempted before all callbacks are invoked, and + // the watchers have processed the update. + funcsToSchedule := []func(context.Context){} + defer func() { + for _, f := range funcsToSchedule { + a.serializer.ScheduleOr(f, fc.Done) + } + }() + resourceStates := a.resources[rType] for name, uErr := range updates { if state, ok := resourceStates[name]; ok { @@ -210,7 +223,8 @@ func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Ty for watcher := range state.watchers { watcher := watcher err := uErr.err - a.serializer.TrySchedule(func(context.Context) { watcher.OnError(err) }) + fc.Add(1) + funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnError(err, fc) }) } continue } @@ -225,7 +239,8 @@ func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Ty for watcher := range state.watchers { watcher := watcher resource := uErr.resource - a.serializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource) }) + fc.Add(1) + funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnUpdate(resource, fc) }) } } // Sync cache. @@ -300,7 +315,8 @@ func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Ty state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist} for watcher := range state.watchers { watcher := watcher - a.serializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist() }) + fc.Add(1) + funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnResourceDoesNotExist(fc) }) } } } @@ -429,7 +445,7 @@ func (a *authority) newConnectionError(err error) { for watcher := range state.watchers { watcher := watcher a.serializer.TrySchedule(func(context.Context) { - watcher.OnError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err)) + watcher.OnError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err), xdsresource.NopDoneNotifier{}) }) } } @@ -495,7 +511,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w a.logger.Infof("Resource type %q with resource name %q found in cache: %s", rType.TypeName(), resourceName, state.cache.ToJSON()) } resource := state.cache - a.serializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource) }) + a.serializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, xdsresource.NopDoneNotifier{}) }) } return func() { @@ -548,7 +564,7 @@ func (a *authority) handleWatchTimerExpiryLocked(rType xdsresource.Type, resourc state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist} for watcher := range state.watchers { watcher := watcher - a.serializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist() }) + a.serializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist(xdsresource.NopDoneNotifier{}) }) } } @@ -574,7 +590,7 @@ func (a *authority) triggerResourceNotFoundForTesting(rType xdsresource.Type, re state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist} for watcher := range state.watchers { watcher := watcher - a.serializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist() }) + a.serializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist(xdsresource.NopDoneNotifier{}) }) } } diff --git a/xds/internal/xdsclient/clientimpl_watchers.go b/xds/internal/xdsclient/clientimpl_watchers.go index f064394aa41c..7a5dddfd2b8b 100644 --- a/xds/internal/xdsclient/clientimpl_watchers.go +++ b/xds/internal/xdsclient/clientimpl_watchers.go @@ -44,7 +44,7 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, if err := c.resourceTypes.maybeRegister(rType); err != nil { logger.Warningf("Watch registered for name %q of type %q which is already registered", rType.TypeName(), resourceName) - c.serializer.TrySchedule(func(context.Context) { watcher.OnError(err) }) + c.serializer.TrySchedule(func(context.Context) { watcher.OnError(err, xdsresource.NopDoneNotifier{}) }) return func() {} } @@ -54,7 +54,7 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, a, unref, err := c.findAuthority(n) if err != nil { logger.Warningf("Watch registered for name %q of type %q, authority %q is not found", rType.TypeName(), resourceName, n.Authority) - c.serializer.TrySchedule(func(context.Context) { watcher.OnError(err) }) + c.serializer.TrySchedule(func(context.Context) { watcher.OnError(err, xdsresource.NopDoneNotifier{}) }) return func() {} } cancelF := a.watchResource(rType, n.String(), watcher) diff --git a/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go b/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go new file mode 100644 index 000000000000..2dd8c67310b7 --- /dev/null +++ b/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go @@ -0,0 +1,53 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient_test + +import "testing" + +func (s) Test_ADSFlowControl_ResourceUpdates_Multiple(t *testing.T) { + // This would be similar to the below test, but with multiple resources. +} + +func (s) Test_ADSFlowControl_ResourceUpdates_Single(t *testing.T) { + // Start an xDS management server. + + // Start an xDS client to the management server. + + // Configure two watchers that block on a channel before calling done.Done() + + // Configure resources on the management server. + + // Ensure that the ADS request is sent out on the stream, via a callback on the management server. + + // Ensure that the ACK is sent out. + + // Update the resource on the management server. + + // Unblock one watcher. + + // Ensure that there is no read on the stream. + + // Unblock the second watcher. + + // Ensure that there is a read on the stream, and the update is seen by the watchers. +} + +// Test for error callbacks + +// Test for Resource not found callbacks diff --git a/xds/internal/xdsclient/tests/cds_watchers_test.go b/xds/internal/xdsclient/tests/cds_watchers_test.go index 6ee227f85997..2b311e1be8e9 100644 --- a/xds/internal/xdsclient/tests/cds_watchers_test.go +++ b/xds/internal/xdsclient/tests/cds_watchers_test.go @@ -43,9 +43,15 @@ import ( type noopClusterWatcher struct{} -func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {} -func (noopClusterWatcher) OnError(err error) {} -func (noopClusterWatcher) OnResourceDoesNotExist() {} +func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData, done xdsresource.DoneNotifier) { + done.Done() +} +func (noopClusterWatcher) OnError(err error, done xdsresource.DoneNotifier) { + done.Done() +} +func (noopClusterWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { + done.Done() +} type clusterUpdateErrTuple struct { update xdsresource.ClusterUpdate @@ -60,20 +66,23 @@ func newClusterWatcher() *clusterWatcher { return &clusterWatcher{updateCh: testutils.NewChannel()} } -func (cw *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) { +func (cw *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData, done xdsresource.DoneNotifier) { cw.updateCh.Send(clusterUpdateErrTuple{update: update.Resource}) + done.Done() } -func (cw *clusterWatcher) OnError(err error) { +func (cw *clusterWatcher) OnError(err error, done xdsresource.DoneNotifier) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have // access to the most recently received error. cw.updateCh.Replace(clusterUpdateErrTuple{err: err}) + done.Done() } -func (cw *clusterWatcher) OnResourceDoesNotExist() { +func (cw *clusterWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { cw.updateCh.Replace(clusterUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")}) + done.Done() } // badClusterResource returns a cluster resource for the given name which diff --git a/xds/internal/xdsclient/tests/eds_watchers_test.go b/xds/internal/xdsclient/tests/eds_watchers_test.go index 7f2d5571efd3..7ad2e1ca6072 100644 --- a/xds/internal/xdsclient/tests/eds_watchers_test.go +++ b/xds/internal/xdsclient/tests/eds_watchers_test.go @@ -53,9 +53,15 @@ const ( type noopEndpointsWatcher struct{} -func (noopEndpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData) {} -func (noopEndpointsWatcher) OnError(err error) {} -func (noopEndpointsWatcher) OnResourceDoesNotExist() {} +func (noopEndpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData, done xdsresource.DoneNotifier) { + done.Done() +} +func (noopEndpointsWatcher) OnError(err error, done xdsresource.DoneNotifier) { + done.Done() +} +func (noopEndpointsWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { + done.Done() +} type endpointsUpdateErrTuple struct { update xdsresource.EndpointsUpdate @@ -70,20 +76,23 @@ func newEndpointsWatcher() *endpointsWatcher { return &endpointsWatcher{updateCh: testutils.NewChannel()} } -func (ew *endpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData) { +func (ew *endpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData, done xdsresource.DoneNotifier) { ew.updateCh.Send(endpointsUpdateErrTuple{update: update.Resource}) + done.Done() } -func (ew *endpointsWatcher) OnError(err error) { +func (ew *endpointsWatcher) OnError(err error, done xdsresource.DoneNotifier) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have // access to the most recently received error. ew.updateCh.Replace(endpointsUpdateErrTuple{err: err}) + done.Done() } -func (ew *endpointsWatcher) OnResourceDoesNotExist() { +func (ew *endpointsWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { ew.updateCh.Replace(endpointsUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Endpoints not found in received response")}) + done.Done() } // badEndpointsResource returns a endpoints resource for the given diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index 23f8f0c8d544..92788dd2d8a2 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -48,9 +48,15 @@ import ( type noopListenerWatcher struct{} -func (noopListenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {} -func (noopListenerWatcher) OnError(err error) {} -func (noopListenerWatcher) OnResourceDoesNotExist() {} +func (noopListenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, done xdsresource.DoneNotifier) { + done.Done() +} +func (noopListenerWatcher) OnError(err error, done xdsresource.DoneNotifier) { + done.Done() +} +func (noopListenerWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { + done.Done() +} type listenerUpdateErrTuple struct { update xdsresource.ListenerUpdate @@ -65,20 +71,23 @@ func newListenerWatcher() *listenerWatcher { return &listenerWatcher{updateCh: testutils.NewChannel()} } -func (cw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) { +func (cw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, done xdsresource.DoneNotifier) { cw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) + done.Done() } -func (cw *listenerWatcher) OnError(err error) { +func (cw *listenerWatcher) OnError(err error, done xdsresource.DoneNotifier) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have // access to the most recently received error. cw.updateCh.Replace(listenerUpdateErrTuple{err: err}) + done.Done() } -func (cw *listenerWatcher) OnResourceDoesNotExist() { +func (cw *listenerWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { cw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) + done.Done() } // badListenerResource returns a listener resource for the given name which does diff --git a/xds/internal/xdsclient/tests/misc_watchers_test.go b/xds/internal/xdsclient/tests/misc_watchers_test.go index f51648907182..947ca698751c 100644 --- a/xds/internal/xdsclient/tests/misc_watchers_test.go +++ b/xds/internal/xdsclient/tests/misc_watchers_test.go @@ -69,23 +69,26 @@ func newTestRouteConfigWatcher(client xdsclient.XDSClient, name1, name2 string) } } -func (rw *testRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) { +func (rw *testRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, done xdsresource.DoneNotifier) { rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource}) rw.cancel1 = xdsresource.WatchRouteConfig(rw.client, rw.name1, rw.rcw1) rw.cancel2 = xdsresource.WatchRouteConfig(rw.client, rw.name2, rw.rcw2) + done.Done() } -func (rw *testRouteConfigWatcher) OnError(err error) { +func (rw *testRouteConfigWatcher) OnError(err error, done xdsresource.DoneNotifier) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have // access to the most recently received error. rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err}) + done.Done() } -func (rw *testRouteConfigWatcher) OnResourceDoesNotExist() { +func (rw *testRouteConfigWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")}) + done.Done() } func (rw *testRouteConfigWatcher) cancel() { diff --git a/xds/internal/xdsclient/tests/rds_watchers_test.go b/xds/internal/xdsclient/tests/rds_watchers_test.go index 524bfbe05210..a042ef1eddfb 100644 --- a/xds/internal/xdsclient/tests/rds_watchers_test.go +++ b/xds/internal/xdsclient/tests/rds_watchers_test.go @@ -43,9 +43,15 @@ import ( type noopRouteConfigWatcher struct{} -func (noopRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {} -func (noopRouteConfigWatcher) OnError(err error) {} -func (noopRouteConfigWatcher) OnResourceDoesNotExist() {} +func (noopRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, done xdsresource.DoneNotifier) { + done.Done() +} +func (noopRouteConfigWatcher) OnError(err error, done xdsresource.DoneNotifier) { + done.Done() +} +func (noopRouteConfigWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { + done.Done() +} type routeConfigUpdateErrTuple struct { update xdsresource.RouteConfigUpdate @@ -60,20 +66,23 @@ func newRouteConfigWatcher() *routeConfigWatcher { return &routeConfigWatcher{updateCh: testutils.NewChannel()} } -func (rw *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) { +func (rw *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, done xdsresource.DoneNotifier) { rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource}) + done.Done() } -func (rw *routeConfigWatcher) OnError(err error) { +func (rw *routeConfigWatcher) OnError(err error, done xdsresource.DoneNotifier) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have // access to the most recently received error. rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err}) + done.Done() } -func (rw *routeConfigWatcher) OnResourceDoesNotExist() { +func (rw *routeConfigWatcher) OnResourceDoesNotExist(done xdsresource.DoneNotifier) { rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")}) + done.Done() } // badRouteConfigResource returns a RouteConfiguration resource for the given diff --git a/xds/internal/xdsclient/transport/loadreport_test.go b/xds/internal/xdsclient/transport/loadreport_test.go index cd5e5e352ee1..d283df2567b7 100644 --- a/xds/internal/xdsclient/transport/loadreport_test.go +++ b/xds/internal/xdsclient/transport/loadreport_test.go @@ -68,10 +68,10 @@ func (s) TestReportLoad(t *testing.T) { tr, err := transport.New(transport.Options{ ServerCfg: serverCfg, NodeProto: nodeProto, - OnRecvHandler: func(transport.ResourceUpdate) error { return nil }, // No ADS validation. - OnErrorHandler: func(error) {}, // No ADS stream error handling. - OnSendHandler: func(*transport.ResourceSendInfo) {}, // No ADS stream update handling. - Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. + OnRecvHandler: func(transport.ResourceUpdate, *transport.FlowControlManager) error { return nil }, // No ADS validation. + OnErrorHandler: func(error) {}, // No ADS stream error handling. + OnSendHandler: func(*transport.ResourceSendInfo) {}, // No ADS stream update handling. + Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. }) if err != nil { t.Fatalf("Failed to create xDS transport: %v", err) diff --git a/xds/internal/xdsclient/transport/transport.go b/xds/internal/xdsclient/transport/transport.go index e0636084b49e..d4b96db70ffc 100644 --- a/xds/internal/xdsclient/transport/transport.go +++ b/xds/internal/xdsclient/transport/transport.go @@ -112,7 +112,7 @@ type Transport struct { // cause the transport layer to send an ACK to the management server. A non-nil // error is returned from this function when the data model layer believes // otherwise, and this will cause the transport layer to send a NACK. -type OnRecvHandlerFunc func(update ResourceUpdate) error +type OnRecvHandlerFunc func(update ResourceUpdate, fc *FlowControlManager) error // OnSendHandlerFunc is the implementation at the authority, which handles state // changes for the resource watch and stop watch timers accordingly. @@ -464,7 +464,10 @@ func (t *Transport) sendExisting(stream adsStream) (sentNodeProto bool, err erro // successfully received. func (t *Transport) recv(stream adsStream) bool { msgReceived := false + fc := &FlowControlManager{logger: t.logger} for { + fc.Wait() + resources, url, rVersion, nonce, err := t.recvAggregatedDiscoveryServiceResponse(stream) if err != nil { // Note that we do not consider it an error if the ADS stream was closed @@ -486,7 +489,7 @@ func (t *Transport) recv(stream adsStream) bool { Resources: resources, URL: url, Version: rVersion, - }) + }, fc) if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceTypeUnsupported { t.logger.Warningf("%v", err) continue @@ -621,3 +624,27 @@ func (t *Transport) Close() { func (t *Transport) ChannelConnectivityStateForTesting() connectivity.State { return t.cc.GetState() } + +// FlowControlManager TBD. +type FlowControlManager struct { + wg sync.WaitGroup + logger *grpclog.PrefixLogger +} + +// Add TBD. +func (fc *FlowControlManager) Add(n int) { + fc.logger.Infof("easwars: add %d", n) + fc.wg.Add(n) +} + +// Wait TBD. +func (fc *FlowControlManager) Wait() { + fc.logger.Infof("easwars: wait") + fc.wg.Wait() +} + +// Done TBD. +func (fc *FlowControlManager) Done() { + fc.logger.Infof("easwars: done") + fc.wg.Done() +} diff --git a/xds/internal/xdsclient/transport/transport_ack_nack_test.go b/xds/internal/xdsclient/transport/transport_ack_nack_test.go index 582c7fa73dca..6fb21a062d5a 100644 --- a/xds/internal/xdsclient/transport/transport_ack_nack_test.go +++ b/xds/internal/xdsclient/transport/transport_ack_nack_test.go @@ -49,7 +49,7 @@ var ( // A simple update handler for listener resources which validates only the // `use_original_dst` field. - dataModelValidator = func(update transport.ResourceUpdate) error { + dataModelValidator = func(update transport.ResourceUpdate, _ *transport.FlowControlManager) error { for _, r := range update.Resources { inner := &v3discoverypb.Resource{} if err := proto.Unmarshal(r.GetValue(), inner); err != nil { diff --git a/xds/internal/xdsclient/transport/transport_backoff_test.go b/xds/internal/xdsclient/transport/transport_backoff_test.go index 0a028ca8dca9..79179d4f7c03 100644 --- a/xds/internal/xdsclient/transport/transport_backoff_test.go +++ b/xds/internal/xdsclient/transport/transport_backoff_test.go @@ -101,7 +101,7 @@ func (s) TestTransport_BackoffAfterStreamFailure(t *testing.T) { nodeID := uuid.New().String() tr, err := transport.New(transport.Options{ ServerCfg: serverCfg, - OnRecvHandler: func(transport.ResourceUpdate) error { return nil }, // No data model layer validation. + OnRecvHandler: func(transport.ResourceUpdate, *transport.FlowControlManager) error { return nil }, // No data model layer validation. OnErrorHandler: func(err error) { select { case streamErrCh <- err: @@ -262,7 +262,7 @@ func (s) TestTransport_RetriesAfterBrokenStream(t *testing.T) { // we can pass a no-op data model layer implementation. tr, err := transport.New(transport.Options{ ServerCfg: serverCfg, - OnRecvHandler: func(transport.ResourceUpdate) error { return nil }, // No data model layer validation. + OnRecvHandler: func(transport.ResourceUpdate, *transport.FlowControlManager) error { return nil }, // No data model layer validation. OnErrorHandler: func(err error) { select { case streamErrCh <- err: @@ -394,10 +394,10 @@ func (s) TestTransport_ResourceRequestedBeforeStreamCreation(t *testing.T) { nodeID := uuid.New().String() tr, err := transport.New(transport.Options{ ServerCfg: serverCfg, - OnRecvHandler: func(transport.ResourceUpdate) error { return nil }, // No data model layer validation. - OnErrorHandler: func(error) {}, // No stream error handling. - OnSendHandler: func(*transport.ResourceSendInfo) {}, // No on send handler - Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. + OnRecvHandler: func(transport.ResourceUpdate, *transport.FlowControlManager) error { return nil }, // No data model layer validation. + OnErrorHandler: func(error) {}, // No stream error handling. + OnSendHandler: func(*transport.ResourceSendInfo) {}, // No on send handler + Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. NodeProto: &v3corepb.Node{Id: nodeID}, }) if err != nil { diff --git a/xds/internal/xdsclient/transport/transport_new_test.go b/xds/internal/xdsclient/transport/transport_new_test.go index ca994735f7d0..39fca43ae340 100644 --- a/xds/internal/xdsclient/transport/transport_new_test.go +++ b/xds/internal/xdsclient/transport/transport_new_test.go @@ -53,7 +53,7 @@ func (s) TestNew(t *testing.T) { opts: transport.Options{ ServerCfg: serverCfg, NodeProto: &v3corepb.Node{}, - OnRecvHandler: func(transport.ResourceUpdate) error { return nil }, + OnRecvHandler: func(transport.ResourceUpdate, *transport.FlowControlManager) error { return nil }, OnSendHandler: func(*transport.ResourceSendInfo) {}, }, wantErrStr: "missing OnError callback handler when creating a new transport", @@ -64,7 +64,7 @@ func (s) TestNew(t *testing.T) { opts: transport.Options{ ServerCfg: serverCfg, NodeProto: &v3corepb.Node{}, - OnRecvHandler: func(transport.ResourceUpdate) error { return nil }, + OnRecvHandler: func(transport.ResourceUpdate, *transport.FlowControlManager) error { return nil }, OnErrorHandler: func(error) {}, }, wantErrStr: "missing OnSend callback handler when creating a new transport", @@ -74,7 +74,7 @@ func (s) TestNew(t *testing.T) { opts: transport.Options{ ServerCfg: serverCfg, NodeProto: &v3corepb.Node{}, - OnRecvHandler: func(transport.ResourceUpdate) error { return nil }, + OnRecvHandler: func(transport.ResourceUpdate, *transport.FlowControlManager) error { return nil }, OnErrorHandler: func(error) {}, OnSendHandler: func(*transport.ResourceSendInfo) {}, }, diff --git a/xds/internal/xdsclient/transport/transport_resource_test.go b/xds/internal/xdsclient/transport/transport_resource_test.go index b1c8aaf4795e..9012631f72b4 100644 --- a/xds/internal/xdsclient/transport/transport_resource_test.go +++ b/xds/internal/xdsclient/transport/transport_resource_test.go @@ -185,7 +185,7 @@ func (s) TestHandleResponseFromManagementServer(t *testing.T) { tr, err := transport.New(transport.Options{ ServerCfg: serverCfg, // No validation. Simply push received resources on a channel. - OnRecvHandler: func(update transport.ResourceUpdate) error { + OnRecvHandler: func(update transport.ResourceUpdate, _ *transport.FlowControlManager) error { resourcesCh.Send(&resourcesWithTypeURL{ resources: update.Resources, url: update.URL, @@ -238,10 +238,8 @@ func (s) TestEmptyListenerResourceOnStreamRestart(t *testing.T) { } nodeProto := &v3corepb.Node{Id: uuid.New().String()} tr, err := transport.New(transport.Options{ - ServerCfg: serverCfg, - OnRecvHandler: func(update transport.ResourceUpdate) error { - return nil - }, + ServerCfg: serverCfg, + OnRecvHandler: func(transport.ResourceUpdate, *transport.FlowControlManager) error { return nil }, OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling. OnErrorHandler: func(error) {}, // No stream error handling. Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. @@ -331,10 +329,8 @@ func (s) TestEmptyClusterResourceOnStreamRestartWithListener(t *testing.T) { } nodeProto := &v3corepb.Node{Id: uuid.New().String()} tr, err := transport.New(transport.Options{ - ServerCfg: serverCfg, - OnRecvHandler: func(update transport.ResourceUpdate) error { - return nil - }, + ServerCfg: serverCfg, + OnRecvHandler: func(transport.ResourceUpdate, *transport.FlowControlManager) error { return nil }, OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling. OnErrorHandler: func(error) {}, // No stream error handling. Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. diff --git a/xds/internal/xdsclient/transport/transport_test.go b/xds/internal/xdsclient/transport/transport_test.go index 6ef8ccfbf7f5..3ddc38aab725 100644 --- a/xds/internal/xdsclient/transport/transport_test.go +++ b/xds/internal/xdsclient/transport/transport_test.go @@ -45,9 +45,12 @@ func (s) TestNewWithGRPCDial(t *testing.T) { } // Create a new transport and ensure that the custom dialer was called. opts := transport.Options{ - ServerCfg: serverCfg, - NodeProto: &v3corepb.Node{}, - OnRecvHandler: func(transport.ResourceUpdate) error { return nil }, + ServerCfg: serverCfg, + NodeProto: &v3corepb.Node{}, + OnRecvHandler: func(update transport.ResourceUpdate, fc *transport.FlowControlManager) error { + fc.Done() + return nil + }, OnErrorHandler: func(error) {}, OnSendHandler: func(*transport.ResourceSendInfo) {}, } diff --git a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go index 5ac7f0312239..ed1c7fdbf536 100644 --- a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go @@ -111,7 +111,7 @@ func (c *ClusterResourceData) Raw() *anypb.Any { // corresponding to the cluster resource being watched. type ClusterWatcher interface { // OnUpdate is invoked to report an update for the resource being watched. - OnUpdate(*ClusterResourceData) + OnUpdate(*ClusterResourceData, DoneNotifier) // OnError is invoked under different error conditions including but not // limited to the following: @@ -121,28 +121,28 @@ type ClusterWatcher interface { // - resource validation error // - ADS stream failure // - connection failure - OnError(error) + OnError(error, DoneNotifier) // OnResourceDoesNotExist is invoked for a specific error condition where // the requested resource is not found on the xDS management server. - OnResourceDoesNotExist() + OnResourceDoesNotExist(DoneNotifier) } type delegatingClusterWatcher struct { watcher ClusterWatcher } -func (d *delegatingClusterWatcher) OnUpdate(data ResourceData) { +func (d *delegatingClusterWatcher) OnUpdate(data ResourceData, done DoneNotifier) { c := data.(*ClusterResourceData) - d.watcher.OnUpdate(c) + d.watcher.OnUpdate(c, done) } -func (d *delegatingClusterWatcher) OnError(err error) { - d.watcher.OnError(err) +func (d *delegatingClusterWatcher) OnError(err error, done DoneNotifier) { + d.watcher.OnError(err, done) } -func (d *delegatingClusterWatcher) OnResourceDoesNotExist() { - d.watcher.OnResourceDoesNotExist() +func (d *delegatingClusterWatcher) OnResourceDoesNotExist(done DoneNotifier) { + d.watcher.OnResourceDoesNotExist(done) } // WatchCluster uses xDS to discover the configuration associated with the diff --git a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go index 775a8aa19423..a418b5423fee 100644 --- a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go @@ -107,7 +107,7 @@ func (e *EndpointsResourceData) Raw() *anypb.Any { // events corresponding to the endpoints resource being watched. type EndpointsWatcher interface { // OnUpdate is invoked to report an update for the resource being watched. - OnUpdate(*EndpointsResourceData) + OnUpdate(*EndpointsResourceData, DoneNotifier) // OnError is invoked under different error conditions including but not // limited to the following: @@ -117,28 +117,28 @@ type EndpointsWatcher interface { // - resource validation error // - ADS stream failure // - connection failure - OnError(error) + OnError(error, DoneNotifier) // OnResourceDoesNotExist is invoked for a specific error condition where // the requested resource is not found on the xDS management server. - OnResourceDoesNotExist() + OnResourceDoesNotExist(DoneNotifier) } type delegatingEndpointsWatcher struct { watcher EndpointsWatcher } -func (d *delegatingEndpointsWatcher) OnUpdate(data ResourceData) { +func (d *delegatingEndpointsWatcher) OnUpdate(data ResourceData, done DoneNotifier) { e := data.(*EndpointsResourceData) - d.watcher.OnUpdate(e) + d.watcher.OnUpdate(e, done) } -func (d *delegatingEndpointsWatcher) OnError(err error) { - d.watcher.OnError(err) +func (d *delegatingEndpointsWatcher) OnError(err error, done DoneNotifier) { + d.watcher.OnError(err, done) } -func (d *delegatingEndpointsWatcher) OnResourceDoesNotExist() { - d.watcher.OnResourceDoesNotExist() +func (d *delegatingEndpointsWatcher) OnResourceDoesNotExist(done DoneNotifier) { + d.watcher.OnResourceDoesNotExist(done) } // WatchEndpoints uses xDS to discover the configuration associated with the diff --git a/xds/internal/xdsclient/xdsresource/listener_resource_type.go b/xds/internal/xdsclient/xdsresource/listener_resource_type.go index 2436d72f8e54..c9360aaf877c 100644 --- a/xds/internal/xdsclient/xdsresource/listener_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/listener_resource_type.go @@ -144,7 +144,7 @@ func (l *ListenerResourceData) Raw() *anypb.Any { // events corresponding to the listener resource being watched. type ListenerWatcher interface { // OnUpdate is invoked to report an update for the resource being watched. - OnUpdate(*ListenerResourceData) + OnUpdate(*ListenerResourceData, DoneNotifier) // OnError is invoked under different error conditions including but not // limited to the following: @@ -154,28 +154,28 @@ type ListenerWatcher interface { // - resource validation error // - ADS stream failure // - connection failure - OnError(error) + OnError(error, DoneNotifier) // OnResourceDoesNotExist is invoked for a specific error condition where // the requested resource is not found on the xDS management server. - OnResourceDoesNotExist() + OnResourceDoesNotExist(DoneNotifier) } type delegatingListenerWatcher struct { watcher ListenerWatcher } -func (d *delegatingListenerWatcher) OnUpdate(data ResourceData) { +func (d *delegatingListenerWatcher) OnUpdate(data ResourceData, done DoneNotifier) { l := data.(*ListenerResourceData) - d.watcher.OnUpdate(l) + d.watcher.OnUpdate(l, done) } -func (d *delegatingListenerWatcher) OnError(err error) { - d.watcher.OnError(err) +func (d *delegatingListenerWatcher) OnError(err error, done DoneNotifier) { + d.watcher.OnError(err, done) } -func (d *delegatingListenerWatcher) OnResourceDoesNotExist() { - d.watcher.OnResourceDoesNotExist() +func (d *delegatingListenerWatcher) OnResourceDoesNotExist(done DoneNotifier) { + d.watcher.OnResourceDoesNotExist(done) } // WatchListener uses xDS to discover the configuration associated with the diff --git a/xds/internal/xdsclient/xdsresource/resource_type.go b/xds/internal/xdsclient/xdsresource/resource_type.go index 3b3a8e79c2b9..82d897b0f6fc 100644 --- a/xds/internal/xdsclient/xdsresource/resource_type.go +++ b/xds/internal/xdsclient/xdsresource/resource_type.go @@ -52,13 +52,24 @@ type Producer interface { WatchResource(rType Type, resourceName string, watcher ResourceWatcher) (cancel func()) } +// DoneNotifier TBD. +type DoneNotifier interface { + Done() +} + +// NopDoneNotifier TDB. +type NopDoneNotifier struct{} + +// Done TBD. +func (NopDoneNotifier) Done() {} + // ResourceWatcher wraps the callbacks to be invoked for different events // corresponding to the resource being watched. type ResourceWatcher interface { // OnUpdate is invoked to report an update for the resource being watched. // The ResourceData parameter needs to be type asserted to the appropriate // type for the resource being watched. - OnUpdate(ResourceData) + OnUpdate(ResourceData, DoneNotifier) // OnError is invoked under different error conditions including but not // limited to the following: @@ -68,11 +79,11 @@ type ResourceWatcher interface { // - resource validation error // - ADS stream failure // - connection failure - OnError(error) + OnError(error, DoneNotifier) // OnResourceDoesNotExist is invoked for a specific error condition where // the requested resource is not found on the xDS management server. - OnResourceDoesNotExist() + OnResourceDoesNotExist(DoneNotifier) } // TODO: Once the implementation is complete, rename this interface as diff --git a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go index 8ce5cb28596e..5cea0fa60407 100644 --- a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go @@ -108,7 +108,7 @@ func (r *RouteConfigResourceData) Raw() *anypb.Any { // events corresponding to the route configuration resource being watched. type RouteConfigWatcher interface { // OnUpdate is invoked to report an update for the resource being watched. - OnUpdate(*RouteConfigResourceData) + OnUpdate(*RouteConfigResourceData, DoneNotifier) // OnError is invoked under different error conditions including but not // limited to the following: @@ -118,28 +118,28 @@ type RouteConfigWatcher interface { // - resource validation error // - ADS stream failure // - connection failure - OnError(error) + OnError(error, DoneNotifier) // OnResourceDoesNotExist is invoked for a specific error condition where // the requested resource is not found on the xDS management server. - OnResourceDoesNotExist() + OnResourceDoesNotExist(DoneNotifier) } type delegatingRouteConfigWatcher struct { watcher RouteConfigWatcher } -func (d *delegatingRouteConfigWatcher) OnUpdate(data ResourceData) { +func (d *delegatingRouteConfigWatcher) OnUpdate(data ResourceData, done DoneNotifier) { rc := data.(*RouteConfigResourceData) - d.watcher.OnUpdate(rc) + d.watcher.OnUpdate(rc, done) } -func (d *delegatingRouteConfigWatcher) OnError(err error) { - d.watcher.OnError(err) +func (d *delegatingRouteConfigWatcher) OnError(err error, done DoneNotifier) { + d.watcher.OnError(err, done) } -func (d *delegatingRouteConfigWatcher) OnResourceDoesNotExist() { - d.watcher.OnResourceDoesNotExist() +func (d *delegatingRouteConfigWatcher) OnResourceDoesNotExist(done DoneNotifier) { + d.watcher.OnResourceDoesNotExist(done) } // WatchRouteConfig uses xDS to discover the configuration associated with the