Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: simplify httpClient by moving onGoAway func to onClose #5885

Merged
merged 1 commit into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
transport: simplify httpClient by moving onGoAway func to onClose
  • Loading branch information
arvindbr8 committed Dec 21, 2022
commit 22f8f77fe21662ada85d2a097aca91a8b9b2b5f0
12 changes: 4 additions & 8 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1237,9 +1237,11 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
addr.ServerName = ac.cc.getServerName(addr)
hctx, hcancel := context.WithCancel(ac.ctx)

onClose := grpcsync.OnceFunc(func() {
onClose := func(r transport.GoAwayReason) {
ac.mu.Lock()
defer ac.mu.Unlock()
// adjust params based on GoAwayReason
ac.adjustParams(r)
if ac.state == connectivity.Shutdown {
// Already shut down. tearDown() already cleared the transport and
// canceled hctx via ac.ctx, and we expected this connection to be
Expand All @@ -1260,19 +1262,13 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
// Always go idle and wait for the LB policy to initiate a new
// connection attempt.
ac.updateConnectivityState(connectivity.Idle, nil)
})
onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
ac.mu.Unlock()
onClose()
}

connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
defer cancel()
copts.ChannelzParentID = ac.channelzID

newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onGoAway, onClose)
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onClose)
if err != nil {
if logger.V(2) {
logger.Infof("Creating new client transport to %q: %v", addr, err)
Expand Down
17 changes: 10 additions & 7 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ type http2Client struct {
channelzID *channelz.Identifier
czData *channelzData

onGoAway func(GoAwayReason)
onClose func()
onClose func(GoAwayReason)

bufferPool *bufferPool

Expand Down Expand Up @@ -197,7 +196,7 @@ func isTemporary(err error) bool {
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) {
scheme := "http"
ctx, cancel := context.WithCancel(ctx)
defer func() {
Expand Down Expand Up @@ -343,7 +342,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
czData: new(channelzData),
onGoAway: onGoAway,
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
onClose: onClose,
Expand Down Expand Up @@ -957,7 +955,9 @@ func (t *http2Client) Close(err error) {
}
// Call t.onClose ASAP to prevent the client from attempting to create new
// streams.
t.onClose()
if t.state != draining {
t.onClose(GoAwayInvalid)
}
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
Expand Down Expand Up @@ -1010,6 +1010,7 @@ func (t *http2Client) GracefulClose() {
if logger.V(logLevel) {
logger.Infof("transport: GracefulClose called")
}
t.onClose(GoAwayInvalid)
t.state = draining
active := len(t.activeStreams)
t.mu.Unlock()
Expand Down Expand Up @@ -1290,8 +1291,10 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// Notify the clientconn about the GOAWAY before we set the state to
// draining, to allow the client to stop attempting to create streams
// before disallowing new streams on this connection.
t.onGoAway(t.goAwayReason)
t.state = draining
if t.state != draining {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a behavior change that needs to be release noted?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be no user-visible behavior change here, because of the corresponding changes in grpc's usage of the transport.

t.onClose(t.goAwayReason)
t.state = draining
}
}
// All streams with IDs greater than the GoAwayId
// and smaller than the previous GoAway ID should be killed.
Expand Down
4 changes: 2 additions & 2 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,8 +583,8 @@ type ConnectOptions struct {

// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, addr, opts, onGoAway, onClose)
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, addr, opts, onClose)
}

// Options provides additional hints and information for message
Expand Down
14 changes: 7 additions & 7 deletions internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func setUpWithOptions(t *testing.T, port int, sc *ServerConfig, ht hType, copts
copts.ChannelzParentID = channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil)

connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
ct, connErr := NewClientTransport(connectCtx, context.Background(), addr, copts, func(GoAwayReason) {}, func() {})
ct, connErr := NewClientTransport(connectCtx, context.Background(), addr, copts, func(GoAwayReason) {})
if connErr != nil {
cancel() // Do not cancel in success path.
t.Fatalf("failed to create transport: %v", connErr)
Expand Down Expand Up @@ -483,7 +483,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, connCh chan net.C
connCh <- conn
}()
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
tr, err := NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {}, func() {})
tr, err := NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {})
if err != nil {
cancel() // Do not cancel in success path.
// Server clean-up.
Expand Down Expand Up @@ -1287,7 +1287,7 @@ func (s) TestClientHonorsConnectContext(t *testing.T) {
time.AfterFunc(100*time.Millisecond, cancel)

copts := ConnectOptions{ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil)}
_, err = NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {}, func() {})
_, err = NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {})
if err == nil {
t.Fatalf("NewClientTransport() returned successfully; wanted error")
}
Expand All @@ -1299,7 +1299,7 @@ func (s) TestClientHonorsConnectContext(t *testing.T) {
// Test context deadline.
connectCtx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
_, err = NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {}, func() {})
_, err = NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {})
if err == nil {
t.Fatalf("NewClientTransport() returned successfully; wanted error")
}
Expand Down Expand Up @@ -1378,7 +1378,7 @@ func (s) TestClientWithMisbehavedServer(t *testing.T) {
defer cancel()

copts := ConnectOptions{ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil)}
ct, err := NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {}, func() {})
ct, err := NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {})
if err != nil {
t.Fatalf("Error while creating client transport: %v", err)
}
Expand Down Expand Up @@ -2282,7 +2282,7 @@ func (s) TestClientHandshakeInfo(t *testing.T) {
TransportCredentials: creds,
ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil),
}
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func(GoAwayReason) {}, func() {})
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func(GoAwayReason) {})
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
}
Expand Down Expand Up @@ -2323,7 +2323,7 @@ func (s) TestClientHandshakeInfoDialer(t *testing.T) {
Dialer: dialer,
ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil),
}
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func(GoAwayReason) {}, func() {})
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func(GoAwayReason) {})
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
}
Expand Down