Skip to content

Commit

Permalink
feat: support close generic client to avoid memory leak (cloudwego#254)
Browse files Browse the repository at this point in the history
* feat: support close generic client avoid memory leak

Change-Id: I6074a4551527420c26f5fd5b576ad778d7006a85
  • Loading branch information
SinnerA authored Dec 8, 2021
1 parent 5d81475 commit cf36195
Show file tree
Hide file tree
Showing 11 changed files with 330 additions and 8 deletions.
18 changes: 16 additions & 2 deletions client/genericclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package genericclient

import (
"context"
"runtime"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/client/callopt"
"github.com/cloudwego/kitex/pkg/generic"
"github.com/cloudwego/kitex/pkg/serviceinfo"
)

var _ Client = &genericServiceClient{}

// NewClient create a generic client
func NewClient(destService string, g generic.Generic, opts ...client.Option) (Client, error) {
svcInfo := generic.ServiceInfo(g.PayloadCodecType())
Expand All @@ -43,14 +46,19 @@ func NewClientWithServiceInfo(destService string, g generic.Generic, svcInfo *se
if err != nil {
return nil, err
}
return &genericServiceClient{
cli := &genericServiceClient{
kClient: kc,
g: g,
}, nil
}
runtime.SetFinalizer(cli, (*genericServiceClient).Close)

return cli, nil
}

// Client generic client
type Client interface {
generic.Closer

// GenericCall generic call
GenericCall(ctx context.Context, method string, request interface{}, callOptions ...callopt.Option) (response interface{}, err error)
}
Expand Down Expand Up @@ -78,3 +86,9 @@ func (gc *genericServiceClient) GenericCall(ctx context.Context, method string,
}
return _result.GetSuccess(), nil
}

func (gc *genericServiceClient) Close() error {
// no need a finalizer anymore
runtime.SetFinalizer(gc, nil)
return gc.g.Close()
}
64 changes: 64 additions & 0 deletions pkg/generic/binary_test/generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"encoding/binary"
"net"
"runtime"
"runtime/debug"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -180,3 +182,65 @@ func genBinaryReqBuf(method string) []byte {
copy(buf[idx:idx+len(reqMsg)], reqMsg)
return buf
}

func TestBinaryThriftGenericClientClose(t *testing.T) {
debug.SetGCPercent(-1)
defer debug.SetGCPercent(100)

var ms runtime.MemStats
runtime.ReadMemStats(&ms)

t.Logf("Allocation: %f Mb, Number of allocation: %d\n", mb(ms.HeapAlloc), ms.HeapObjects)

clis := make([]genericclient.Client, 10000)
for i := 0; i < 10000; i++ {
g := generic.BinaryThriftGeneric()
clis[i] = newGenericClient("destServiceName", g, "127.0.0.1:9009")
}

runtime.ReadMemStats(&ms)
preHeepAlloc, preHeapObjects := mb(ms.HeapAlloc), ms.HeapObjects
t.Logf("Allocation: %f Mb, Number of allocation: %d\n", preHeepAlloc, preHeapObjects)

for _, cli := range clis {
_ = cli.Close()
}
runtime.GC()
runtime.ReadMemStats(&ms)
aferGCHeepAlloc, afterGCHeapObjects := mb(ms.HeapAlloc), ms.HeapObjects
t.Logf("Allocation: %f Mb, Number of allocation: %d\n", aferGCHeepAlloc, afterGCHeapObjects)
test.Assert(t, aferGCHeepAlloc < preHeepAlloc && afterGCHeapObjects < preHeapObjects)
}

func TestBinaryThriftGenericClientFinalizer(t *testing.T) {
debug.SetGCPercent(-1)
defer debug.SetGCPercent(100)

var ms runtime.MemStats
runtime.ReadMemStats(&ms)
t.Logf("Allocation: %f Mb, Number of allocation: %d\n", mb(ms.HeapAlloc), ms.HeapObjects)

clis := make([]genericclient.Client, 10000)
for i := 0; i < 10000; i++ {
g := generic.BinaryThriftGeneric()
clis[i] = newGenericClient("destServiceName", g, "127.0.0.1:9009")
}

runtime.ReadMemStats(&ms)
t.Logf("Allocation: %f Mb, Number of allocation: %d\n", mb(ms.HeapAlloc), ms.HeapObjects)

runtime.GC()
runtime.ReadMemStats(&ms)
firstGCHeepAlloc, firstGCHeapObjects := mb(ms.HeapAlloc), ms.HeapObjects
t.Logf("Allocation: %f Mb, Number of allocation: %d\n", firstGCHeepAlloc, firstGCHeapObjects)

runtime.GC()
runtime.ReadMemStats(&ms)
secondGCHeepAlloc, secondGCHeapObjects := mb(ms.HeapAlloc), ms.HeapObjects
t.Logf("Allocation: %f Mb, Number of allocation: %d\n", secondGCHeepAlloc, secondGCHeapObjects)
test.Assert(t, secondGCHeepAlloc < firstGCHeepAlloc && secondGCHeapObjects < firstGCHeapObjects)
}

func mb(byteSize uint64) float32 {
return float32(byteSize) / float32(1024*1024)
}
23 changes: 23 additions & 0 deletions pkg/generic/closer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2021 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package generic

// Closer is usually used to recycle resource.
type Closer interface {
// Close the unused resource.
Close() error
}
1 change: 1 addition & 0 deletions pkg/generic/descriptor_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import "github.com/cloudwego/kitex/pkg/generic/descriptor"

// DescriptorProvider provide service descriptor
type DescriptorProvider interface {
Closer
// Provide return a channel for provide service descriptors
Provide() <-chan *descriptor.ServiceDescriptor
}
17 changes: 17 additions & 0 deletions pkg/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

// Generic ...
type Generic interface {
Closer
// PayloadCodec return codec implement
PayloadCodec() remote.PayloadCodec
// PayloadCodecType return the type of codec
Expand Down Expand Up @@ -98,6 +99,10 @@ func (g *binaryThriftGeneric) GetMethod(req interface{}, method string) (*Method
return &Method{method, false}, nil
}

func (g *binaryThriftGeneric) Close() error {
return nil
}

type mapThriftGeneric struct {
codec *mapThriftCodec
}
Expand All @@ -118,6 +123,10 @@ func (g *mapThriftGeneric) GetMethod(req interface{}, method string) (*Method, e
return g.codec.getMethod(req, method)
}

func (g *mapThriftGeneric) Close() error {
return g.codec.Close()
}

type jsonThriftGeneric struct {
codec *jsonThriftCodec
}
Expand All @@ -138,6 +147,10 @@ func (g *jsonThriftGeneric) GetMethod(req interface{}, method string) (*Method,
return g.codec.getMethod(req, method)
}

func (g *jsonThriftGeneric) Close() error {
return g.codec.Close()
}

type httpThriftGeneric struct {
codec *httpThriftCodec
}
Expand All @@ -157,3 +170,7 @@ func (g *httpThriftGeneric) PayloadCodec() remote.PayloadCodec {
func (g *httpThriftGeneric) GetMethod(req interface{}, method string) (*Method, error) {
return g.codec.getMethod(req)
}

func (g *httpThriftGeneric) Close() error {
return g.codec.Close()
}
9 changes: 9 additions & 0 deletions pkg/generic/httpthrift_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ import (
"github.com/cloudwego/kitex/pkg/serviceinfo"
)

var (
_ remote.PayloadCodec = &httpThriftCodec{}
_ Closer = &httpThriftCodec{}
)

// HTTPRequest alias of descriptor HTTPRequest
type HTTPRequest = descriptor.HTTPRequest

Expand Down Expand Up @@ -108,6 +113,10 @@ func (c *httpThriftCodec) Name() string {
return "HttpThrift"
}

func (c *httpThriftCodec) Close() error {
return c.provider.Close()
}

var json = jsoniter.Config{
EscapeHTML: true,
UseNumber: true,
Expand Down
74 changes: 72 additions & 2 deletions pkg/generic/json_test/generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"net"
"reflect"
"runtime"
"runtime/debug"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -147,7 +149,7 @@ func initThriftMockClient(t *testing.T) genericclient.Client {
test.Assert(t, err == nil)
g, err := generic.JSONThriftGeneric(p)
test.Assert(t, err == nil)
cli := newGenericClient("destServiceName", g, "127.0.0.1:9119")
cli := newGenericClient("destServiceName", g, "127.0.0.1:9128")
test.Assert(t, err == nil)
return cli
}
Expand Down Expand Up @@ -178,7 +180,75 @@ func initThriftServer(t *testing.T, address string, handler generic.Service) ser
}

func initMockServer(t *testing.T, handler kt.Mock) server.Server {
addr, _ := net.ResolveTCPAddr("tcp", ":9119")
addr, _ := net.ResolveTCPAddr("tcp", ":9128")
svr := newMockServer(handler, addr)
return svr
}

func TestJSONThriftGenericClientClose(t *testing.T) {
debug.SetGCPercent(-1)
defer debug.SetGCPercent(100)

var ms runtime.MemStats
runtime.ReadMemStats(&ms)

t.Logf("Allocation: %f Mb, Number of allocation: %d\n", mb(ms.HeapAlloc), ms.HeapObjects)

clis := make([]genericclient.Client, 1000)
for i := 0; i < 1000; i++ {
p, err := generic.NewThriftFileProvider("./idl/mock.thrift")
test.Assert(t, err == nil)
g, err := generic.JSONThriftGeneric(p)
test.Assert(t, err == nil)
clis[i] = newGenericClient("destServiceName", g, "127.0.0.1:9129")
}

runtime.ReadMemStats(&ms)
preHeepAlloc, preHeapObjects := mb(ms.HeapAlloc), ms.HeapObjects
t.Logf("Allocation: %f Mb, Number of allocation: %d\n", preHeepAlloc, preHeapObjects)

for _, cli := range clis {
_ = cli.Close()
}
runtime.GC()
runtime.ReadMemStats(&ms)
aferGCHeepAlloc, afterGCHeapObjects := mb(ms.HeapAlloc), ms.HeapObjects
t.Logf("Allocation: %f Mb, Number of allocation: %d\n", aferGCHeepAlloc, afterGCHeapObjects)
test.Assert(t, aferGCHeepAlloc < preHeepAlloc && afterGCHeapObjects < preHeapObjects)
}

func TestJSONThriftGenericClientFinalizer(t *testing.T) {
debug.SetGCPercent(-1)
defer debug.SetGCPercent(100)

var ms runtime.MemStats
runtime.ReadMemStats(&ms)
t.Logf("Allocation: %f Mb, Number of allocation: %d\n", mb(ms.HeapAlloc), ms.HeapObjects)

clis := make([]genericclient.Client, 1000)
for i := 0; i < 1000; i++ {
p, err := generic.NewThriftFileProvider("./idl/mock.thrift")
test.Assert(t, err == nil)
g, err := generic.JSONThriftGeneric(p)
test.Assert(t, err == nil)
clis[i] = newGenericClient("destServiceName", g, "127.0.0.1:9130")
}

runtime.ReadMemStats(&ms)
t.Logf("Allocation: %f Mb, Number of allocation: %d\n", mb(ms.HeapAlloc), ms.HeapObjects)

runtime.GC()
runtime.ReadMemStats(&ms)
firstGCHeepAlloc, firstGCHeapObjects := mb(ms.HeapAlloc), ms.HeapObjects
t.Logf("Allocation: %f Mb, Number of allocation: %d\n", firstGCHeepAlloc, firstGCHeapObjects)

runtime.GC()
runtime.ReadMemStats(&ms)
secondGCHeepAlloc, secondGCHeapObjects := mb(ms.HeapAlloc), ms.HeapObjects
t.Logf("Allocation: %f Mb, Number of allocation: %d\n", secondGCHeepAlloc, secondGCHeapObjects)
test.Assert(t, secondGCHeepAlloc < firstGCHeepAlloc && secondGCHeapObjects < firstGCHeapObjects)
}

func mb(byteSize uint64) float32 {
return float32(byteSize) / float32(1024*1024)
}
9 changes: 9 additions & 0 deletions pkg/generic/jsonthrift_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ import (
"github.com/cloudwego/kitex/pkg/serviceinfo"
)

var (
_ remote.PayloadCodec = &jsonThriftCodec{}
_ Closer = &jsonThriftCodec{}
)

// JSONRequest alias of string
type JSONRequest = string

Expand Down Expand Up @@ -99,3 +104,7 @@ func (c *jsonThriftCodec) getMethod(req interface{}, method string) (*Method, er
func (c *jsonThriftCodec) Name() string {
return "JSONThrift"
}

func (c *jsonThriftCodec) Close() error {
return c.provider.Close()
}
Loading

0 comments on commit cf36195

Please sign in to comment.