forked from grpc/grpc-go
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
use proto.Buffer API for protobuf codec and cache proto.Buffer structs (
grpc#1010) * use a global sharded pool of proto.Buffer caches in protoCodec * fix goimports * make global buffer pool index counter atomic * hack to remove alloc in encode_len_struct * remove extra slice alloc in proto codec marshal * replce magic number for proto size field length with constant * replace custom cache with sync.Pool * remove 1 line functions in codec.go and add protoCodec microbenchmarks * add concurrent usage test for protoCodec * fix golint.gofmt,goimport checks * fix issues in codec.go and codec_test.go * use go parallel benchmark helpers * replace proto.Codec with a guess of size needed * update Fatalf -> Errorf in tests * wrap proto.Buffer along with cached last size into larger struct for pool use * make wrapped proto buffer only a literal * fix style and imports * move b.Run into inner function * reverse micro benchmark op order to unmarshal-marshal and fix benchmark setup-in-test bug * add test for large message * remove use of defer in codec.marshal * revert recent changes to codec bencmarks * move sub-benchmarks into >= go-1.7 only file * add commentfor marshaler and tweak benchmark subtests for easier usage * move build tag for go1.7 on benchmarks to inside file * move build tag to top of file * comment Codec, embed proto.Buffer into cached struct and add an int32 cap
- Loading branch information
Showing
4 changed files
with
376 additions
and
27 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
/* | ||
* | ||
* Copyright 2014, Google Inc. | ||
* All rights reserved. | ||
* | ||
* Redistribution and use in source and binary forms, with or without | ||
* modification, are permitted provided that the following conditions are | ||
* met: | ||
* | ||
* * Redistributions of source code must retain the above copyright | ||
* notice, this list of conditions and the following disclaimer. | ||
* * Redistributions in binary form must reproduce the above | ||
* copyright notice, this list of conditions and the following disclaimer | ||
* in the documentation and/or other materials provided with the | ||
* distribution. | ||
* * Neither the name of Google Inc. nor the names of its | ||
* contributors may be used to endorse or promote products derived from | ||
* this software without specific prior written permission. | ||
* | ||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | ||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | ||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | ||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT | ||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | ||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT | ||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | ||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | ||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
* | ||
*/ | ||
|
||
package grpc | ||
|
||
import ( | ||
"math" | ||
"sync" | ||
|
||
"github.com/golang/protobuf/proto" | ||
) | ||
|
||
// Codec defines the interface gRPC uses to encode and decode messages. | ||
// Note that implementations of this interface must be thread safe; | ||
// a Codec's methods can be called from concurrent goroutines. | ||
type Codec interface { | ||
// Marshal returns the wire format of v. | ||
Marshal(v interface{}) ([]byte, error) | ||
// Unmarshal parses the wire format into v. | ||
Unmarshal(data []byte, v interface{}) error | ||
// String returns the name of the Codec implementation. The returned | ||
// string will be used as part of content type in transmission. | ||
String() string | ||
} | ||
|
||
// protoCodec is a Codec implementation with protobuf. It is the default codec for gRPC. | ||
type protoCodec struct { | ||
} | ||
|
||
type cachedProtoBuffer struct { | ||
lastMarshaledSize uint32 | ||
proto.Buffer | ||
} | ||
|
||
func capToMaxInt32(val int) uint32 { | ||
if val > math.MaxInt32 { | ||
return uint32(math.MaxInt32) | ||
} | ||
return uint32(val) | ||
} | ||
|
||
func (p protoCodec) marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error) { | ||
protoMsg := v.(proto.Message) | ||
newSlice := make([]byte, 0, cb.lastMarshaledSize) | ||
|
||
cb.SetBuf(newSlice) | ||
cb.Reset() | ||
if err := cb.Marshal(protoMsg); err != nil { | ||
return nil, err | ||
} | ||
out := cb.Bytes() | ||
cb.lastMarshaledSize = capToMaxInt32(len(out)) | ||
return out, nil | ||
} | ||
|
||
func (p protoCodec) Marshal(v interface{}) ([]byte, error) { | ||
cb := protoBufferPool.Get().(*cachedProtoBuffer) | ||
out, err := p.marshal(v, cb) | ||
|
||
// put back buffer and lose the ref to the slice | ||
cb.SetBuf(nil) | ||
protoBufferPool.Put(cb) | ||
return out, err | ||
} | ||
|
||
func (p protoCodec) Unmarshal(data []byte, v interface{}) error { | ||
cb := protoBufferPool.Get().(*cachedProtoBuffer) | ||
cb.SetBuf(data) | ||
err := cb.Unmarshal(v.(proto.Message)) | ||
cb.SetBuf(nil) | ||
protoBufferPool.Put(cb) | ||
return err | ||
} | ||
|
||
func (protoCodec) String() string { | ||
return "proto" | ||
} | ||
|
||
var ( | ||
protoBufferPool = &sync.Pool{ | ||
New: func() interface{} { | ||
return &cachedProtoBuffer{ | ||
Buffer: proto.Buffer{}, | ||
lastMarshaledSize: 16, | ||
} | ||
}, | ||
} | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
// +build go1.7 | ||
|
||
/* | ||
* | ||
* Copyright 2014, Google Inc. | ||
* All rights reserved. | ||
* | ||
* Redistribution and use in source and binary forms, with or without | ||
* modification, are permitted provided that the following conditions are | ||
* met: | ||
* | ||
* * Redistributions of source code must retain the above copyright | ||
* notice, this list of conditions and the following disclaimer. | ||
* * Redistributions in binary form must reproduce the above | ||
* copyright notice, this list of conditions and the following disclaimer | ||
* in the documentation and/or other materials provided with the | ||
* distribution. | ||
* * Neither the name of Google Inc. nor the names of its | ||
* contributors may be used to endorse or promote products derived from | ||
* this software without specific prior written permission. | ||
* | ||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | ||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | ||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | ||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT | ||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | ||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT | ||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | ||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | ||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
* | ||
*/ | ||
|
||
package grpc | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
|
||
"github.com/golang/protobuf/proto" | ||
"google.golang.org/grpc/test/codec_perf" | ||
) | ||
|
||
func setupBenchmarkProtoCodecInputs(b *testing.B, payloadBaseSize uint32) []proto.Message { | ||
payloadBase := make([]byte, payloadBaseSize) | ||
// arbitrary byte slices | ||
payloadSuffixes := [][]byte{ | ||
[]byte("one"), | ||
[]byte("two"), | ||
[]byte("three"), | ||
[]byte("four"), | ||
[]byte("five"), | ||
} | ||
protoStructs := make([]proto.Message, 0) | ||
|
||
for _, p := range payloadSuffixes { | ||
ps := &codec_perf.Buffer{} | ||
ps.Body = append(payloadBase, p...) | ||
protoStructs = append(protoStructs, ps) | ||
} | ||
|
||
return protoStructs | ||
} | ||
|
||
// The possible use of certain protobuf APIs like the proto.Buffer API potentially involves caching | ||
// on our side. This can add checks around memory allocations and possible contention. | ||
// Example run: go test -v -run=^$ -bench=BenchmarkProtoCodec -benchmem | ||
func BenchmarkProtoCodec(b *testing.B) { | ||
// range of message sizes | ||
payloadBaseSizes := make([]uint32, 0) | ||
for i := uint32(0); i <= 12; i += 4 { | ||
payloadBaseSizes = append(payloadBaseSizes, 1<<i) | ||
} | ||
// range of SetParallelism | ||
parallelisms := make([]uint32, 0) | ||
for i := uint32(0); i <= 16; i += 4 { | ||
parallelisms = append(parallelisms, 1<<i) | ||
} | ||
for _, s := range payloadBaseSizes { | ||
for _, p := range parallelisms { | ||
func(parallelism int, payloadBaseSize uint32) { | ||
protoStructs := setupBenchmarkProtoCodecInputs(b, payloadBaseSize) | ||
name := fmt.Sprintf("MinPayloadSize:%v/SetParallelism(%v)", payloadBaseSize, parallelism) | ||
b.Run(name, func(b *testing.B) { | ||
codec := &protoCodec{} | ||
b.SetParallelism(parallelism) | ||
b.RunParallel(func(pb *testing.PB) { | ||
benchmarkProtoCodec(codec, protoStructs, pb, b) | ||
}) | ||
}) | ||
}(int(p), s) | ||
} | ||
} | ||
} | ||
|
||
func benchmarkProtoCodec(codec *protoCodec, protoStructs []proto.Message, pb *testing.PB, b *testing.B) { | ||
counter := 0 | ||
for pb.Next() { | ||
counter++ | ||
ps := protoStructs[counter%len(protoStructs)] | ||
fastMarshalAndUnmarshal(codec, ps, b) | ||
} | ||
} | ||
|
||
func fastMarshalAndUnmarshal(protoCodec Codec, protoStruct proto.Message, b *testing.B) { | ||
marshaledBytes, err := protoCodec.Marshal(protoStruct) | ||
if err != nil { | ||
b.Errorf("protoCodec.Marshal(_) returned an error") | ||
} | ||
if err := protoCodec.Unmarshal(marshaledBytes, protoStruct); err != nil { | ||
b.Errorf("protoCodec.Unmarshal(_) returned an error") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
/* | ||
* | ||
* Copyright 2014, Google Inc. | ||
* All rights reserved. | ||
* | ||
* Redistribution and use in source and binary forms, with or without | ||
* modification, are permitted provided that the following conditions are | ||
* met: | ||
* | ||
* * Redistributions of source code must retain the above copyright | ||
* notice, this list of conditions and the following disclaimer. | ||
* * Redistributions in binary form must reproduce the above | ||
* copyright notice, this list of conditions and the following disclaimer | ||
* in the documentation and/or other materials provided with the | ||
* distribution. | ||
* * Neither the name of Google Inc. nor the names of its | ||
* contributors may be used to endorse or promote products derived from | ||
* this software without specific prior written permission. | ||
* | ||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | ||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | ||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | ||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT | ||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | ||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT | ||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | ||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | ||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
* | ||
*/ | ||
|
||
package grpc | ||
|
||
import ( | ||
"bytes" | ||
"sync" | ||
"testing" | ||
|
||
"google.golang.org/grpc/test/codec_perf" | ||
) | ||
|
||
func marshalAndUnmarshal(t *testing.T, protoCodec Codec, expectedBody []byte) { | ||
p := &codec_perf.Buffer{} | ||
p.Body = expectedBody | ||
|
||
marshalledBytes, err := protoCodec.Marshal(p) | ||
if err != nil { | ||
t.Errorf("protoCodec.Marshal(_) returned an error") | ||
} | ||
|
||
if err := protoCodec.Unmarshal(marshalledBytes, p); err != nil { | ||
t.Errorf("protoCodec.Unmarshal(_) returned an error") | ||
} | ||
|
||
if bytes.Compare(p.GetBody(), expectedBody) != 0 { | ||
t.Errorf("Unexpected body; got %v; want %v", p.GetBody(), expectedBody) | ||
} | ||
} | ||
|
||
func TestBasicProtoCodecMarshalAndUnmarshal(t *testing.T) { | ||
marshalAndUnmarshal(t, protoCodec{}, []byte{1, 2, 3}) | ||
} | ||
|
||
// Try to catch possible race conditions around use of pools | ||
func TestConcurrentUsage(t *testing.T) { | ||
const ( | ||
numGoRoutines = 100 | ||
numMarshUnmarsh = 1000 | ||
) | ||
|
||
// small, arbitrary byte slices | ||
protoBodies := [][]byte{ | ||
[]byte("one"), | ||
[]byte("two"), | ||
[]byte("three"), | ||
[]byte("four"), | ||
[]byte("five"), | ||
} | ||
|
||
var wg sync.WaitGroup | ||
codec := protoCodec{} | ||
|
||
for i := 0; i < numGoRoutines; i++ { | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
for k := 0; k < numMarshUnmarsh; k++ { | ||
marshalAndUnmarshal(t, codec, protoBodies[k%len(protoBodies)]) | ||
} | ||
}() | ||
} | ||
|
||
wg.Wait() | ||
} | ||
|
||
// TestStaggeredMarshalAndUnmarshalUsingSamePool tries to catch potential errors in which slices get | ||
// stomped on during reuse of a proto.Buffer. | ||
func TestStaggeredMarshalAndUnmarshalUsingSamePool(t *testing.T) { | ||
codec1 := protoCodec{} | ||
codec2 := protoCodec{} | ||
|
||
expectedBody1 := []byte{1, 2, 3} | ||
expectedBody2 := []byte{4, 5, 6} | ||
|
||
proto1 := codec_perf.Buffer{Body: expectedBody1} | ||
proto2 := codec_perf.Buffer{Body: expectedBody2} | ||
|
||
var m1, m2 []byte | ||
var err error | ||
|
||
if m1, err = codec1.Marshal(&proto1); err != nil { | ||
t.Errorf("protoCodec.Marshal(%v) failed", proto1) | ||
} | ||
|
||
if m2, err = codec2.Marshal(&proto2); err != nil { | ||
t.Errorf("protoCodec.Marshal(%v) failed", proto2) | ||
} | ||
|
||
if err = codec1.Unmarshal(m1, &proto1); err != nil { | ||
t.Errorf("protoCodec.Unmarshal(%v) failed", m1) | ||
} | ||
|
||
if err = codec2.Unmarshal(m2, &proto2); err != nil { | ||
t.Errorf("protoCodec.Unmarshal(%v) failed", m2) | ||
} | ||
|
||
b1 := proto1.GetBody() | ||
b2 := proto2.GetBody() | ||
|
||
for i, v := range b1 { | ||
if expectedBody1[i] != v { | ||
t.Errorf("expected %v at index %v but got %v", i, expectedBody1[i], v) | ||
} | ||
} | ||
|
||
for i, v := range b2 { | ||
if expectedBody2[i] != v { | ||
t.Errorf("expected %v at index %v but got %v", i, expectedBody2[i], v) | ||
} | ||
} | ||
} |
Oops, something went wrong.