Skip to content

Commit

Permalink
fix: rpcinfo pool disable also for the server
Browse files Browse the repository at this point in the history
  • Loading branch information
felix.fengmin committed Dec 12, 2023
1 parent 4db8525 commit 00b0718
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 8 deletions.
6 changes: 4 additions & 2 deletions pkg/remote/trans/nphttp2/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,11 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) error {
ri := svrTrans.pool.Get().(rpcinfo.RPCInfo)
rCtx := rpcinfo.NewCtxWithRPCInfo(s.Context(), ri)
defer func() {
// reset rpcinfo
// reset rpcinfo for performance (PR #584)
ri = t.opt.InitOrResetRPCInfoFunc(ri, conn.RemoteAddr())
svrTrans.pool.Put(ri)
if rpcinfo.PoolEnabled() {
svrTrans.pool.Put(ri)
}
}()

// set grpc transport flag before execute metahandler
Expand Down
5 changes: 3 additions & 2 deletions pkg/rpcinfo/rpcinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func EnablePool(enable bool) {
}
}

func poolEnabled() bool {
// PoolEnabled returns true if rpcInfoPool is enabled.
func PoolEnabled() bool {
return atomic.LoadInt32(&enablePool) == 1
}

Expand Down Expand Up @@ -88,7 +89,7 @@ func (r *rpcInfo) zero() {

// Recycle reuses the rpcInfo.
func (r *rpcInfo) Recycle() {
if !poolEnabled() {
if !PoolEnabled() {
return
}
if v, ok := r.from.(internal.Reusable); ok {
Expand Down
4 changes: 2 additions & 2 deletions pkg/rpcinfo/rpcinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
func TestEnablePool(t *testing.T) {
t.Run("disable", func(t *testing.T) {
EnablePool(false)
test.Assert(t, !poolEnabled())
test.Assert(t, !PoolEnabled())
})

t.Run("disable-enable", func(t *testing.T) {
EnablePool(false)
EnablePool(true)
test.Assert(t, poolEnabled())
test.Assert(t, PoolEnabled())
})
}
6 changes: 4 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ func newErrorHandleMW(errHandle func(context.Context, error) error) endpoint.Mid

func (s *server) initOrResetRPCInfoFunc() func(rpcinfo.RPCInfo, net.Addr) rpcinfo.RPCInfo {
return func(ri rpcinfo.RPCInfo, rAddr net.Addr) rpcinfo.RPCInfo {
// Reset rpcinfo if it exists in ctx.
if ri != nil {
// Reset existing rpcinfo to improve performance for long connections (PR #584).
if ri != nil && rpcinfo.PoolEnabled() {
fi := rpcinfo.AsMutableEndpointInfo(ri.From())
fi.Reset()
fi.SetAddress(rAddr)
Expand All @@ -155,6 +155,8 @@ func (s *server) initOrResetRPCInfoFunc() func(rpcinfo.RPCInfo, net.Addr) rpcinf
}
return ri
}

// allocate a new rpcinfo if it's the connection's first request or rpcInfoPool is disabled
rpcStats := rpcinfo.AsMutableRPCStats(rpcinfo.NewRPCStats())
if s.opt.StatsLevel != nil {
rpcStats.SetLevel(*s.opt.StatsLevel)
Expand Down
14 changes: 14 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,10 @@ func TestInitOrResetRPCInfo(t *testing.T) {
test.Assert(t, ri.Stats().Level() == stats.LevelDetailed)

// test reset
pOld := reflect.ValueOf(ri).Pointer()
ri = rpcInfoInitFunc(ri, conn.RemoteAddr())
pNew := reflect.ValueOf(ri).Pointer()
test.Assert(t, pOld == pNew, pOld, pNew)
test.Assert(t, ri.From().Address().String() == remoteAddr.String())
test.Assert(t, ri.Config().ReadWriteTimeout() == rwTimeout)

Expand Down Expand Up @@ -226,6 +229,17 @@ func TestInitOrResetRPCInfo(t *testing.T) {
test.Assert(t, panicked == nil)
test.Assert(t, ri.Stats().Error() == nil)
test.Assert(t, ri.Stats().Level() == 0)

// test reset after rpcInfoPool is disabled
t.Run("reset with pool disabled", func(t *testing.T) {
backupState := rpcinfo.PoolEnabled()
defer rpcinfo.EnablePool(backupState)
rpcinfo.EnablePool(false)

riNew := rpcInfoInitFunc(ri, conn.RemoteAddr())
pOld, pNew := reflect.ValueOf(ri).Pointer(), reflect.ValueOf(riNew).Pointer()
test.Assert(t, pOld != pNew, pOld, pNew)
})
}

func TestServiceRegisterFailed(t *testing.T) {
Expand Down

0 comments on commit 00b0718

Please sign in to comment.