Skip to content

Commit

Permalink
writing messages now doesn't require dynamic memory allocations anymore
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille Roussel committed May 31, 2017
1 parent 15fbff0 commit 9cf044a
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 124 deletions.
73 changes: 32 additions & 41 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -59,7 +58,7 @@ type Conn struct {
fetchMaxBytes int32
fetchMinSize int32

// correlation ID generator (accessed through atomic operations)
// correlation ID generator (synchronized on wlock)
correlationID int32
}

Expand Down Expand Up @@ -342,7 +341,7 @@ func (c *Conn) ReadBatch(minBytes int, maxBytes int) *Batch {
return &Batch{err: err}
}

throttle, remain, err := readFetchResponseHeader(&c.rbuf, int(size))
throttle, remain, err := readFetchResponseHeader(&c.rbuf, size)
return &Batch{
conn: c,
reader: &c.rbuf,
Expand All @@ -358,7 +357,7 @@ func (c *Conn) ReadBatch(minBytes int, maxBytes int) *Batch {
// ReadOffset returns the offset of the first message with a timestamp equal or
// greater to t.
func (c *Conn) ReadOffset(t time.Time) (int64, error) {
return c.readOffset(timeToTimestamp(t))
return c.readOffset(timestamp(t))
}

// ReadOffsets returns the absolute first and last offsets of the topic used by
Expand Down Expand Up @@ -498,30 +497,24 @@ func (c *Conn) WriteMessages(msgs ...Message) (int, error) {
return 0, nil
}

var n int
var set = make(messageSet, len(msgs))

for i, msg := range msgs {
n := 0
for _, msg := range msgs {
n += len(msg.Key) + len(msg.Value)
set[i] = msg.item()
}

err := c.writeOperation(
func(deadline time.Time, id int32) error {
now := time.Now()
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
return c.writeRequest(produceRequest, v2, id, produceRequestV2{
RequiredAcks: -1,
Timeout: milliseconds(deadlineToTimeout(deadline, now)),
Topics: []produceRequestTopicV2{{
TopicName: c.topic,
Partitions: []produceRequestPartitionV2{{
Partition: c.partition,
MessageSetSize: set.size(),
MessageSet: set,
}},
}},
})
return writeProduceRequest(
&c.wbuf,
id,
c.clientID,
c.topic,
c.partition,
deadlineToTimeout(deadline, now),
msgs...,
)
},
func(deadline time.Time, size int) error {
return expectZeroSize(readArrayWith(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
Expand All @@ -536,7 +529,7 @@ func (c *Conn) WriteMessages(msgs ...Message) (int, error) {
// we've produced a message to a single partition.
size, err = readArrayWith(r, size, func(r *bufio.Reader, size int) (int, error) {
var p produceResponsePartitionV2
size, err := read(r, size, &p)
size, err := p.readFrom(r, size)
if err == nil && p.ErrorCode != 0 {
err = Error(p.ErrorCode)
}
Expand All @@ -560,6 +553,12 @@ func (c *Conn) WriteMessages(msgs ...Message) (int, error) {
return n, err
}

func (c *Conn) writeRequestHeader(apiKey apiKey, apiVersion apiVersion, correlationID int32, size int32) {
hdr := c.requestHeader(apiKey, apiVersion, correlationID)
hdr.Size = (hdr.size() + size) - 4
hdr.writeTo(&c.wbuf)
}

func (c *Conn) writeRequest(apiKey apiKey, apiVersion apiVersion, correlationID int32, req request) error {
hdr := c.requestHeader(apiKey, apiVersion, correlationID)
hdr.Size = (hdr.size() + req.size()) - 4
Expand Down Expand Up @@ -594,7 +593,8 @@ func (c *Conn) skipResponseSizeAndID() {
}

func (c *Conn) generateCorrelationID() int32 {
return atomic.AddInt32(&c.correlationID, 1)
c.correlationID++
return c.correlationID
}

func (c *Conn) readDeadline() time.Time {
Expand All @@ -605,15 +605,15 @@ func (c *Conn) writeDeadline() time.Time {
return c.wdeadline.deadline()
}

func (c *Conn) readOperation(write writeFunc, read readFunc) error {
func (c *Conn) readOperation(write func(time.Time, int32) error, read func(time.Time, int) error) error {
return c.do(&c.rdeadline, write, read)
}

func (c *Conn) writeOperation(write writeFunc, read readFunc) error {
func (c *Conn) writeOperation(write func(time.Time, int32) error, read func(time.Time, int) error) error {
return c.do(&c.wdeadline, write, read)
}

func (c *Conn) do(d *connDeadline, write writeFunc, read readFunc) error {
func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func(time.Time, int) error) error {
id, err := c.doRequest(d, write)
if err != nil {
return err
Expand All @@ -624,7 +624,7 @@ func (c *Conn) do(d *connDeadline, write writeFunc, read readFunc) error {
return err
}

if err = read(deadline, int(size)); err != nil {
if err = read(deadline, size); err != nil {
switch err.(type) {
case Error:
default:
Expand All @@ -637,13 +637,11 @@ func (c *Conn) do(d *connDeadline, write writeFunc, read readFunc) error {
return err
}

func (c *Conn) doRequest(d *connDeadline, write writeFunc) (id int32, err error) {
id = c.generateCorrelationID()

func (c *Conn) doRequest(d *connDeadline, write func(time.Time, int32) error) (id int32, err error) {
c.wlock.Lock()
id = c.generateCorrelationID()
err = write(d.setConnWriteDeadline(c.conn), id)
d.unsetConnWriteDeadline()
c.wlock.Unlock()

if err != nil {
// When an error occurs there's no way to know if the connection is in a
Expand All @@ -652,10 +650,11 @@ func (c *Conn) doRequest(d *connDeadline, write writeFunc) (id int32, err error)
c.conn.Close()
}

c.wlock.Unlock()
return
}

func (c *Conn) waitResponse(d *connDeadline, id int32) (deadline time.Time, size int32, lock *sync.Mutex, err error) {
func (c *Conn) waitResponse(d *connDeadline, id int32) (deadline time.Time, size int, lock *sync.Mutex, err error) {
for {
var rsz int32
var rid int32
Expand All @@ -672,7 +671,7 @@ func (c *Conn) waitResponse(d *connDeadline, id int32) (deadline time.Time, size

if id == rid {
c.skipResponseSizeAndID()
size, lock = rsz-4, &c.rlock
size, lock = int(rsz-4), &c.rlock
return
}

Expand Down Expand Up @@ -752,11 +751,3 @@ func (d *connDeadline) unsetConnWriteDeadline() {
d.wconn = nil
d.mutex.Unlock()
}

// writeFunc is the type of functions passed to (*Conn).do to write a request
// to the kafka connection.
type writeFunc func(deadline time.Time, id int32) error

// readFunc is the type of functions passed to (*Conn).do to read a response
// from the kafka connection.
type readFunc func(deadline time.Time, size int) error
14 changes: 14 additions & 0 deletions crc32.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@ import (
"sync"
)

func crc32OfMessage(magicByte int8, attributes int8, timestamp int64, key []byte, value []byte) uint32 {
b := acquireCrc32Buffer()
b.writeInt8(magicByte)
b.writeInt8(attributes)
if magicByte != 0 {
b.writeInt64(timestamp)
}
b.writeBytes(key)
b.writeBytes(value)
sum := b.sum
releaseCrc32Buffer(b)
return sum
}

type crc32Buffer struct {
sum uint32
buf bytes.Buffer
Expand Down
59 changes: 1 addition & 58 deletions discard.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package kafka

import (
"bufio"
"fmt"
"reflect"
)
import "bufio"

func discardN(r *bufio.Reader, sz int, n int) (int, error) {
n, err := r.Discard(n)
Expand Down Expand Up @@ -44,56 +40,3 @@ func discardBytes(r *bufio.Reader, sz int) (int, error) {
return discardN(r, sz, n)
})
}

func discard(r *bufio.Reader, sz int, a interface{}) (int, error) {
switch a.(type) {
case int8:
return discardInt8(r, sz)
case int16:
return discardInt16(r, sz)
case int32:
return discardInt32(r, sz)
case int64:
return discardInt64(r, sz)
case string:
return discardString(r, sz)
case []byte:
return discardBytes(r, sz)
}
switch v := reflect.ValueOf(a); v.Kind() {
case reflect.Struct:
return discardStruct(r, sz, v)
case reflect.Slice:
return discardSlice(r, sz, v)
default:
panic(fmt.Sprintf("unsupported type: %T", a))
}
}

func discardStruct(r *bufio.Reader, sz int, v reflect.Value) (int, error) {
var err error
for i, n := 0, v.NumField(); i != n; i++ {
if sz, err = discard(r, sz, v.Field(i)); err != nil {
break
}
}
return sz, err
}

func discardSlice(r *bufio.Reader, sz int, v reflect.Value) (int, error) {
var zero = reflect.Zero(v.Type().Elem())
var err error
var len int32

if sz, err = readInt32(r, sz, &len); err != nil {
return sz, err
}

for n := int(len); n > 0; n-- {
if sz, err = discard(r, sz, zero); err != nil {
break
}
}

return sz, err
}
14 changes: 2 additions & 12 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (msg Message) message() message {
MagicByte: 1,
Key: msg.Key,
Value: msg.Value,
Timestamp: timeToTimestamp(msg.Time),
Timestamp: timestamp(msg.Time),
}
m.CRC = m.crc32()
return m
Expand All @@ -43,17 +43,7 @@ type message struct {
}

func (m message) crc32() int32 {
b := acquireCrc32Buffer()
b.writeInt8(m.MagicByte)
b.writeInt8(m.Attributes)
if m.MagicByte != 0 {
b.writeInt64(m.Timestamp)
}
b.writeBytes(m.Key)
b.writeBytes(m.Value)
sum := b.sum
releaseCrc32Buffer(b)
return int32(sum)
return int32(crc32OfMessage(m.MagicByte, m.Attributes, m.Timestamp, m.Key, m.Value))
}

func (m message) size() int32 {
Expand Down
14 changes: 14 additions & 0 deletions produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,17 @@ func (p produceResponsePartitionV2) writeTo(w *bufio.Writer) {
writeInt64(w, p.Offset)
writeInt64(w, p.Timestamp)
}

func (p *produceResponsePartitionV2) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
if remain, err = readInt32(r, sz, &p.Partition); err != nil {
return
}
if remain, err = readInt16(r, remain, &p.ErrorCode); err != nil {
return
}
if remain, err = readInt64(r, remain, &p.Offset); err != nil {
return
}
remain, err = readInt64(r, remain, &p.Timestamp)
return
}
Loading

0 comments on commit 9cf044a

Please sign in to comment.