Skip to content

Commit

Permalink
cdsbalancer: switch cluster watch to generic xDS client API (#6600)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Sep 12, 2023
1 parent 0317200 commit 82a568d
Show file tree
Hide file tree
Showing 4 changed files with 564 additions and 639 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func makeLogicalDNSClusterResource(name, dnsHost string, dnsPort uint32) *v3clus
// cluster resource. The test verifies that the load balancing configuration
// pushed to the cluster_resolver LB policy is contains the expected discovery
// mechanism corresponding to the leaf cluster, on both occasions.
func (s) TestClusterHandlerSuccess_LeafNode(t *testing.T) {
func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) {
tests := []struct {
name string
firstClusterResource *v3clusterpb.Cluster
Expand Down Expand Up @@ -657,9 +657,11 @@ func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) {
// Tests the scenario where the aggregate cluster graph has a node that has
// child node of itself. The case for this is A -> A, and since there is no base
// cluster (EDS or Logical DNS), no configuration should be pushed to the child
// policy. Then the test updates A -> B, where B is a leaf EDS cluster. Verifies
// that configuration is pushed to the child policy and that an RPC can be
// successfully made.
// policy. The channel is expected to move to TRANSIENT_FAILURE and RPCs are
// expected to fail with code UNAVAILABLE and an error message specifying that
// the aggregate cluster grpah no leaf clusters. Then the test updates A -> B,
// where B is a leaf EDS cluster. Verifies that configuration is pushed to the
// child policy and that an RPC can be successfully made.
func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) {
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
Expand Down Expand Up @@ -687,6 +689,19 @@ func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) {
case <-time.After(defaultTestShortTimeout):
}

testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)

// Verify that the RPC fails with expected code.
client := testgrpc.NewTestServiceClient(cc)
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if gotCode, wantCode := status.Code(err), codes.Unavailable; gotCode != wantCode {
t.Fatalf("EmptyCall() failed with code: %v, want %v", gotCode, wantCode)
}
const wantErr = "aggregate cluster graph has no leaf clusters"
if !strings.Contains(err.Error(), wantErr) {
t.Fatalf("EmptyCall() failed with err: %v, want error containing %s", err, wantErr)
}

// Start a test service backend.
server := stubserver.StartTestService(t, nil)
t.Cleanup(server.Stop)
Expand Down Expand Up @@ -719,6 +734,111 @@ func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) {
t.Fatal(err)
}

// Verify that a successful RPC can be made.
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
}

// Tests the scenario where the aggregate cluster graph contains a cycle and
// contains no leaf clusters. The case used here is [A -> B, B -> A]. As there
// are no leaf clusters in this graph, no configuration should be pushed to the
// child policy. The channel is expected to move to TRANSIENT_FAILURE and RPCs
// are expected to fail with code UNAVAILABLE and an error message specifying
// that the aggregate cluster graph has no leaf clusters.
func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) {
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)

const (
clusterNameA = clusterName // cluster name in cds LB policy config
clusterNameB = clusterName + "-B"
)
// Configure the management server with an aggregate cluster resource graph
// that contains a cycle and no leaf clusters.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterNameA, []string{clusterNameB}),
makeAggregateClusterResource(clusterNameB, []string{clusterNameA}),
},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

select {
case cfg := <-lbCfgCh:
t.Fatalf("Child policy received configuration when not expected to: %s", pretty.ToJSON(cfg))
case <-time.After(defaultTestShortTimeout):
}

testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)

// Verify that the RPC fails with expected code.
client := testgrpc.NewTestServiceClient(cc)
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if gotCode, wantCode := status.Code(err), codes.Unavailable; gotCode != wantCode {
t.Fatalf("EmptyCall() failed with code: %v, want %v", gotCode, wantCode)
}
const wantErr = "aggregate cluster graph has no leaf clusters"
if !strings.Contains(err.Error(), wantErr) {
t.Fatalf("EmptyCall() failed with err: %v, want %s", err, wantErr)
}
}

// Tests the scenario where the aggregate cluster graph contains a cycle and
// also contains a leaf cluster. The case used here is [A -> B, B -> A, C]. As
// there is a leaf cluster in this graph , configuration should be pushed to the
// child policy and RPCs should get routed to that leaf cluster.
func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) {
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)

// Start a test service backend.
server := stubserver.StartTestService(t, nil)
t.Cleanup(server.Stop)

const (
clusterNameA = clusterName // cluster name in cds LB policy config
clusterNameB = clusterName + "-B"
clusterNameC = clusterName + "-C"
)
// Configure the management server with an aggregate cluster resource graph
// that contains a cycle, but also contains a leaf cluster.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterNameA, []string{clusterNameB}),
makeAggregateClusterResource(clusterNameB, []string{clusterNameA, clusterNameC}),
e2e.DefaultCluster(clusterNameC, serviceName, e2e.SecurityLevelNone),
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Verify the configuration pushed to the child policy.
wantChildCfg := &clusterresolver.LBConfig{
DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
Cluster: clusterNameC,
Type: clusterresolver.DiscoveryMechanismTypeEDS,
EDSServiceName: serviceName,
OutlierDetection: json.RawMessage(`{}`),
}},
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
}
if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
t.Fatal(err)
}

// Verify that a successful RPC can be made.
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
Expand Down
Loading

0 comments on commit 82a568d

Please sign in to comment.