Skip to content

Commit

Permalink
clusterresolver: handle EDS nacks and resource-not-found errors corre…
Browse files Browse the repository at this point in the history
…ctly (#6436)
  • Loading branch information
easwars authored Jul 11, 2023
1 parent fc0aa46 commit bf5b7ae
Show file tree
Hide file tree
Showing 5 changed files with 571 additions and 15 deletions.
2 changes: 1 addition & 1 deletion xds/internal/balancer/clusterresolver/clusterresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal
b.logger = prefixLogger(b)
b.logger.Infof("Created")

b.resourceWatcher = newResourceResolver(b)
b.resourceWatcher = newResourceResolver(b, b.logger)
b.cc = &ccWrapper{
ClientConn: cc,
resourceWatcher: b.resourceWatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,21 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/types/known/wrapperspb"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
Expand Down Expand Up @@ -771,3 +779,289 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
t.Fatalf("EmptyCall() failed with error %v, want %v", err, dnsErr)
}
}

// TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate tests the
// scenario where the top-level cluster is an aggregate cluster that resolves to
// an EDS and LOGICAL_DNS cluster. The management server first sends a good EDS
// response for the EDS cluster and the test verifies that RPCs get routed to
// the EDS cluster. The management server then sends a bad EDS response. The
// test verifies that the cluster_resolver LB policy continues to use the
// previously received good update and that RPCs still get routed to the EDS
// cluster.
func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()

// Start an xDS management server.
mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()

// Start two test backends and extract their host and port. The first
// backend is used for the EDS cluster and the second backend is used for
// the LOGICAL_DNS cluster.
servers, cleanup3 := startTestServiceBackends(t, 2)
defer cleanup3()
addrs, ports := backendAddressesAndPorts(t, servers)

// Configure an aggregate cluster pointing to an EDS and DNS cluster. Also
// configure an endpoints resource for the EDS cluster.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create xDS client, configure cds_experimental LB policy with a manual
// resolver, and dial the test backends.
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()

// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}

// Update DNS resolver with test backend addresses.
dnsR.UpdateState(resolver.State{Addresses: addrs[1:]})

// Make an RPC and ensure that it gets routed to the first backend since the
// EDS cluster is of higher priority than the LOGICAL_DNS cluster.
client := testgrpc.NewTestServiceClient(cc)
peer := &peer.Peer{}
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() != addrs[0].Addr {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
}

// Push an EDS resource from the management server that is expected to be
// NACKed by the xDS client. Since the cluster_resolver LB policy has a
// previously received good EDS resource, it will continue to use that.
resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Ensure that RPCs continue to get routed to the EDS cluster for the next
// second.
for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() != addrs[0].Addr {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
}
}
}

// TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate tests the
// scenario where the top-level cluster is an aggregate cluster that resolves to
// an EDS and LOGICAL_DNS cluster. The management server sends a bad EDS
// response. The test verifies that the cluster_resolver LB policy falls back to
// the LOGICAL_DNS cluster, because it is supposed to treat the bad EDS response
// as though it received an update with no endpoints.
func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()

// Start an xDS management server.
mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()

// Start two test backends and extract their host and port. The first
// backend is used for the EDS cluster and the second backend is used for
// the LOGICAL_DNS cluster.
servers, cleanup3 := startTestServiceBackends(t, 2)
defer cleanup3()
addrs, ports := backendAddressesAndPorts(t, servers)

// Configure an aggregate cluster pointing to an EDS and DNS cluster.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})},
SkipValidation: true,
}

// Set a load balancing weight of 0 for the backend in the EDS resource.
// This is expected to be NACKed by the xDS client. Since the
// cluster_resolver LB policy has no previously received good EDS resource,
// it will treat this as though it received an update with no endpoints.
resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create xDS client, configure cds_experimental LB policy with a manual
// resolver, and dial the test backends.
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()

// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}

// Update DNS resolver with test backend addresses.
dnsR.UpdateState(resolver.State{Addresses: addrs[1:]})

// Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster.
peer := &peer.Peer{}
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() != addrs[1].Addr {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[1].Addr)
}
}

// TestAggregateCluster_Fallback_EDS_ResourceNotFound tests the scenario where
// the top-level cluster is an aggregate cluster that resolves to an EDS and
// LOGICAL_DNS cluster. The management server does not respond with the EDS
// cluster. The test verifies that the cluster_resolver LB policy falls back to
// the LOGICAL_DNS cluster in this case.
func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()

// Start an xDS management server.
mgmtServer, nodeID, _, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()

// Start a test backend for the LOGICAL_DNS cluster.
server := stubserver.StartTestService(t, nil)
defer server.Stop()

// Configure an aggregate cluster pointing to an EDS and DNS cluster. No
// endpoints are configured for the EDS cluster.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create an xDS client talking to the above management server, configured
// with a short watch expiry timeout.
xdsClient, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
NodeProto: &v3corepb.Node{Id: nodeID},
}, defaultTestWatchExpiryTimeout, time.Duration(0))
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer close()

// Create a manual resolver and push a service config specifying the use of
// the cds LB policy as the top-level LB policy, and a corresponding config
// with a single cluster.
r := manual.NewBuilderWithScheme("whatever")
jsonSC := fmt.Sprintf(`{
"loadBalancingConfig":[{
"cds_experimental":{
"cluster": "%s"
}
}]
}`, clusterName)
scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))

// Create a ClientConn.
cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

// Make an RPC with a short deadline. We expect this RPC to not succeed
// because the DNS resolver has not responded with endpoint addresses.
client := testgrpc.NewTestServiceClient(cc)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded)
}

// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}

// Update DNS resolver with test backend addresses.
dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: server.Address}}})

// Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster.
// Even though the EDS cluster is of higher priority, since the management
// server does not respond with an EDS resource, the cluster_resolver LB
// policy is expected to fallback to the LOGICAL_DNS cluster once the watch
// timeout expires.
peer := &peer.Peer{}
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() != server.Address {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address)
}
}
Loading

0 comments on commit bf5b7ae

Please sign in to comment.