Skip to content

Commit

Permalink
0.4: fix short writes (segmentio#479)
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille authored Aug 12, 2020
1 parent a37955f commit ee2efb7
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 5 deletions.
117 changes: 117 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package kafka

import (
"bytes"
"context"
"io"
"math/rand"
"net"
"sync"
"testing"
"time"

"github.com/segmentio/kafka-go/compress"
)

func newLocalClientAndTopic() (*Client, string, func()) {
Expand Down Expand Up @@ -183,3 +188,115 @@ func testConsumerGroupFetchOffsets(t *testing.T, ctx context.Context, c *Client)
}
}
}

func TestClientProduceAndConsume(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Tests a typical kafka use case, data is produced to a partition,
// then consumed back sequentially. We use snappy compression because
// kafka stream are often compressed, and verify that each record
// produced is exposed to the consumer, and order is preserved.
client, topic, shutdown := newLocalClientAndTopic()
defer shutdown()

epoch := time.Now()
seed := int64(0) // deterministic
prng := rand.New(rand.NewSource(seed))
offset := int64(0)

const numBatches = 100
const recordsPerBatch = 320
t.Logf("producing %d batches of %d records...", numBatches, recordsPerBatch)

for i := 0; i < numBatches; i++ { // produce 100 batches
records := make([]Record, recordsPerBatch)

for i := range records {
v := make([]byte, prng.Intn(999)+1)
io.ReadFull(prng, v)
records[i].Time = epoch
records[i].Value = NewBytes(v)
}

res, err := client.Produce(ctx, &ProduceRequest{
Topic: topic,
Partition: 0,
RequiredAcks: -1,
Records: NewRecordReader(records...),
Compression: compress.Snappy,
})
if err != nil {
t.Fatal(err)
}
if res.Error != nil {
t.Fatal(res.Error)
}
if res.BaseOffset != offset {
t.Fatalf("records were produced at an unexpected offset, want %d but got %d", offset, res.BaseOffset)
}
offset += int64(len(records))
}

prng.Seed(seed)
offset = 0 // reset
numFetches := 0
numRecords := 0

for numRecords < (numBatches * recordsPerBatch) {
res, err := client.Fetch(ctx, &FetchRequest{
Topic: topic,
Partition: 0,
Offset: offset,
MinBytes: 1,
MaxBytes: 256 * 1024,
MaxWait: 100 * time.Millisecond, // should only hit on the last fetch
})
if err != nil {
t.Fatal(err)
}
if res.Error != nil {
t.Fatal(err)
}

for {
r, err := res.Records.ReadRecord()
if err != nil {
if err != io.EOF {
t.Fatal(err)
}
break
}

if r.Key != nil {
r.Key.Close()
t.Error("unexpected non-null key on record at offset", r.Offset)
}

n := prng.Intn(999) + 1
a := make([]byte, n)
b := make([]byte, n)
io.ReadFull(prng, a)

_, err = io.ReadFull(r.Value, b)
r.Value.Close()
if err != nil {
t.Fatalf("reading record at offset %d: %v", r.Offset, err)
}

if !bytes.Equal(a, b) {
t.Fatalf("value of record at offset %d mismatches", r.Offset)
}

if r.Offset != offset {
t.Fatalf("record at offset %d was expected to have offset %d", r.Offset, offset)
}

offset = r.Offset + 1
numRecords++
}

numFetches++
}

t.Logf("%d records were read in %d fetches", numRecords, numFetches)
}
29 changes: 29 additions & 0 deletions docker-compose.010.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
version: "3"
services:
kafka:
image: wurstmeister/kafka:0.10.1.1
links:
- zookeeper
ports:
- 9092:9092
- 9093:9093
environment:
KAFKA_BROKER_ID: '1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_ADVERTISED_HOST_NAME: 'localhost'
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_MESSAGE_MAX_BYTES: '200000000'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
zookeeper:
image: wurstmeister/zookeeper
ports:
- 2181:2181
8 changes: 4 additions & 4 deletions protocol/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"io"
"math"
"sort"
"sync"
"sync/atomic"
)
Expand Down Expand Up @@ -461,9 +460,10 @@ func (pages contiguousPages) slice(begin, end int64) contiguousPages {
}

func (pages contiguousPages) indexOf(offset int64) int {
return sort.Search(len(pages), func(i int) bool {
return offset < (pages[i].offset + pages[i].Size())
})
if len(pages) == 0 {
return 0
}
return int((offset - pages[0].offset) / pageSize)
}

func (pages contiguousPages) scan(begin, end int64, f func([]byte) bool) {
Expand Down
2 changes: 1 addition & 1 deletion protocol/record_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (rs *RecordSet) writeToVersion1(buffer *pageBuffer, bufferOffset int64) err
var err error
buffer.pages.scan(bufferOffset, buffer.Size(), func(b []byte) bool {
_, err = compressor.Write(b)
return err != nil
return err == nil
})
if err != nil {
return err
Expand Down

0 comments on commit ee2efb7

Please sign in to comment.