Skip to content

Commit

Permalink
test: allow set request/response size in interop soak test (grpc#6513)
Browse files Browse the repository at this point in the history
  • Loading branch information
mohanli-ml authored Aug 9, 2023
1 parent 07609e1 commit 92b481a
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 10 deletions.
6 changes: 4 additions & 2 deletions interop/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ var (
soakPerIterationMaxAcceptableLatencyMs = flag.Int("soak_per_iteration_max_acceptable_latency_ms", 1000, "The number of milliseconds a single iteration in the two soak tests (rpc_soak and channel_soak) should take.")
soakOverallTimeoutSeconds = flag.Int("soak_overall_timeout_seconds", 10, "The overall number of seconds after which a soak test should stop and fail, if the desired number of iterations have not yet completed.")
soakMinTimeMsBetweenRPCs = flag.Int("soak_min_time_ms_between_rpcs", 0, "The minimum time in milliseconds between consecutive RPCs in a soak test (rpc_soak or channel_soak), useful for limiting QPS")
soakRequestSize = flag.Int("soak_request_size", 271828, "The request size in a soak RPC. The default value is set based on the interop large unary test case.")
soakResponseSize = flag.Int("soak_response_size", 314159, "The response size in a soak RPC. The default value is set based on the interop large unary test case.")
tlsServerName = flag.String("server_host_override", "", "The server name used to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.")
additionalMetadata = flag.String("additional_metadata", "", "Additional metadata to send in each request, as a semicolon-separated list of key:value pairs.")
testCase = flag.String("test_case", "large_unary",
Expand Down Expand Up @@ -352,10 +354,10 @@ func main() {
interop.DoPickFirstUnary(tc)
logger.Infoln("PickFirstUnary done")
case "rpc_soak":
interop.DoSoakTest(tc, serverAddr, opts, false /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second))
interop.DoSoakTest(tc, serverAddr, opts, false /* resetChannel */, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second))
logger.Infoln("RpcSoak done")
case "channel_soak":
interop.DoSoakTest(tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second))
interop.DoSoakTest(tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second))
logger.Infoln("ChannelSoak done")
case "orca_per_rpc":
interop.DoORCAPerRPCTest(tc)
Expand Down
15 changes: 8 additions & 7 deletions interop/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ func DoPickFirstUnary(tc testgrpc.TestServiceClient) {
}
}

func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, resetChannel bool, serverAddr string, dopts []grpc.DialOption, copts []grpc.CallOption) (latency time.Duration, err error) {
func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, resetChannel bool, serverAddr string, soakRequestSize int, soakResponseSize int, dopts []grpc.DialOption, copts []grpc.CallOption) (latency time.Duration, err error) {
start := time.Now()
client := tc
if resetChannel {
Expand All @@ -699,10 +699,10 @@ func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, rese
// per test spec, don't include channel shutdown in latency measurement
defer func() { latency = time.Since(start) }()
// do a large-unary RPC
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, soakRequestSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(largeRespSize),
ResponseSize: int32(soakResponseSize),
Payload: pl,
}
var reply *testpb.SimpleResponse
Expand All @@ -713,8 +713,8 @@ func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, rese
}
t := reply.GetPayload().GetType()
s := len(reply.GetPayload().GetBody())
if t != testpb.PayloadType_COMPRESSABLE || s != largeRespSize {
err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, largeRespSize)
if t != testpb.PayloadType_COMPRESSABLE || s != soakResponseSize {
err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, soakResponseSize)
return
}
return
Expand All @@ -723,7 +723,8 @@ func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, rese
// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds.
// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new
// stub that is created with the provided server address and dial options.
func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, perIterationMaxAcceptableLatency time.Duration, minTimeBetweenRPCs time.Duration, overallDeadline time.Time) {
// TODO(mohanli-ml): Create SoakTestOptions as a parameter for this method.
func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, soakRequestSize int, soakResponseSize int, perIterationMaxAcceptableLatency time.Duration, minTimeBetweenRPCs time.Duration, overallDeadline time.Time) {
start := time.Now()
ctx, cancel := context.WithDeadline(context.Background(), overallDeadline)
defer cancel()
Expand All @@ -743,7 +744,7 @@ func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.D
earliestNextStart := time.After(minTimeBetweenRPCs)
iterationsDone++
var p peer.Peer
latency, err := doOneSoakIteration(ctx, tc, resetChannel, serverAddr, dopts, []grpc.CallOption{grpc.Peer(&p)})
latency, err := doOneSoakIteration(ctx, tc, resetChannel, serverAddr, soakRequestSize, soakResponseSize, dopts, []grpc.CallOption{grpc.Peer(&p)})
latencyMs := int64(latency / time.Millisecond)
h.Add(latencyMs)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion interop/xds_federation/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ var (
soakPerIterationMaxAcceptableLatencyMs = flag.Int("soak_per_iteration_max_acceptable_latency_ms", 1000, "The number of milliseconds a single iteration in the two soak tests (rpc_soak and channel_soak) should take.")
soakOverallTimeoutSeconds = flag.Int("soak_overall_timeout_seconds", 10, "The overall number of seconds after which a soak test should stop and fail, if the desired number of iterations have not yet completed.")
soakMinTimeMsBetweenRPCs = flag.Int("soak_min_time_ms_between_rpcs", 0, "The minimum time in milliseconds between consecutive RPCs in a soak test (rpc_soak or channel_soak), useful for limiting QPS")
soakRequestSize = flag.Int("soak_request_size", 271828, "The request size in a soak RPC. The default value is set based on the interop large unary test case.")
soakResponseSize = flag.Int("soak_response_size", 314159, "The response size in a soak RPC. The default value is set based on the interop large unary test case.")
testCase = flag.String("test_case", "rpc_soak",
`Configure different test cases. Valid options are:
rpc_soak: sends --soak_iterations large_unary RPCs;
Expand Down Expand Up @@ -116,7 +118,7 @@ func main() {
for i := range clients {
wg.Add(1)
go func(c clientConfig) {
interop.DoSoakTest(c.tc, c.uri, c.opts, resetChannel, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second))
interop.DoSoakTest(c.tc, c.uri, c.opts, resetChannel, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second))
logger.Infof("%s test done for server: %s", *testCase, c.uri)
wg.Done()
}(clients[i])
Expand Down
Binary file added interop/xds_federation/xds_federation
Binary file not shown.

0 comments on commit 92b481a

Please sign in to comment.