diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index 48082e2069fa..5413613375a2 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -495,7 +495,7 @@ func (s) TestGRPCLBWeighted(t *testing.T) { tss.ls.sls <- &lbpb.ServerList{Servers: bes} for i := 0; i < 1000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port]) @@ -605,18 +605,18 @@ func (s) TestDropRequest(t *testing.T) { // 1st RPCs pick the first item in server list. They should succeed // since they choose the non-drop-request backend according to the // round robin policy. - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(!failfast)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(!failfast)); err != nil { t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } // 2nd RPCs pick the second item in server list. They should succeed // since they choose the non-drop-request backend according to the // round robin policy. - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(!failfast)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(!failfast)); err != nil { t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } // 3rd RPCs should fail, because they pick last item in server list, // with Drop set to true. - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(!failfast)); status.Code(err) != codes.Unavailable { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(!failfast)); status.Code(err) != codes.Unavailable { t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, %s", testC, err, codes.Unavailable) } } @@ -625,7 +625,7 @@ func (s) TestDropRequest(t *testing.T) { // Make one more RPC to move the picker index one step further, so it's not // 0. The following RPCs will test that drop index is not reset. If picker // index is at 0, we cannot tell whether it's reset or not. - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } @@ -636,18 +636,18 @@ func (s) TestDropRequest(t *testing.T) { time.Sleep(time.Second) for i := 0; i < 3; i++ { var p peer.Peer - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } if want := tss.bePorts[1]; p.Addr.(*net.TCPAddr).Port != want { t.Errorf("got peer: %v, want peer port: %v", p.Addr, want) } - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.Unavailable { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.Unavailable { t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, %s", testC, err, codes.Unavailable) } - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } if want := tss.bePorts[1]; p.Addr.(*net.TCPAddr).Port != want { @@ -709,7 +709,7 @@ func (s) TestBalancerDisconnects(t *testing.T) { }}}) var p peer.Peer - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } if p.Addr.(*net.TCPAddr).Port != tests[0].bePorts[0] { @@ -720,7 +720,7 @@ func (s) TestBalancerDisconnects(t *testing.T) { // Stop balancer[0], balancer[1] should be used by grpclb. // Check peer address to see if that happened. for i := 0; i < 1000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } if p.Addr.(*net.TCPAddr).Port == tests[1].bePorts[0] { @@ -784,7 +784,7 @@ func (s) TestFallback(t *testing.T) { }}}) var p peer.Peer - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, ", err) } if p.Addr.String() != beLis.Addr().String() { @@ -802,7 +802,7 @@ func (s) TestFallback(t *testing.T) { var backendUsed bool for i := 0; i < 1000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] { @@ -821,7 +821,7 @@ func (s) TestFallback(t *testing.T) { var fallbackUsed bool for i := 0; i < 2000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { // Because we are hard-closing the connection, above, it's possible // for the first RPC attempt to be sent on the old connection, // which will lead to an Unavailable error when it is closed. @@ -847,7 +847,7 @@ func (s) TestFallback(t *testing.T) { var backendUsed2 bool for i := 0; i < 2000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] { @@ -913,7 +913,7 @@ func (s) TestExplicitFallback(t *testing.T) { var p peer.Peer var backendUsed bool for i := 0; i < 2000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] { @@ -931,7 +931,7 @@ func (s) TestExplicitFallback(t *testing.T) { var fallbackUsed bool for i := 0; i < 2000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } if p.Addr.String() == beLis.Addr().String() { @@ -949,7 +949,7 @@ func (s) TestExplicitFallback(t *testing.T) { backendUsed = false for i := 0; i < 2000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] { @@ -1067,7 +1067,7 @@ func (s) TestFallBackWithNoServerAddress(t *testing.T) { var backendUsed bool for i := 0; i < 1000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] { @@ -1144,7 +1144,7 @@ func (s) TestGRPCLBPickFirst(t *testing.T) { result = "" for i := 0; i < 1000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, ", err) } result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port]) @@ -1156,7 +1156,7 @@ func (s) TestGRPCLBPickFirst(t *testing.T) { tss.ls.sls <- &lbpb.ServerList{Servers: beServers[2:]} result = "" for i := 0; i < 1000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, ", err) } result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port]) @@ -1168,7 +1168,7 @@ func (s) TestGRPCLBPickFirst(t *testing.T) { tss.ls.sls <- &lbpb.ServerList{Servers: beServers[1:]} result = "" for i := 0; i < 1000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, ", err) } result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port]) @@ -1194,7 +1194,7 @@ func (s) TestGRPCLBPickFirst(t *testing.T) { result = "" for i := 0; i < 1000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, ", err) } result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port]) @@ -1206,7 +1206,7 @@ func (s) TestGRPCLBPickFirst(t *testing.T) { tss.ls.sls <- &lbpb.ServerList{Servers: beServers[0:3]} result = "" for i := 0; i < 1000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port]) @@ -1295,12 +1295,14 @@ const ( func (s) TestGRPCLBStatsUnarySuccess(t *testing.T) { if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout) + defer cancel() // The first non-failfast RPC succeeds, all connections are up. - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } for i := 0; i < countRPC-1; i++ { - testC.EmptyCall(context.Background(), &testpb.Empty{}) + testC.EmptyCall(ctx, &testpb.Empty{}) } }, &rpcStats{ numCallsStarted: int64(countRPC), @@ -1314,12 +1316,14 @@ func (s) TestGRPCLBStatsUnarySuccess(t *testing.T) { func (s) TestGRPCLBStatsUnaryDrop(t *testing.T) { if err := runAndCheckStats(t, true, nil, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout) + defer cancel() // The first non-failfast RPC succeeds, all connections are up. - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } for i := 0; i < countRPC-1; i++ { - testC.EmptyCall(context.Background(), &testpb.Empty{}) + testC.EmptyCall(ctx, &testpb.Empty{}) } }, &rpcStats{ numCallsStarted: int64(countRPC), @@ -1334,12 +1338,14 @@ func (s) TestGRPCLBStatsUnaryDrop(t *testing.T) { func (s) TestGRPCLBStatsUnaryFailedToSend(t *testing.T) { if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout) + defer cancel() // The first non-failfast RPC succeeds, all connections are up. - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } for i := 0; i < countRPC-1; i++ { - cc.Invoke(context.Background(), failtosendURI, &testpb.Empty{}, nil) + cc.Invoke(ctx, failtosendURI, &testpb.Empty{}, nil) } }, &rpcStats{ numCallsStarted: int64(countRPC), @@ -1354,8 +1360,10 @@ func (s) TestGRPCLBStatsUnaryFailedToSend(t *testing.T) { func (s) TestGRPCLBStatsStreamingSuccess(t *testing.T) { if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout) + defer cancel() // The first non-failfast RPC succeeds, all connections are up. - stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true)) + stream, err := testC.FullDuplexCall(ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, ", testC, err) } @@ -1365,7 +1373,7 @@ func (s) TestGRPCLBStatsStreamingSuccess(t *testing.T) { } } for i := 0; i < countRPC-1; i++ { - stream, err = testC.FullDuplexCall(context.Background()) + stream, err = testC.FullDuplexCall(ctx) if err == nil { // Wait for stream to end if err is nil. for { @@ -1387,8 +1395,10 @@ func (s) TestGRPCLBStatsStreamingSuccess(t *testing.T) { func (s) TestGRPCLBStatsStreamingDrop(t *testing.T) { if err := runAndCheckStats(t, true, nil, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout) + defer cancel() // The first non-failfast RPC succeeds, all connections are up. - stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true)) + stream, err := testC.FullDuplexCall(ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, ", testC, err) } @@ -1398,7 +1408,7 @@ func (s) TestGRPCLBStatsStreamingDrop(t *testing.T) { } } for i := 0; i < countRPC-1; i++ { - stream, err = testC.FullDuplexCall(context.Background()) + stream, err = testC.FullDuplexCall(ctx) if err == nil { // Wait for stream to end if err is nil. for { @@ -1421,8 +1431,10 @@ func (s) TestGRPCLBStatsStreamingDrop(t *testing.T) { func (s) TestGRPCLBStatsStreamingFailedToSend(t *testing.T) { if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout) + defer cancel() // The first non-failfast RPC succeeds, all connections are up. - stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true)) + stream, err := testC.FullDuplexCall(ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, ", testC, err) } @@ -1432,7 +1444,7 @@ func (s) TestGRPCLBStatsStreamingFailedToSend(t *testing.T) { } } for i := 0; i < countRPC-1; i++ { - cc.NewStream(context.Background(), &grpc.StreamDesc{}, failtosendURI) + cc.NewStream(ctx, &grpc.StreamDesc{}, failtosendURI) } }, &rpcStats{ numCallsStarted: int64(countRPC), diff --git a/balancer_switching_test.go b/balancer_switching_test.go index ed132121280c..2c6ed576620f 100644 --- a/balancer_switching_test.go +++ b/balancer_switching_test.go @@ -83,8 +83,10 @@ func checkPickFirst(cc *ClientConn, servers []*server) error { err error ) connected := false + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() for i := 0; i < 5000; i++ { - if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); errorDesc(err) == servers[0].port { + if err = cc.Invoke(ctx, "/foo/bar", &req, &reply); errorDesc(err) == servers[0].port { if connected { // connected is set to false if peer is not server[0]. So if // connected is true here, this is the second time we saw @@ -100,9 +102,10 @@ func checkPickFirst(cc *ClientConn, servers []*server) error { if !connected { return fmt.Errorf("pickfirst is not in effect after 5 second, EmptyCall() = _, %v, want _, %v", err, servers[0].port) } + // The following RPCs should all succeed with the first server. for i := 0; i < 3; i++ { - err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply) + err = cc.Invoke(ctx, "/foo/bar", &req, &reply) if errorDesc(err) != servers[0].port { return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[0].port, err) } @@ -117,6 +120,8 @@ func checkRoundRobin(cc *ClientConn, servers []*server) error { err error ) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // Make sure connections to all servers are up. for i := 0; i < 2; i++ { // Do this check twice, otherwise the first RPC's transport may still be @@ -124,7 +129,7 @@ func checkRoundRobin(cc *ClientConn, servers []*server) error { for _, s := range servers { var up bool for i := 0; i < 5000; i++ { - if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); errorDesc(err) == s.port { + if err = cc.Invoke(ctx, "/foo/bar", &req, &reply); errorDesc(err) == s.port { up = true break } @@ -138,7 +143,7 @@ func checkRoundRobin(cc *ClientConn, servers []*server) error { serverCount := len(servers) for i := 0; i < 3*serverCount; i++ { - err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply) + err = cc.Invoke(ctx, "/foo/bar", &req, &reply) if errorDesc(err) != servers[i%serverCount].port { return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[i%serverCount].port, err) } diff --git a/benchmark/primitives/context_test.go b/benchmark/primitives/context_test.go index c94acd74597c..faae50703e7d 100644 --- a/benchmark/primitives/context_test.go +++ b/benchmark/primitives/context_test.go @@ -24,6 +24,8 @@ import ( "time" ) +const defaultTestTimeout = 10 * time.Second + func BenchmarkCancelContextErrNoErr(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) for i := 0; i < b.N; i++ { @@ -72,7 +74,7 @@ func BenchmarkCancelContextChannelGotErr(b *testing.B) { } func BenchmarkTimerContextErrNoErr(b *testing.B) { - ctx, cancel := context.WithTimeout(context.Background(), 24*time.Hour) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) for i := 0; i < b.N; i++ { if err := ctx.Err(); err != nil { b.Fatal("error") @@ -92,7 +94,7 @@ func BenchmarkTimerContextErrGotErr(b *testing.B) { } func BenchmarkTimerContextChannelNoErr(b *testing.B) { - ctx, cancel := context.WithTimeout(context.Background(), 24*time.Hour) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) for i := 0; i < b.N; i++ { select { case <-ctx.Done(): diff --git a/call_test.go b/call_test.go index 78760ba5297a..abc4537ddb7d 100644 --- a/call_test.go +++ b/call_test.go @@ -43,6 +43,8 @@ var ( canceled = 0 ) +const defaultTestTimeout = 10 * time.Second + type testCodec struct { } @@ -237,7 +239,8 @@ func (s) TestUnaryClientInterceptor(t *testing.T) { }() var reply string - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() parentCtx := context.WithValue(ctx, ctxKey("parentKey"), 0) if err := cc.Invoke(parentCtx, "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse { t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want ", err) @@ -305,7 +308,8 @@ func (s) TestChainUnaryClientInterceptor(t *testing.T) { }() var reply string - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() parentCtx := context.WithValue(ctx, ctxKey("parentKey"), 0) if err := cc.Invoke(parentCtx, "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse+"321" { t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want ", err) @@ -346,7 +350,8 @@ func (s) TestChainOnBaseUnaryClientInterceptor(t *testing.T) { }() var reply string - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() parentCtx := context.WithValue(ctx, ctxKey("parentKey"), 0) if err := cc.Invoke(parentCtx, "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse { t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want ", err) @@ -407,7 +412,8 @@ func (s) TestChainStreamClientInterceptor(t *testing.T) { server.stop() }() - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() parentCtx := context.WithValue(ctx, ctxKey("parentKey"), 0) _, err := cc.NewStream(parentCtx, &StreamDesc{}, "/foo/bar") if err != nil { @@ -418,7 +424,9 @@ func (s) TestChainStreamClientInterceptor(t *testing.T) { func (s) TestInvoke(t *testing.T) { server, cc := setUp(t, 0, math.MaxUint32) var reply string - if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse { t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want ", err) } cc.Close() @@ -429,7 +437,9 @@ func (s) TestInvokeLargeErr(t *testing.T) { server, cc := setUp(t, 0, math.MaxUint32) var reply string req := "hello" - err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + err := cc.Invoke(ctx, "/foo/bar", &req, &reply) if _, ok := status.FromError(err); !ok { t.Fatalf("grpc.Invoke(_, _, _, _, _) receives non rpc error.") } @@ -445,7 +455,9 @@ func (s) TestInvokeErrorSpecialChars(t *testing.T) { server, cc := setUp(t, 0, math.MaxUint32) var reply string req := "weird error" - err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + err := cc.Invoke(ctx, "/foo/bar", &req, &reply) if _, ok := status.FromError(err); !ok { t.Fatalf("grpc.Invoke(_, _, _, _, _) receives non rpc error.") } diff --git a/channelz/service/service_sktopt_test.go b/channelz/service/service_sktopt_test.go index e2d024f83652..ecd4a2ad05f9 100644 --- a/channelz/service/service_sktopt_test.go +++ b/channelz/service/service_sktopt_test.go @@ -145,8 +145,10 @@ func (s) TestGetSocketOptions(t *testing.T) { ids[i] = channelz.RegisterNormalSocket(s, svrID, strconv.Itoa(i)) defer channelz.RemoveEntry(ids[i]) } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() for i, s := range ss { - resp, _ := svr.GetSocket(context.Background(), &channelzpb.GetSocketRequest{SocketId: ids[i]}) + resp, _ := svr.GetSocket(ctx, &channelzpb.GetSocketRequest{SocketId: ids[i]}) metrics := resp.GetSocket() if !reflect.DeepEqual(metrics.GetRef(), &channelzpb.SocketRef{SocketId: ids[i], Name: strconv.Itoa(i)}) || !reflect.DeepEqual(socketProtoToStruct(metrics), s) { t.Fatalf("resp.GetSocket() want: metrics.GetRef() = %#v and %#v, got: metrics.GetRef() = %#v and %#v", &channelzpb.SocketRef{SocketId: ids[i], Name: strconv.Itoa(i)}, s, metrics.GetRef(), socketProtoToStruct(metrics)) diff --git a/channelz/service/service_test.go b/channelz/service/service_test.go index 467ceb4f431c..03d2b29c27b4 100644 --- a/channelz/service/service_test.go +++ b/channelz/service/service_test.go @@ -69,6 +69,8 @@ var protoToSocketOpt protoToSocketOptFunc // TODO: Go1.7 is no longer supported - does this need a change? var emptyTime time.Time +const defaultTestTimeout = 10 * time.Second + type dummyChannel struct { state connectivity.State target string @@ -327,7 +329,9 @@ func (s) TestGetTopChannels(t *testing.T) { defer channelz.RemoveEntry(id) } s := newCZServer() - resp, _ := s.GetTopChannels(context.Background(), &channelzpb.GetTopChannelsRequest{StartChannelId: 0}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + resp, _ := s.GetTopChannels(ctx, &channelzpb.GetTopChannelsRequest{StartChannelId: 0}) if !resp.GetEnd() { t.Fatalf("resp.GetEnd() want true, got %v", resp.GetEnd()) } @@ -340,7 +344,7 @@ func (s) TestGetTopChannels(t *testing.T) { id := channelz.RegisterChannel(tcs[0], 0, "") defer channelz.RemoveEntry(id) } - resp, _ = s.GetTopChannels(context.Background(), &channelzpb.GetTopChannelsRequest{StartChannelId: 0}) + resp, _ = s.GetTopChannels(ctx, &channelzpb.GetTopChannelsRequest{StartChannelId: 0}) if resp.GetEnd() { t.Fatalf("resp.GetEnd() want false, got %v", resp.GetEnd()) } @@ -374,7 +378,9 @@ func (s) TestGetServers(t *testing.T) { defer channelz.RemoveEntry(id) } svr := newCZServer() - resp, _ := svr.GetServers(context.Background(), &channelzpb.GetServersRequest{StartServerId: 0}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + resp, _ := svr.GetServers(ctx, &channelzpb.GetServersRequest{StartServerId: 0}) if !resp.GetEnd() { t.Fatalf("resp.GetEnd() want true, got %v", resp.GetEnd()) } @@ -387,7 +393,7 @@ func (s) TestGetServers(t *testing.T) { id := channelz.RegisterServer(ss[0], "") defer channelz.RemoveEntry(id) } - resp, _ = svr.GetServers(context.Background(), &channelzpb.GetServersRequest{StartServerId: 0}) + resp, _ = svr.GetServers(ctx, &channelzpb.GetServersRequest{StartServerId: 0}) if resp.GetEnd() { t.Fatalf("resp.GetEnd() want false, got %v", resp.GetEnd()) } @@ -407,7 +413,9 @@ func (s) TestGetServerSockets(t *testing.T) { defer channelz.RemoveEntry(id) } svr := newCZServer() - resp, _ := svr.GetServerSockets(context.Background(), &channelzpb.GetServerSocketsRequest{ServerId: svrID, StartSocketId: 0}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + resp, _ := svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID, StartSocketId: 0}) if !resp.GetEnd() { t.Fatalf("resp.GetEnd() want: true, got: %v", resp.GetEnd()) } @@ -424,7 +432,7 @@ func (s) TestGetServerSockets(t *testing.T) { id := channelz.RegisterNormalSocket(&dummySocket{}, svrID, "") defer channelz.RemoveEntry(id) } - resp, _ = svr.GetServerSockets(context.Background(), &channelzpb.GetServerSocketsRequest{ServerId: svrID, StartSocketId: 0}) + resp, _ = svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID, StartSocketId: 0}) if resp.GetEnd() { t.Fatalf("resp.GetEnd() want false, got %v", resp.GetEnd()) } @@ -446,9 +454,11 @@ func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) { defer channelz.RemoveEntry(id) } svr := newCZServer() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // Make GetServerSockets with startID = ids[1]+1, so socket-1 won't be // included in the response. - resp, _ := svr.GetServerSockets(context.Background(), &channelzpb.GetServerSocketsRequest{ServerId: svrID, StartSocketId: ids[1] + 1}) + resp, _ := svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID, StartSocketId: ids[1] + 1}) if !resp.GetEnd() { t.Fatalf("resp.GetEnd() want: true, got: %v", resp.GetEnd()) } @@ -512,7 +522,9 @@ func (s) TestGetChannel(t *testing.T) { defer channelz.RemoveEntry(id) } svr := newCZServer() - resp, _ := svr.GetChannel(context.Background(), &channelzpb.GetChannelRequest{ChannelId: ids[0]}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + resp, _ := svr.GetChannel(ctx, &channelzpb.GetChannelRequest{ChannelId: ids[0]}) metrics := resp.GetChannel() subChans := metrics.GetSubchannelRef() if len(subChans) != 1 || subChans[0].GetName() != refNames[2] || subChans[0].GetSubchannelId() != ids[2] { @@ -552,7 +564,7 @@ func (s) TestGetChannel(t *testing.T) { } } } - resp, _ = svr.GetChannel(context.Background(), &channelzpb.GetChannelRequest{ChannelId: ids[1]}) + resp, _ = svr.GetChannel(ctx, &channelzpb.GetChannelRequest{ChannelId: ids[1]}) metrics = resp.GetChannel() nestedChans = metrics.GetChannelRef() if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[3] || nestedChans[0].GetChannelId() != ids[3] { @@ -598,7 +610,9 @@ func (s) TestGetSubChannel(t *testing.T) { defer channelz.RemoveEntry(id) } svr := newCZServer() - resp, _ := svr.GetSubchannel(context.Background(), &channelzpb.GetSubchannelRequest{SubchannelId: ids[1]}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + resp, _ := svr.GetSubchannel(ctx, &channelzpb.GetSubchannelRequest{SubchannelId: ids[1]}) metrics := resp.GetSubchannel() want := map[int64]string{ ids[2]: refNames[2], @@ -719,8 +733,10 @@ func (s) TestGetSocket(t *testing.T) { ids[i] = channelz.RegisterNormalSocket(s, svrID, strconv.Itoa(i)) defer channelz.RemoveEntry(ids[i]) } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() for i, s := range ss { - resp, _ := svr.GetSocket(context.Background(), &channelzpb.GetSocketRequest{SocketId: ids[i]}) + resp, _ := svr.GetSocket(ctx, &channelzpb.GetSocketRequest{SocketId: ids[i]}) metrics := resp.GetSocket() if !reflect.DeepEqual(metrics.GetRef(), &channelzpb.SocketRef{SocketId: ids[i], Name: strconv.Itoa(i)}) || !reflect.DeepEqual(socketProtoToStruct(metrics), s) { t.Fatalf("resp.GetSocket() want: metrics.GetRef() = %#v and %#v, got: metrics.GetRef() = %#v and %#v", &channelzpb.SocketRef{SocketId: ids[i], Name: strconv.Itoa(i)}, s, metrics.GetRef(), socketProtoToStruct(metrics)) diff --git a/credentials/alts/internal/handshaker/handshaker_test.go b/credentials/alts/internal/handshaker/handshaker_test.go index 9214f647c853..bf516dc53c87 100644 --- a/credentials/alts/internal/handshaker/handshaker_test.go +++ b/credentials/alts/internal/handshaker/handshaker_test.go @@ -56,6 +56,8 @@ var ( } ) +const defaultTestTimeout = 10 * time.Second + // testRPCStream mimics a altspb.HandshakerService_DoHandshakeClient object. type testRPCStream struct { grpc.ClientStream @@ -133,6 +135,10 @@ func (s) TestClientHandshake(t *testing.T) { } { errc := make(chan error) stat.Reset() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + for i := 0; i < testCase.numberOfHandshakes; i++ { stream := &testRPCStream{ t: t, @@ -155,7 +161,7 @@ func (s) TestClientHandshake(t *testing.T) { side: core.ClientSide, } go func() { - _, context, err := chs.ClientHandshake(context.Background()) + _, context, err := chs.ClientHandshake(ctx) if err == nil && context == nil { panic("expected non-nil ALTS context") } @@ -188,6 +194,10 @@ func (s) TestServerHandshake(t *testing.T) { } { errc := make(chan error) stat.Reset() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + for i := 0; i < testCase.numberOfHandshakes; i++ { stream := &testRPCStream{ t: t, @@ -207,7 +217,7 @@ func (s) TestServerHandshake(t *testing.T) { side: core.ServerSide, } go func() { - _, context, err := shs.ServerHandshake(context.Background()) + _, context, err := shs.ServerHandshake(ctx) if err == nil && context == nil { panic("expected non-nil ALTS context") } @@ -258,7 +268,10 @@ func (s) TestPeerNotResponding(t *testing.T) { }, side: core.ClientSide, } - _, context, err := chs.ClientHandshake(context.Background()) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + _, context, err := chs.ClientHandshake(ctx) chs.Close() if context != nil { t.Error("expected non-nil ALTS context") diff --git a/credentials/alts/utils_test.go b/credentials/alts/utils_test.go index b9e752ebbac9..5b54b1d5f77c 100644 --- a/credentials/alts/utils_test.go +++ b/credentials/alts/utils_test.go @@ -26,6 +26,7 @@ import ( "os" "strings" "testing" + "time" "google.golang.org/grpc/codes" altspb "google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp" @@ -37,6 +38,8 @@ const ( testServiceAccount1 = "service_account1" testServiceAccount2 = "service_account2" testServiceAccount3 = "service_account3" + + defaultTestTimeout = 10 * time.Second ) func setupManufacturerReader(testOS string, reader func() (io.Reader, error)) func() { @@ -101,7 +104,8 @@ func (s) TestIsRunningOnGCPNoProductNameFile(t *testing.T) { } func (s) TestAuthInfoFromContext(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() altsAuthInfo := &fakeALTSAuthInfo{} p := &peer.Peer{ AuthInfo: altsAuthInfo, @@ -158,7 +162,8 @@ func (s) TestAuthInfoFromPeer(t *testing.T) { } func (s) TestClientAuthorizationCheck(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() altsAuthInfo := &fakeALTSAuthInfo{testServiceAccount1} p := &peer.Peer{ AuthInfo: altsAuthInfo, diff --git a/credentials/credentials_test.go b/credentials/credentials_test.go index ea0cf5819ff8..dee0f2ca8304 100644 --- a/credentials/credentials_test.go +++ b/credentials/credentials_test.go @@ -24,12 +24,15 @@ import ( "net" "strings" "testing" + "time" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/testdata" ) +const defaultTestTimeout = 10 * time.Second + type s struct { grpctest.Tester } @@ -60,7 +63,9 @@ func createTestContext(s SecurityLevel) context.Context { Method: "testInfo", AuthInfo: auth, } - return internal.NewRequestInfoContext.(func(context.Context, RequestInfo) context.Context)(context.Background(), ri) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + return internal.NewRequestInfoContext.(func(context.Context, RequestInfo) context.Context)(ctx, ri) } func (s) TestCheckSecurityLevel(t *testing.T) { @@ -112,7 +117,9 @@ func (s) TestCheckSecurityLevelNoGetCommonAuthInfoMethod(t *testing.T) { Method: "testInfo", AuthInfo: auth, } - ctxWithRequestInfo := internal.NewRequestInfoContext.(func(context.Context, RequestInfo) context.Context)(context.Background(), ri) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + ctxWithRequestInfo := internal.NewRequestInfoContext.(func(context.Context, RequestInfo) context.Context)(ctx, ri) if err := CheckSecurityLevel(ctxWithRequestInfo, PrivacyAndIntegrity); err != nil { t.Fatalf("CheckSeurityLevel() returned failure but want success") } @@ -296,7 +303,9 @@ func gRPCServerHandshake(conn net.Conn) (AuthInfo, error) { // Client handshake implementation in gRPC. func gRPCClientHandshake(conn net.Conn, lisAddr string) (AuthInfo, error) { clientTLS := NewTLS(&tls.Config{InsecureSkipVerify: true}) - _, authInfo, err := clientTLS.ClientHandshake(context.Background(), lisAddr, conn) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + _, authInfo, err := clientTLS.ClientHandshake(ctx, lisAddr, conn) if err != nil { return nil, err } diff --git a/credentials/local/local_test.go b/credentials/local/local_test.go index 3c65010e8b2a..64e2ec3e7fc6 100644 --- a/credentials/local/local_test.go +++ b/credentials/local/local_test.go @@ -31,6 +31,8 @@ import ( "google.golang.org/grpc/internal/grpctest" ) +const defaultTestTimeout = 10 * time.Second + type s struct { grpctest.Tester } @@ -89,7 +91,10 @@ func serverLocalHandshake(conn net.Conn) (credentials.AuthInfo, error) { // Client local handshake implementation. func clientLocalHandshake(conn net.Conn, lisAddr string) (credentials.AuthInfo, error) { cred := NewCredentials() - _, authInfo, err := cred.ClientHandshake(context.Background(), lisAddr, conn) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + _, authInfo, err := cred.ClientHandshake(ctx, lisAddr, conn) if err != nil { return nil, err } diff --git a/credentials/sts/sts_test.go b/credentials/sts/sts_test.go index 9cfa12097d60..ac680e001112 100644 --- a/credentials/sts/sts_test.go +++ b/credentials/sts/sts_test.go @@ -255,7 +255,10 @@ func (s) TestGetRequestMetadataSuccess(t *testing.T) { errCh := make(chan error, 1) go receiveAndCompareRequest(fc.ReqChan, errCh) - gotMetadata, err := creds.GetRequestMetadata(createTestContext(context.Background(), credentials.PrivacyAndIntegrity), "") + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + gotMetadata, err := creds.GetRequestMetadata(createTestContext(ctx, credentials.PrivacyAndIntegrity), "") if err != nil { t.Fatalf("creds.GetRequestMetadata() = %v", err) } @@ -270,7 +273,7 @@ func (s) TestGetRequestMetadataSuccess(t *testing.T) { // from the cache. This will fail if the credentials tries to send a fresh // request here since we have not configured our fakeClient to return any // response on retries. - gotMetadata, err = creds.GetRequestMetadata(createTestContext(context.Background(), credentials.PrivacyAndIntegrity), "") + gotMetadata, err = creds.GetRequestMetadata(createTestContext(ctx, credentials.PrivacyAndIntegrity), "") if err != nil { t.Fatalf("creds.GetRequestMetadata() = %v", err) } @@ -290,7 +293,9 @@ func (s) TestGetRequestMetadataBadSecurityLevel(t *testing.T) { t.Fatalf("NewCredentials(%v) = %v", goodOptions, err) } - gotMetadata, err := creds.GetRequestMetadata(createTestContext(context.Background(), credentials.IntegrityOnly), "") + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + gotMetadata, err := creds.GetRequestMetadata(createTestContext(ctx, credentials.IntegrityOnly), "") if err == nil { t.Fatalf("creds.GetRequestMetadata() succeeded with metadata %v, expected to fail", gotMetadata) } @@ -335,7 +340,9 @@ func (s) TestGetRequestMetadataCacheExpiry(t *testing.T) { } fc.RespChan.Send(resp) - gotMetadata, err := creds.GetRequestMetadata(createTestContext(context.Background(), credentials.PrivacyAndIntegrity), "") + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + gotMetadata, err := creds.GetRequestMetadata(createTestContext(ctx, credentials.PrivacyAndIntegrity), "") if err != nil { t.Fatalf("creds.GetRequestMetadata() = %v", err) } @@ -374,6 +381,8 @@ func (s) TestGetRequestMetadataBadResponses(t *testing.T) { }, } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() for _, test := range tests { t.Run(test.name, func(t *testing.T) { defer overrideSubjectTokenGood()() @@ -393,7 +402,7 @@ func (s) TestGetRequestMetadataBadResponses(t *testing.T) { go receiveAndCompareRequest(fc.ReqChan, errCh) fc.RespChan.Send(test.response) - if _, err := creds.GetRequestMetadata(createTestContext(context.Background(), credentials.PrivacyAndIntegrity), ""); err == nil { + if _, err := creds.GetRequestMetadata(createTestContext(ctx, credentials.PrivacyAndIntegrity), ""); err == nil { t.Fatal("creds.GetRequestMetadata() succeeded when expected to fail") } if err := <-errCh; err != nil { @@ -426,7 +435,9 @@ func (s) TestGetRequestMetadataBadSubjectTokenRead(t *testing.T) { errCh <- nil }() - if _, err := creds.GetRequestMetadata(createTestContext(context.Background(), credentials.PrivacyAndIntegrity), ""); err == nil { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := creds.GetRequestMetadata(createTestContext(ctx, credentials.PrivacyAndIntegrity), ""); err == nil { t.Fatal("creds.GetRequestMetadata() succeeded when expected to fail") } if err := <-errCh; err != nil { @@ -604,6 +615,9 @@ func (s) TestConstructRequest(t *testing.T) { }, }, } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() for _, test := range tests { t.Run(test.name, func(t *testing.T) { if test.subjectTokenReadErr { @@ -618,7 +632,7 @@ func (s) TestConstructRequest(t *testing.T) { defer overrideActorTokenGood()() } - gotRequest, err := constructRequest(context.Background(), test.opts) + gotRequest, err := constructRequest(ctx, test.opts) if (err != nil) != test.wantErr { t.Fatalf("constructRequest(%v) = %v, wantErr: %v", test.opts, err, test.wantErr) } @@ -634,7 +648,9 @@ func (s) TestConstructRequest(t *testing.T) { func (s) TestSendRequest(t *testing.T) { defer overrideSubjectTokenGood()() - req, err := constructRequest(context.Background(), goodOptions) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + req, err := constructRequest(ctx, goodOptions) if err != nil { t.Fatal(err) } diff --git a/health/client_test.go b/health/client_test.go index fa218afada72..ba933f95b84f 100644 --- a/health/client_test.go +++ b/health/client_test.go @@ -28,6 +28,8 @@ import ( "google.golang.org/grpc/connectivity" ) +const defaultTestTimeout = 10 * time.Second + func (s) TestClientHealthCheckBackoff(t *testing.T) { const maxRetries = 5 @@ -51,7 +53,9 @@ func (s) TestClientHealthCheckBackoff(t *testing.T) { } defer func() { backoffFunc = oldBackoffFunc }() - clientHealthCheck(context.Background(), newStream, func(connectivity.State, error) {}, "test") + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + clientHealthCheck(ctx, newStream, func(connectivity.State, error) {}, "test") if !reflect.DeepEqual(got, want) { t.Fatalf("Backoff durations for %v retries are %v. (expected: %v)", maxRetries, got, want) diff --git a/internal/transport/keepalive_test.go b/internal/transport/keepalive_test.go index 37b77bb539c4..c8f177fecf1b 100644 --- a/internal/transport/keepalive_test.go +++ b/internal/transport/keepalive_test.go @@ -34,6 +34,8 @@ import ( "google.golang.org/grpc/keepalive" ) +const defaultTestTimeout = 10 * time.Second + // TestMaxConnectionIdle tests that a server will send GoAway to an idle // client. An idle client is one who doesn't make any RPC calls for a duration // of MaxConnectionIdle time. @@ -50,7 +52,9 @@ func (s) TestMaxConnectionIdle(t *testing.T) { cancel() }() - stream, err := client.NewStream(context.Background(), &CallHdr{}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := client.NewStream(ctx, &CallHdr{}) if err != nil { t.Fatalf("client.NewStream() failed: %v", err) } @@ -87,7 +91,9 @@ func (s) TestMaxConnectionIdleBusyClient(t *testing.T) { cancel() }() - _, err := client.NewStream(context.Background(), &CallHdr{}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + _, err := client.NewStream(ctx, &CallHdr{}) if err != nil { t.Fatalf("client.NewStream() failed: %v", err) } @@ -121,7 +127,9 @@ func (s) TestMaxConnectionAge(t *testing.T) { cancel() }() - _, err := client.NewStream(context.Background(), &CallHdr{}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + _, err := client.NewStream(ctx, &CallHdr{}) if err != nil { t.Fatalf("client.NewStream() failed: %v", err) } @@ -228,8 +236,10 @@ func (s) TestKeepaliveServerWithResponsiveClient(t *testing.T) { // Give keepalive logic some time by sleeping. time.Sleep(4 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // Make sure the client transport is healthy. - if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil { + if _, err := client.NewStream(ctx, &CallHdr{}); err != nil { t.Fatalf("client.NewStream() failed: %v", err) } } @@ -258,8 +268,10 @@ func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) { // Sleep for keepalive to close the connection. time.Sleep(4 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // Make sure the client transport is not healthy. - if _, err := client.NewStream(context.Background(), &CallHdr{}); err == nil { + if _, err := client.NewStream(ctx, &CallHdr{}); err == nil { t.Fatal("client.NewStream() should have failed, but succeeded") } } @@ -287,8 +299,10 @@ func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) { // Give keepalive some time. time.Sleep(4 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // Make sure the client transport is healthy. - if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil { + if _, err := client.NewStream(ctx, &CallHdr{}); err != nil { t.Fatalf("client.NewStream() failed: %v", err) } } @@ -311,8 +325,10 @@ func (s) TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { } defer conn.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // Create a stream, but send no data on it. - if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil { + if _, err := client.NewStream(ctx, &CallHdr{}); err != nil { t.Fatalf("client.NewStream() failed: %v", err) } @@ -320,7 +336,7 @@ func (s) TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { time.Sleep(4 * time.Second) // Make sure the client transport is not healthy. - if _, err := client.NewStream(context.Background(), &CallHdr{}); err == nil { + if _, err := client.NewStream(ctx, &CallHdr{}); err == nil { t.Fatal("client.NewStream() should have failed, but succeeded") } } @@ -344,8 +360,10 @@ func (s) TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { // Give keepalive some time. time.Sleep(4 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // Make sure the client transport is healthy. - if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil { + if _, err := client.NewStream(ctx, &CallHdr{}); err != nil { t.Fatalf("client.NewStream() failed: %v", err) } } @@ -391,8 +409,10 @@ func (s) TestKeepaliveClientFrequency(t *testing.T) { t.Fatalf("client transport still healthy; expected GoAway from the server.") } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // Make sure the client transport is not healthy. - if _, err := client.NewStream(context.Background(), &CallHdr{}); err == nil { + if _, err := client.NewStream(ctx, &CallHdr{}); err == nil { t.Fatal("client.NewStream() should have failed, but succeeded") } } @@ -434,8 +454,10 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) { t.Fatalf("client transport still healthy; expected GoAway from the server.") } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // Make sure the client transport is not healthy. - if _, err := client.NewStream(context.Background(), &CallHdr{}); err == nil { + if _, err := client.NewStream(ctx, &CallHdr{}); err == nil { t.Fatal("client.NewStream() should have failed, but succeeded") } } @@ -463,7 +485,9 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) { cancel() }() - if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := client.NewStream(ctx, &CallHdr{}); err != nil { t.Fatalf("client.NewStream() failed: %v", err) } @@ -481,7 +505,7 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) { } // Make sure the client transport is not healthy. - if _, err := client.NewStream(context.Background(), &CallHdr{}); err == nil { + if _, err := client.NewStream(ctx, &CallHdr{}); err == nil { t.Fatal("client.NewStream() should have failed, but succeeded") } } @@ -514,8 +538,10 @@ func (s) TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) { // Give keepalive enough time. time.Sleep(3 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // Make sure the client transport is healthy. - if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil { + if _, err := client.NewStream(ctx, &CallHdr{}); err != nil { t.Fatalf("client.NewStream() failed: %v", err) } } @@ -543,7 +569,9 @@ func (s) TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) { cancel() }() - if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := client.NewStream(ctx, &CallHdr{}); err != nil { t.Fatalf("client.NewStream() failed: %v", err) } @@ -551,7 +579,7 @@ func (s) TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) { time.Sleep(3 * time.Second) // Make sure the client transport is healthy. - if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil { + if _, err := client.NewStream(ctx, &CallHdr{}); err != nil { t.Fatalf("client.NewStream() failed: %v", err) } } @@ -584,8 +612,10 @@ func (s) TestKeepaliveServerEnforcementWithDormantKeepaliveOnClient(t *testing.T // No active streams on the client. Give keepalive enough time. time.Sleep(5 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // Make sure the client transport is healthy. - if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil { + if _, err := client.NewStream(ctx, &CallHdr{}); err != nil { t.Fatalf("client.NewStream() failed: %v", err) } } @@ -633,7 +663,9 @@ func (s) TestTCPUserTimeout(t *testing.T) { cancel() }() - stream, err := client.NewStream(context.Background(), &CallHdr{}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := client.NewStream(ctx, &CallHdr{}) if err != nil { t.Fatalf("client.NewStream() failed: %v", err) } diff --git a/internal/transport/proxy_test.go b/internal/transport/proxy_test.go index 628b1fddc494..a2f1aa438546 100644 --- a/internal/transport/proxy_test.go +++ b/internal/transport/proxy_test.go @@ -210,8 +210,11 @@ func (s) TestMapAddressEnv(t *testing.T) { } defer overwrite(hpfe)() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // envTestAddr should be handled by ProxyFromEnvironment. - got, err := mapAddress(context.Background(), envTestAddr) + got, err := mapAddress(ctx, envTestAddr) if err != nil { t.Error(err) } diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 391ad9925c1a..0058df0d806f 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -483,7 +483,9 @@ func (s) TestInflightStreamClosing(t *testing.T) { defer server.stop() defer client.Close() - stream, err := client.NewStream(context.Background(), &CallHdr{}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := client.NewStream(ctx, &CallHdr{}) if err != nil { t.Fatalf("Client failed to create RPC request: %v", err) } @@ -519,14 +521,16 @@ func (s) TestClientSendAndReceive(t *testing.T) { Host: "localhost", Method: "foo.Small", } - s1, err1 := ct.NewStream(context.Background(), callHdr) + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + s1, err1 := ct.NewStream(ctx, callHdr) if err1 != nil { t.Fatalf("failed to open stream: %v", err1) } if s1.id != 1 { t.Fatalf("wrong stream id: %d", s1.id) } - s2, err2 := ct.NewStream(context.Background(), callHdr) + s2, err2 := ct.NewStream(ctx, callHdr) if err2 != nil { t.Fatalf("failed to open stream: %v", err2) } @@ -564,7 +568,9 @@ func performOneRPC(ct ClientTransport) { Host: "localhost", Method: "foo.Small", } - s, err := ct.NewStream(context.Background(), callHdr) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + s, err := ct.NewStream(ctx, callHdr) if err != nil { return } @@ -606,12 +612,14 @@ func (s) TestLargeMessage(t *testing.T) { Host: "localhost", Method: "foo.Large", } + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() var wg sync.WaitGroup for i := 0; i < 2; i++ { wg.Add(1) go func() { defer wg.Done() - s, err := ct.NewStream(context.Background(), callHdr) + s, err := ct.NewStream(ctx, callHdr) if err != nil { t.Errorf("%v.NewStream(_, _) = _, %v, want _, ", ct, err) } @@ -771,7 +779,7 @@ func (s) TestGracefulClose(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - str, err := ct.NewStream(context.Background(), &CallHdr{}) + str, err := ct.NewStream(ctx, &CallHdr{}) if err == ErrConnClosing { return } else if err != nil { @@ -839,7 +847,9 @@ func (s) TestMaxStreams(t *testing.T) { Host: "localhost", Method: "foo.Large", } - s, err := ct.NewStream(context.Background(), callHdr) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + s, err := ct.NewStream(ctx, callHdr) if err != nil { t.Fatalf("Failed to open stream: %v", err) } @@ -924,7 +934,9 @@ func (s) TestServerContextCanceledOnClosedConnection(t *testing.T) { server.mu.Unlock() break } - s, err := ct.NewStream(context.Background(), callHdr) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + s, err := ct.NewStream(ctx, callHdr) if err != nil { t.Fatalf("Failed to open stream: %v", err) } @@ -988,7 +1000,9 @@ func (s) TestClientConnDecoupledFromApplicationRead(t *testing.T) { notifyChan := make(chan struct{}) server.h.notify = notifyChan server.mu.Unlock() - cstream1, err := client.NewStream(context.Background(), &CallHdr{}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cstream1, err := client.NewStream(ctx, &CallHdr{}) if err != nil { t.Fatalf("Client failed to create first stream. Err: %v", err) } @@ -1015,7 +1029,7 @@ func (s) TestClientConnDecoupledFromApplicationRead(t *testing.T) { server.h.notify = notifyChan server.mu.Unlock() // Create another stream on client. - cstream2, err := client.NewStream(context.Background(), &CallHdr{}) + cstream2, err := client.NewStream(ctx, &CallHdr{}) if err != nil { t.Fatalf("Client failed to create second stream. Err: %v", err) } @@ -1070,8 +1084,10 @@ func (s) TestServerConnDecoupledFromApplicationRead(t *testing.T) { for k := range server.conns { st = k.(*http2Server) } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() server.mu.Unlock() - cstream1, err := client.NewStream(context.Background(), &CallHdr{}) + cstream1, err := client.NewStream(ctx, &CallHdr{}) if err != nil { t.Fatalf("Failed to create 1st stream. Err: %v", err) } @@ -1080,7 +1096,7 @@ func (s) TestServerConnDecoupledFromApplicationRead(t *testing.T) { t.Fatalf("Client failed to write data. Err: %v", err) } //Client should be able to create another stream and send data on it. - cstream2, err := client.NewStream(context.Background(), &CallHdr{}) + cstream2, err := client.NewStream(ctx, &CallHdr{}) if err != nil { t.Fatalf("Failed to create 2nd stream. Err: %v", err) } @@ -1287,7 +1303,7 @@ func (s) TestClientWithMisbehavedServer(t *testing.T) { t.Fatalf("Error while creating client transport: %v", err) } defer ct.Close() - str, err := ct.NewStream(context.Background(), &CallHdr{}) + str, err := ct.NewStream(connectCtx, &CallHdr{}) if err != nil { t.Fatalf("Error while creating stream: %v", err) } @@ -1312,7 +1328,9 @@ func (s) TestEncodingRequiredStatus(t *testing.T) { Host: "localhost", Method: "foo", } - s, err := ct.NewStream(context.Background(), callHdr) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + s, err := ct.NewStream(ctx, callHdr) if err != nil { return } @@ -1338,7 +1356,9 @@ func (s) TestInvalidHeaderField(t *testing.T) { Host: "localhost", Method: "foo", } - s, err := ct.NewStream(context.Background(), callHdr) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + s, err := ct.NewStream(ctx, callHdr) if err != nil { return } @@ -1356,7 +1376,9 @@ func (s) TestHeaderChanClosedAfterReceivingAnInvalidHeader(t *testing.T) { defer cancel() defer server.stop() defer ct.Close() - s, err := ct.NewStream(context.Background(), &CallHdr{Host: "localhost", Method: "foo"}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + s, err := ct.NewStream(ctx, &CallHdr{Host: "localhost", Method: "foo"}) if err != nil { t.Fatalf("failed to create the stream") } @@ -1473,12 +1495,14 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig) for k := range server.conns { st = k.(*http2Server) } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() server.mu.Unlock() const numStreams = 10 clientStreams := make([]*Stream, numStreams) for i := 0; i < numStreams; i++ { var err error - clientStreams[i], err = client.NewStream(context.Background(), &CallHdr{}) + clientStreams[i], err = client.NewStream(ctx, &CallHdr{}) if err != nil { t.Fatalf("Failed to create stream. Err: %v", err) } @@ -1669,7 +1693,9 @@ func runPingPongTest(t *testing.T, msgSize int) { } return false, nil }) - stream, err := client.NewStream(context.Background(), &CallHdr{}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := client.NewStream(ctx, &CallHdr{}) if err != nil { t.Fatalf("Failed to create stream. Err: %v", err) } @@ -1748,7 +1774,9 @@ func (s) TestHeaderTblSize(t *testing.T) { defer cancel() defer ct.Close() defer server.stop() - _, err := ct.NewStream(context.Background(), &CallHdr{}) + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + _, err := ct.NewStream(ctx, &CallHdr{}) if err != nil { t.Fatalf("failed to open stream: %v", err) } diff --git a/metadata/metadata_test.go b/metadata/metadata_test.go index 84845d5b1278..f1fb5f6d324e 100644 --- a/metadata/metadata_test.go +++ b/metadata/metadata_test.go @@ -23,10 +23,13 @@ import ( "reflect" "strconv" "testing" + "time" "google.golang.org/grpc/internal/grpctest" ) +const defaultTestTimeout = 10 * time.Second + type s struct { grpctest.Tester } @@ -168,7 +171,9 @@ func (s) TestAppend(t *testing.T) { func (s) TestAppendToOutgoingContext(t *testing.T) { // Pre-existing metadata - ctx := NewOutgoingContext(context.Background(), Pairs("k1", "v1", "k2", "v2")) + tCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + ctx := NewOutgoingContext(tCtx, Pairs("k1", "v1", "k2", "v2")) ctx = AppendToOutgoingContext(ctx, "k1", "v3") ctx = AppendToOutgoingContext(ctx, "k1", "v4") md, ok := FromOutgoingContext(ctx) @@ -181,7 +186,7 @@ func (s) TestAppendToOutgoingContext(t *testing.T) { } // No existing metadata - ctx = AppendToOutgoingContext(context.Background(), "k1", "v1") + ctx = AppendToOutgoingContext(tCtx, "k1", "v1") md, ok = FromOutgoingContext(ctx) if !ok { t.Errorf("Expected MD to exist in ctx, but got none") @@ -193,7 +198,8 @@ func (s) TestAppendToOutgoingContext(t *testing.T) { } func (s) TestAppendToOutgoingContext_Repeated(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() for i := 0; i < 100; i = i + 2 { ctx1 := AppendToOutgoingContext(ctx, "k", strconv.Itoa(i)) @@ -213,7 +219,9 @@ func (s) TestAppendToOutgoingContext_Repeated(t *testing.T) { func (s) TestAppendToOutgoingContext_FromKVSlice(t *testing.T) { const k, v = "a", "b" kv := []string{k, v} - ctx := AppendToOutgoingContext(context.Background(), kv...) + tCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + ctx := AppendToOutgoingContext(tCtx, kv...) md, _ := FromOutgoingContext(ctx) if md[k][0] != v { t.Fatalf("md[%q] = %q; want %q", k, md[k], v) @@ -230,7 +238,8 @@ func Benchmark_AddingMetadata_ContextManipulationApproach(b *testing.B) { // TODO: Add in N=1-100 tests once Go1.6 support is removed. const num = 10 for n := 0; n < b.N; n++ { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() for i := 0; i < num; i++ { md, _ := FromOutgoingContext(ctx) NewOutgoingContext(ctx, Join(Pairs("k1", "v1", "k2", "v2"), md)) @@ -241,8 +250,9 @@ func Benchmark_AddingMetadata_ContextManipulationApproach(b *testing.B) { // Newer/faster approach to adding metadata to context func BenchmarkAppendToOutgoingContext(b *testing.B) { const num = 10 + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() for n := 0; n < b.N; n++ { - ctx := context.Background() for i := 0; i < num; i++ { ctx = AppendToOutgoingContext(ctx, "k1", "v1", "k2", "v2") } @@ -250,7 +260,8 @@ func BenchmarkAppendToOutgoingContext(b *testing.B) { } func BenchmarkFromOutgoingContext(b *testing.B) { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() ctx = NewOutgoingContext(ctx, MD{"k3": {"v3", "v4"}}) ctx = AppendToOutgoingContext(ctx, "k1", "v1", "k2", "v2") diff --git a/reflection/serverreflection_test.go b/reflection/serverreflection_test.go index 55d1840fdc3a..24070141c2f2 100644 --- a/reflection/serverreflection_test.go +++ b/reflection/serverreflection_test.go @@ -25,6 +25,7 @@ import ( "reflect" "sort" "testing" + "time" "github.com/golang/protobuf/proto" dpb "github.com/golang/protobuf/protoc-gen-go/descriptor" @@ -51,6 +52,8 @@ var ( fdProto2Ext2Byte []byte ) +const defaultTestTimeout = 10 * time.Second + type x struct { grpctest.Tester } @@ -209,7 +212,9 @@ func (x) TestReflectionEnd2end(t *testing.T) { defer conn.Close() c := rpb.NewServerReflectionClient(conn) - stream, err := c.ServerReflectionInfo(context.Background(), grpc.WaitForReady(true)) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := c.ServerReflectionInfo(ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("cannot get ServerReflectionInfo: %v", err) } diff --git a/stats/stats_test.go b/stats/stats_test.go index d047d48bc5e2..875a57eeddfc 100644 --- a/stats/stats_test.go +++ b/stats/stats_test.go @@ -37,6 +37,8 @@ import ( "google.golang.org/grpc/status" ) +const defaultTestTimeout = 10 * time.Second + type s struct { grpctest.Tester } @@ -281,8 +283,10 @@ func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.Simple } else { req = &testpb.SimpleRequest{Id: errorID} } - ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) - resp, err = tc.UnaryCall(ctx, req, grpc.WaitForReady(!c.failfast)) + + tCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + resp, err = tc.UnaryCall(metadata.NewOutgoingContext(tCtx, testMetadata), req, grpc.WaitForReady(!c.failfast)) return req, resp, err } @@ -293,7 +297,9 @@ func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]*testpb.SimpleRequest err error ) tc := testpb.NewTestServiceClient(te.clientConn()) - stream, err := tc.FullDuplexCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.WaitForReady(!c.failfast)) + tCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := tc.FullDuplexCall(metadata.NewOutgoingContext(tCtx, testMetadata), grpc.WaitForReady(!c.failfast)) if err != nil { return reqs, resps, err } @@ -332,7 +338,9 @@ func (te *test) doClientStreamCall(c *rpcConfig) ([]*testpb.SimpleRequest, *test err error ) tc := testpb.NewTestServiceClient(te.clientConn()) - stream, err := tc.ClientStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.WaitForReady(!c.failfast)) + tCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := tc.ClientStreamCall(metadata.NewOutgoingContext(tCtx, testMetadata), grpc.WaitForReady(!c.failfast)) if err != nil { return reqs, resp, err } @@ -367,7 +375,9 @@ func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.SimpleRequest, []*test startID = errorID } req = &testpb.SimpleRequest{Id: startID} - stream, err := tc.ServerStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), req, grpc.WaitForReady(!c.failfast)) + tCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := tc.ServerStreamCall(metadata.NewOutgoingContext(tCtx, testMetadata), req, grpc.WaitForReady(!c.failfast)) if err != nil { return req, resps, err } @@ -1286,7 +1296,9 @@ func (s) TestClientStatsFullDuplexRPCError(t *testing.T) { func (s) TestTags(t *testing.T) { b := []byte{5, 2, 4, 3, 1} - ctx := stats.SetTags(context.Background(), b) + tCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + ctx := stats.SetTags(tCtx, b) if tg := stats.OutgoingTags(ctx); !reflect.DeepEqual(tg, b) { t.Errorf("OutgoingTags(%v) = %v; want %v", ctx, tg, b) } @@ -1294,7 +1306,7 @@ func (s) TestTags(t *testing.T) { t.Errorf("Tags(%v) = %v; want nil", ctx, tg) } - ctx = stats.SetIncomingTags(context.Background(), b) + ctx = stats.SetIncomingTags(tCtx, b) if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, b) { t.Errorf("Tags(%v) = %v; want %v", ctx, tg, b) } @@ -1305,7 +1317,9 @@ func (s) TestTags(t *testing.T) { func (s) TestTrace(t *testing.T) { b := []byte{5, 2, 4, 3, 1} - ctx := stats.SetTrace(context.Background(), b) + tCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + ctx := stats.SetTrace(tCtx, b) if tr := stats.OutgoingTrace(ctx); !reflect.DeepEqual(tr, b) { t.Errorf("OutgoingTrace(%v) = %v; want %v", ctx, tr, b) } @@ -1313,7 +1327,7 @@ func (s) TestTrace(t *testing.T) { t.Errorf("Trace(%v) = %v; want nil", ctx, tr) } - ctx = stats.SetIncomingTrace(context.Background(), b) + ctx = stats.SetIncomingTrace(tCtx, b) if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, b) { t.Errorf("Trace(%v) = %v; want %v", ctx, tr, b) } diff --git a/test/channelz_test.go b/test/channelz_test.go index 9f8af01e7c74..7c074961d771 100644 --- a/test/channelz_test.go +++ b/test/channelz_test.go @@ -695,13 +695,17 @@ func (t *testServiceClientWrapper) HalfDuplexCall(ctx context.Context, opts ...g } func doSuccessfulUnaryCall(tc testpb.TestServiceClient, t *testing.T) { - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } } func doStreamingInputCallWithLargePayload(tc testpb.TestServiceClient, t *testing.T) { - s, err := tc.StreamingInputCall(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + s, err := tc.StreamingInputCall(ctx) if err != nil { t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want ", err) } @@ -725,7 +729,9 @@ func doServerSideFailedUnaryCall(tc testpb.TestServiceClient, t *testing.T) { ResponseSize: int32(smallSize), Payload: largePayload, } - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } } diff --git a/test/creds_test.go b/test/creds_test.go index 46bdd30dc85e..6b3fc2a46076 100644 --- a/test/creds_test.go +++ b/test/creds_test.go @@ -92,7 +92,9 @@ func (s) TestCredsBundleBoth(t *testing.T) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("Test failed. Reason: %v", err) } } @@ -114,7 +116,9 @@ func (s) TestCredsBundleTransportCredentials(t *testing.T) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("Test failed. Reason: %v", err) } } @@ -130,7 +134,9 @@ func (s) TestCredsBundlePerRPCCredentials(t *testing.T) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("Test failed. Reason: %v", err) } } @@ -164,8 +170,10 @@ func (s) TestNonFailFastRPCSucceedOnTimeoutCreds(t *testing.T) { cc := te.clientConn(grpc.WithTransportCredentials(&clientTimeoutCreds{})) tc := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // This unary call should succeed, because ClientHandshake will succeed for the second time. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want ", err) } } @@ -236,7 +244,7 @@ func (s) TestFailFastRPCErrorOnBadCertificates(t *testing.T) { // with Unavailable because the connection hasn't started. When the // first connection failed with creds error, the next RPC should also // fail with the expected error. - if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); strings.Contains(err.Error(), clientAlwaysFailCredErrorMsg) { + if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); strings.Contains(err.Error(), clientAlwaysFailCredErrorMsg) { return } time.Sleep(time.Millisecond) @@ -317,7 +325,9 @@ func testPerRPCCredentialsViaDialOptions(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("Test failed. Reason: %v", err) } } @@ -336,7 +346,9 @@ func testPerRPCCredentialsViaCallOptions(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.PerRPCCredentials(testPerRPCCredentials{})); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.PerRPCCredentials(testPerRPCCredentials{})); err != nil { t.Fatalf("Test failed. Reason: %v", err) } } @@ -376,7 +388,9 @@ func testPerRPCCredentialsViaDialOptionsAndCallOptions(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.PerRPCCredentials(testPerRPCCredentials{})); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.PerRPCCredentials(testPerRPCCredentials{})); err != nil { t.Fatalf("Test failed. Reason: %v", err) } } diff --git a/test/end2end_test.go b/test/end2end_test.go index 0842dccaad01..07efccd0ac76 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -917,11 +917,13 @@ func (s) TestContextDeadlineNotIgnored(t *testing.T) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } + cancel() atomic.StoreInt32(&(lc.beLazy), 1) - ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + ctx, cancel = context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() t1 := time.Now() if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { @@ -951,13 +953,15 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } te.srv.Stop() + cancel() // Wait for the client to notice the connection is gone. - ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + ctx, cancel = context.WithTimeout(context.Background(), 500*time.Millisecond) state := cc.GetState() for ; state == connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() { } @@ -1071,7 +1075,7 @@ func testServerGoAwayPendingRPC(t *testing.T, e env) { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } // Finish an RPC to make sure the connection is good. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, ", tc, err) } ch := make(chan struct{}) @@ -1145,7 +1149,7 @@ func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } // Finish an RPC to make sure the connection is good. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, ", tc, err) } ch1 := make(chan struct{}) @@ -1226,7 +1230,9 @@ func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, ", tc, err) } ch := make(chan struct{}) @@ -1263,14 +1269,18 @@ func testConcurrentServerStopAndGoAway(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - stream, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } + // Finish an RPC to make sure the connection is good. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, ", tc, err) } + ch := make(chan struct{}) go func() { te.srv.GracefulStop() @@ -1396,10 +1406,10 @@ func testFailFast(t *testing.T, e env) { time.Sleep(10 * time.Millisecond) } // The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable { t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %s", err, codes.Unavailable) } - if _, err := tc.StreamingInputCall(context.Background()); status.Code(err) != codes.Unavailable { + if _, err := tc.StreamingInputCall(ctx); status.Code(err) != codes.Unavailable { t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %s", err, codes.Unavailable) } @@ -1475,9 +1485,11 @@ func (s) TestGetMethodConfig(t *testing.T) { time.Sleep(time.Millisecond) } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. var err error - if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { + if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } @@ -1512,7 +1524,7 @@ func (s) TestGetMethodConfig(t *testing.T) { time.Sleep(time.Millisecond) } // The following RPCs are expected to become fail-fast. - if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable { + if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) } } @@ -1556,13 +1568,14 @@ func (s) TestServiceConfigWaitForReady(t *testing.T) { } time.Sleep(time.Millisecond) } - + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. var err error - if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { + if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } - if _, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { + if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } @@ -1597,10 +1610,10 @@ func (s) TestServiceConfigWaitForReady(t *testing.T) { time.Sleep(time.Millisecond) } // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } - if _, err := tc.FullDuplexCall(context.Background()); status.Code(err) != codes.DeadlineExceeded { + if _, err := tc.FullDuplexCall(ctx); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } } @@ -1768,16 +1781,17 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) { } time.Sleep(time.Millisecond) } - + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // Test for unary RPC recv. - if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err = tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } // Test for unary RPC send. req.Payload = extraLargePayload req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -1838,14 +1852,14 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) { req.Payload = smallPayload req.ResponseSize = int32(largeSize) - if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err = tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } // Test for unary RPC send. req.Payload = largePayload req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -1899,24 +1913,24 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) { req.Payload = smallPayload req.ResponseSize = int32(largeSize) - if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err != nil { + if _, err = tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); err != nil { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want ", err) } req.ResponseSize = int32(extraLargeSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } // Test for unary RPC send. req.Payload = largePayload req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(context.Background(), req); err != nil { + if _, err := tc.UnaryCall(ctx, req); err != nil { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want ", err) } req.Payload = extraLargePayload - if _, err = tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err = tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -2137,8 +2151,11 @@ func testMaxMsgSizeClientDefault(t *testing.T, e env) { ResponseSize: int32(largeSize), Payload: smallPayload, } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // Test for unary RPC recv. - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -2206,15 +2223,18 @@ func testMaxMsgSizeClientAPI(t *testing.T, e env) { ResponseSize: int32(largeSize), Payload: smallPayload, } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // Test for unary RPC recv. - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } // Test for unary RPC send. req.Payload = largePayload req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -2291,15 +2311,18 @@ func testMaxMsgSizeServerAPI(t *testing.T, e env) { ResponseSize: int32(largeSize), Payload: smallPayload, } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // Test for unary RPC send. - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } // Test for unary RPC recv. req.Payload = largePayload req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -2380,7 +2403,9 @@ func testTap(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } if ttap.cnt != 1 { @@ -2397,7 +2422,7 @@ func testTap(t *testing.T, e env) { ResponseSize: 45, Payload: payload, } - if _, err := tc.UnaryCall(context.Background(), req); status.Code(err) != codes.Unavailable { + if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Unavailable { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) } } @@ -2727,7 +2752,9 @@ func testEmptyUnaryWithUserAgent(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) var header metadata.MD - reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Header(&header)) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + reply, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Header(&header)) if err != nil || !proto.Equal(&testpb.Empty{}, reply) { t.Fatalf("TestService/EmptyCall(_, _) = %v, %v, want %v, ", reply, err, &testpb.Empty{}) } @@ -2788,7 +2815,10 @@ func testLargeUnary(t *testing.T, e env) { ResponseSize: respSize, Payload: payload, } - reply, err := tc.UnaryCall(context.Background(), req) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + reply, err := tc.UnaryCall(ctx, req) if err != nil { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, ", err) } @@ -2832,13 +2862,16 @@ func testExceedMsgLimit(t *testing.T, e env) { ResponseSize: smallSize, Payload: largePayload, } - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } // Make sure the client cannot receive a unary RPC of largeSize. req.ResponseSize = largeSize req.Payload = smallPayload - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -2893,7 +2926,9 @@ func testPeerClientSide(t *testing.T, e env) { defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) peer := new(peer.Peer) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } pa := peer.Addr.String() @@ -2952,8 +2987,10 @@ func testPeerFailedRPC(t *testing.T, e env) { defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // first make a successful request to the server - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } @@ -2969,7 +3006,7 @@ func testPeerFailedRPC(t *testing.T, e env) { } peer := new(peer.Peer) - if _, err := tc.UnaryCall(context.Background(), req, grpc.Peer(peer)); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(ctx, req, grpc.Peer(peer)); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } else { pa := peer.Addr.String() @@ -3822,7 +3859,10 @@ func testServerStreaming(t *testing.T, e env) { ResponseType: testpb.PayloadType_COMPRESSABLE, ResponseParameters: respParam, } - stream, err := tc.StreamingOutputCall(context.Background(), req) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := tc.StreamingOutputCall(ctx, req) if err != nil { t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want ", tc, err) } @@ -3930,7 +3970,9 @@ func testServerStreamingConcurrent(t *testing.T, e env) { doStreamingCall := func() { req := &testpb.StreamingOutputCallRequest{} - stream, err := tc.StreamingOutputCall(context.Background(), req) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := tc.StreamingOutputCall(ctx, req) if err != nil { t.Errorf("%v.StreamingOutputCall(_) = _, %v, want ", tc, err) return @@ -4219,11 +4261,14 @@ func testCompressServerHasNoSupport(t *testing.T, e env) { ResponseSize: respSize, Payload: payload, } - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.Unimplemented { + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.Unimplemented { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code %s", err, codes.Unimplemented) } // Streaming RPC - stream, err := tc.FullDuplexCall(context.Background()) + stream, err := tc.FullDuplexCall(ctx) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } @@ -4371,7 +4416,9 @@ func testUnaryClientInterceptor(t *testing.T, e env) { defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.NotFound { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.NotFound { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.NotFound) } } @@ -4411,7 +4458,9 @@ func testStreamClientInterceptor(t *testing.T, e env) { ResponseParameters: respParam, Payload: payload, } - if _, err := tc.StreamingOutputCall(context.Background(), req); status.Code(err) != codes.NotFound { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := tc.StreamingOutputCall(ctx, req); status.Code(err) != codes.NotFound { t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, error code %s", tc, err, codes.NotFound) } } @@ -4433,7 +4482,9 @@ func testUnaryServerInterceptor(t *testing.T, e env) { defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.PermissionDenied { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.PermissionDenied { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied) } } @@ -4477,14 +4528,16 @@ func testStreamServerInterceptor(t *testing.T, e env) { ResponseParameters: respParam, Payload: payload, } - s1, err := tc.StreamingOutputCall(context.Background(), req) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + s1, err := tc.StreamingOutputCall(ctx, req) if err != nil { t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, ", tc, err) } if _, err := s1.Recv(); status.Code(err) != codes.PermissionDenied { t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied) } - s2, err := tc.FullDuplexCall(context.Background()) + s2, err := tc.FullDuplexCall(ctx) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } @@ -4788,7 +4841,10 @@ func testClientResourceExhaustedCancelFullDuplex(t *testing.T, e env) { te.maxClientReceiveMsgSize = newInt(10) cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - stream, err := tc.FullDuplexCall(context.Background()) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := tc.FullDuplexCall(ctx) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } @@ -4881,7 +4937,6 @@ func (s) TestFlowControlLogicalRace(t *testing.T) { go s.Serve(lis) ctx := context.Background() - cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock()) if err != nil { t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err) @@ -5413,7 +5468,7 @@ func (s) TestTapTimeout(t *testing.T) { res, err := ss.client.EmptyCall(ctx, &testpb.Empty{}) cancel() if s, ok := status.FromError(err); !ok || s.Code() != codes.Canceled { - t.Fatalf("ss.client.EmptyCall(context.Background(), _) = %v, %v; want nil, ", res, err) + t.Fatalf("ss.client.EmptyCall(ctx, _) = %v, %v; want nil, ", res, err) } } @@ -5497,7 +5552,9 @@ func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - stream, err := tc.FullDuplexCall(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := tc.FullDuplexCall(ctx) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } @@ -5597,11 +5654,13 @@ func testEncodeDoesntPanic(t *testing.T, e env) { defer te.tearDown() te.customCodec = nil tc := testpb.NewTestServiceClient(te.clientConn()) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // Failure case, should not panic. - tc.EmptyCall(context.Background(), &testpb.Empty{}) + tc.EmptyCall(ctx, &testpb.Empty{}) erc.noError = true // Passing case. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("EmptyCall(_, _) = _, %v, want _, ", err) } } @@ -5709,8 +5768,10 @@ func testGetMethodConfigTD(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } @@ -5723,13 +5784,13 @@ func testGetMethodConfigTD(t *testing.T, e env) { ch <- sc // Wait for the new service config to propagate. for { - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded { continue } break } // The following RPCs are expected to become fail-fast. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) } } @@ -5759,11 +5820,13 @@ func testServiceConfigWaitForReadyTD(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } - if _, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { + if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } @@ -5789,10 +5852,10 @@ func testServiceConfigWaitForReadyTD(t *testing.T, e env) { break } // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } - if _, err := tc.FullDuplexCall(context.Background()); status.Code(err) != codes.DeadlineExceeded { + if _, err := tc.FullDuplexCall(ctx); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } } @@ -5917,15 +5980,18 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { ResponseSize: int32(extraLargeSize), Payload: smallPayload, } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // Test for unary RPC recv. - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } // Test for unary RPC send. req.Payload = extraLargePayload req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -5975,14 +6041,14 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { req.Payload = smallPayload req.ResponseSize = int32(largeSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } // Test for unary RPC send. req.Payload = largePayload req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -6024,24 +6090,24 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { req.Payload = smallPayload req.ResponseSize = int32(largeSize) - if _, err := tc.UnaryCall(context.Background(), req); err != nil { + if _, err := tc.UnaryCall(ctx, req); err != nil { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want ", err) } req.ResponseSize = int32(extraLargeSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } // Test for unary RPC send. req.Payload = largePayload req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(context.Background(), req); err != nil { + if _, err := tc.UnaryCall(ctx, req); err != nil { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want ", err) } req.Payload = extraLargePayload - if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -6098,7 +6164,9 @@ func (s) TestMethodFromServerStream(t *testing.T) { te.startServer(nil) defer te.tearDown() - _ = te.clientConn().Invoke(context.Background(), testMethod, nil, nil) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + _ = te.clientConn().Invoke(ctx, testMethod, nil, nil) if !ok || method != testMethod { t.Fatalf("Invoke with method %q, got %q, %v, want %q, true", testMethod, method, ok, testMethod) } @@ -6165,7 +6233,9 @@ func (s) TestInterceptorCanAccessCallOptions(t *testing.T) { var headers metadata.MD var trailers metadata.MD var pr peer.Peer - tc.UnaryCall(context.Background(), &testpb.SimpleRequest{}, + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + tc.UnaryCall(ctx, &testpb.SimpleRequest{}, grpc.MaxCallRecvMsgSize(100), grpc.MaxCallSendMsgSize(200), grpc.PerRPCCredentials(testPerRPCCredentials{}), @@ -6188,7 +6258,7 @@ func (s) TestInterceptorCanAccessCallOptions(t *testing.T) { observedOpts = observedOptions{} // reset - tc.StreamingInputCall(context.Background(), + tc.StreamingInputCall(ctx, grpc.WaitForReady(false), grpc.MaxCallSendMsgSize(2020), grpc.UseCompressor("comp-type"), diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index 0a60f8c927ac..99f7d8951ebd 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -315,9 +315,12 @@ func (s) TestHealthCheckWithGoAway(t *testing.T) { } }`)}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // make some rpcs to make sure connection is working. if err := verifyResultWithDelay(func() (bool, error) { - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } return true, nil @@ -326,8 +329,6 @@ func (s) TestHealthCheckWithGoAway(t *testing.T) { } // the stream rpc will persist through goaway event. - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) @@ -407,9 +408,11 @@ func (s) TestHealthCheckWithConnClose(t *testing.T) { } }`)}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // make some rpcs to make sure connection is working. if err := verifyResultWithDelay(func() (bool, error) { - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } return true, nil @@ -470,9 +473,11 @@ func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) { ServiceConfig: sc, }) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // make some rpcs to make sure connection is working. if err := verifyResultWithDelay(func() (bool, error) { - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } return true, nil @@ -481,8 +486,6 @@ func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) { } // the stream rpc will persist through goaway event. - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) @@ -561,9 +564,11 @@ func (s) TestHealthCheckWithClientConnClose(t *testing.T) { } }`)}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // make some rpcs to make sure connection is working. if err := verifyResultWithDelay(func() (bool, error) { - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } return true, nil @@ -765,9 +770,11 @@ func testHealthCheckDisableWithDialOption(t *testing.T, addr string) { } }`)}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // send some rpcs to make sure transport has been created and is ready for use. if err := verifyResultWithDelay(func() (bool, error) { - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } return true, nil @@ -804,9 +811,11 @@ func testHealthCheckDisableWithBalancer(t *testing.T, addr string) { } }`)}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // send some rpcs to make sure transport has been created and is ready for use. if err := verifyResultWithDelay(func() (bool, error) { - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } return true, nil @@ -837,9 +846,11 @@ func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) { r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: addr}}}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // send some rpcs to make sure transport has been created and is ready for use. if err := verifyResultWithDelay(func() (bool, error) { - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { return false, fmt.Errorf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } return true, nil diff --git a/test/server_test.go b/test/server_test.go index c6a5fe74bd55..41466157a19f 100644 --- a/test/server_test.go +++ b/test/server_test.go @@ -135,9 +135,11 @@ func (s) TestChainUnaryServerInterceptor(t *testing.T) { } defer ss.Stop() - resp, err := ss.client.UnaryCall(context.Background(), &testpb.SimpleRequest{}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + resp, err := ss.client.UnaryCall(ctx, &testpb.SimpleRequest{}) if s, ok := status.FromError(err); !ok || s.Code() != codes.OK { - t.Fatalf("ss.client.UnaryCall(context.Background(), _) = %v, %v; want nil, ", resp, err) + t.Fatalf("ss.client.UnaryCall(ctx, _) = %v, %v; want nil, ", resp, err) } respBytes := resp.Payload.GetBody() @@ -181,9 +183,11 @@ func (s) TestChainOnBaseUnaryServerInterceptor(t *testing.T) { } defer ss.Stop() - resp, err := ss.client.EmptyCall(context.Background(), &testpb.Empty{}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + resp, err := ss.client.EmptyCall(ctx, &testpb.Empty{}) if s, ok := status.FromError(err); !ok || s.Code() != codes.OK { - t.Fatalf("ss.client.EmptyCall(context.Background(), _) = %v, %v; want nil, ", resp, err) + t.Fatalf("ss.client.EmptyCall(ctx, _) = %v, %v; want nil, ", resp, err) } } @@ -268,7 +272,9 @@ func (s) TestChainStreamServerInterceptor(t *testing.T) { } defer ss.Stop() - stream, err := ss.client.FullDuplexCall(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := ss.client.FullDuplexCall(ctx) if err != nil { t.Fatalf("failed to FullDuplexCall: %v", err) } diff --git a/test/stream_cleanup_test.go b/test/stream_cleanup_test.go index cb31b4eb2876..77d9477cf17e 100644 --- a/test/stream_cleanup_test.go +++ b/test/stream_cleanup_test.go @@ -50,10 +50,12 @@ func (s) TestStreamCleanup(t *testing.T) { } defer ss.Stop() - if _, err := ss.client.UnaryCall(context.Background(), &testpb.SimpleRequest{}); status.Code(err) != codes.ResourceExhausted { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := ss.client.UnaryCall(ctx, &testpb.SimpleRequest{}); status.Code(err) != codes.ResourceExhausted { t.Fatalf("should fail with ResourceExhausted, message's body size: %v, maximum message size the client can receive: %v", bodySize, callRecvMsgSize) } - if _, err := ss.client.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("should succeed, err: %v", err) } }