forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
response.go
151 lines (125 loc) · 3.86 KB
/
response.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package protocol
import (
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
"io"
)
func ReadResponse(r io.Reader, apiKey ApiKey, apiVersion int16) (correlationID int32, msg Message, err error) {
if i := int(apiKey); i < 0 || i >= len(apiTypes) {
err = fmt.Errorf("unsupported api key: %d", i)
return
}
t := &apiTypes[apiKey]
if t == nil {
err = fmt.Errorf("unsupported api: %s", apiNames[apiKey])
return
}
minVersion := t.minVersion()
maxVersion := t.maxVersion()
if apiVersion < minVersion || apiVersion > maxVersion {
err = fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion)
return
}
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
if err = d.err; err != nil {
err = dontExpectEOF(err)
return
}
d.remain = int(size)
correlationID = d.readInt32()
if err = d.err; err != nil {
if errors.Is(err, io.ErrUnexpectedEOF) {
// If a Writer/Reader is configured without TLS and connects
// to a broker expecting TLS the only message we return to the
// caller is io.ErrUnexpetedEOF which is opaque. This section
// tries to determine if that's what has happened.
// We first deconstruct the initial 4 bytes of the message
// from the size which was read earlier.
// Next, we examine those bytes to see if they looks like a TLS
// error message. If they do we wrap the io.ErrUnexpectedEOF
// with some context.
if looksLikeUnexpectedTLS(size) {
err = fmt.Errorf("%w: broker appears to be expecting TLS", io.ErrUnexpectedEOF)
}
return
}
err = dontExpectEOF(err)
return
}
res := &t.responses[apiVersion-minVersion]
if res.flexible {
// In the flexible case, there's a tag buffer at the end of the response header
taggedCount := int(d.readUnsignedVarInt())
for i := 0; i < taggedCount; i++ {
d.readUnsignedVarInt() // tagID
size := d.readUnsignedVarInt()
// Just throw away the values for now
d.read(int(size))
}
}
msg = res.new()
res.decode(d, valueOf(msg))
d.discardAll()
if err = d.err; err != nil {
err = dontExpectEOF(err)
}
return
}
func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Message) error {
apiKey := msg.ApiKey()
if i := int(apiKey); i < 0 || i >= len(apiTypes) {
return fmt.Errorf("unsupported api key: %d", i)
}
t := &apiTypes[apiKey]
if t == nil {
return fmt.Errorf("unsupported api: %s", apiNames[apiKey])
}
minVersion := t.minVersion()
maxVersion := t.maxVersion()
if apiVersion < minVersion || apiVersion > maxVersion {
return fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion)
}
r := &t.responses[apiVersion-minVersion]
v := valueOf(msg)
b := newPageBuffer()
defer b.unref()
e := &encoder{writer: b}
e.writeInt32(0) // placeholder for the response size
e.writeInt32(correlationID)
if r.flexible {
// Flexible messages use extra space for a tag buffer,
// which begins with a size value. Since we're not writing any fields into the
// latter, we can just write zero for now.
//
// See
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields
// for details.
e.writeUnsignedVarInt(0)
}
r.encode(e, v)
err := e.err
if err == nil {
size := packUint32(uint32(b.Size()) - 4)
b.WriteAt(size[:], 0)
_, err = b.WriteTo(w)
}
return err
}
const (
tlsAlertByte byte = 0x15
)
// looksLikeUnexpectedTLS returns true if the size passed in resemble
// the TLS alert message that is returned to a client which sends
// an invalid ClientHello message.
func looksLikeUnexpectedTLS(size int32) bool {
var sizeBytes [4]byte
binary.BigEndian.PutUint32(sizeBytes[:], uint32(size))
if sizeBytes[0] != tlsAlertByte {
return false
}
version := int(sizeBytes[1])<<8 | int(sizeBytes[2])
return version <= tls.VersionTLS13 && version >= tls.VersionTLS10
}