Skip to content

Commit

Permalink
interop: hold lock on server for OOB metrics updates; share 30s timeo…
Browse files Browse the repository at this point in the history
…ut (grpc#6277)
  • Loading branch information
dfawley authored May 12, 2023
1 parent 68381e7 commit 5dcfb37
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 12 deletions.
23 changes: 22 additions & 1 deletion interop/interop_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ pass () {
echo "$(tput setaf 2) $(date): $1 $(tput sgr 0)"
}

withTimeout () {
timer=$1
shift

# Run command in the background.
cmd=$(printf '%q ' "$@")
eval "$cmd" &
wpid=$!
# Kill after 20 seconds.
sleep $timer && kill $wpid &
# Wait for the background thread.
wait $wpid
}

# Don't run some tests that need a special environment:
# "google_default_credentials"
# "compute_engine_channel_credentials"
Expand All @@ -70,6 +84,8 @@ CASES=(
"custom_metadata"
"unimplemented_method"
"unimplemented_service"
"orca_per_rpc"
"orca_oob"
)

# Build server
Expand All @@ -96,7 +112,12 @@ for case in ${CASES[@]}; do
echo "$(tput setaf 4) $(date): testing: ${case} $(tput sgr 0)"

CLIENT_LOG="$(mktemp)"
if ! GRPC_GO_LOG_SEVERITY_LEVEL=info timeout 20 go run ./interop/client --use_tls --server_host_override=foo.test.google.fr --use_test_ca --test_case="${case}" &> $CLIENT_LOG; then
if ! GRPC_GO_LOG_SEVERITY_LEVEL=info withTimeout 20 go run ./interop/client \
--use_tls \
--server_host_override=foo.test.google.fr \
--use_test_ca --test_case="${case}" \
--service_config_json='{ "loadBalancingConfig": [{ "test_backend_metrics_load_balancer": {} }]}' \
&> $CLIENT_LOG; then
fail "FAIL: test case ${case}
got server log:
$(cat $SERVER_LOG)
Expand Down
21 changes: 10 additions & 11 deletions interop/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"io"
"os"
"strings"
"sync"
"time"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -779,6 +780,7 @@ func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.D
type testServer struct {
testgrpc.UnimplementedTestServiceServer

orcaMu sync.Mutex
metricsRecorder orca.ServerMetricsRecorder
}

Expand Down Expand Up @@ -842,11 +844,6 @@ func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*
// recorder in the context, if present.
setORCAMetrics(r, orcaData)
}
if r, orcaData := s.metricsRecorder, in.GetOrcaOobReport(); r != nil && orcaData != nil {
// Transfer the request's OOB ORCA data to the server metrics recorder
// in the server, if present.
setORCAMetrics(r, orcaData)
}
return &testpb.SimpleResponse{
Payload: pl,
}, nil
Expand Down Expand Up @@ -912,6 +909,7 @@ func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallSe
stream.SetTrailer(trailer)
}
}
hasORCALock := false
for {
in, err := stream.Recv()
if err == io.EOF {
Expand All @@ -929,6 +927,11 @@ func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallSe
if r, orcaData := s.metricsRecorder, in.GetOrcaOobReport(); r != nil && orcaData != nil {
// Transfer the request's OOB ORCA data to the server metrics recorder
// in the server, if present.
if !hasORCALock {
s.orcaMu.Lock()
defer s.orcaMu.Unlock()
hasORCALock = true
}
setORCAMetrics(r, orcaData)
}

Expand Down Expand Up @@ -1036,14 +1039,12 @@ func DoORCAOOBTest(tc testgrpc.TestServiceClient) {
logger.Fatalf("/TestService/FullDuplexCall received error receiving: %v", err)
}

ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
want := &v3orcapb.OrcaLoadReport{
CpuUtilization: 0.8210,
MemUtilization: 0.5847,
Utilization: map[string]float64{"util": 0.30499},
}
checkORCAMetrics(ctx2, tc, want)
checkORCAMetrics(ctx, tc, want)

err = stream.Send(&testpb.StreamingOutputCallRequest{
OrcaOobReport: &testpb.TestOrcaReport{
Expand All @@ -1061,14 +1062,12 @@ func DoORCAOOBTest(tc testgrpc.TestServiceClient) {
logger.Fatalf("/TestService/FullDuplexCall received error receiving: %v", err)
}

ctx3, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
want = &v3orcapb.OrcaLoadReport{
CpuUtilization: 0.29309,
MemUtilization: 0.2,
Utilization: map[string]float64{"util": 0.2039},
}
checkORCAMetrics(ctx3, tc, want)
checkORCAMetrics(ctx, tc, want)
}

func checkORCAMetrics(ctx context.Context, tc testgrpc.TestServiceClient, want *v3orcapb.OrcaLoadReport) {
Expand Down

0 comments on commit 5dcfb37

Please sign in to comment.