diff --git a/balancer/weightedroundrobin/balancer.go b/balancer/weightedroundrobin/balancer.go index e957b91b1966..a164d1bedd7e 100644 --- a/balancer/weightedroundrobin/balancer.go +++ b/balancer/weightedroundrobin/balancer.go @@ -419,7 +419,11 @@ func (w *weightedSubConn) OnLoadReport(load *v3orcapb.OrcaLoadReport) { w.logger.Infof("Received load report for subchannel %v: %v", w.SubConn, load) } // Update weights of this subchannel according to the reported load - if load.CpuUtilization == 0 || load.RpsFractional == 0 { + utilization := load.ApplicationUtilization + if utilization == 0 { + utilization = load.CpuUtilization + } + if utilization == 0 || load.RpsFractional == 0 { if w.logger.V(2) { w.logger.Infof("Ignoring empty load report for subchannel %v", w.SubConn) } @@ -430,7 +434,7 @@ func (w *weightedSubConn) OnLoadReport(load *v3orcapb.OrcaLoadReport) { defer w.mu.Unlock() errorRate := load.Eps / load.RpsFractional - w.weightVal = load.RpsFractional / (load.CpuUtilization + errorRate*w.cfg.ErrorUtilizationPenalty) + w.weightVal = load.RpsFractional / (utilization + errorRate*w.cfg.ErrorUtilizationPenalty) if w.logger.V(2) { w.logger.Infof("New weight for subchannel %v: %v", w.SubConn, w.weightVal) } diff --git a/balancer/weightedroundrobin/balancer_test.go b/balancer/weightedroundrobin/balancer_test.go index a0a84a7f057b..1d67bcf1f008 100644 --- a/balancer/weightedroundrobin/balancer_test.go +++ b/balancer/weightedroundrobin/balancer_test.go @@ -110,7 +110,7 @@ func startServer(t *testing.T, r reportType) *testServer { if r := orca.CallMetricsRecorderFromContext(ctx); r != nil { // Copy metrics from what the test set in cmr into r. sm := cmr.(orca.ServerMetricsProvider).ServerMetrics() - r.SetCPUUtilization(sm.CPUUtilization) + r.SetApplicationUtilization(sm.AppUtilization) r.SetQPS(sm.QPS) r.SetEPS(sm.EPS) } @@ -230,10 +230,10 @@ func (s) TestBalancer_TwoAddresses_ReportingEnabledPerCall(t *testing.T) { // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed // disproportionately to srv2 (10:1). srv1.callMetrics.SetQPS(10.0) - srv1.callMetrics.SetCPUUtilization(1.0) + srv1.callMetrics.SetApplicationUtilization(1.0) srv2.callMetrics.SetQPS(10.0) - srv2.callMetrics.SetCPUUtilization(.1) + srv2.callMetrics.SetApplicationUtilization(.1) sc := svcConfig(t, perCallConfig) if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { @@ -253,33 +253,58 @@ func (s) TestBalancer_TwoAddresses_ReportingEnabledPerCall(t *testing.T) { // Tests two addresses with OOB ORCA reporting enabled. Checks the backends // are called in the appropriate ratios. func (s) TestBalancer_TwoAddresses_ReportingEnabledOOB(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() + testCases := []struct { + name string + utilSetter func(orca.ServerMetricsRecorder, float64) + }{{ + name: "application_utilization", + utilSetter: func(smr orca.ServerMetricsRecorder, val float64) { + smr.SetApplicationUtilization(val) + }, + }, { + name: "cpu_utilization", + utilSetter: func(smr orca.ServerMetricsRecorder, val float64) { + smr.SetCPUUtilization(val) + }, + }, { + name: "application over cpu", + utilSetter: func(smr orca.ServerMetricsRecorder, val float64) { + smr.SetApplicationUtilization(val) + smr.SetCPUUtilization(2.0) // ignored because ApplicationUtilization is set + }, + }} - srv1 := startServer(t, reportOOB) - srv2 := startServer(t, reportOOB) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() - // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed - // disproportionately to srv2 (10:1). - srv1.oobMetrics.SetQPS(10.0) - srv1.oobMetrics.SetCPUUtilization(1.0) + srv1 := startServer(t, reportOOB) + srv2 := startServer(t, reportOOB) - srv2.oobMetrics.SetQPS(10.0) - srv2.oobMetrics.SetCPUUtilization(.1) + // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed + // disproportionately to srv2 (10:1). + srv1.oobMetrics.SetQPS(10.0) + tc.utilSetter(srv1.oobMetrics, 1.0) - sc := svcConfig(t, oobConfig) - if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { - t.Fatalf("Error starting client: %v", err) - } - addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}} - srv1.R.UpdateState(resolver.State{Addresses: addrs}) + srv2.oobMetrics.SetQPS(10.0) + tc.utilSetter(srv2.oobMetrics, 0.1) - // Call each backend once to ensure the weights have been received. - ensureReached(ctx, t, srv1.Client, 2) + sc := svcConfig(t, oobConfig) + if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { + t.Fatalf("Error starting client: %v", err) + } + addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}} + srv1.R.UpdateState(resolver.State{Addresses: addrs}) - // Wait for the weight update period to allow the new weights to be processed. - time.Sleep(weightUpdatePeriod) - checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}) + // Call each backend once to ensure the weights have been received. + ensureReached(ctx, t, srv1.Client, 2) + + // Wait for the weight update period to allow the new weights to be processed. + time.Sleep(weightUpdatePeriod) + checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}) + }) + } } // Tests two addresses with OOB ORCA reporting enabled, where the reports @@ -295,10 +320,10 @@ func (s) TestBalancer_TwoAddresses_UpdateLoads(t *testing.T) { // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed // disproportionately to srv2 (10:1). srv1.oobMetrics.SetQPS(10.0) - srv1.oobMetrics.SetCPUUtilization(1.0) + srv1.oobMetrics.SetApplicationUtilization(1.0) srv2.oobMetrics.SetQPS(10.0) - srv2.oobMetrics.SetCPUUtilization(.1) + srv2.oobMetrics.SetApplicationUtilization(.1) sc := svcConfig(t, oobConfig) if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { @@ -317,10 +342,10 @@ func (s) TestBalancer_TwoAddresses_UpdateLoads(t *testing.T) { // Update the loads so srv2 is loaded and srv1 is not; ensure RPCs are // routed disproportionately to srv1. srv1.oobMetrics.SetQPS(10.0) - srv1.oobMetrics.SetCPUUtilization(.1) + srv1.oobMetrics.SetApplicationUtilization(.1) srv2.oobMetrics.SetQPS(10.0) - srv2.oobMetrics.SetCPUUtilization(1.0) + srv2.oobMetrics.SetApplicationUtilization(1.0) // Wait for the weight update period to allow the new weights to be processed. time.Sleep(weightUpdatePeriod + oobReportingInterval) @@ -340,19 +365,19 @@ func (s) TestBalancer_TwoAddresses_OOBThenPerCall(t *testing.T) { // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed // disproportionately to srv2 (10:1). srv1.oobMetrics.SetQPS(10.0) - srv1.oobMetrics.SetCPUUtilization(1.0) + srv1.oobMetrics.SetApplicationUtilization(1.0) srv2.oobMetrics.SetQPS(10.0) - srv2.oobMetrics.SetCPUUtilization(.1) + srv2.oobMetrics.SetApplicationUtilization(.1) // For per-call metrics (not used initially), srv2 reports that it is // loaded and srv1 reports low load. After confirming OOB works, switch to // per-call and confirm the new routing weights are applied. srv1.callMetrics.SetQPS(10.0) - srv1.callMetrics.SetCPUUtilization(.1) + srv1.callMetrics.SetApplicationUtilization(.1) srv2.callMetrics.SetQPS(10.0) - srv2.callMetrics.SetCPUUtilization(1.0) + srv2.callMetrics.SetApplicationUtilization(1.0) sc := svcConfig(t, oobConfig) if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { @@ -396,13 +421,13 @@ func (s) TestBalancer_TwoAddresses_ErrorPenalty(t *testing.T) { // to 0.9 which will cause the weights to be equal and RPCs to be routed // 50/50. srv1.oobMetrics.SetQPS(10.0) - srv1.oobMetrics.SetCPUUtilization(1.0) + srv1.oobMetrics.SetApplicationUtilization(1.0) srv1.oobMetrics.SetEPS(0) // srv1 weight before: 10.0 / 1.0 = 10.0 // srv1 weight after: 10.0 / 1.0 = 10.0 srv2.oobMetrics.SetQPS(10.0) - srv2.oobMetrics.SetCPUUtilization(.1) + srv2.oobMetrics.SetApplicationUtilization(.1) srv2.oobMetrics.SetEPS(10.0) // srv2 weight before: 10.0 / 0.1 = 100.0 // srv2 weight after: 10.0 / 1.0 = 10.0 @@ -476,10 +501,10 @@ func (s) TestBalancer_TwoAddresses_BlackoutPeriod(t *testing.T) { // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed // disproportionately to srv2 (10:1). srv1.oobMetrics.SetQPS(10.0) - srv1.oobMetrics.SetCPUUtilization(1.0) + srv1.oobMetrics.SetApplicationUtilization(1.0) srv2.oobMetrics.SetQPS(10.0) - srv2.oobMetrics.SetCPUUtilization(.1) + srv2.oobMetrics.SetApplicationUtilization(.1) cfg := oobConfig cfg.BlackoutPeriod = tc.blackoutPeriodCfg @@ -544,10 +569,10 @@ func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) { // is 1 minute but the weights expire in 1 second, routing will go to 50/50 // after the weights expire. srv1.oobMetrics.SetQPS(10.0) - srv1.oobMetrics.SetCPUUtilization(1.0) + srv1.oobMetrics.SetApplicationUtilization(1.0) srv2.oobMetrics.SetQPS(10.0) - srv2.oobMetrics.SetCPUUtilization(.1) + srv2.oobMetrics.SetApplicationUtilization(.1) cfg := oobConfig cfg.OOBReportingPeriod = stringp("60s") @@ -594,16 +619,16 @@ func (s) TestBalancer_AddressesChanging(t *testing.T) { // srv1: weight 10 srv1.oobMetrics.SetQPS(10.0) - srv1.oobMetrics.SetCPUUtilization(1.0) + srv1.oobMetrics.SetApplicationUtilization(1.0) // srv2: weight 100 srv2.oobMetrics.SetQPS(10.0) - srv2.oobMetrics.SetCPUUtilization(.1) + srv2.oobMetrics.SetApplicationUtilization(.1) // srv3: weight 20 srv3.oobMetrics.SetQPS(20.0) - srv3.oobMetrics.SetCPUUtilization(1.0) + srv3.oobMetrics.SetApplicationUtilization(1.0) // srv4: weight 200 srv4.oobMetrics.SetQPS(20.0) - srv4.oobMetrics.SetCPUUtilization(.1) + srv4.oobMetrics.SetApplicationUtilization(.1) sc := svcConfig(t, oobConfig) if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {