diff --git a/client/callopt/options.go b/client/callopt/options.go index 78ae24e56d..24a5eb879d 100644 --- a/client/callopt/options.go +++ b/client/callopt/options.go @@ -51,15 +51,11 @@ func newOptions() interface{} { } } -func (co *callOptions) zero() { +// Recycle zeros the call option and put it to the pool. +func (co *callOptions) Recycle() { co.configs = nil co.svr = nil co.locks.Zero() -} - -// Recycle zeros the call option and put it to the pool. -func (co *callOptions) Recycle() { - co.zero() callOptionsPool.Put(co) } diff --git a/client/client.go b/client/client.go index a1a738c94a..383eff2c04 100644 --- a/client/client.go +++ b/client/client.go @@ -337,7 +337,8 @@ func (kc *kClient) invokeHandleEndpoint() (endpoint.Endpoint, error) { return } config := ri.Config() - if kc.svcInfo.MethodInfo(methodName).OneWay() { + m := kc.svcInfo.MethodInfo(methodName) + if m.OneWay() { sendMsg = remote.NewMessage(req, kc.svcInfo, ri, remote.Oneway, remote.Client) } else { sendMsg = remote.NewMessage(req, kc.svcInfo, ri, remote.Call, remote.Client) @@ -347,7 +348,7 @@ func (kc *kClient) invokeHandleEndpoint() (endpoint.Endpoint, error) { if err = cli.Send(ctx, ri, sendMsg); err != nil { return } - if resp == nil || kc.svcInfo.MethodInfo(methodName).OneWay() { + if resp == nil || m.OneWay() { cli.Recv(ctx, ri, nil) return nil } diff --git a/pkg/remote/message.go b/pkg/remote/message.go index 4adf0449f8..85130fad1b 100644 --- a/pkg/remote/message.go +++ b/pkg/remote/message.go @@ -54,6 +54,7 @@ const ( ) const ( + // ReadFailed . ReadFailed string = "RFailed" // MeshHeader use in message.Tag to check MeshHeader @@ -277,7 +278,10 @@ type TransInfo interface { } func newTransInfo() interface{} { - return &transInfo{} + return &transInfo{ + intInfo: make(map[uint16]string), + strInfo: make(map[string]string), + } } type transInfo struct { @@ -286,15 +290,16 @@ type transInfo struct { } func (ti *transInfo) zero() { - ti.intInfo = nil - ti.strInfo = nil + for k := range ti.intInfo { + delete(ti.intInfo, k) + } + for k := range ti.strInfo { + delete(ti.strInfo, k) + } } // TransIntInfo implements the TransInfo interface. func (ti *transInfo) TransIntInfo() map[uint16]string { - if ti.intInfo == nil { - ti.intInfo = make(map[uint16]string) - } return ti.intInfo } @@ -314,9 +319,6 @@ func (ti *transInfo) PutTransIntInfo(kvInfo map[uint16]string) { // TransStrInfo implements the TransInfo interface. func (ti *transInfo) TransStrInfo() map[string]string { - if ti.strInfo == nil { - ti.strInfo = make(map[string]string) - } return ti.strInfo } diff --git a/pkg/rpcinfo/ctx.go b/pkg/rpcinfo/ctx.go index b836ee4195..f7456dcc54 100644 --- a/pkg/rpcinfo/ctx.go +++ b/pkg/rpcinfo/ctx.go @@ -45,25 +45,7 @@ func GetRPCInfo(ctx context.Context) RPCInfo { // PutRPCInfo recycles the RPCInfo. This function is for internal use only. func PutRPCInfo(ri RPCInfo) { - if ri == nil { - return - } - if r, ok := ri.From().(internal.Reusable); ok { - r.Recycle() - } - if r, ok := ri.To().(internal.Reusable); ok { - r.Recycle() - } - if r, ok := ri.Invocation().(internal.Reusable); ok { - r.Recycle() - } - if r, ok := ri.Config().(internal.Reusable); ok { - r.Recycle() - } - if r, ok := ri.Stats().(internal.Reusable); ok { - r.Recycle() - } - if r, ok := ri.(internal.Reusable); ok { - r.Recycle() + if v, ok := ri.(internal.Reusable); ok { + v.Recycle() } } diff --git a/pkg/rpcinfo/endpointInfo.go b/pkg/rpcinfo/endpointInfo.go index 2592554ffe..a8257a3789 100644 --- a/pkg/rpcinfo/endpointInfo.go +++ b/pkg/rpcinfo/endpointInfo.go @@ -39,7 +39,7 @@ func init() { } func newEndpointInfo() interface{} { - return &endpointInfo{} + return &endpointInfo{tags: make(map[string]string)} } // ServiceName implements the EndpointInfo interface. @@ -91,9 +91,6 @@ func (ei *endpointInfo) SetAddress(addr net.Addr) error { // SetTag implements the MutableEndpointInfo interface. func (ei *endpointInfo) SetTag(key, value string) error { - if ei.tags == nil { - ei.tags = make(map[string]string) - } ei.tags[key] = value return nil } @@ -107,7 +104,9 @@ func (ei *endpointInfo) zero() { ei.serviceName = "" ei.method = "" ei.address = nil - ei.tags = nil + for k := range ei.tags { + delete(ei.tags, k) + } } // Recycle is used to recycle the endpointInfo. @@ -122,7 +121,9 @@ func NewMutableEndpointInfo(serviceName, method string, address net.Addr, tags m ei.serviceName = serviceName ei.method = method ei.address = address - ei.tags = tags + for k, v := range tags { + ei.tags[k] = v + } return ei } @@ -133,14 +134,10 @@ func NewEndpointInfo(serviceName, method string, address net.Addr, tags map[stri // FromBasicInfo converts an EndpointBasicInfo into EndpointInfo. func FromBasicInfo(bi *EndpointBasicInfo) EndpointInfo { - tags := make(map[string]string) - for k, v := range bi.Tags { - tags[k] = v - } - return NewEndpointInfo(bi.ServiceName, bi.Method, nil, tags) + return NewEndpointInfo(bi.ServiceName, bi.Method, nil, bi.Tags) } // EmptyEndpointInfo creates an empty EndpointInfo. func EmptyEndpointInfo() EndpointInfo { - return NewEndpointInfo("", "", nil, make(map[string]string)) + return NewEndpointInfo("", "", nil, nil) } diff --git a/pkg/rpcinfo/rpcinfo.go b/pkg/rpcinfo/rpcinfo.go index ad5e308022..b25e8108ab 100644 --- a/pkg/rpcinfo/rpcinfo.go +++ b/pkg/rpcinfo/rpcinfo.go @@ -18,6 +18,8 @@ package rpcinfo import ( "sync" + + "github.com/cloudwego/kitex/internal" ) var rpcInfoPool sync.Pool @@ -55,6 +57,21 @@ func (r *rpcInfo) zero() { // Recycle reuses the rpcInfo. func (r *rpcInfo) Recycle() { + if v, ok := r.from.(internal.Reusable); ok { + v.Recycle() + } + if v, ok := r.to.(internal.Reusable); ok { + v.Recycle() + } + if v, ok := r.invocation.(internal.Reusable); ok { + v.Recycle() + } + if v, ok := r.config.(internal.Reusable); ok { + v.Recycle() + } + if v, ok := r.stats.(internal.Reusable); ok { + v.Recycle() + } r.zero() rpcInfoPool.Put(r) } diff --git a/pkg/rpcinfo/rpcstats.go b/pkg/rpcinfo/rpcstats.go index 4ddd1c30fb..53ebe23a4b 100644 --- a/pkg/rpcinfo/rpcstats.go +++ b/pkg/rpcinfo/rpcstats.go @@ -21,7 +21,6 @@ import ( "sync" "time" - "github.com/cloudwego/kitex/internal" "github.com/cloudwego/kitex/pkg/stats" ) @@ -193,10 +192,10 @@ func (r *rpcStats) Reset() { r.recvSize = 0 r.sendSize = 0 for i := range r.eventMap { - if t, ok := r.eventMap[i].(internal.Reusable); ok { - t.Recycle() + if r.eventMap[i] != nil { + r.eventMap[i].(*event).Recycle() + r.eventMap[i] = nil } - r.eventMap[i] = nil } }