Skip to content

Commit

Permalink
feat: print detail loginfo by ctx (cloudwego#250)
Browse files Browse the repository at this point in the history
  • Loading branch information
SinnerA authored Dec 8, 2021
1 parent 059a156 commit 5d81475
Show file tree
Hide file tree
Showing 13 changed files with 41 additions and 42 deletions.
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf
github.com/choleraehyq/pid v0.0.12 h1:JLiTCsz2gStQZ3YWet+p9hktRnWzk7VJigpzvGV+I2o=
github.com/choleraehyq/pid v0.0.12/go.mod h1:uhzeFgxJZWQsZulelVQZwdASxQ9TIPZYL4TPkQMtL/U=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudwego/netpoll v0.1.0 h1:UC4vCS05d+jxkqMY+QE5lNpc9Cx+TUl6wUOGMtrpO34=
github.com/cloudwego/netpoll v0.1.0/go.mod h1:rZOiNI0FYjuvNybXKKhAPUja03loJi/cdv2F55AE6E8=
github.com/cloudwego/netpoll v0.1.1-0.20211206100800-69f5d8e997b2 h1:0BJ/lX4Z5AdLS5amBAWOBdFYD9J0YUZSDcFAcAy2mmo=
github.com/cloudwego/netpoll v0.1.1-0.20211206100800-69f5d8e997b2/go.mod h1:rZOiNI0FYjuvNybXKKhAPUja03loJi/cdv2F55AE6E8=
Expand Down
4 changes: 2 additions & 2 deletions internal/mocks/transhandlerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ func (t *MockCliTransHandler) OnInactive(ctx context.Context, conn net.Conn) {
// OnError implements the remote.TransHandler interface.
func (t *MockCliTransHandler) OnError(ctx context.Context, err error, conn net.Conn) {
if pe, ok := err.(*kerrors.DetailedError); ok {
klog.Errorf("KITEX: send request error, remote=%s, error=%s\nstack=%s", conn.RemoteAddr(), err.Error(), pe.Stack())
klog.CtxErrorf(ctx, "KITEX: send request error, remote=%s, error=%s\nstack=%s", conn.RemoteAddr(), err.Error(), pe.Stack())
} else {
klog.Errorf("KITEX: send request error, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
klog.CtxErrorf(ctx, "KITEX: send request error, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/mocks/transhandlerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ func (t *MockSvrTransHandler) OnInactive(ctx context.Context, conn net.Conn) {
// OnError implements the remote.TransHandler interface.
func (t *MockSvrTransHandler) OnError(ctx context.Context, err error, conn net.Conn) {
if pe, ok := err.(*kerrors.DetailedError); ok {
klog.Errorf("KITEX: send request error, remote=%s, error=%s\nstack=%s", conn.RemoteAddr(), err.Error(), pe.Stack())
klog.CtxErrorf(ctx, "KITEX: send request error, remote=%s, error=%s\nstack=%s", conn.RemoteAddr(), err.Error(), pe.Stack())
} else {
klog.Errorf("KITEX: send request error, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
klog.CtxErrorf(ctx, "KITEX: send request error, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
}
}

Expand Down
8 changes: 4 additions & 4 deletions internal/stats/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (c *Controller) Append(col stats.Tracer) {

// DoStart starts the tracers.
func (c *Controller) DoStart(ctx context.Context, ri rpcinfo.RPCInfo) context.Context {
defer c.tryRecover()
defer c.tryRecover(ctx)
Record(ctx, ri, stats.RPCStart, nil)

for _, col := range c.tracers {
Expand All @@ -48,7 +48,7 @@ func (c *Controller) DoStart(ctx context.Context, ri rpcinfo.RPCInfo) context.Co

// DoFinish calls the tracers in reversed order.
func (c *Controller) DoFinish(ctx context.Context, ri rpcinfo.RPCInfo, err error) {
defer c.tryRecover()
defer c.tryRecover(ctx)
Record(ctx, ri, stats.RPCFinish, err)
if err != nil {
rpcStats := rpcinfo.AsMutableRPCStats(ri.Stats())
Expand All @@ -66,8 +66,8 @@ func (c *Controller) HasTracer() bool {
return c != nil && len(c.tracers) > 0
}

func (c *Controller) tryRecover() {
func (c *Controller) tryRecover(ctx context.Context) {
if err := recover(); err != nil {
klog.Warnf("Panic happened during tracer call. This doesn't affect the rpc call, but may lead to lack of monitor data such as metrics and logs: error=%s, stack=%s", err, string(debug.Stack()))
klog.CtxWarnf(ctx, "Panic happened during tracer call. This doesn't affect the rpc call, but may lead to lack of monitor data such as metrics and logs: error=%s, stack=%s", err, string(debug.Stack()))
}
}
4 changes: 2 additions & 2 deletions pkg/remote/trans/default_client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ func (t *cliTransHandler) OnInactive(ctx context.Context, conn net.Conn) {
// OnError implements the remote.ClientTransHandler interface.
func (t *cliTransHandler) OnError(ctx context.Context, err error, conn net.Conn) {
if pe, ok := err.(*kerrors.DetailedError); ok {
klog.Errorf("KITEX: send request error, remote=%s, error=%s\nstack=%s", conn.RemoteAddr(), err.Error(), pe.Stack())
klog.CtxErrorf(ctx, "KITEX: send request error, remote=%s, error=%s\nstack=%s", conn.RemoteAddr(), err.Error(), pe.Stack())
} else {
klog.Errorf("KITEX: send request error, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
klog.CtxErrorf(ctx, "KITEX: send request error, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/remote/trans/default_server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) error {
if conn != nil {
ri := rpcinfo.GetRPCInfo(ctx)
rService, rAddr := getRemoteInfo(ri, conn)
klog.Errorf("KITEX: panic happened, close conn, remoteAddress=%s, remoteService=%s, error=%v\nstack=%s", rAddr, rService, panicErr, string(debug.Stack()))
klog.CtxErrorf(ctx, "KITEX: panic happened, close conn, remoteAddress=%s, remoteService=%s, error=%v\nstack=%s", rAddr, rService, panicErr, string(debug.Stack()))
} else {
klog.Errorf("KITEX: panic happened, error=%v\nstack=%s", panicErr, string(debug.Stack()))
klog.CtxErrorf(ctx, "KITEX: panic happened, error=%v\nstack=%s", panicErr, string(debug.Stack()))
}
}
if closeConn && conn != nil {
Expand Down Expand Up @@ -199,9 +199,9 @@ func (t *svrTransHandler) OnError(ctx context.Context, err error, conn net.Conn)
remote := rpcinfo.AsMutableEndpointInfo(ri.From())
remote.SetTag(rpcinfo.RemoteClosedTag, "1")
} else if pe, ok := err.(*kerrors.DetailedError); ok {
klog.Errorf("KITEX: processing request error, remoteService=%s, remoteAddr=%v, error=%s\nstack=%s", rService, rAddr, err.Error(), pe.Stack())
klog.CtxErrorf(ctx, "KITEX: processing request error, remoteService=%s, remoteAddr=%v, error=%s\nstack=%s", rService, rAddr, err.Error(), pe.Stack())
} else {
klog.Errorf("KITEX: processing request error, remoteService=%s, remoteAddr=%v, error=%s", rService, rAddr, err.Error())
klog.CtxErrorf(ctx, "KITEX: processing request error, remoteService=%s, remoteAddr=%v, error=%s", rService, rAddr, err.Error())
}
}

Expand Down Expand Up @@ -237,7 +237,7 @@ func (t *svrTransHandler) writeErrorReplyIfNeeded(ctx context.Context, recvMsg r
}
err = t.transPipe.Write(ctx, conn, errMsg)
if err != nil {
klog.Errorf("KITEX: write error reply failed, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
klog.CtxErrorf(ctx, "KITEX: write error reply failed, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/remote/trans/netpoll/http_client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ func (t *httpCliTransHandler) OnInactive(ctx context.Context, conn net.Conn) {
// This is called when panic happens.
func (t *httpCliTransHandler) OnError(ctx context.Context, err error, conn net.Conn) {
if pe, ok := err.(*kerrors.DetailedError); ok {
klog.Errorf("KITEX: send http request error, remote=%s, error=%s\nstack=%s", conn.RemoteAddr(), err.Error(), pe.Stack())
klog.CtxErrorf(ctx, "KITEX: send http request error, remote=%s, error=%s\nstack=%s", conn.RemoteAddr(), err.Error(), pe.Stack())
} else {
klog.Errorf("KITEX: send http request error, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
klog.CtxErrorf(ctx, "KITEX: send http request error, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/remote/trans/netpollmux/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ func (t *cliTransHandler) OnInactive(ctx context.Context, conn net.Conn) {
// OnError implements the remote.ClientTransHandler interface.
func (t *cliTransHandler) OnError(ctx context.Context, err error, conn net.Conn) {
if pe, ok := err.(*kerrors.DetailedError); ok {
klog.Errorf("KITEX: send request error, remote=%s, error=%s\nstack=%s", conn.RemoteAddr(), err.Error(), pe.Stack())
klog.CtxErrorf(ctx, "KITEX: send request error, remote=%s, error=%s\nstack=%s", conn.RemoteAddr(), err.Error(), pe.Stack())
} else {
klog.Errorf("KITEX: send request error, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
klog.CtxErrorf(ctx, "KITEX: send request error, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/remote/trans/netpollmux/mux_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ func (c *muxCliConn) OnRequest(ctx context.Context, connection netpoll.Connectio
length, seqID, err := parseHeader(connection.Reader())
if err != nil {
err = fmt.Errorf("%w: addr(%s)", err, connection.RemoteAddr())
return c.onError(err, connection)
return c.onError(ctx, err, connection)
}
// reader is nil if return error
reader, err := connection.Reader().Slice(length)
if err != nil {
err = fmt.Errorf("mux read package slice failed: addr(%s), %w", connection.RemoteAddr(), err)
return c.onError(err, connection)
return c.onError(ctx, err, connection)
}
go func() {
asyncCallback, ok := c.seqIDMap.load(seqID)
Expand All @@ -90,8 +90,8 @@ func (c *muxCliConn) close() error {
return nil
}

func (c *muxCliConn) onError(err error, connection netpoll.Connection) error {
klog.Errorf("KITEX: error=%s", err.Error())
func (c *muxCliConn) onError(ctx context.Context, err error, connection netpoll.Connection) error {
klog.CtxErrorf(ctx, "KITEX: error=%s", err.Error())
connection.Close()
return err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/remote/trans/netpollmux/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,9 @@ func (t *svrTransHandler) OnError(ctx context.Context, err error, conn net.Conn)
remote := rpcinfo.AsMutableEndpointInfo(ri.From())
remote.SetTag(rpcinfo.RemoteClosedTag, "1")
} else if pe, ok := err.(*kerrors.DetailedError); ok {
klog.Errorf("KITEX: processing request error, remoteService=%s, remoteAddr=%v, error=%s\nstack=%s", rService, rAddr, err.Error(), pe.Stack())
klog.CtxErrorf(ctx, "KITEX: processing request error, remoteService=%s, remoteAddr=%v, error=%s\nstack=%s", rService, rAddr, err.Error(), pe.Stack())
} else {
klog.Errorf("KITEX: processing request error, remoteService=%s, remoteAddr=%v, error=%s", rService, rAddr, err.Error())
klog.CtxErrorf(ctx, "KITEX: processing request error, remoteService=%s, remoteAddr=%v, error=%s", rService, rAddr, err.Error())
}
}

Expand Down Expand Up @@ -334,7 +334,7 @@ func (t *svrTransHandler) writeErrorReplyIfNeeded(ctx context.Context, recvMsg r
}
err = t.transPipe.Write(ctx, conn, errMsg)
if err != nil {
klog.Errorf("KITEX: write error reply failed, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
klog.CtxErrorf(ctx, "KITEX: write error reply failed, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
}
}

Expand All @@ -347,9 +347,9 @@ func (t *svrTransHandler) tryRecover(ctx context.Context, conn net.Conn) {

if conn != nil {
conn.Close()
klog.Errorf("KITEX: panic happened, close conn[%s], %s\n%s", conn.RemoteAddr(), err, string(debug.Stack()))
klog.CtxErrorf(ctx, "KITEX: panic happened, close conn[%s], %s\n%s", conn.RemoteAddr(), err, string(debug.Stack()))
} else {
klog.Errorf("KITEX: panic happened, %s\n%s", err, string(debug.Stack()))
klog.CtxErrorf(ctx, "KITEX: panic happened, %s\n%s", err, string(debug.Stack()))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/remote/trans/nphttp2/grpc/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func newHTTP2Client(ctx context.Context, conn netpoll.Connection, onGoAway func(
go func() {
err := t.loopy.run()
if err != nil {
klog.Errorf("transport: loopyWriter.run returning. Err: %v", err)
klog.CtxErrorf(ctx, "transport: loopyWriter.run returning. Err: %v", err)
}
// If it's a connection error, let reader goroutine handle it
// since there might be data in the buffers.
Expand Down
16 changes: 8 additions & 8 deletions pkg/remote/trans/nphttp2/grpc/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func newHTTP2Server(ctx context.Context, conn netpoll.Connection) (_ ServerTrans
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
if err := t.loopy.run(); err != nil {
klog.Errorf("transport: loopyWriter.run returning. Err: %v", err)
klog.CtxErrorf(ctx, "transport: loopyWriter.run returning. Err: %v", err)
}
t.conn.Close()
close(t.writerDone)
Expand Down Expand Up @@ -270,7 +270,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
if streamID%2 != 1 || streamID <= t.maxStreamID {
t.mu.Unlock()
// illegal gRPC stream id.
klog.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
klog.CtxErrorf(s.ctx, "transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
s.cancel()
return true
}
Expand Down Expand Up @@ -317,7 +317,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
if err != nil {
if se, ok := err.(http2.StreamError); ok {
klog.Warnf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
klog.CtxWarnf(t.ctx, "transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
t.mu.Lock()
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
Expand All @@ -337,7 +337,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
t.Close()
return
}
klog.Warnf("transport: http2Server.HandleStreams failed to read frame: %v", err)
klog.CtxWarnf(t.ctx, "transport: http2Server.HandleStreams failed to read frame: %v", err)
t.Close()
return
}
Expand All @@ -360,7 +360,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
case *http2.GoAwayFrame:
// TODO: Handle GoAway from the client appropriately.
default:
klog.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
klog.CtxErrorf(t.ctx, "transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
}
}
}
Expand Down Expand Up @@ -557,7 +557,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {

if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
klog.Errorf("transport: Got too many pings from the client, closing the connection.")
klog.CtxErrorf(t.ctx, "transport: Got too many pings from the client, closing the connection.")
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
}
}
Expand Down Expand Up @@ -590,7 +590,7 @@ func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
var sz int64
for _, f := range hdrFrame.hf {
if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
klog.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
klog.CtxErrorf(t.ctx, "header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
return false
}
}
Expand Down Expand Up @@ -677,7 +677,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
klog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
klog.CtxErrorf(t.ctx, "transport: failed to marshal rpc status: %v, error: %v", p, err)
} else {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/remote/trans/nphttp2/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) error {
panicErr := recover()
if panicErr != nil {
if conn != nil {
klog.Errorf("KITEX: panic happened, close conn, remoteAddress=%s, error=%s\nstack=%s", conn.RemoteAddr(), panicErr, string(debug.Stack()))
klog.CtxErrorf(ctx, "KITEX: panic happened, close conn, remoteAddress=%s, error=%s\nstack=%s", conn.RemoteAddr(), panicErr, string(debug.Stack()))
} else {
klog.Errorf("KITEX: panic happened, error=%v\nstack=%s", panicErr, string(debug.Stack()))
klog.CtxErrorf(ctx, "KITEX: panic happened, error=%v\nstack=%s", panicErr, string(debug.Stack()))
}
}
t.finishTracer(ctx, ri, err, panicErr)
Expand Down Expand Up @@ -183,9 +183,9 @@ func (t *svrTransHandler) OnInactive(ctx context.Context, conn net.Conn) {
// 传输层 error 回调
func (t *svrTransHandler) OnError(ctx context.Context, err error, conn net.Conn) {
if pe, ok := err.(*kerrors.DetailedError); ok {
klog.Errorf("KITEX: processing request error, remote=%s, error=%s\nstack=%s", conn.RemoteAddr(), err.Error(), pe.Stack())
klog.CtxErrorf(ctx, "KITEX: processing request error, remote=%s, error=%s\nstack=%s", conn.RemoteAddr(), err.Error(), pe.Stack())
} else {
klog.Errorf("KITEX: processing request error, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
klog.CtxErrorf(ctx, "KITEX: processing request error, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
}
}

Expand Down

0 comments on commit 5d81475

Please sign in to comment.