Skip to content

Commit

Permalink
[xDS] Delay ADS read when watchers need time (grpc#34942)
Browse files Browse the repository at this point in the history
Closes grpc#34942
Fixes grpc#34099

COPYBARA_INTEGRATE_REVIEW=grpc#34942 from eugeneo:tasks/ads-delay cb43dce
PiperOrigin-RevId: 594279268
  • Loading branch information
eugeneo authored and copybara-github committed Dec 28, 2023
1 parent c12a564 commit a2c7a70
Show file tree
Hide file tree
Showing 16 changed files with 436 additions and 147 deletions.
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3674,6 +3674,7 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
"absl/cleanup",
"absl/memory",
"absl/status",
"absl/status:statusor",
Expand Down
3 changes: 3 additions & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -4455,6 +4455,7 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
"absl/cleanup",
"absl/functional:bind_front",
"absl/memory",
"absl/random",
Expand Down Expand Up @@ -4664,6 +4665,7 @@ grpc_cc_library(
"//:ref_counted_ptr",
"//:sockaddr_utils",
"//:uri_parser",
"//:xds_client",
],
)

Expand Down Expand Up @@ -4732,6 +4734,7 @@ grpc_cc_library(
"//:orphanable",
"//:ref_counted_ptr",
"//:work_serializer",
"//:xds_client",
],
)

