forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gzip.go
131 lines (108 loc) · 2.72 KB
/
gzip.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package gzip
import (
"bytes"
"compress/gzip"
"io"
"io/ioutil"
"sync"
kafka "github.com/segmentio/kafka-go"
)
var (
// emptyGzipBytes is the binary value for an empty file that has been
// gzipped. It is used to initialize gzip.Reader before adding it to the
// readerPool.
emptyGzipBytes = [...]byte{
0x1f, 0x8b, 0x08, 0x08, 0x0d, 0x0c, 0x67, 0x5c, 0x00, 0x03, 0x66, 0x6f,
0x6f, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
}
readerPool = sync.Pool{
New: func() interface{} {
// if the reader doesn't get valid gzip at initialization time,
// it will not be valid and will fail on Reset.
reader := &gzipReader{}
reader.Reset(nil)
return reader
},
}
)
type gzipReader struct {
gzip.Reader
emptyGzipFile bytes.Reader
}
func (z *gzipReader) Reset(r io.Reader) {
if r == nil {
z.emptyGzipFile.Reset(emptyGzipBytes[:])
r = &z.emptyGzipFile
}
z.Reader.Reset(r)
}
func init() {
kafka.RegisterCompressionCodec(NewCompressionCodec())
}
const (
Code = 1
DefaultCompressionLevel = gzip.DefaultCompression
)
type CompressionCodec struct{ writerPool sync.Pool }
func NewCompressionCodec() *CompressionCodec {
return NewCompressionCodecLevel(DefaultCompressionLevel)
}
func NewCompressionCodecLevel(level int) *CompressionCodec {
return &CompressionCodec{
writerPool: sync.Pool{
New: func() interface{} {
w, err := gzip.NewWriterLevel(ioutil.Discard, level)
if err != nil {
return err
}
return w
},
},
}
}
// Code implements the kafka.CompressionCodec interface.
func (c *CompressionCodec) Code() int8 { return Code }
// Name implements the kafka.CompressionCodec interface.
func (c *CompressionCodec) Name() string { return "gzip" }
// NewReader implements the kafka.CompressionCodec interface.
func (c *CompressionCodec) NewReader(r io.Reader) io.ReadCloser {
z := readerPool.Get().(*gzipReader)
z.Reset(r)
return &reader{z}
}
// NewWriter implements the kafka.CompressionCodec interface.
func (c *CompressionCodec) NewWriter(w io.Writer) io.WriteCloser {
x := c.writerPool.Get()
z, _ := x.(*gzip.Writer)
if z == nil {
return errorWriter{err: x.(error)}
}
z.Reset(w)
return &writer{c, z}
}
type reader struct{ *gzipReader }
func (r *reader) Close() (err error) {
if z := r.gzipReader; z != nil {
r.gzipReader = nil
err = z.Close()
z.Reset(nil)
readerPool.Put(z)
}
return
}
type writer struct {
c *CompressionCodec
*gzip.Writer
}
func (w *writer) Close() (err error) {
if z := w.Writer; z != nil {
w.Writer = nil
err = z.Close()
z.Reset(nil)
w.c.writerPool.Put(z)
}
return
}
type errorWriter struct{ err error }
func (w errorWriter) Close() error { return w.err }
func (w errorWriter) Write(b []byte) (int, error) { return 0, w.err }