Skip to content

Commit

Permalink
fix record timestamp delta (segmentio#266)
Browse files Browse the repository at this point in the history
* fix record timestamp delta

* better tests

* fix go vet complains

* truncate timestamp to milliseconds in generated message sequences
  • Loading branch information
Achille authored May 7, 2019
1 parent b4a5812 commit c616928
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 11 deletions.
18 changes: 12 additions & 6 deletions conn_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -463,7 +464,9 @@ func testConnWriteReadSequentially(t *testing.T, conn *Conn) {
}

func testConnWriteBatchReadSequentially(t *testing.T, conn *Conn) {
if _, err := conn.WriteMessages(makeTestSequence(10)...); err != nil {
msgs := makeTestSequence(10)

if _, err := conn.WriteMessages(msgs...); err != nil {
t.Fatal(err)
}

Expand All @@ -473,11 +476,14 @@ func testConnWriteBatchReadSequentially(t *testing.T, conn *Conn) {
t.Error(err)
continue
}
s := string(msg.Value)
if v, err := strconv.Atoi(s); err != nil {
t.Error(err)
} else if v != i {
t.Errorf("bad message read at offset %d: %s", i, s)
if !bytes.Equal(msg.Key, msgs[i].Key) {
t.Errorf("bad message key at offset %d: %q != %q", i, msg.Key, msgs[i].Key)
}
if !bytes.Equal(msg.Value, msgs[i].Value) {
t.Errorf("bad message value at offset %d: %q != %q", i, msg.Value, msgs[i].Value)
}
if !msg.Time.Equal(msgs[i].Time) {
t.Errorf("bad message time at offset %d: %s != %s", i, msg.Time, msgs[i].Time)
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
module github.com/segmentio/kafka-go

go 1.11

require (
github.com/DataDog/zstd v1.4.0
github.com/golang/snappy v0.0.1
github.com/pierrec/lz4 v2.0.5+incompatible
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/xdg/stringprep v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 // indirect
)
18 changes: 18 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
github.com/DataDog/zstd v1.4.0 h1:vhoV+DUHnRZdKW1i5UMjAk2G4JY8wN4ayRfYDNdEhwo=
github.com/DataDog/zstd v1.4.0/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 h1:rlLehGeYg6jfoyz/eDqDU1iRXLKfR42nnNh57ytKEWo=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
2 changes: 2 additions & 0 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,11 @@ func TestReadTruncatedMessages(t *testing.T) {
}

func makeTestSequence(n int) []Message {
base := time.Now()
msgs := make([]Message, n)
for i := 0; i != n; i++ {
msgs[i] = Message{
Time: base.Add(time.Duration(i) * time.Millisecond).Truncate(time.Millisecond),
Value: []byte(strconv.Itoa(i)),
}
}
Expand Down
10 changes: 5 additions & 5 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,10 @@ func writeProduceRequestV2(w *bufio.Writer, codec CompressionCodec, correlationI
}

func writeProduceRequestV3(w *bufio.Writer, codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, msgs ...Message) (err error) {

var size int32
var compressed []byte
var attributes int16

if codec != nil {
attributes = int16(codec.Code())
recordBuf := &bytes.Buffer{}
Expand Down Expand Up @@ -542,6 +542,7 @@ func recordBatchSize(msgs ...Message) (size int32) {
func writeRecordBatch(w *bufio.Writer, attributes int16, size int32, write func(*bufio.Writer), msgs ...Message) error {

baseTime := msgs[0].Time
lastTime := msgs[len(msgs)-1].Time

writeInt64(w, int64(0))

Expand All @@ -557,8 +558,7 @@ func writeRecordBatch(w *bufio.Writer, attributes int16, size int32, write func(
writeInt16(crcWriter, attributes) // attributes, timestamp type 0 - create time, not part of a transaction, no control messages
writeInt32(crcWriter, int32(len(msgs)-1)) // max offset
writeInt64(crcWriter, timestamp(baseTime))
lastTime := timestamp(msgs[len(msgs)-1].Time)
writeInt64(crcWriter, int64(lastTime))
writeInt64(crcWriter, timestamp(lastTime))
writeInt64(crcWriter, -1) // default producer id for now
writeInt16(crcWriter, -1) // default producer epoch for now
writeInt32(crcWriter, -1) // default base sequence
Expand All @@ -584,7 +584,7 @@ var maxDate = time.Date(5000, time.January, 0, 0, 0, 0, 0, time.UTC)

func recordSize(msg *Message, timestampDelta time.Duration, offsetDelta int64) (size int) {
size += 1 + // attributes
varIntLen(int64(timestampDelta)) +
varIntLen(int64(milliseconds(timestampDelta))) +
varIntLen(offsetDelta) +
varIntLen(int64(len(msg.Key))) +
len(msg.Key) +
Expand Down Expand Up @@ -656,7 +656,7 @@ func writeRecord(w *bufio.Writer, attributes int8, baseTime time.Time, offset in
writeVarInt(w, int64(recordSize(&msg, timestampDelta, offsetDelta)))

writeInt8(w, attributes)
writeVarInt(w, int64(timestampDelta))
writeVarInt(w, int64(milliseconds(timestampDelta)))
writeVarInt(w, offsetDelta)

writeVarInt(w, int64(len(msg.Key)))
Expand Down

0 comments on commit c616928

Please sign in to comment.