Skip to content

Commit

Permalink
use proto.Buffer API for protobuf codec and cache proto.Buffer structs (
Browse files Browse the repository at this point in the history
grpc#1010)

* use a global sharded pool of proto.Buffer caches in protoCodec

* fix goimports

* make global buffer pool index counter atomic

* hack to remove alloc in encode_len_struct

* remove extra slice alloc in proto codec marshal

* replce magic number for proto size field length with constant

* replace custom cache with sync.Pool

* remove 1 line functions in codec.go and add protoCodec microbenchmarks

* add concurrent usage test for protoCodec

* fix golint.gofmt,goimport checks

* fix issues in codec.go and codec_test.go

* use go parallel benchmark helpers

* replace proto.Codec with a guess of size needed

* update Fatalf -> Errorf in tests

* wrap proto.Buffer along with cached last size into larger struct for pool use

* make wrapped proto buffer only a literal

* fix style and imports

* move b.Run into inner function

* reverse micro benchmark op order to unmarshal-marshal and fix benchmark setup-in-test bug

* add test for large message

* remove use of defer in codec.marshal

* revert recent changes to codec bencmarks

* move sub-benchmarks into >= go-1.7 only file

* add commentfor marshaler and tweak benchmark subtests for easier usage

* move build tag for go1.7 on benchmarks to inside file

* move build tag to top of file

* comment Codec, embed proto.Buffer into cached struct and add an int32 cap
  • Loading branch information
apolcyn authored Apr 13, 2017
1 parent cd8432e commit 0e8b58d
Show file tree
Hide file tree
Showing 4 changed files with 376 additions and 27 deletions.
118 changes: 118 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

package grpc

import (
"math"
"sync"

"github.com/golang/protobuf/proto"
)

// Codec defines the interface gRPC uses to encode and decode messages.
// Note that implementations of this interface must be thread safe;
// a Codec's methods can be called from concurrent goroutines.
type Codec interface {
// Marshal returns the wire format of v.
Marshal(v interface{}) ([]byte, error)
// Unmarshal parses the wire format into v.
Unmarshal(data []byte, v interface{}) error
// String returns the name of the Codec implementation. The returned
// string will be used as part of content type in transmission.
String() string
}

// protoCodec is a Codec implementation with protobuf. It is the default codec for gRPC.
type protoCodec struct {
}

type cachedProtoBuffer struct {
lastMarshaledSize uint32
proto.Buffer
}

func capToMaxInt32(val int) uint32 {
if val > math.MaxInt32 {
return uint32(math.MaxInt32)
}
return uint32(val)
}

func (p protoCodec) marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error) {
protoMsg := v.(proto.Message)
newSlice := make([]byte, 0, cb.lastMarshaledSize)

cb.SetBuf(newSlice)
cb.Reset()
if err := cb.Marshal(protoMsg); err != nil {
return nil, err
}
out := cb.Bytes()
cb.lastMarshaledSize = capToMaxInt32(len(out))
return out, nil
}

func (p protoCodec) Marshal(v interface{}) ([]byte, error) {
cb := protoBufferPool.Get().(*cachedProtoBuffer)
out, err := p.marshal(v, cb)

// put back buffer and lose the ref to the slice
cb.SetBuf(nil)
protoBufferPool.Put(cb)
return out, err
}

func (p protoCodec) Unmarshal(data []byte, v interface{}) error {
cb := protoBufferPool.Get().(*cachedProtoBuffer)
cb.SetBuf(data)
err := cb.Unmarshal(v.(proto.Message))
cb.SetBuf(nil)
protoBufferPool.Put(cb)
return err
}

func (protoCodec) String() string {
return "proto"
}

var (
protoBufferPool = &sync.Pool{
New: func() interface{} {
return &cachedProtoBuffer{
Buffer: proto.Buffer{},
lastMarshaledSize: 16,
}
},
}
)
115 changes: 115 additions & 0 deletions codec_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// +build go1.7

/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

package grpc

import (
"fmt"
"testing"

"github.com/golang/protobuf/proto"
"google.golang.org/grpc/test/codec_perf"
)

func setupBenchmarkProtoCodecInputs(b *testing.B, payloadBaseSize uint32) []proto.Message {
payloadBase := make([]byte, payloadBaseSize)
// arbitrary byte slices
payloadSuffixes := [][]byte{
[]byte("one"),
[]byte("two"),
[]byte("three"),
[]byte("four"),
[]byte("five"),
}
protoStructs := make([]proto.Message, 0)

for _, p := range payloadSuffixes {
ps := &codec_perf.Buffer{}
ps.Body = append(payloadBase, p...)
protoStructs = append(protoStructs, ps)
}

return protoStructs
}