Expand Down
21 changes: 14 additions & 7 deletions src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h"
#include "src/core/ext/xds/certificate_provider_store.h"
#include "src/core/ext/xds/xds_certificate_provider.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_client_grpc.h"
#include "src/core/ext/xds/xds_cluster.h"
#include "src/core/ext/xds/xds_common_types.h"
Expand Down Expand Up @@ -123,26 +124,32 @@ class CdsLb : public LoadBalancingPolicy {
: parent_(std::move(parent)), name_(std::move(name)) {}

void OnResourceChanged(
std::shared_ptr<const XdsClusterResource> cluster_data) override {
std::shared_ptr<const XdsClusterResource> cluster_data,
RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
parent_->work_serializer()->Run(
[self = RefAsSubclass<ClusterWatcher>(),
cluster_data = std::move(cluster_data)]() mutable {
cluster_data = std::move(cluster_data),
read_handle = std::move(read_delay_handle)]() mutable {
self->parent_->OnClusterChanged(self->name_,
std::move(cluster_data));
},
DEBUG_LOCATION);
}
void OnError(absl::Status status) override {
void OnError(
absl::Status status,
RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
parent_->work_serializer()->Run(
[self = RefAsSubclass<ClusterWatcher>(),
status = std::move(status)]() mutable {
[self = RefAsSubclass<ClusterWatcher>(), status = std::move(status),
read_handle = std::move(read_delay_handle)]() mutable {
self->parent_->OnError(self->name_, std::move(status));
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override {
void OnResourceDoesNotExist(
RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
parent_->work_serializer()->Run(
[self = RefAsSubclass<ClusterWatcher>()]() {
[self = RefAsSubclass<ClusterWatcher>(),
read_handle = std::move(read_delay_handle)]() {
self->parent_->OnResourceDoesNotExist(self->name_);
},
DEBUG_LOCATION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,26 +215,33 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
~EndpointWatcher() override {
discovery_mechanism_.reset(DEBUG_LOCATION, "EndpointWatcher");
}
void OnResourceChanged(
std::shared_ptr<const XdsEndpointResource> update) override {
void OnResourceChanged(std::shared_ptr<const XdsEndpointResource> update,
RefCountedPtr<XdsClient::ReadDelayHandle>
read_delay_handle) override {
discovery_mechanism_->parent()->work_serializer()->Run(
[self = RefAsSubclass<EndpointWatcher>(),
update = std::move(update)]() mutable {
update = std::move(update),
read_delay_handle = std::move(read_delay_handle)]() mutable {
self->OnResourceChangedHelper(std::move(update));
},
DEBUG_LOCATION);
}
void OnError(absl::Status status) override {
void OnError(absl::Status status,
RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)
override {
discovery_mechanism_->parent()->work_serializer()->Run(
[self = RefAsSubclass<EndpointWatcher>(),
status = std::move(status)]() mutable {
status = std::move(status),
read_delay_handle = std::move(read_delay_handle)]() mutable {
self->OnErrorHelper(std::move(status));
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override {
void OnResourceDoesNotExist(RefCountedPtr<XdsClient::ReadDelayHandle>
read_delay_handle) override {
discovery_mechanism_->parent()->work_serializer()->Run(
[self = RefAsSubclass<EndpointWatcher>()]() {
[self = RefAsSubclass<EndpointWatcher>(),
read_delay_handle = std::move(read_delay_handle)]() {
self->OnResourceDoesNotExistHelper();
},
DEBUG_LOCATION);
Expand Down
40 changes: 27 additions & 13 deletions src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_bootstrap_grpc.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_client_grpc.h"
#include "src/core/ext/xds/xds_http_filters.h"
#include "src/core/ext/xds/xds_listener.h"
Expand Down Expand Up @@ -99,6 +100,8 @@ TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");

namespace {

using ReadDelayHandle = XdsClient::ReadDelayHandle;

//
// XdsResolver
//
Expand Down Expand Up @@ -141,26 +144,31 @@ class XdsResolver : public Resolver {
explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
: resolver_(std::move(resolver)) {}
void OnResourceChanged(
std::shared_ptr<const XdsListenerResource> listener) override {
std::shared_ptr<const XdsListenerResource> listener,
RefCountedPtr<ReadDelayHandle> read_delay_handle) override {
resolver_->work_serializer_->Run(
[self = RefAsSubclass<ListenerWatcher>(),
listener = std::move(listener)]() mutable {
listener = std::move(listener),
read_delay_handle = std::move(read_delay_handle)]() mutable {
self->resolver_->OnListenerUpdate(std::move(listener));
},
DEBUG_LOCATION);
}
void OnError(absl::Status status) override {
void OnError(absl::Status status,
RefCountedPtr<ReadDelayHandle> read_delay_handle) override {
resolver_->work_serializer_->Run(
[self = RefAsSubclass<ListenerWatcher>(),
status = std::move(status)]() mutable {
[self = RefAsSubclass<ListenerWatcher>(), status = std::move(status),
read_delay_handle = std::move(read_delay_handle)]() mutable {
self->resolver_->OnError(self->resolver_->lds_resource_name_,
std::move(status));
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override {
void OnResourceDoesNotExist(
RefCountedPtr<ReadDelayHandle> read_delay_handle) override {
resolver_->work_serializer_->Run(
[self = RefAsSubclass<ListenerWatcher>()]() {
[self = RefAsSubclass<ListenerWatcher>(),
read_delay_handle = std::move(read_delay_handle)]() {
self->resolver_->OnResourceDoesNotExist(
absl::StrCat(self->resolver_->lds_resource_name_,
": xDS listener resource does not exist"));
Expand All @@ -178,28 +186,34 @@ class XdsResolver : public Resolver {
explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
: resolver_(std::move(resolver)) {}
void OnResourceChanged(
std::shared_ptr<const XdsRouteConfigResource> route_config) override {
std::shared_ptr<const XdsRouteConfigResource> route_config,
RefCountedPtr<ReadDelayHandle> read_delay_handle) override {
resolver_->work_serializer_->Run(
[self = RefAsSubclass<RouteConfigWatcher>(),
route_config = std::move(route_config)]() mutable {
route_config = std::move(route_config),
read_delay_handle = std::move(read_delay_handle)]() mutable {
if (self != self->resolver_->route_config_watcher_) return;
self->resolver_->OnRouteConfigUpdate(std::move(route_config));
},
DEBUG_LOCATION);
}
void OnError(absl::Status status) override {
void OnError(absl::Status status,
RefCountedPtr<ReadDelayHandle> read_delay_handle) override {
resolver_->work_serializer_->Run(
[self = RefAsSubclass<RouteConfigWatcher>(),
status = std::move(status)]() mutable {
status = std::move(status),
read_delay_handle = std::move(read_delay_handle)]() mutable {
if (self != self->resolver_->route_config_watcher_) return;
self->resolver_->OnError(self->resolver_->route_config_name_,
std::move(status));
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override {
void OnResourceDoesNotExist(
RefCountedPtr<ReadDelayHandle> read_delay_handle) override {
resolver_->work_serializer_->Run(
[self = RefAsSubclass<RouteConfigWatcher>()]() {
[self = RefAsSubclass<RouteConfigWatcher>(),
read_delay_handle = std::move(read_delay_handle)]() {
if (self != self->resolver_->route_config_watcher_) return;
self->resolver_->OnResourceDoesNotExist(absl::StrCat(
self->resolver_->route_config_name_,
Expand Down
Loading

0 comments on commit a2c7a70

Please sign in to comment.