Skip to content

Commit

Permalink
balancer/rls: Add picker and cache unit tests for RLS Metrics (#7614)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq authored Sep 25, 2024
1 parent a9ff62d commit 218811e
Show file tree
Hide file tree
Showing 6 changed files with 348 additions and 100 deletions.
58 changes: 58 additions & 0 deletions balancer/rls/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,61 @@ func (s) TestDataCache_ResetBackoffState(t *testing.T) {
t.Fatalf("unexpected diff in backoffState for cache entry after dataCache.resetBackoffState(): %s", diff)
}
}

func (s) TestDataCache_Metrics(t *testing.T) {
cacheEntriesMetricsTests := []*cacheEntry{
{size: 1},
{size: 2},
{size: 3},
{size: 4},
{size: 5},
}
tmr := stats.NewTestMetricsRecorder()
dc := newDataCache(50, nil, tmr, "")

dc.updateRLSServerTarget("rls-server-target")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntriesMetricsTests[i])
}

const cacheEntriesKey = "grpc.lb.rls.cache_entries"
const cacheSizeKey = "grpc.lb.rls.cache_size"
// 5 total entries which add up to 15 size, so should record that.
if got, _ := tmr.Metric(cacheEntriesKey); got != 5 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 5)
}
if got, _ := tmr.Metric(cacheSizeKey); got != 15 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 15)
}

// Resize down the cache to 2 entries (deterministic as based of LRU).
dc.resize(9)
if got, _ := tmr.Metric(cacheEntriesKey); got != 2 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 2)
}
if got, _ := tmr.Metric(cacheSizeKey); got != 9 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 9)
}

// Update an entry to have size 6. This should reflect in the size metrics,
// which will increase by 1 to 11, while the number of cache entries should
// stay same. This write is deterministic and writes to the last one.
dc.updateEntrySize(cacheEntriesMetricsTests[4], 6)

if got, _ := tmr.Metric(cacheEntriesKey); got != 2 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 2)
}
if got, _ := tmr.Metric(cacheSizeKey); got != 10 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 10)
}

// Delete this scaled up cache key. This should scale down the cache to 1
// entries, and remove 6 size so cache size should be 4.
dc.deleteAndCleanup(cacheKeys[4], cacheEntriesMetricsTests[4])
if got, _ := tmr.Metric(cacheEntriesKey); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 1)
}
if got, _ := tmr.Metric(cacheSizeKey); got != 4 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 4)
}
}
168 changes: 167 additions & 1 deletion balancer/rls/picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/stubserver"
rlstest "google.golang.org/grpc/internal/testutils/rls"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -246,6 +248,133 @@ func (s) TestPick_DataCacheMiss_PendingEntryExists(t *testing.T) {
}
}

// Test_RLSDefaultTargetPicksMetric tests the default target picks metric. It
// configures an RLS Balancer which specifies to route to the default target in
// the RLS Configuration, and makes an RPC on a Channel containing this RLS
// Balancer. This test then asserts a default target picks metric is emitted,
// and target pick or failed pick metric is not emitted.
func (s) Test_RLSDefaultTargetPicksMetric(t *testing.T) {
// Start an RLS server and set the throttler to always throttle requests.
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())

// Build RLS service config with a default target.
rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
defBackendCh, defBackendAddress := startBackend(t)
rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress

// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

tmr := stats.NewTestMetricsRecorder()
cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()

// Make an RPC and ensure it gets routed to the default target.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)

if got, _ := tmr.Metric("grpc.lb.rls.default_target_picks"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.rls.default_target_picks", got, 1)
}
if _, ok := tmr.Metric("grpc.lb.rls.target_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.target_picks")
}
if _, ok := tmr.Metric("grpc.lb.rls.failed_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.failed_picks")
}
}

// Test_RLSTargetPicksMetric tests the target picks metric. It configures an RLS
// Balancer which specifies to route to a target through a RouteLookupResponse,
// and makes an RPC on a Channel containing this RLS Balancer. This test then
// asserts a target picks metric is emitted, and default target pick or failed
// pick metric is not emitted.
func (s) Test_RLSTargetPicksMetric(t *testing.T) {
// Start an RLS server and set the throttler to never throttle requests.
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Build the RLS config without a default target.
rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)

// Start a test backend, and setup the fake RLS server to return this as a
// target in the RLS response.
testBackendCh, testBackendAddress := startBackend(t)
rlsServer.SetResponseCallback(func(context.Context, *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
})

// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

tmr := stats.NewTestMetricsRecorder()
// Dial the backend.
cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()

// Make an RPC and ensure it gets routed to the test backend.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
if got, _ := tmr.Metric("grpc.lb.rls.target_picks"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.rls.target_picks", got, 1)
}
if _, ok := tmr.Metric("grpc.lb.rls.default_target_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.default_target_picks")
}
if _, ok := tmr.Metric("grpc.lb.rls.failed_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.failed_picks")
}
}

// Test_RLSFailedPicksMetric tests the failed picks metric. It configures an RLS
// Balancer to fail a pick with unavailable, and makes an RPC on a Channel
// containing this RLS Balancer. This test then asserts a failed picks metric is
// emitted, and default target pick or target pick metric is not emitted.
func (s) Test_RLSFailedPicksMetric(t *testing.T) {
// Start an RLS server and set the throttler to never throttle requests.
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Build an RLS config without a default target.
rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)

// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

tmr := stats.NewTestMetricsRecorder()
// Dial the backend.
cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()