// The possible use of certain protobuf APIs like the proto.Buffer API potentially involves caching
// on our side. This can add checks around memory allocations and possible contention.
// Example run: go test -v -run=^$ -bench=BenchmarkProtoCodec -benchmem
func BenchmarkProtoCodec(b *testing.B) {
// range of message sizes
payloadBaseSizes := make([]uint32, 0)
for i := uint32(0); i <= 12; i += 4 {
payloadBaseSizes = append(payloadBaseSizes, 1<<i)
}
// range of SetParallelism
parallelisms := make([]uint32, 0)
for i := uint32(0); i <= 16; i += 4 {
parallelisms = append(parallelisms, 1<<i)
}
for _, s := range payloadBaseSizes {
for _, p := range parallelisms {
func(parallelism int, payloadBaseSize uint32) {
protoStructs := setupBenchmarkProtoCodecInputs(b, payloadBaseSize)
name := fmt.Sprintf("MinPayloadSize:%v/SetParallelism(%v)", payloadBaseSize, parallelism)
b.Run(name, func(b *testing.B) {
codec := &protoCodec{}
b.SetParallelism(parallelism)
b.RunParallel(func(pb *testing.PB) {
benchmarkProtoCodec(codec, protoStructs, pb, b)
})
})
}(int(p), s)
}
}
}

func benchmarkProtoCodec(codec *protoCodec, protoStructs []proto.Message, pb *testing.PB, b *testing.B) {
counter := 0
for pb.Next() {
counter++
ps := protoStructs[counter%len(protoStructs)]
fastMarshalAndUnmarshal(codec, ps, b)
}
}

func fastMarshalAndUnmarshal(protoCodec Codec, protoStruct proto.Message, b *testing.B) {
marshaledBytes, err := protoCodec.Marshal(protoStruct)
if err != nil {
b.Errorf("protoCodec.Marshal(_) returned an error")
}
if err := protoCodec.Unmarshal(marshaledBytes, protoStruct); err != nil {
b.Errorf("protoCodec.Unmarshal(_) returned an error")
}
}
143 changes: 143 additions & 0 deletions codec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

package grpc

import (
"bytes"
"sync"
"testing"

"google.golang.org/grpc/test/codec_perf"
)

func marshalAndUnmarshal(t *testing.T, protoCodec Codec, expectedBody []byte) {
p := &codec_perf.Buffer{}
p.Body = expectedBody

marshalledBytes, err := protoCodec.Marshal(p)
if err != nil {
t.Errorf("protoCodec.Marshal(_) returned an error")
}

if err := protoCodec.Unmarshal(marshalledBytes, p); err != nil {
t.Errorf("protoCodec.Unmarshal(_) returned an error")
}

if bytes.Compare(p.GetBody(), expectedBody) != 0 {
t.Errorf("Unexpected body; got %v; want %v", p.GetBody(), expectedBody)
}
}

func TestBasicProtoCodecMarshalAndUnmarshal(t *testing.T) {
marshalAndUnmarshal(t, protoCodec{}, []byte{1, 2, 3})
}

// Try to catch possible race conditions around use of pools
func TestConcurrentUsage(t *testing.T) {
const (
numGoRoutines = 100
numMarshUnmarsh = 1000
)

// small, arbitrary byte slices
protoBodies := [][]byte{
[]byte("one"),
[]byte("two"),
[]byte("three"),
[]byte("four"),
[]byte("five"),
}

var wg sync.WaitGroup
codec := protoCodec{}

for i := 0; i < numGoRoutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for k := 0; k < numMarshUnmarsh; k++ {
marshalAndUnmarshal(t, codec, protoBodies[k%len(protoBodies)])
}
}()
}

wg.Wait()
}

// TestStaggeredMarshalAndUnmarshalUsingSamePool tries to catch potential errors in which slices get
// stomped on during reuse of a proto.Buffer.
func TestStaggeredMarshalAndUnmarshalUsingSamePool(t *testing.T) {
codec1 := protoCodec{}
codec2 := protoCodec{}

expectedBody1 := []byte{1, 2, 3}
expectedBody2 := []byte{4, 5, 6}

proto1 := codec_perf.Buffer{Body: expectedBody1}
proto2 := codec_perf.Buffer{Body: expectedBody2}

var m1, m2 []byte
var err error

if m1, err = codec1.Marshal(&proto1); err != nil {
t.Errorf("protoCodec.Marshal(%v) failed", proto1)
}

if m2, err = codec2.Marshal(&proto2); err != nil {
t.Errorf("protoCodec.Marshal(%v) failed", proto2)
}

if err = codec1.Unmarshal(m1, &proto1); err != nil {
t.Errorf("protoCodec.Unmarshal(%v) failed", m1)
}

if err = codec2.Unmarshal(m2, &proto2); err != nil {
t.Errorf("protoCodec.Unmarshal(%v) failed", m2)
}

b1 := proto1.GetBody()
b2 := proto2.GetBody()

for i, v := range b1 {
if expectedBody1[i] != v {
t.Errorf("expected %v at index %v but got %v", i, expectedBody1[i], v)
}
}

for i, v := range b2 {
if expectedBody2[i] != v {
t.Errorf("expected %v at index %v but got %v", i, expectedBody2[i], v)
}
}
}
Loading

0 comments on commit 0e8b58d

Please sign in to comment.