Skip to content

Commit

Permalink
optimize fetch request
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille Roussel committed May 31, 2017
1 parent e1cd444 commit 38c2d1a
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 31 deletions.
37 changes: 15 additions & 22 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,22 +315,19 @@ func (c *Conn) ReadBatch(minBytes int, maxBytes int) *Batch {

id, err := c.doRequest(&c.rdeadline, func(deadline time.Time, id int32) error {
now := time.Now()
adj := adjustDeadlineForRTT(deadline, now, defaultRTT)
err := c.writeRequest(fetchRequest, v1, id, fetchRequestV1{
ReplicaID: -1,
MinBytes: int32(minBytes),
MaxWaitTime: milliseconds(deadlineToTimeout(adj, now)),
Topics: []fetchRequestTopicV1{{
TopicName: c.topic,
Partitions: []fetchRequestPartitionV1{{
Partition: c.partition,
MaxBytes: c.fetchMinSize + int32(maxBytes),
FetchOffset: offset,
}},
}},
})
adjustedDeadline = adj
return err
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
adjustedDeadline = deadline
return writeFetchRequestV1(
&c.wbuf,
id,
c.clientID,
c.topic,
c.partition,
offset,
minBytes,
maxBytes+int(c.fetchMinSize),
deadlineToTimeout(deadline, now),
)
})
if err != nil {
return &Batch{err: err}
Expand Down Expand Up @@ -586,11 +583,6 @@ func (c *Conn) skipResponseSizeAndID() {
c.rbuf.Discard(8)
}

func (c *Conn) generateCorrelationID() int32 {
c.correlationID++
return c.correlationID
}

func (c *Conn) readDeadline() time.Time {
return c.rdeadline.deadline()
}
Expand Down Expand Up @@ -633,7 +625,8 @@ func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func

func (c *Conn) doRequest(d *connDeadline, write func(time.Time, int32) error) (id int32, err error) {
c.wlock.Lock()
id = c.generateCorrelationID()
c.correlationID++
id = c.correlationID
err = write(d.setConnWriteDeadline(c.conn), id)
d.unsetConnWriteDeadline()

Expand Down
8 changes: 5 additions & 3 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ func testConnReadEmptyWithDeadline(t *testing.T, conn *Conn) {
}
}

const benchmarkMessageCount = 100

func BenchmarkConn(b *testing.B) {
benchmarks := []struct {
scenario string
Expand Down Expand Up @@ -475,7 +477,7 @@ func BenchmarkConn(b *testing.B) {

topic := makeTopic()
value := make([]byte, 10e3) // 10 KB
msgs := make([]Message, 1000)
msgs := make([]Message, benchmarkMessageCount)

for i := range msgs {
msgs[i].Value = value
Expand All @@ -501,7 +503,7 @@ func BenchmarkConn(b *testing.B) {

func benchmarkConnSeek(b *testing.B, conn *Conn, _ []byte) {
for i := 0; i != b.N; i++ {
if _, err := conn.Seek(int64(i%1000), 1); err != nil {
if _, err := conn.Seek(int64(i%benchmarkMessageCount), 1); err != nil {
b.Error(err)
return
}
Expand All @@ -513,7 +515,7 @@ func benchmarkConnRead(b *testing.B, conn *Conn, a []byte) {
i := 0

for i != b.N {
if (i % 1000) == 0 {
if (i % benchmarkMessageCount) == 0 {
if _, err := conn.Seek(0, 0); err != nil {
b.Error(err)
return
Expand Down
45 changes: 40 additions & 5 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,46 @@ func write(w *bufio.Writer, a interface{}) {
}
}

// This function is used as an optimization to avoid dynamic memory allocations
// in the common case of reading an offset on a single topic and partition.
// The functions bellow are used as optimizations to avoid dynamic memory
// allocations that occur when building the data structures representing the
// kafka protocol requests.

func writeFetchRequestV1(w *bufio.Writer, correleationID int32, clientID string, topic string, partition int32, offset int64, minBytes int, maxBytes int, maxWait time.Duration) error {
h := requestHeader{
ApiKey: int16(fetchRequest),
ApiVersion: int16(v1),
CorrelationID: correleationID,
ClientID: clientID,
}
h.Size = (h.size() - 4) +
4 + // replica ID
4 + // max wait time
4 + // min bytes
4 + // topic array length
sizeofString(topic) +
4 + // partition array length
4 + // partition
8 + // offset
4 // max bytes

h.writeTo(w)
writeInt32(w, -1) // replica ID
writeInt32(w, milliseconds(maxWait))
writeInt32(w, int32(minBytes))

// topic array
writeArrayLen(w, 1)
writeString(w, topic)

// partition array
writeArrayLen(w, 1)
writeInt32(w, partition)
writeInt64(w, offset)
writeInt32(w, int32(maxBytes))

return w.Flush()
}

func writeListOffsetRequestV1(w *bufio.Writer, correleationID int32, clientID string, topic string, parition int32, time int64) error {
h := requestHeader{
ApiKey: int16(listOffsetRequest),
Expand Down Expand Up @@ -130,9 +168,6 @@ func writeListOffsetRequestV1(w *bufio.Writer, correleationID int32, clientID st
return w.Flush()
}

// This function is used as an optimization to avoid dynamic memory allocations
// in the common case of sending a batch messages to a kafka server for a single
// topic and partition.
func writeProduceRequestV2(w *bufio.Writer, correleationID int32, clientID string, topic string, partition int32, timeout time.Duration, msgs ...Message) error {
var size int32

Expand Down
34 changes: 33 additions & 1 deletion write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,42 @@ const (

func TestWriteOptimizations(t *testing.T) {
t.Parallel()
t.Run("writeFetchRequestV1", testWriteFetchRequestV1)
t.Run("writeListOffsetRequestV1", testWriteListOffsetRequestV1)
t.Run("writeProduceRequestV2", testWriteProduceRequestV2)
}

func testWriteFetchRequestV1(t *testing.T) {
const offset = 42
const minBytes = 10
const maxBytes = 1000
const maxWait = 100 * time.Millisecond
testWriteOptimization(t,
requestHeader{
ApiKey: int16(fetchRequest),
ApiVersion: int16(v1),
CorrelationID: testCorrelationID,
ClientID: testClientID,
},
fetchRequestV1{
ReplicaID: -1,
MaxWaitTime: milliseconds(maxWait),
MinBytes: minBytes,
Topics: []fetchRequestTopicV1{{
TopicName: testTopic,
Partitions: []fetchRequestPartitionV1{{
Partition: testPartition,
FetchOffset: offset,
MaxBytes: maxBytes,
}},
}},
},
func(w *bufio.Writer) {
writeFetchRequestV1(w, testCorrelationID, testClientID, testTopic, testPartition, offset, minBytes, maxBytes, maxWait)
},
)
}

func testWriteListOffsetRequestV1(t *testing.T) {
const time = -1
testWriteOptimization(t,
Expand Down Expand Up @@ -121,7 +153,7 @@ func testWriteOptimization(t *testing.T, h requestHeader, r request, f func(*buf
} else {
for i := 0; i != n1; i++ {
if c1[i] != c2[i] {
t.Logf("byte at offset %d: %#x != %#x", i, c1[i], c2[i])
t.Logf("byte at offset %d/%d: %#x != %#x", i, n1, c1[i], c2[i])
break
}
}
Expand Down

0 comments on commit 38c2d1a

Please sign in to comment.