// Make an RPC and expect it to fail with deadline exceeded error. We use a
// smaller timeout to ensure that the test doesn't run very long.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errors.New("RLS response's target list does not contain any entries for key"))

if got, _ := tmr.Metric("grpc.lb.rls.failed_picks"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.rls.failed_picks", got, 1)
}
if _, ok := tmr.Metric("grpc.lb.rls.target_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.target_picks")
}
if _, ok := tmr.Metric("grpc.lb.rls.default_target_picks"); ok {
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.default_target_picks")
}
}

// Test verifies the scenario where there is a matching entry in the data cache
// which is valid and there is no pending request. The pick is expected to be
// delegated to the child policy.
Expand All @@ -256,7 +385,6 @@ func (s) TestPick_DataCacheHit_NoPendingEntry_ValidEntry(t *testing.T) {

// Build the RLS config without a default target.
rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)

// Start a test backend, and setup the fake RLS server to return this as a
// target in the RLS response.
testBackendCh, testBackendAddress := startBackend(t)
Expand Down Expand Up @@ -881,3 +1009,41 @@ func TestIsFullMethodNameValid(t *testing.T) {
})
}
}

// Tests the conversion of the child pickers error to the pick result attribute.
func (s) TestChildPickResultError(t *testing.T) {
tests := []struct {
name string
err error
want string
}{
{
name: "nil",
err: nil,
want: "complete",
},
{
name: "errNoSubConnAvailable",
err: balancer.ErrNoSubConnAvailable,
want: "queue",
},
{
name: "status error",
err: status.Error(codes.Unimplemented, "unimplemented"),
want: "drop",
},
{
name: "other error",
err: errors.New("some error"),
want: "fail",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if got := errToPickResult(test.err); got != test.want {
t.Fatalf("errToPickResult(%q) = %v, want %v", test.err, got, test.want)
}
})
}
}
22 changes: 15 additions & 7 deletions balancer/weightedroundrobin/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ var (
OOBReportingPeriod: stringp("0.005s"),
BlackoutPeriod: stringp("0s"),
WeightExpirationPeriod: stringp("60s"),
WeightUpdatePeriod: stringp(".050s"),
WeightUpdatePeriod: stringp("30s"),
ErrorUtilizationPenalty: float64p(0),
}
)
Expand Down Expand Up @@ -224,8 +224,8 @@ func (s) TestWRRMetricsBasic(t *testing.T) {
srv := startServer(t, reportCall)
sc := svcConfig(t, testMetricsConfig)

mr := stats.NewTestMetricsRecorder(t)
if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil {
tmr := stats.NewTestMetricsRecorder()
if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(tmr)); err != nil {
t.Fatalf("Error starting client: %v", err)
}
srv.callMetrics.SetQPS(float64(1))
Expand All @@ -234,12 +234,20 @@ func (s) TestWRRMetricsBasic(t *testing.T) {
t.Fatalf("Error from EmptyCall: %v", err)
}

mr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) // Falls back because only one SubConn.
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) // The endpoint weight has not expired so this is 0 (never emitted).
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1)
if got, _ := tmr.Metric("grpc.lb.wrr.rr_fallback"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.rr_fallback", got, 1)
}
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != 0 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, 0)
}
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_not_yet_usable"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_not_yet_usable", got, 1)
}
// Unusable, so no endpoint weight. Due to only one SubConn, this will never
// update the weight. Thus, this will stay 0.
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != 0 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, 0)
}
}

// Tests two addresses with ORCA reporting disabled (should fall back to pure
Expand Down
24 changes: 17 additions & 7 deletions balancer/weightedroundrobin/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
tmr := stats.NewTestMetricsRecorder(t)
tmr := stats.NewTestMetricsRecorder()
wsc := &weightedSubConn{
metricsRecorder: tmr,
weightVal: 3,
Expand All @@ -117,9 +117,15 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {
}
wsc.weight(test.nowTime, test.weightExpirationPeriod, test.blackoutPeriod, true)

tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", test.endpointWeightStaleWant)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", test.endpointWeightNotYetUsableWant)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", test.endpointWeightWant)
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != test.endpointWeightStaleWant {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, test.endpointWeightStaleWant)
}
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_not_yet_usable"); got != test.endpointWeightNotYetUsableWant {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_not_yet_usable", got, test.endpointWeightNotYetUsableWant)
}
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != test.endpointWeightStaleWant {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, test.endpointWeightStaleWant)
}
})
}

Expand All @@ -130,7 +136,7 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {
// with no weights. Both of these should emit a count metric for round robin
// fallback.
func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) {
tmr := stats.NewTestMetricsRecorder(t)
tmr := stats.NewTestMetricsRecorder()
wsc := &weightedSubConn{
metricsRecorder: tmr,
weightVal: 0,
Expand All @@ -147,7 +153,9 @@ func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) {
// There is only one SubConn, so no matter if the SubConn has a weight or
// not will fallback to round robin.
p.regenerateScheduler()
tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1)
if got, _ := tmr.Metric("grpc.lb.wrr.rr_fallback"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.rr_fallback", got, 1)
}
tmr.ClearMetrics()

// With two SubConns, if neither of them have weights, it will also fallback
Expand All @@ -159,5 +167,7 @@ func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) {
}
p.subConns = append(p.subConns, wsc2)
p.regenerateScheduler()
tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1)
if got, _ := tmr.Metric("grpc.lb.wrr.rr_fallback"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.rr_fallback", got, 1)
}
}
Loading

0 comments on commit 218811e

Please sign in to comment.