forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrecordbatch.go
108 lines (93 loc) · 2.52 KB
/
recordbatch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package kafka
import (
"bytes"
"time"
)
const recordBatchHeaderSize int32 = 0 +
8 + // base offset
4 + // batch length
4 + // partition leader epoch
1 + // magic
4 + // crc
2 + // attributes
4 + // last offset delta
8 + // first timestamp
8 + // max timestamp
8 + // producer id
2 + // producer epoch
4 + // base sequence
4 // msg count
func recordBatchSize(msgs ...Message) (size int32) {
size = recordBatchHeaderSize
baseTime := msgs[0].Time
for i := range msgs {
msg := &msgs[i]
msz := recordSize(msg, msg.Time.Sub(baseTime), int64(i))
size += int32(msz + varIntLen(int64(msz)))
}
return
}
func compressRecordBatch(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int16, size int32, err error) {
compressed = acquireBuffer()
compressor := codec.NewWriter(compressed)
wb := &writeBuffer{w: compressor}
for i, msg := range msgs {
wb.writeRecord(0, msgs[0].Time, int64(i), msg)
}
if err = compressor.Close(); err != nil {
releaseBuffer(compressed)
return
}
attributes = int16(codec.Code())
size = recordBatchHeaderSize + int32(compressed.Len())
return
}
type recordBatch struct {
// required input parameters
codec CompressionCodec
attributes int16
msgs []Message
// parameters calculated during init
compressed *bytes.Buffer
size int32
}
func newRecordBatch(codec CompressionCodec, msgs ...Message) (r *recordBatch, err error) {
r = &recordBatch{
codec: codec,
msgs: msgs,
}
if r.codec == nil {
r.size = recordBatchSize(r.msgs...)
} else {
r.compressed, r.attributes, r.size, err = compressRecordBatch(r.codec, r.msgs...)
}
return
}
func (r *recordBatch) writeTo(wb *writeBuffer) {
wb.writeInt32(r.size)
baseTime := r.msgs[0].Time
lastTime := r.msgs[len(r.msgs)-1].Time
if r.compressed != nil {
wb.writeRecordBatch(r.attributes, r.size, len(r.msgs), baseTime, lastTime, func(wb *writeBuffer) {
wb.Write(r.compressed.Bytes())
})
releaseBuffer(r.compressed)
} else {
wb.writeRecordBatch(r.attributes, r.size, len(r.msgs), baseTime, lastTime, func(wb *writeBuffer) {
for i, msg := range r.msgs {
wb.writeRecord(0, r.msgs[0].Time, int64(i), msg)
}
})
}
}
func recordSize(msg *Message, timestampDelta time.Duration, offsetDelta int64) int {
return 1 + // attributes
varIntLen(int64(milliseconds(timestampDelta))) +
varIntLen(offsetDelta) +
varBytesLen(msg.Key) +
varBytesLen(msg.Value) +
varArrayLen(len(msg.Headers), func(i int) int {
h := &msg.Headers[i]
return varStringLen(h.Key) + varBytesLen(h.Value)
})
}