Skip to content

Commit

Permalink
kafka.writeBuffer (segmentio#318)
Browse files Browse the repository at this point in the history
* add kafka.writeBuffer

* fix crc32 computation
  • Loading branch information
Achille authored Jul 22, 2019
1 parent e31ffda commit ad7818b
Show file tree
Hide file tree
Showing 40 changed files with 811 additions and 838 deletions.
37 changes: 17 additions & 20 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Conn struct {
// write buffer (synchronized on wlock)
wlock sync.Mutex
wbuf bufio.Writer
wb writeBuffer

// deadline management
wdeadline connDeadline
Expand Down Expand Up @@ -160,6 +161,8 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
transactionalID: emptyToNullable(config.TransactionalID),
}

c.wb.w = &c.wbuf

// The fetch request needs to ask for a MaxBytes value that is at least
// enough to load the control data of the response. To avoid having to
// recompute it on every read, it is cached here in the Conn value.
Expand Down Expand Up @@ -767,8 +770,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
adjustedDeadline = deadline
switch c.fetchVersion {
case v10:
return writeFetchRequestV10(
&c.wbuf,
return c.wb.writeFetchRequestV10(
id,
c.clientID,
c.topic,
Expand All @@ -780,8 +782,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
int8(cfg.IsolationLevel),
)
case v5:
return writeFetchRequestV5(
&c.wbuf,
return c.wb.writeFetchRequestV5(
id,
c.clientID,
c.topic,
Expand All @@ -793,8 +794,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
int8(cfg.IsolationLevel),
)
default:
return writeFetchRequestV2(
&c.wbuf,
return c.wb.writeFetchRequestV2(
id,
c.clientID,
c.topic,
Expand Down Expand Up @@ -891,7 +891,7 @@ func (c *Conn) ReadOffsets() (first, last int64, err error) {
func (c *Conn) readOffset(t int64) (offset int64, err error) {
err = c.readOperation(
func(deadline time.Time, id int32) error {
return writeListOffsetRequestV1(&c.wbuf, id, c.clientID, c.topic, c.partition, t)
return c.wb.writeListOffsetRequestV1(id, c.clientID, c.topic, c.partition, t)
},
func(deadline time.Time, size int) error {
return expectZeroSize(readArrayWith(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
Expand Down Expand Up @@ -1058,8 +1058,7 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
switch version := c.apiVersions[produceRequest].MaxVersion; {
case version >= 7:
return writeProduceRequestV7(
&c.wbuf,
return c.wb.writeProduceRequestV7(
codec,
id,
c.clientID,
Expand All @@ -1071,8 +1070,7 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
msgs...,
)
case version >= 3:
return writeProduceRequestV3(
&c.wbuf,
return c.wb.writeProduceRequestV3(
codec,
id,
c.clientID,
Expand All @@ -1084,8 +1082,7 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
msgs...,
)
default:
return writeProduceRequestV2(
&c.wbuf,
return c.wb.writeProduceRequestV2(
codec,
id,
c.clientID,
Expand Down Expand Up @@ -1170,14 +1167,14 @@ func (c *Conn) SetRequiredAcks(n int) error {
func (c *Conn) writeRequestHeader(apiKey apiKey, apiVersion apiVersion, correlationID int32, size int32) {
hdr := c.requestHeader(apiKey, apiVersion, correlationID)
hdr.Size = (hdr.size() + size) - 4
hdr.writeTo(&c.wbuf)
hdr.writeTo(&c.wb)
}

func (c *Conn) writeRequest(apiKey apiKey, apiVersion apiVersion, correlationID int32, req request) error {
hdr := c.requestHeader(apiKey, apiVersion, correlationID)
hdr.Size = (hdr.size() + req.size()) - 4
hdr.writeTo(&c.wbuf)
req.writeTo(&c.wbuf)
hdr.writeTo(&c.wb)
req.writeTo(&c.wb)
return c.wbuf.Flush()
}

Expand Down Expand Up @@ -1369,7 +1366,7 @@ func (c *Conn) ApiVersions() ([]ApiVersion, error) {
}
h.Size = (h.size() - 4)

h.writeTo(&c.wbuf)
h.writeTo(&c.wb)
return c.wbuf.Flush()
})
if err != nil {
Expand Down Expand Up @@ -1538,11 +1535,11 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {

// fall back to opaque bytes on the wire. the broker is expecting these if
// it just processed a v0 sasl handshake.
writeInt32(&c.wbuf, int32(len(data)))
if _, err := c.wbuf.Write(data); err != nil {
c.wb.writeInt32(int32(len(data)))
if _, err := c.wb.Write(data); err != nil {
return nil, err
}
if err := c.wbuf.Flush(); err != nil {
if err := c.wb.Flush(); err != nil {
return nil, err
}

Expand Down
91 changes: 36 additions & 55 deletions crc32.go
Original file line number Diff line number Diff line change
@@ -1,80 +1,61 @@
package kafka

import (
"bytes"
"encoding/binary"
"hash/crc32"
"sync"
"io"
)

func crc32OfMessage(magicByte int8, attributes int8, timestamp int64, key []byte, value []byte) uint32 {
b := acquireCrc32Buffer()
b.writeInt8(magicByte)
b.writeInt8(attributes)
if magicByte != 0 {
b.writeInt64(timestamp)
}
b.writeBytes(key)
b.writeBytes(value)
sum := b.sum
releaseCrc32Buffer(b)
return sum
}

type crc32Buffer struct {
sum uint32
buf bytes.Buffer
type crc32Writer struct {
table *crc32.Table
buffer [8]byte
crc32 uint32
}

func (c *crc32Buffer) writeInt8(i int8) {
c.buf.Truncate(0)
c.buf.WriteByte(byte(i))
c.update()
func (w *crc32Writer) update(b []byte) {
w.crc32 = crc32.Update(w.crc32, w.table, b)
}

func (c *crc32Buffer) writeInt32(i int32) {
a := [4]byte{}
binary.BigEndian.PutUint32(a[:], uint32(i))
c.buf.Truncate(0)
c.buf.Write(a[:])
c.update()
func (w *crc32Writer) writeInt8(i int8) {
w.buffer[0] = byte(i)
w.update(w.buffer[:1])
}

func (c *crc32Buffer) writeInt64(i int64) {
a := [8]byte{}
binary.BigEndian.PutUint64(a[:], uint64(i))
c.buf.Truncate(0)
c.buf.Write(a[:])
c.update()
func (w *crc32Writer) writeInt16(i int16) {
binary.BigEndian.PutUint16(w.buffer[:2], uint16(i))
w.update(w.buffer[:2])
}

func (c *crc32Buffer) writeBytes(b []byte) {
if b == nil {
c.writeInt32(-1)
} else {
c.writeInt32(int32(len(b)))
}
c.sum = crc32Update(c.sum, b)
func (w *crc32Writer) writeInt32(i int32) {
binary.BigEndian.PutUint32(w.buffer[:4], uint32(i))
w.update(w.buffer[:4])
}

func (c *crc32Buffer) update() {
c.sum = crc32Update(c.sum, c.buf.Bytes())
func (w *crc32Writer) writeInt64(i int64) {
binary.BigEndian.PutUint64(w.buffer[:8], uint64(i))
w.update(w.buffer[:8])
}

func crc32Update(sum uint32, b []byte) uint32 {
return crc32.Update(sum, crc32.IEEETable, b)
func (w *crc32Writer) writeBytes(b []byte) {
n := len(b)
if b == nil {
n = -1
}
w.writeInt32(int32(n))
w.update(b)
}

var crc32BufferPool = sync.Pool{
New: func() interface{} { return &crc32Buffer{} },
func (w *crc32Writer) Write(b []byte) (int, error) {
w.update(b)
return len(b), nil
}

func acquireCrc32Buffer() *crc32Buffer {
c := crc32BufferPool.Get().(*crc32Buffer)
c.sum = 0
return c
func (w *crc32Writer) WriteString(s string) (int, error) {
w.update([]byte(s))
return len(s), nil
}

func releaseCrc32Buffer(b *crc32Buffer) {
crc32BufferPool.Put(b)
}
var (
_ io.Writer = (*crc32Writer)(nil)
_ io.StringWriter = (*crc32Writer)(nil)
)
10 changes: 4 additions & 6 deletions crc32_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kafka

import (
"bufio"
"bytes"
"hash/crc32"
"testing"
Expand All @@ -18,15 +17,14 @@ func TestMessageCRC32(t *testing.T) {
}

b := &bytes.Buffer{}
w := bufio.NewWriter(b)
write(w, m)
w.Flush()
w := &writeBuffer{w: b}
w.write(m)

h := crc32.NewIEEE()
h := crc32.New(crc32.IEEETable)
h.Write(b.Bytes()[4:])

sum1 := h.Sum32()
sum2 := uint32(m.crc32())
sum2 := uint32(m.crc32(&crc32Writer{table: crc32.IEEETable}))

if sum1 != sum2 {
t.Error("bad CRC32:")
Expand Down
40 changes: 20 additions & 20 deletions createtopics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func (t createTopicsRequestV0ConfigEntry) size() int32 {
sizeofString(t.ConfigValue)
}

func (t createTopicsRequestV0ConfigEntry) writeTo(w *bufio.Writer) {
writeString(w, t.ConfigName)
writeString(w, t.ConfigValue)
func (t createTopicsRequestV0ConfigEntry) writeTo(wb *writeBuffer) {
wb.writeString(t.ConfigName)
wb.writeString(t.ConfigValue)
}

type ReplicaAssignment struct {
Expand All @@ -54,9 +54,9 @@ func (t createTopicsRequestV0ReplicaAssignment) size() int32 {
sizeofInt32(t.Replicas)
}

func (t createTopicsRequestV0ReplicaAssignment) writeTo(w *bufio.Writer) {
writeInt32(w, t.Partition)
writeInt32(w, t.Replicas)
func (t createTopicsRequestV0ReplicaAssignment) writeTo(wb *writeBuffer) {
wb.writeInt32(t.Partition)
wb.writeInt32(t.Replicas)
}

type TopicConfig struct {
Expand Down Expand Up @@ -126,12 +126,12 @@ func (t createTopicsRequestV0Topic) size() int32 {
sizeofArray(len(t.ConfigEntries), func(i int) int32 { return t.ConfigEntries[i].size() })
}

func (t createTopicsRequestV0Topic) writeTo(w *bufio.Writer) {
writeString(w, t.Topic)
writeInt32(w, t.NumPartitions)
writeInt16(w, t.ReplicationFactor)
writeArray(w, len(t.ReplicaAssignments), func(i int) { t.ReplicaAssignments[i].writeTo(w) })
writeArray(w, len(t.ConfigEntries), func(i int) { t.ConfigEntries[i].writeTo(w) })
func (t createTopicsRequestV0Topic) writeTo(wb *writeBuffer) {
wb.writeString(t.Topic)
wb.writeInt32(t.NumPartitions)
wb.writeInt16(t.ReplicationFactor)
wb.writeArray(len(t.ReplicaAssignments), func(i int) { t.ReplicaAssignments[i].writeTo(wb) })
wb.writeArray(len(t.ConfigEntries), func(i int) { t.ConfigEntries[i].writeTo(wb) })
}

// See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics
Expand All @@ -150,9 +150,9 @@ func (t createTopicsRequestV0) size() int32 {
sizeofInt32(t.Timeout)
}

func (t createTopicsRequestV0) writeTo(w *bufio.Writer) {
writeArray(w, len(t.Topics), func(i int) { t.Topics[i].writeTo(w) })
writeInt32(w, t.Timeout)
func (t createTopicsRequestV0) writeTo(wb *writeBuffer) {
wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) })
wb.writeInt32(t.Timeout)
}

type createTopicsResponseV0TopicError struct {
Expand All @@ -168,9 +168,9 @@ func (t createTopicsResponseV0TopicError) size() int32 {
sizeofInt16(t.ErrorCode)
}

func (t createTopicsResponseV0TopicError) writeTo(w *bufio.Writer) {
writeString(w, t.Topic)
writeInt16(w, t.ErrorCode)
func (t createTopicsResponseV0TopicError) writeTo(wb *writeBuffer) {
wb.writeString(t.Topic)
wb.writeInt16(t.ErrorCode)
}

func (t *createTopicsResponseV0TopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) {
Expand All @@ -192,8 +192,8 @@ func (t createTopicsResponseV0) size() int32 {
return sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() })
}

func (t createTopicsResponseV0) writeTo(w *bufio.Writer) {
writeArray(w, len(t.TopicErrors), func(i int) { t.TopicErrors[i].writeTo(w) })
func (t createTopicsResponseV0) writeTo(wb *writeBuffer) {
wb.writeArray(len(t.TopicErrors), func(i int) { t.TopicErrors[i].writeTo(wb) })
}

func (t *createTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
Expand Down
7 changes: 3 additions & 4 deletions createtopics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ func TestCreateTopicsResponseV0(t *testing.T) {
},
}

buf := bytes.NewBuffer(nil)
w := bufio.NewWriter(buf)
b := bytes.NewBuffer(nil)
w := &writeBuffer{w: b}
item.writeTo(w)
w.Flush()

var found createTopicsResponseV0
remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len())
remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
if err != nil {
t.Error(err)
t.FailNow()
Expand Down
Loading

0 comments on commit ad7818b

Please sign in to comment.