Skip to content

Commit

Permalink
keepalive: apply minimum ping time of 10s to client and 1s to server (#…
Browse files Browse the repository at this point in the history
…2642)

* keepalive: apply minimum ping time of 10s to client and 1s to server

* review fixes
  • Loading branch information
dfawley authored and lyuxuan committed Feb 21, 2019
1 parent ae7b4f2 commit ed70822
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 30 deletions.
58 changes: 47 additions & 11 deletions clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,25 +939,61 @@ func (s) TestClientUpdatesParamsAfterGoAway(t *testing.T) {
t.Fatalf("Failed to listen. Err: %v", err)
}
defer lis.Close()
connected := make(chan struct{})
go func() {
conn, err := lis.Accept()
if err != nil {
t.Errorf("error accepting connection: %v", err)
return
}
defer conn.Close()
f := http2.NewFramer(conn, conn)
// Start a goroutine to read from the conn to prevent the client from
// blocking after it writes its preface.
go func() {
for {
if _, err := f.ReadFrame(); err != nil {
return
}
}
}()
if err := f.WriteSettings(http2.Setting{}); err != nil {
t.Errorf("error writing settings: %v", err)
return
}
<-connected
if err := f.WriteGoAway(0, http2.ErrCodeEnhanceYourCalm, []byte("too_many_pings")); err != nil {
t.Errorf("error writing GOAWAY: %v", err)
return
}
}()
addr := lis.Addr().String()
s := NewServer()
go s.Serve(lis)
defer s.Stop()
cc, err := Dial(addr, WithBlock(), WithInsecure(), WithKeepaliveParams(keepalive.ClientParameters{
Time: 50 * time.Millisecond,
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
cc, err := DialContext(ctx, addr, WithBlock(), WithInsecure(), WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 100 * time.Millisecond,
PermitWithoutStream: true,
}))
if err != nil {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
}
defer cc.Close()
time.Sleep(1 * time.Second)
cc.mu.RLock()
defer cc.mu.RUnlock()
v := cc.mkp.Time
if v < 100*time.Millisecond {
t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 100ms", v)
close(connected)
for {
time.Sleep(10 * time.Millisecond)
cc.mu.RLock()
v := cc.mkp.Time
if v == 20*time.Second {
// Success
cc.mu.RUnlock()
return
}
if ctx.Err() != nil {
// Timeout
t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 20s", v)
}
cc.mu.RUnlock()
}
}

Expand Down
5 changes: 5 additions & 0 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/envconfig"
Expand Down Expand Up @@ -388,6 +389,10 @@ func WithUserAgent(s string) DialOption {
// WithKeepaliveParams returns a DialOption that specifies keepalive parameters
// for the client transport.
func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
if kp.Time < internal.KeepaliveMinPingTime {
grpclog.Warningf("Adjusting keepalive ping interval to minimum period of %v", internal.KeepaliveMinPingTime)
kp.Time = internal.KeepaliveMinPingTime
}
return newFuncDialOption(func(o *dialOptions) {
o.copts.KeepaliveParams = kp
})
Expand Down
8 changes: 7 additions & 1 deletion internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
// symbols to avoid circular dependencies.
package internal

import "context"
import (
"context"
"time"
)

var (
// WithContextDialer is exported by dialoptions.go
Expand All @@ -33,6 +36,9 @@ var (
HealthCheckFunc HealthChecker
// BalancerUnregister is exported by package balancer to unregister a balancer.
BalancerUnregister func(name string)
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second
)

// HealthChecker defines the signature of the client-side LB channel health checking function.
Expand Down
2 changes: 2 additions & 0 deletions keepalive/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
type ClientParameters struct {
// After a duration of this time if the client doesn't see any activity it
// pings the server to see if the transport is still alive.
// If set below 10s, a minimum value of 10s will be used instead.
Time time.Duration // The current default value is infinity.
// After having pinged for keepalive check, the client waits for a duration
// of Timeout and if no activity is seen even after that the connection is
Expand Down Expand Up @@ -62,6 +63,7 @@ type ServerParameters struct {
MaxConnectionAgeGrace time.Duration // The current default value is infinity.
// After a duration of this time if the server doesn't see any activity it
// pings the client to see if the transport is still alive.
// If set below 1s, a minimum value of 1s will be used instead.
Time time.Duration // The current default value is 2 hours.
// After having pinged for keepalive check, the server waits for a duration
// of Timeout and if no activity is seen even after that the connection is
Expand Down
5 changes: 5 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ func InitialConnWindowSize(s int32) ServerOption {

// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
if kp.Time > 0 && kp.Time < time.Second {
grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
kp.Time = time.Second
}

return func(o *options) {
o.keepaliveParams = kp
}
Expand Down
28 changes: 19 additions & 9 deletions test/channelz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -847,8 +848,8 @@ func doIdleCallToInvokeKeepAlive(tc testpb.TestServiceClient, t *testing.T) {
if err != nil {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
}
// 2500ms allow for 2 keepalives (1000ms per round trip)
time.Sleep(2500 * time.Millisecond)
// Allow for at least 2 keepalives (1s per ping interval)
time.Sleep(4 * time.Second)
cancel()
}

Expand Down Expand Up @@ -1125,15 +1126,24 @@ func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {

func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) {
channelz.NewChannelzStorage()
defer func(t time.Duration) { internal.KeepaliveMinPingTime = t }(internal.KeepaliveMinPingTime)
internal.KeepaliveMinPingTime = time.Second
e := tcpClearRREnv
te := newTest(t, e)
te.cliKeepAlive = &keepalive.ClientParameters{Time: 500 * time.Millisecond, Timeout: 500 * time.Millisecond}
te.customDialOptions = append(te.customDialOptions, grpc.WithKeepaliveParams(
keepalive.ClientParameters{
Time: time.Second,
Timeout: 500 * time.Millisecond,
PermitWithoutStream: true,
}))
te.customServerOptions = append(te.customServerOptions, grpc.KeepaliveEnforcementPolicy(
keepalive.EnforcementPolicy{
MinTime: 500 * time.Millisecond,
PermitWithoutStream: true,
}))
te.startServer(&testServer{security: e.security})
te.clientConn() // Dial the server
defer te.tearDown()
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
doIdleCallToInvokeKeepAlive(tc, t)

if err := verifyResultWithDelay(func() (bool, error) {
tchan, _ := channelz.GetTopChannels(0, 0)
if len(tchan) != 1 {
Expand All @@ -1157,7 +1167,7 @@ func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) {
break
}
skt := channelz.GetSocket(id)
if skt.SocketData.KeepAlivesSent != 2 { // doIdleCallToInvokeKeepAlive func is set up to send 2 KeepAlives.
if skt.SocketData.KeepAlivesSent != 2 {
return false, fmt.Errorf("there should be 2 KeepAlives sent, not %d", skt.SocketData.KeepAlivesSent)
}
return true, nil
Expand Down Expand Up @@ -1230,7 +1240,7 @@ func (s) TestCZServerSocketMetricsKeepAlive(t *testing.T) {
channelz.NewChannelzStorage()
e := tcpClearRREnv
te := newTest(t, e)
te.svrKeepAlive = &keepalive.ServerParameters{Time: 500 * time.Millisecond, Timeout: 500 * time.Millisecond}
te.customServerOptions = append(te.customServerOptions, grpc.KeepaliveParams(keepalive.ServerParameters{Time: time.Second, Timeout: 500 * time.Millisecond}))
te.startServer(&testServer{security: e.security})
defer te.tearDown()
cc := te.clientConn()
Expand Down
9 changes: 0 additions & 9 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ import (
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -510,8 +509,6 @@ type test struct {
customDialOptions []grpc.DialOption
customServerOptions []grpc.ServerOption
resolverScheme string
cliKeepAlive *keepalive.ClientParameters
svrKeepAlive *keepalive.ServerParameters

// All test dialing is blocking by default. Set this to true if dial
// should be non-blocking.
Expand Down Expand Up @@ -633,9 +630,6 @@ func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network,
case "clientTimeoutCreds":
sopts = append(sopts, grpc.Creds(&clientTimeoutCreds{}))
}
if te.svrKeepAlive != nil {
sopts = append(sopts, grpc.KeepaliveParams(*te.svrKeepAlive))
}
sopts = append(sopts, te.customServerOptions...)
s := grpc.NewServer(sopts...)
te.srv = s
Expand Down Expand Up @@ -873,9 +867,6 @@ func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string)
if te.srvAddr == "" {
te.srvAddr = "client.side.only.test"
}
if te.cliKeepAlive != nil {
opts = append(opts, grpc.WithKeepaliveParams(*te.cliKeepAlive))
}
opts = append(opts, te.customDialOptions...)
return opts, scheme
}
Expand Down

0 comments on commit ed70822

Please sign in to comment.