diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index 7f02728709ca..cc1af4f418a5 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/grpcrand" "google.golang.org/grpc/internal/syscall" "google.golang.org/grpc/status" "google.golang.org/grpc/testdata" @@ -185,11 +186,21 @@ func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benc } } - // TODO add open loop distribution. - switch config.LoadParams.Load.(type) { + // If set, perform an open loop, if not perform a closed loop. An open loop + // asynchronously starts RPCs based on random start times derived from a + // Poisson distribution. A closed loop performs RPCs in a blocking manner, + // and runs the next RPC after the previous RPC completes and returns. + var poissonLambda *float64 + switch t := config.LoadParams.Load.(type) { case *testpb.LoadParams_ClosedLoop: case *testpb.LoadParams_Poisson: - return status.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams) + if t.Poisson == nil { + return status.Errorf(codes.InvalidArgument, "poisson is nil, needs to be set") + } + if t.Poisson.OfferedLoad <= 0 { + return status.Errorf(codes.InvalidArgument, "poisson.offered is <= 0: %v, needs to be >0", t.Poisson.OfferedLoad) + } + poissonLambda = &t.Poisson.OfferedLoad default: return status.Errorf(codes.InvalidArgument, "unknown load params: %v", config.LoadParams) } @@ -198,11 +209,9 @@ func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benc switch config.RpcType { case testpb.RpcType_UNARY: - bc.doCloseLoopUnary(conns, rpcCountPerConn, payloadReqSize, payloadRespSize) - // TODO open loop. + bc.unaryLoop(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, poissonLambda) case testpb.RpcType_STREAMING: - bc.doCloseLoopStreaming(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType) - // TODO open loop. + bc.streamingLoop(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType, poissonLambda) default: return status.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType) } @@ -246,7 +255,7 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) return bc, nil } -func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int) { +func (bc *benchmarkClient) unaryLoop(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, poissonLambda *float64) { for ic, conn := range conns { client := testgrpc.NewBenchmarkServiceClient(conn) // For each connection, create rpcCountPerConn goroutines to do rpc. @@ -260,36 +269,44 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe // Now relying on worker client to reserve time to do warm up. // The worker client needs to wait for some time after client is created, // before starting benchmark. - done := make(chan bool) - for { - go func() { - start := time.Now() - if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil { + if poissonLambda == nil { // Closed loop. + done := make(chan bool) + for { + go func() { + start := time.Now() + if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil { + select { + case <-bc.stop: + case done <- false: + } + return + } + elapse := time.Since(start) + bc.lockingHistograms[idx].add(int64(elapse)) select { case <-bc.stop: - case done <- false: + case done <- true: } - return - } - elapse := time.Since(start) - bc.lockingHistograms[idx].add(int64(elapse)) + }() select { case <-bc.stop: - case done <- true: + return + case <-done: } - }() - select { - case <-bc.stop: - return - case <-done: } + } else { // Open loop. + timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / *poissonLambda) * float64(time.Second)) + time.AfterFunc(timeBetweenRPCs, func() { + bc.poissonUnary(client, idx, reqSize, respSize, *poissonLambda) + }) } + }(idx) } } } -func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string) { +func (bc *benchmarkClient) streamingLoop(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string, poissonLambda *float64) { var doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error if payloadType == "bytebuf" { doRPC = benchmark.DoByteBufStreamingRoundTrip @@ -304,33 +321,69 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou if err != nil { logger.Fatalf("%v.StreamingCall(_) = _, %v", c, err) } - // Create histogram for each goroutine. idx := ic*rpcCountPerConn + j bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions) - // Start goroutine on the created mutex and histogram. - go func(idx int) { - // TODO: do warm up if necessary. - // Now relying on worker client to reserve time to do warm up. - // The worker client needs to wait for some time after client is created, - // before starting benchmark. - for { - start := time.Now() - if err := doRPC(stream, reqSize, respSize); err != nil { - return - } - elapse := time.Since(start) - bc.lockingHistograms[idx].add(int64(elapse)) - select { - case <-bc.stop: - return - default: + if poissonLambda == nil { // Closed loop. + // Start goroutine on the created mutex and histogram. + go func(idx int) { + // TODO: do warm up if necessary. + // Now relying on worker client to reserve time to do warm up. + // The worker client needs to wait for some time after client is created, + // before starting benchmark. + for { + start := time.Now() + if err := doRPC(stream, reqSize, respSize); err != nil { + return + } + elapse := time.Since(start) + bc.lockingHistograms[idx].add(int64(elapse)) + select { + case <-bc.stop: + return + default: + } } - } - }(idx) + }(idx) + } else { // Open loop. + timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / *poissonLambda) * float64(time.Second)) + time.AfterFunc(timeBetweenRPCs, func() { + bc.poissonStreaming(stream, idx, reqSize, respSize, *poissonLambda, doRPC) + }) + } } } } +func (bc *benchmarkClient) poissonUnary(client testgrpc.BenchmarkServiceClient, idx int, reqSize int, respSize int, lambda float64) { + go func() { + start := time.Now() + if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil { + return + } + elapse := time.Since(start) + bc.lockingHistograms[idx].add(int64(elapse)) + }() + timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / lambda) * float64(time.Second)) + time.AfterFunc(timeBetweenRPCs, func() { + bc.poissonUnary(client, idx, reqSize, respSize, lambda) + }) +} + +func (bc *benchmarkClient) poissonStreaming(stream testgrpc.BenchmarkService_StreamingCallClient, idx int, reqSize int, respSize int, lambda float64, doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error) { + go func() { + start := time.Now() + if err := doRPC(stream, reqSize, respSize); err != nil { + return + } + elapse := time.Since(start) + bc.lockingHistograms[idx].add(int64(elapse)) + }() + timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / lambda) * float64(time.Second)) + time.AfterFunc(timeBetweenRPCs, func() { + bc.poissonStreaming(stream, idx, reqSize, respSize, lambda, doRPC) + }) +} + // getStats returns the stats for benchmark client. // It resets lastResetTime and all histograms if argument reset is true. func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { diff --git a/internal/grpcrand/grpcrand.go b/internal/grpcrand/grpcrand.go index d08e3e907666..aa97273e7d13 100644 --- a/internal/grpcrand/grpcrand.go +++ b/internal/grpcrand/grpcrand.go @@ -80,6 +80,13 @@ func Uint32() uint32 { return r.Uint32() } +// ExpFloat64 implements rand.ExpFloat64 on the grpcrand global source. +func ExpFloat64() float64 { + mu.Lock() + defer mu.Unlock() + return r.ExpFloat64() +} + // Shuffle implements rand.Shuffle on the grpcrand global source. var Shuffle = func(n int, f func(int, int)) { mu.Lock()