package kafka
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"time"
)
type writable interface {
writeTo(*bufio.Writer)
}
func writeInt8(w *bufio.Writer, i int8) {
w.WriteByte(byte(i))
}
func writeInt16(w *bufio.Writer, i int16) {
var b [2]byte
binary.BigEndian.PutUint16(b[:], uint16(i))
w.WriteByte(b[0])
w.WriteByte(b[1])
}
func writeInt32(w *bufio.Writer, i int32) {
var b [4]byte
binary.BigEndian.PutUint32(b[:], uint32(i))
w.WriteByte(b[0])
w.WriteByte(b[1])
w.WriteByte(b[2])
w.WriteByte(b[3])
}
func writeInt64(w *bufio.Writer, i int64) {
var b [8]byte
binary.BigEndian.PutUint64(b[:], uint64(i))
w.WriteByte(b[0])
w.WriteByte(b[1])
w.WriteByte(b[2])
w.WriteByte(b[3])
w.WriteByte(b[4])
w.WriteByte(b[5])
w.WriteByte(b[6])
w.WriteByte(b[7])
}
func writeVarInt(w *bufio.Writer, i int64) {
u := uint64((i << 1) ^ (i >> 63))
for u >= 0x80 {
w.WriteByte(byte(u) | 0x80)
u >>= 7
}
w.WriteByte(byte(u))
}
func varIntLen(i int64) (l int) {
i = i<<1 ^ i>>63
for i&0x7f != i {
l++
i >>= 7
}
l++
return l
}
func writeString(w *bufio.Writer, s string) {
writeInt16(w, int16(len(s)))
w.WriteString(s)
}
func writeNullableString(w *bufio.Writer, s *string) {
if s == nil {
writeInt16(w, -1)
} else {
writeString(w, *s)
}
}
func writeBytes(w *bufio.Writer, b []byte) {
n := len(b)
if b == nil {
n = -1
}
writeInt32(w, int32(n))
w.Write(b)
}
func writeBool(w *bufio.Writer, b bool) {
v := int8(0)
if b {
v = 1
}
writeInt8(w, v)
}
func writeArrayLen(w *bufio.Writer, n int) {
writeInt32(w, int32(n))
}
func writeArray(w *bufio.Writer, n int, f func(int)) {
writeArrayLen(w, n)
for i := 0; i != n; i++ {
f(i)
}
}
func writeStringArray(w *bufio.Writer, a []string) {
writeArray(w, len(a), func(i int) { writeString(w, a[i]) })
}
func writeInt32Array(w *bufio.Writer, a []int32) {
writeArray(w, len(a), func(i int) { writeInt32(w, a[i]) })
}
func write(w *bufio.Writer, a interface{}) {
switch v := a.(type) {
case int8:
writeInt8(w, v)
case int16:
writeInt16(w, v)
case int32:
writeInt32(w, v)
case int64:
writeInt64(w, v)
case string:
writeString(w, v)
case []byte:
writeBytes(w, v)
case bool:
writeBool(w, v)
case writable:
v.writeTo(w)
default:
panic(fmt.Sprintf("unsupported type: %T", a))
}
}
// The functions bellow are used as optimizations to avoid dynamic memory
// allocations that occur when building the data structures representing the
// kafka protocol requests.
func writeFetchRequestV2(w *bufio.Writer, correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration) error {
h := requestHeader{
ApiKey: int16(fetchRequest),
ApiVersion: int16(v2),
CorrelationID: correlationID,
ClientID: clientID,
}
h.Size = (h.size() - 4) +
4 + // replica ID
4 + // max wait time
4 + // min bytes
4 + // topic array length
sizeofString(topic) +
4 + // partition array length
4 + // partition
8 + // offset
4 // max bytes
h.writeTo(w)
writeInt32(w, -1) // replica ID
writeInt32(w, milliseconds(maxWait))
writeInt32(w, int32(minBytes))
// topic array
writeArrayLen(w, 1)
writeString(w, topic)
// partition array
writeArrayLen(w, 1)
writeInt32(w, partition)
writeInt64(w, offset)
writeInt32(w, int32(maxBytes))
return w.Flush()
}
func writeFetchRequestV5(w *bufio.Writer, correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error {
h := requestHeader{
ApiKey: int16(fetchRequest),
ApiVersion: int16(v5),
CorrelationID: correlationID,
ClientID: clientID,
}
h.Size = (h.size() - 4) +
4 + // replica ID
4 + // max wait time
4 + // min bytes
4 + // max bytes
1 + // isolation level
4 + // topic array length
sizeofString(topic) +
4 + // partition array length
4 + // partition
8 + // offset
8 + // log start offset
4 // max bytes
h.writeTo(w)
writeInt32(w, -1) // replica ID
writeInt32(w, milliseconds(maxWait))
writeInt32(w, int32(minBytes))
writeInt32(w, int32(maxBytes))
writeInt8(w, isolationLevel) // isolation level 0 - read uncommitted
// topic array
writeArrayLen(w, 1)
writeString(w, topic)
// partition array
writeArrayLen(w, 1)
writeInt32(w, partition)
writeInt64(w, offset)
writeInt64(w, int64(0)) // log start offset only used when is sent by follower
writeInt32(w, int32(maxBytes))
return w.Flush()
}
func writeFetchRequestV10(w *bufio.Writer, correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error {
h := requestHeader{
ApiKey: int16(fetchRequest),
ApiVersion: int16(v10),
CorrelationID: correlationID,
ClientID: clientID,
}
h.Size = (h.size() - 4) +
4 + // replica ID
4 + // max wait time
4 + // min bytes
4 + // max bytes
1 + // isolation level
4 + // session ID
4 + // session epoch
4 + // topic array length
sizeofString(topic) +
4 + // partition array length
4 + // partition
4 + // current leader epoch
8 + // fetch offset
8 + // log start offset
4 + // partition max bytes
4 // forgotten topics data
h.writeTo(w)
writeInt32(w, -1) // replica ID
writeInt32(w, milliseconds(maxWait))
writeInt32(w, int32(minBytes))
writeInt32(w, int32(maxBytes))
writeInt8(w, isolationLevel) // isolation level 0 - read uncommitted
writeInt32(w, 0) //FIXME
writeInt32(w, -1) //FIXME
// topic array
writeArrayLen(w, 1)
writeString(w, topic)
// partition array
writeArrayLen(w, 1)
writeInt32(w, partition)
writeInt32(w, -1) //FIXME
writeInt64(w, offset)
writeInt64(w, int64(0)) // log start offset only used when is sent by follower
writeInt32(w, int32(maxBytes))
// forgotten topics array
writeArrayLen(w, 0) // forgotten topics not supported yet
return w.Flush()
}
func writeListOffsetRequestV1(w *bufio.Writer, correlationID int32, clientID, topic string, partition int32, time int64) error {
h := requestHeader{
ApiKey: int16(listOffsetRequest),
ApiVersion: int16(v1),
CorrelationID: correlationID,
ClientID: clientID,
}
h.Size = (h.size() - 4) +
4 + // replica ID
4 + // topic array length
sizeofString(topic) + // topic
4 + // partition array length
4 + // partition
8 // time
h.writeTo(w)
writeInt32(w, -1) // replica ID
// topic array
writeArrayLen(w, 1)
writeString(w, topic)
// partition array
writeArrayLen(w, 1)
writeInt32(w, partition)
writeInt64(w, time)
return w.Flush()
}
func writeProduceRequestV2(w *bufio.Writer, codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, msgs ...Message) (err error) {
attributes := int8(CompressionNoneCode)
if codec != nil {
if msgs, err = compressMessageSet(codec, msgs...); err != nil {
return err
}
attributes = codec.Code()
}
size := messageSetSize(msgs...)
h := requestHeader{
ApiKey: int16(produceRequest),
ApiVersion: int16(v2),
CorrelationID: correlationID,
ClientID: clientID,
}
h.Size = (h.size() - 4) +
2 + // required acks
4 + // timeout
4 + // topic array length
sizeofString(topic) + // topic
4 + // partition array length
4 + // partition
4 + // message set size
size
h.writeTo(w)
writeInt16(w, requiredAcks) // required acks
writeInt32(w, milliseconds(timeout))
// topic array
writeArrayLen(w, 1)
writeString(w, topic)
// partition array
writeArrayLen(w, 1)
writeInt32(w, partition)
writeInt32(w, size)
for _, msg := range msgs {
writeMessage(w, msg.Offset, attributes, msg.Time, msg.Key, msg.Value)
}
return w.Flush()
}
func writeProduceRequestV3(w *bufio.Writer, codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, msgs ...Message) (err error) {
var size int32
var compressed []byte
var attributes int16
if codec == nil {
size = recordBatchSize(msgs...)
} else {
compressed, attributes, size, err = compressRecordBatch(codec, msgs...)
if err != nil {
return
}
}
h := requestHeader{
ApiKey: int16(produceRequest),
ApiVersion: int16(v3),
CorrelationID: correlationID,
ClientID: clientID,
}
h.Size = (h.size() - 4) +
sizeofNullableString(transactionalID) +
2 + // required acks
4 + // timeout
4 + // topic array length
sizeofString(topic) + // topic
4 + // partition array length
4 + // partition
4 + // message set size
size
h.writeTo(w)
writeNullableString(w, transactionalID)
writeInt16(w, requiredAcks) // required acks
writeInt32(w, milliseconds(timeout))
// topic array
writeArrayLen(w, 1)
writeString(w, topic)
// partition array
writeArrayLen(w, 1)
writeInt32(w, partition)
writeInt32(w, size)
if codec != nil {
err = writeRecordBatch(w, attributes, size, func(w *bufio.Writer) {
w.Write(compressed)
}, msgs...)
} else {
err = writeRecordBatch(w, attributes, size, func(w *bufio.Writer) {
for i, msg := range msgs {
writeRecord(w, 0, msgs[0].Time, int64(i), msg)
}
}, msgs...)
}
if err != nil {
return
}
return w.Flush()
}
func writeProduceRequestV7(w *bufio.Writer, codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, msgs ...Message) (err error) {
var size int32
var compressed []byte
var attributes int16
if codec == nil {
size = recordBatchSize(msgs...)
} else {
compressed, attributes, size, err = compressRecordBatch(codec, msgs...)
if err != nil {
return
}
}
h := requestHeader{
ApiKey: int16(produceRequest),
ApiVersion: int16(v7),
CorrelationID: correlationID,
ClientID: clientID,
}
h.Size = (h.size() - 4) +
sizeofNullableString(transactionalID) +
2 + // required acks
4 + // timeout
4 + // topic array length
sizeofString(topic) + // topic
4 + // partition array length
4 + // partition
4 + // message set size
size
h.writeTo(w)
writeNullableString(w, transactionalID)
writeInt16(w, requiredAcks) // required acks
writeInt32(w, milliseconds(timeout))
// topic array
writeArrayLen(w, 1)
writeString(w, topic)
// partition array
writeArrayLen(w, 1)
writeInt32(w, partition)
writeInt32(w, size)
if codec != nil {
err = writeRecordBatch(w, attributes, size, func(w *bufio.Writer) {
w.Write(compressed)
}, msgs...)
} else {
err = writeRecordBatch(w, attributes, size, func(w *bufio.Writer) {
for i, msg := range msgs {
writeRecord(w, 0, msgs[0].Time, int64(i), msg)
}
}, msgs...)
}
if err != nil {
return
}
return w.Flush()
}
func messageSetSize(msgs ...Message) (size int32) {
for _, msg := range msgs {
size += 8 + // offset
4 + // message size
4 + // crc
1 + // magic byte
1 + // attributes
8 + // timestamp
sizeofBytes(msg.Key) +
sizeofBytes(msg.Value)
}
return
}
const recordBatchHeaderSize int32 = 0 +
8 + // base offset
4 + // batch length
4 + // partition leader epoch
1 + // magic
4 + // crc
2 + // attributes
4 + // last offset delta
8 + // first timestamp
8 + // max timestamp
8 + // producer id
2 + // producer epoch
4 + // base sequence
4 // msg count
func recordBatchSize(msgs ...Message) (size int32) {
size = recordBatchHeaderSize
baseTime := msgs[0].Time
for i, msg := range msgs {
sz := recordSize(&msg, msg.Time.Sub(baseTime), int64(i))
size += int32(sz + varIntLen(int64(sz)))
}
return
}
func writeRecordBatch(w *bufio.Writer, attributes int16, size int32, write func(*bufio.Writer), msgs ...Message) error {
baseTime := msgs[0].Time
lastTime := msgs[len(msgs)-1].Time
writeInt64(w, int64(0))
writeInt32(w, int32(size-12)) // 12 = batch length + base offset sizes
writeInt32(w, -1) // partition leader epoch
writeInt8(w, 2) // magic byte
crcBuf := &bytes.Buffer{}
crcBuf.Grow(int(size - 12)) // 12 = batch length + base offset sizes
crcWriter := bufio.NewWriter(crcBuf)
writeInt16(crcWriter, attributes) // attributes, timestamp type 0 - create time, not part of a transaction, no control messages
writeInt32(crcWriter, int32(len(msgs)-1)) // max offset
writeInt64(crcWriter, timestamp(baseTime))
writeInt64(crcWriter, timestamp(lastTime))
writeInt64(crcWriter, -1) // default producer id for now
writeInt16(crcWriter, -1) // default producer epoch for now
writeInt32(crcWriter, -1) // default base sequence
writeInt32(crcWriter, int32(len(msgs))) // record count
write(crcWriter)
if err := crcWriter.Flush(); err != nil {
return err
}
crcTable := crc32.MakeTable(crc32.Castagnoli)
crcChecksum := crc32.Checksum(crcBuf.Bytes(), crcTable)
writeInt32(w, int32(crcChecksum))
if _, err := w.Write(crcBuf.Bytes()); err != nil {
return err
}
return nil
}
var maxDate = time.Date(5000, time.January, 0, 0, 0, 0, 0, time.UTC)
func recordSize(msg *Message, timestampDelta time.Duration, offsetDelta int64) (size int) {
size += 1 + // attributes
varIntLen(int64(milliseconds(timestampDelta))) +
varIntLen(offsetDelta) +
varIntLen(int64(len(msg.Key))) +
len(msg.Key) +
varIntLen(int64(len(msg.Value))) +
len(msg.Value) +
varIntLen(int64(len(msg.Headers)))
for _, h := range msg.Headers {
size += varIntLen(int64(len([]byte(h.Key)))) +
len([]byte(h.Key)) +
varIntLen(int64(len(h.Value))) +
len(h.Value)
}
return
}
func compressMessageSet(codec CompressionCodec, msgs ...Message) ([]Message, error) {
estimatedLen := 0
for _, msg := range msgs {
estimatedLen += int(msgSize(msg.Key, msg.Value))
}
buffer := &bytes.Buffer{}
buffer.Grow(estimatedLen / 2)
compressor := codec.NewWriter(buffer)
compressedWriter := bufio.NewWriterSize(compressor, 512)
for offset, msg := range msgs {
writeMessage(compressedWriter, int64(offset), CompressionNoneCode, msg.Time, msg.Key, msg.Value)
}
if err := compressedWriter.Flush(); err != nil {
compressor.Close()
return nil, err
}
if err := compressor.Close(); err != nil {
return nil, err
}
return []Message{{Value: buffer.Bytes()}}, nil
}
func compressRecordBatch(codec CompressionCodec, msgs ...Message) (compressed []byte, attributes int16, size int32, err error) {
recordBuf := new(bytes.Buffer)
recordBuf.Grow(int(recordBatchSize(msgs...)) / 2)
compressor := codec.NewWriter(recordBuf)
compressedWriter := bufio.NewWriterSize(compressor, 512)
for i, msg := range msgs {
writeRecord(compressedWriter, 0, msgs[0].Time, int64(i), msg)
}
if err = compressedWriter.Flush(); err != nil {
compressor.Close()
return
}
if err = compressor.Close(); err != nil {
return
}
compressed = recordBuf.Bytes()
attributes = int16(codec.Code())
size = recordBatchHeaderSize + int32(len(compressed))
return
}
const magicByte = 1 // compatible with kafka 0.10.0.0+
func writeMessage(w *bufio.Writer, offset int64, attributes int8, time time.Time, key, value []byte) {
timestamp := timestamp(time)
crc32 := crc32OfMessage(magicByte, attributes, timestamp, key, value)
size := msgSize(key, value)
writeInt64(w, offset)
writeInt32(w, size)
writeInt32(w, int32(crc32))
writeInt8(w, magicByte)
writeInt8(w, attributes)
writeInt64(w, timestamp)
writeBytes(w, key)
writeBytes(w, value)
}
func msgSize(key, value []byte) int32 {
return 4 + // crc
1 + // magic byte
1 + // attributes
8 + // timestamp
sizeofBytes(key) +
sizeofBytes(value)
}
// Messages with magic >2 are called records. This method writes messages using message format 2.
func writeRecord(w *bufio.Writer, attributes int8, baseTime time.Time, offset int64, msg Message) {
timestampDelta := msg.Time.Sub(baseTime)
offsetDelta := int64(offset)
writeVarInt(w, int64(recordSize(&msg, timestampDelta, offsetDelta)))
writeInt8(w, attributes)
writeVarInt(w, int64(milliseconds(timestampDelta)))
writeVarInt(w, offsetDelta)
writeVarInt(w, int64(len(msg.Key)))
w.Write(msg.Key)
writeVarInt(w, int64(len(msg.Value)))
w.Write(msg.Value)
writeVarInt(w, int64(len(msg.Headers)))
for _, h := range msg.Headers {
writeVarInt(w, int64(len(h.Key)))
w.Write([]byte(h.Key))
writeVarInt(w, int64(len(h.Value)))
w.Write(h.Value)
}
}