Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added logs for reasons causing connection and transport close #5840

Merged
merged 8 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Responded to Doug and Easwar's comments
  • Loading branch information
zasweq committed Dec 8, 2022
commit 30d51f7ba6c8eeae2add360f1a277a46208944ff
2 changes: 1 addition & 1 deletion clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onGoAway, onClose)
if err != nil {
if logger.V(2) {
logger.Errorf("Creating new client transport to %q: %v", addr, err)
logger.Infof("Creating new client transport to %q: %v", addr, err)
}
// newTr is either nil, or closed.
hcancel()
Expand Down
4 changes: 2 additions & 2 deletions internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
}
}
if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
return errors.New("finished processing active streams while in draining mode, no more streams to finish processing")
return errors.New("finished processing active streams while in draining mode")
}
return nil
}
Expand Down Expand Up @@ -793,7 +793,7 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
if l.side == clientSide {
l.draining = true
if len(l.estdStreams) == 0 {
return errors.New("no active streams left to process when GOAWAY frame received, so can trigger closure")
return errors.New("received GOAWAY with no active streams")
}
}
return nil
Expand Down
10 changes: 6 additions & 4 deletions internal/transport/handler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,12 @@ type serverHandlerTransport struct {
}

func (ht *serverHandlerTransport) Close(err error) {
if logger.V(logLevel) {
logger.Infof("Closing serverHandlerTransport: %v", err)
}
ht.closeOnce.Do(ht.closeCloseChanOnce)
ht.closeOnce.Do(func() {
if logger.V(logLevel) {
logger.Infof("Closing serverHandlerTransport: %v", err)
}
ht.closeCloseChanOnce()
})
dfawley marked this conversation as resolved.
Show resolved Hide resolved
}

func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) }
Expand Down
12 changes: 5 additions & 7 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
if connectCtx.Err() != nil {
// connectCtx expired before exiting the function. Hard close the connection.
if logger.V(logLevel) {
logger.Infof("transport: closing connection due to connect context expiring.")
logger.Infof("newClientTransport: closing connection due to connect context expiring.")
dfawley marked this conversation as resolved.
Show resolved Hide resolved
}
conn.Close()
}
Expand Down Expand Up @@ -448,10 +448,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
err := t.loopy.run()
if err != nil {
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
// Do not close the transport. Let reader goroutine handle it since
// there might be data in the buffers.
Expand Down Expand Up @@ -955,7 +953,7 @@ func (t *http2Client) Close(err error) {
return
}
if logger.V(logLevel) {
logger.Infof("Closing transport, will close the connection. Err: %v", err)
logger.Infof("Closing transport, will close the connection: %v", err)
}
// Call t.onClose ASAP to prevent the client from attempting to create new
// streams.
Expand Down Expand Up @@ -1016,7 +1014,7 @@ func (t *http2Client) GracefulClose() {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close(errNoStreamsDraining)
t.Close(connectionErrorf(true, nil, "no active streams left to process while draining"))
return
}
t.controlBuf.put(&incomingGoAway{})
Expand Down
15 changes: 7 additions & 8 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
if err := t.loopy.run(); err != nil {
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
err := t.loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
t.conn.Close()
t.controlBuf.finish()
Expand All @@ -345,7 +344,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
return t, nil
}

// operateHeader takes action on the decoded headers. Returns an error if fatal
// operateHeaders takes action on the decoded headers. Returns an error if fatal
// error encountered and transport needs to close, otherwise returns nil.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) error {
// Acquire max stream ID lock for entire duration
Expand All @@ -368,7 +367,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(

if streamID%2 != 1 || streamID <= t.maxStreamID {
// illegal gRPC stream id.
return fmt.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
return fmt.Errorf("received an illegal stream id: %v", streamID)
}
t.maxStreamID = streamID

Expand Down Expand Up @@ -1196,7 +1195,7 @@ func (t *http2Server) Close(err error) {
return
}
if logger.V(logLevel) {
logger.Infof("Closing transport, will close the connection. Err: %v", err)
logger.Infof("Closing transport, will close the connection: %v", err)
}
t.state = closing
streams := t.activeStreams
Expand Down Expand Up @@ -1314,7 +1313,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
sid := t.maxStreamID
retErr := g.closeConn
if len(t.activeStreams) == 0 {
retErr = errors.New("second goaway written and no active streams left to process")
retErr = errors.New("second GOAWAY written and no active streams left to process")
}
t.mu.Unlock()
t.maxStreamMu.Unlock()
Expand Down
4 changes: 0 additions & 4 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,10 +761,6 @@ func (e ConnectionError) Unwrap() error {
var (
// ErrConnClosing indicates that the transport is closing.
ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
// errNoStreamsDraining indicates that this transport is finished processing
// while in draining mode due to no more active streams to process, thus
// making the transport and connection immediately ready to close.
errNoStreamsDraining = connectionErrorf(true, nil, "no active streams left to process while draining")
// errStreamDrain indicates that the stream is rejected because the
// connection is draining. This could be caused by goaway or balancer
// removing the address.
Expand Down
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns == nil {
st.Close(errors.New("s.conns (representing active server transports) is nil"))
st.Close(errors.New("Server.addConn called when server has already been stopped"))
return false
}
if s.drain {
Expand Down