From 00f748644e5e1df8a3ab85fa952f230ae562277c Mon Sep 17 00:00:00 2001 From: Benjamin Yolken Date: Tue, 17 Nov 2020 21:59:18 -0800 Subject: [PATCH] Misc. fixes --- alterpartitionreassignments.go | 70 +++++++++++++ createpartitions.go | 9 +- describeconfigs.go | 9 +- describegroups.go | 4 +- electleaders.go | 10 +- incrementalalterconfigs.go | 8 +- listgroups.go | 4 +- .../alterpartitionreassignments.go | 25 ++--- protocol/decode.go | 98 +++++++++++++++---- protocol/encode.go | 94 ++++++++++++++---- protocol/request.go | 16 +-- protocol/response.go | 3 +- 12 files changed, 265 insertions(+), 85 deletions(-) create mode 100644 alterpartitionreassignments.go diff --git a/alterpartitionreassignments.go b/alterpartitionreassignments.go new file mode 100644 index 000000000..63fdab2b0 --- /dev/null +++ b/alterpartitionreassignments.go @@ -0,0 +1,70 @@ +package kafka + +import ( + "context" + "net" + "time" + + "github.com/segmentio/kafka-go/protocol/alterpartitionreassignments" +) + +type AlterPartitionReassignmentsRequest struct { + // Address of the kafka broker to send the request to. + Addr net.Addr + + Topic string + Assignments []AlterPartitionReassignmentsRequestAssignment + Timeout time.Duration +} + +type AlterPartitionReassignmentsRequestAssignment struct { + PartitionID int32 + BrokerIDs []int32 +} + +type AlterPartitionReassignmentsResponse struct { + ErrorCode int16 + ErrorMessage string +} + +func (c *Client) AlterPartitionReassignments( + ctx context.Context, + req AlterPartitionReassignmentsRequest, +) (*AlterPartitionReassignmentsResponse, error) { + apiPartitions := []alterpartitionreassignments.RequestPartition{} + + for _, assignment := range req.Assignments { + apiPartitions = append( + apiPartitions, + alterpartitionreassignments.RequestPartition{ + PartitionIndex: assignment.PartitionID, + Replicas: assignment.BrokerIDs, + }, + ) + } + + apiReq := &alterpartitionreassignments.Request{ + TimeoutMs: int32(req.Timeout.Milliseconds()), + Topics: []alterpartitionreassignments.RequestTopic{ + { + Name: req.Topic, + Partitions: apiPartitions, + }, + }, + } + + protoResp, err := c.roundTrip( + ctx, + req.Addr, + apiReq, + ) + if err != nil { + return nil, err + } + apiResp := protoResp.(*alterpartitionreassignments.Response) + + return &AlterPartitionReassignmentsResponse{ + ErrorCode: apiResp.ErrorCode, + ErrorMessage: apiResp.ErrorMessage, + }, nil +} diff --git a/createpartitions.go b/createpartitions.go index 0a20f64e5..42127568a 100644 --- a/createpartitions.go +++ b/createpartitions.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "time" "github.com/segmentio/kafka-go/protocol/createpartitions" ) @@ -15,6 +16,7 @@ type CreatePartitionsRequest struct { Topic string NewPartitions []CreatePartitionsRequestPartition TotalCount int32 + Timeout time.Duration } type CreatePartitionsRequestPartition struct { @@ -45,10 +47,10 @@ func (c *Client) CreatePartitions( Assignments: assignments, }, }, - TimeoutMs: 5000, + TimeoutMs: int32(req.Timeout.Milliseconds()), } - protocolResp, err := c.roundTrip( + protoResp, err := c.roundTrip( ctx, req.Addr, apiReq, @@ -56,9 +58,8 @@ func (c *Client) CreatePartitions( if err != nil { return nil, err } - apiResp := protocolResp.(*createpartitions.Response) + apiResp := protoResp.(*createpartitions.Response) - fmt.Printf("Response: %+v", *apiResp) if len(apiResp.Results) == 0 { return nil, fmt.Errorf("Empty results") } diff --git a/describeconfigs.go b/describeconfigs.go index f7c0cf51c..53d493ed6 100644 --- a/describeconfigs.go +++ b/describeconfigs.go @@ -5,7 +5,6 @@ import ( "fmt" "net" "strconv" - "strings" "github.com/segmentio/kafka-go/protocol/describeconfigs" ) @@ -60,7 +59,7 @@ func (c *Client) DescribeConfigs( ) } - protocolResp, err := c.roundTrip( + protoResp, err := c.roundTrip( ctx, req.Addr, &describeconfigs.Request{ @@ -70,7 +69,7 @@ func (c *Client) DescribeConfigs( if err != nil { return nil, err } - apiResp := protocolResp.(*describeconfigs.Response) + apiResp := protoResp.(*describeconfigs.Response) resp := &DescribeConfigsResponse{} @@ -79,10 +78,6 @@ func (c *Client) DescribeConfigs( case describeconfigs.ResourceTypeBroker: configs := map[string]string{} for _, entry := range resource.ConfigEntries { - if resource.ResourceName == "2" && strings.Contains(entry.ConfigName, "throttled") { - fmt.Printf("Entry: %+v\n", entry) - } - if !req.IncludeDefaults && (entry.IsDefault || entry.ConfigSource == describeconfigs.ConfigSourceDefaultConfig || entry.ConfigSource == describeconfigs.ConfigSourceStaticBrokerConfig || diff --git a/describegroups.go b/describegroups.go index 0404d3ea9..17072f663 100644 --- a/describegroups.go +++ b/describegroups.go @@ -60,7 +60,7 @@ func (c *Client) DescribeGroup( ctx context.Context, req DescribeGroupsRequest, ) (*DescribeGroupsResponse, error) { - protocolResp, err := c.roundTrip( + protoResp, err := c.roundTrip( ctx, req.Addr, &describegroups.Request{ @@ -70,7 +70,7 @@ func (c *Client) DescribeGroup( if err != nil { return nil, err } - apiResp := protocolResp.(*describegroups.Response) + apiResp := protoResp.(*describegroups.Response) resp := &DescribeGroupsResponse{ Groups: []DescribeGroupsResponseGroup{}, } diff --git a/electleaders.go b/electleaders.go index 878edef23..bf23e4575 100644 --- a/electleaders.go +++ b/electleaders.go @@ -3,6 +3,7 @@ package kafka import ( "context" "net" + "time" "github.com/segmentio/kafka-go/protocol/electleaders" ) @@ -13,7 +14,8 @@ type ElectLeadersRequest struct { Topic string Partitions []int32 - TimeoutMs int32 + + Timeout time.Duration } type ElectLeadersResponse struct { @@ -36,18 +38,18 @@ func (c *Client) ElectLeaders( ) } - protocolResp, err := c.roundTrip( + protoResp, err := c.roundTrip( ctx, req.Addr, &electleaders.Request{ TopicPartitions: topicPartitions, - TimeoutMs: req.TimeoutMs, + TimeoutMs: int32(req.Timeout.Milliseconds()), }, ) if err != nil { return nil, err } - apiResp := protocolResp.(*electleaders.Response) + apiResp := protoResp.(*electleaders.Response) return &ElectLeadersResponse{ ErrorCode: apiResp.ErrorCode, diff --git a/incrementalalterconfigs.go b/incrementalalterconfigs.go index 59920592d..3222babef 100644 --- a/incrementalalterconfigs.go +++ b/incrementalalterconfigs.go @@ -49,7 +49,7 @@ func (c *Client) AlterBrokerConfigs( }, } - protocolResp, err := c.roundTrip( + protoResp, err := c.roundTrip( ctx, req.Addr, apiReq, @@ -57,7 +57,7 @@ func (c *Client) AlterBrokerConfigs( if err != nil { return nil, err } - apiResp := protocolResp.(*incrementalalterconfigs.Response) + apiResp := protoResp.(*incrementalalterconfigs.Response) if len(apiResp.Responses) == 0 { return nil, fmt.Errorf("Empty response") } @@ -83,7 +83,7 @@ func (c *Client) AlterTopicConfigs( }, } - protocolResp, err := c.roundTrip( + protoResp, err := c.roundTrip( ctx, req.Addr, apiReq, @@ -91,7 +91,7 @@ func (c *Client) AlterTopicConfigs( if err != nil { return nil, err } - apiResp := protocolResp.(*incrementalalterconfigs.Response) + apiResp := protoResp.(*incrementalalterconfigs.Response) if len(apiResp.Responses) == 0 { return nil, fmt.Errorf("Empty response") diff --git a/listgroups.go b/listgroups.go index cb412f94d..22e144d52 100644 --- a/listgroups.go +++ b/listgroups.go @@ -26,11 +26,11 @@ func (c *Client) ListGroups( ctx context.Context, req ListGroupsRequest, ) (*ListGroupsResponse, error) { - protocolResp, err := c.roundTrip(ctx, req.Addr, &listgroups.Request{}) + protoResp, err := c.roundTrip(ctx, req.Addr, &listgroups.Request{}) if err != nil { return nil, err } - apiResp := protocolResp.(*listgroups.Response) + apiResp := protoResp.(*listgroups.Response) resp := &ListGroupsResponse{} for _, apiGroupInfo := range apiResp.Groups { diff --git a/protocol/alterpartitionreassignments/alterpartitionreassignments.go b/protocol/alterpartitionreassignments/alterpartitionreassignments.go index 37d769ef9..fafe75475 100644 --- a/protocol/alterpartitionreassignments/alterpartitionreassignments.go +++ b/protocol/alterpartitionreassignments/alterpartitionreassignments.go @@ -16,13 +16,13 @@ type Request struct { } type RequestTopic struct { - Name string `kafka:"min=v0,max=v0,compact"` - Partitions []RequestPartitions `kafka:"min=v0,max=v0"` + Name string `kafka:"min=v0,max=v0"` + Partitions []RequestPartition `kafka:"min=v0,max=v0"` } -type RequestPartitions struct { - PartitionIndex int32 `kafka:"min=v0,max=v0"` - Replicas int32 `kafka:"min=v0,max=v0"` +type RequestPartition struct { + PartitionIndex int32 `kafka:"min=v0,max=v0"` + Replicas []int32 `kafka:"min=v0,max=v0"` } func (r *Request) ApiKey() protocol.ApiKey { @@ -34,9 +34,10 @@ func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { } type Response struct { - ThrottleTimeMs int32 `kafka:"min=v0,max=v0"` - ErrorCode int16 `kafka:"min=v0,max=v0"` - ErrorMessage string `kafka:"min=v0,max=v0,compact,nullable"` + ThrottleTimeMs int32 `kafka:"min=v0,max=v0"` + ErrorCode int16 `kafka:"min=v0,max=v0"` + ErrorMessage string `kafka:"min=v0,max=v0,nullable"` + Results []ResponseResult `kafka:"min=v0,max=v0"` // We need at least one tagged field to indicate that this is a "flexible" message // type. @@ -44,14 +45,14 @@ type Response struct { } type ResponseResult struct { - Name string `kafka:"min=v0,max=v0,compact"` + Name string `kafka:"min=v0,max=v0"` Partitions []ResponsePartition `kafka:"min=v0,max=v0"` } type ResponsePartition struct { - PartitionIndex int32 - ErrorCode int16 - ErrorMessage string `kafka:"min=v0,max=v0,compact,nullable"` + PartitionIndex int32 `kafka:"min=v0,max=v0"` + ErrorCode int16 `kafka:"min=v0,max=v0"` + ErrorMessage string `kafka:"min=v0,max=v0,nullable"` } func (r *Response) ApiKey() protocol.ApiKey { diff --git a/protocol/decode.go b/protocol/decode.go index 87bea34a0..0a0ad62a6 100644 --- a/protocol/decode.go +++ b/protocol/decode.go @@ -89,10 +89,18 @@ func (d *decoder) decodeString(v value) { v.setString(d.readString()) } +func (d *decoder) decodeCompactString(v value) { + v.setString(d.readCompactString()) +} + func (d *decoder) decodeBytes(v value) { v.setBytes(d.readBytes()) } +func (d *decoder) decodeCompactBytes(v value) { + v.setBytes(d.readCompactBytes()) +} + func (d *decoder) decodeArray(v value, elemType reflect.Type, decodeElem decodeFunc) { if n := d.readInt32(); n < 0 { v.setArray(array{}) @@ -105,6 +113,19 @@ func (d *decoder) decodeArray(v value, elemType reflect.Type, decodeElem decodeF } } +func (d *decoder) decodeCompactArray(v value, elemType reflect.Type, decodeElem decodeFunc) { + if n := d.readUnsignedVarInt(); n < 1 { + // TODO: Distinguish between empty and null? + v.setArray(array{}) + } else { + a := makeArray(elemType, int(n-1)) + for i := 0; i < int(n-1) && d.remain > 0; i++ { + decodeElem(d, a.index(i)) + } + v.setArray(a) + } +} + func (d *decoder) discardAll() { d.discard(d.remain) } @@ -205,10 +226,11 @@ func (d *decoder) readString() string { } func (d *decoder) readCompactString() string { - if n := d.readVarInt(); n < 0 { + if n := d.readUnsignedVarInt(); n < 1 { + // TODO: Distinguish between empty and null? return "" } else { - return bytesToString(d.read(int(n))) + return bytesToString(d.read(int(n - 1))) } } @@ -230,18 +252,18 @@ func (d *decoder) readBytesTo(w io.Writer) bool { } func (d *decoder) readCompactBytes() []byte { - if n := d.readVarInt(); n < 0 { + if n := d.readUnsignedVarInt(); n < 1 { return nil } else { - return d.read(int(n)) + return d.read(int(n - 1)) } } func (d *decoder) readCompactBytesTo(w io.Writer) bool { - if n := d.readVarInt(); n < 0 { + if n := d.readUnsignedVarInt(); n < 1 { return false } else { - d.writeTo(w, int(n)) + d.writeTo(w, int(n-1)) return d.err == nil } } @@ -273,6 +295,33 @@ func (d *decoder) readVarInt() int64 { return 0 } +func (d *decoder) readUnsignedVarInt() uint64 { + n := 11 // varints are at most 11 bytes + + if n > d.remain { + n = d.remain + } + + x := uint64(0) + s := uint(0) + + for n > 0 { + b := d.readByte() + + if (b & 0x80) == 0 { + x |= uint64(b) << s + return x + } + + x |= uint64(b&0x7f) << s + s += 7 + n-- + } + + d.setError(fmt.Errorf("cannot decode varint from input stream")) + return 0 +} + type decodeFunc func(*decoder, value) var ( @@ -298,12 +347,12 @@ func decodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) case reflect.Int64: return (*decoder).decodeInt64 case reflect.String: - return stringDecodeFuncOf(tag) + return stringDecodeFuncOf(flexible, tag) case reflect.Struct: return structDecodeFuncOf(typ, version, flexible) case reflect.Slice: if typ.Elem().Kind() == reflect.Uint8 { // []byte - return bytesDecodeFuncOf(tag) + return bytesDecodeFuncOf(flexible, tag) } return arrayDecodeFuncOf(typ, version, flexible, tag) default: @@ -311,12 +360,22 @@ func decodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) } } -func stringDecodeFuncOf(tag structTag) decodeFunc { - return (*decoder).decodeString +func stringDecodeFuncOf(flexible bool, tag structTag) decodeFunc { + if flexible { + // In flexible messages, all strings are compact + return (*decoder).decodeCompactString + } else { + return (*decoder).decodeString + } } -func bytesDecodeFuncOf(tag structTag) decodeFunc { - return (*decoder).decodeBytes +func bytesDecodeFuncOf(flexible bool, tag structTag) decodeFunc { + if flexible { + // In flexible messages, all arrays are compact + return (*decoder).decodeCompactBytes + } else { + return (*decoder).decodeBytes + } } func structDecodeFuncOf(typ reflect.Type, version int16, flexible bool) decodeFunc { @@ -327,7 +386,7 @@ func structDecodeFuncOf(typ reflect.Type, version int16, flexible bool) decodeFu } var fields []field - var taggedFields map[int]*field + taggedFields := map[int]*field{} forEachStructField(typ, func(typ reflect.Type, index index, tag string) { forEachStructTag(tag, func(tag structTag) bool { @@ -356,15 +415,13 @@ func structDecodeFuncOf(typ reflect.Type, version int16, flexible bool) decodeFu } if flexible { - fmt.Println("Decoding flexible fields") - // See https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields // for details of tag buffers in "flexible" messages. - numFields := int(d.readVarInt()) + numFields := int(d.readUnsignedVarInt()) for i := 0; i < numFields; i++ { - tagID := int(d.readVarInt()) - size := int(d.readVarInt()) + tagID := int(d.readUnsignedVarInt()) + size := int(d.readUnsignedVarInt()) f, ok := taggedFields[tagID] if ok { @@ -380,6 +437,11 @@ func structDecodeFuncOf(typ reflect.Type, version int16, flexible bool) decodeFu func arrayDecodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) decodeFunc { elemType := typ.Elem() elemFunc := decodeFuncOf(elemType, version, flexible, tag) + if flexible { + // In flexible messages, all arrays are compact + return func(d *decoder, v value) { d.decodeCompactArray(v, elemType, elemFunc) } + } + return func(d *decoder, v value) { d.decodeArray(v, elemType, elemFunc) } } diff --git a/protocol/encode.go b/protocol/encode.go index f21e1c24a..a34f22a86 100644 --- a/protocol/encode.go +++ b/protocol/encode.go @@ -133,18 +133,34 @@ func (e *encoder) encodeString(v value) { e.writeString(v.string()) } +func (e *encoder) encodeCompactString(v value) { + e.writeCompactString(v.string()) +} + func (e *encoder) encodeNullString(v value) { e.writeNullString(v.string()) } +func (e *encoder) encodeCompactNullString(v value) { + e.writeCompactNullString(v.string()) +} + func (e *encoder) encodeBytes(v value) { e.writeBytes(v.bytes()) } +func (e *encoder) encodeCompactBytes(v value) { + e.writeCompactBytes(v.bytes()) +} + func (e *encoder) encodeNullBytes(v value) { e.writeNullBytes(v.bytes()) } +func (e *encoder) encodeCompactNullBytes(v value) { + e.writeCompactNullBytes(v.bytes()) +} + func (e *encoder) encodeArray(v value, elemType reflect.Type, encodeElem encodeFunc) { a := v.array(elemType) n := a.length() @@ -155,6 +171,22 @@ func (e *encoder) encodeArray(v value, elemType reflect.Type, encodeElem encodeF } } +func (e *encoder) encodeCompactArray(v value, elemType reflect.Type, encodeElem encodeFunc) { + a := v.array(elemType) + if a.isNil() { + e.writeUnsignedVarInt(0) + return + } + + n := a.length() + + // In compact scheme, size 0=null, size 1=0 len array, etc. + e.writeUnsignedVarInt(uint64(n + 1)) + for i := 0; i < n; i++ { + encodeElem(e, a.index(i)) + } +} + func (e *encoder) encodeNullArray(v value, elemType reflect.Type, encodeElem encodeFunc) { a := v.array(elemType) if a.isNil() { @@ -205,15 +237,15 @@ func (e *encoder) writeNullString(s string) { } func (e *encoder) writeCompactString(s string) { - e.writeVarInt(int64(len(s))) + e.writeUnsignedVarInt(uint64(len(s)) + 1) e.WriteString(s) } func (e *encoder) writeCompactNullString(s string) { if s == "" { - e.writeVarInt(-1) + e.writeUnsignedVarInt(0) } else { - e.writeVarInt(int64(len(s))) + e.writeUnsignedVarInt(uint64(len(s)) + 1) e.WriteString(s) } } @@ -233,15 +265,15 @@ func (e *encoder) writeNullBytes(b []byte) { } func (e *encoder) writeCompactBytes(b []byte) { - e.writeVarInt(int64(len(b))) + e.writeUnsignedVarInt(uint64(len(b)) + 1) e.Write(b) } func (e *encoder) writeCompactNullBytes(b []byte) { if b == nil { - e.writeVarInt(-1) + e.writeUnsignedVarInt(0) } else { - e.writeVarInt(int64(len(b))) + e.writeUnsignedVarInt(uint64(len(b)) + 1) e.Write(b) } } @@ -273,11 +305,11 @@ func (e *encoder) writeNullBytesFrom(b Bytes) error { func (e *encoder) writeCompactNullBytesFrom(b Bytes) error { if b == nil { - e.writeVarInt(-1) + e.writeUnsignedVarInt(0) return nil } else { size := int64(b.Len()) - e.writeVarInt(size) + e.writeUnsignedVarInt(uint64(size + 1)) n, err := io.Copy(e, b) if err == nil && n != size { err = fmt.Errorf("size of compact nullable bytes does not match the number of bytes that were written (size=%d, written=%d): %w", size, n, io.ErrUnexpectedEOF) @@ -305,6 +337,24 @@ func (e *encoder) writeVarInt(i int64) { e.Write(b[:n]) } +func (e *encoder) writeUnsignedVarInt(i uint64) { + b := e.buffer[:] + n := 0 + + for i > 0x80 && n < len(b) { + b[n] = byte(i)&0x7f | 0x80 + i >>= 7 + n++ + } + + if n < len(b) { + b[n] = byte(i) + n++ + } + + e.Write(b[:n]) +} + type encodeFunc func(*encoder, value) var ( @@ -332,12 +382,12 @@ func encodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) case reflect.Int64: return (*encoder).encodeInt64 case reflect.String: - return stringEncodeFuncOf(tag) + return stringEncodeFuncOf(flexible, tag) case reflect.Struct: return structEncodeFuncOf(typ, version, flexible) case reflect.Slice: if typ.Elem().Kind() == reflect.Uint8 { // []byte - return bytesEncodeFuncOf(tag) + return bytesEncodeFuncOf(flexible, tag) } return arrayEncodeFuncOf(typ, version, flexible, tag) default: @@ -345,8 +395,14 @@ func encodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) } } -func stringEncodeFuncOf(tag structTag) encodeFunc { +func stringEncodeFuncOf(flexible bool, tag structTag) encodeFunc { switch { + case flexible && tag.Nullable: + // In flexible messages, all strings are compact + return (*encoder).encodeCompactNullString + case flexible: + // In flexible messages, all strings are compact + return (*encoder).encodeCompactString case tag.Nullable: return (*encoder).encodeNullString default: @@ -354,8 +410,11 @@ func stringEncodeFuncOf(tag structTag) encodeFunc { } } -func bytesEncodeFuncOf(tag structTag) encodeFunc { +func bytesEncodeFuncOf(flexible bool, tag structTag) encodeFunc { switch { + case flexible: + // In flexible messages, all arrays are compact + return (*encoder).encodeCompactBytes case tag.Nullable: return (*encoder).encodeNullBytes default: @@ -402,20 +461,18 @@ func structEncodeFuncOf(typ reflect.Type, version int16, flexible bool) encodeFu } if flexible { - fmt.Println("Encoding flexible fields") - // See https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields // for details of tag buffers in "flexible" messages. - e.writeVarInt(int64(len(taggedFields))) + e.writeUnsignedVarInt(uint64(len(taggedFields))) for i := range taggedFields { f := &fields[i] - e.writeVarInt(int64(f.tagID)) + e.writeUnsignedVarInt(uint64(f.tagID)) buf := &bytes.Buffer{} se := &encoder{writer: buf} f.encode(se, v.fieldByIndex(f.index)) - e.writeVarInt(int64(buf.Len())) + e.writeUnsignedVarInt(uint64(buf.Len())) e.Write(buf.Bytes()) } } @@ -426,6 +483,9 @@ func arrayEncodeFuncOf(typ reflect.Type, version int16, flexible bool, tag struc elemType := typ.Elem() elemFunc := encodeFuncOf(elemType, version, flexible, tag) switch { + case flexible: + // In flexible messages, all arrays are compact + return func(e *encoder, v value) { e.encodeCompactArray(v, elemType, elemFunc) } case tag.Nullable: return func(e *encoder, v value) { e.encodeNullArray(v, elemType, elemFunc) } default: diff --git a/protocol/request.go b/protocol/request.go index 0390ca35c..cdee12075 100644 --- a/protocol/request.go +++ b/protocol/request.go @@ -3,7 +3,6 @@ package protocol import ( "fmt" "io" - "log" ) func ReadRequest(r io.Reader) ( @@ -100,14 +99,6 @@ func WriteRequest( ) } - log.Printf( - "Making request with apiKey %+v, apiVersion %d, minVersion %d, msg %+v\n", - apiKey, - apiVersion, - minVersion, - msg, - ) - r := &t.requests[apiVersion-minVersion] v := valueOf(msg) b := newPageBuffer() @@ -120,16 +111,15 @@ func WriteRequest( e.writeInt32(correlationID) if r.flexible { - fmt.Println(">>>>>> Flexible request") - // Flexible uses a compact string for the client ID, then extra space for a + // Flexible messages use a nullable string for the client ID, then extra space for a // tag buffer, which begins with a size value. Since we're not writing any fields into the // latter, we can just write zero for now. // // See // https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields // for details. - e.writeCompactString(clientID) - e.writeVarInt(0) + e.writeNullString(clientID) + e.writeUnsignedVarInt(0) } else { // Technically, recent versions of kafka interpret this field as a nullable // string, however kafka 0.10 expected a non-nullable string and fails with diff --git a/protocol/response.go b/protocol/response.go index f2819e8e4..4c5cc315e 100644 --- a/protocol/response.go +++ b/protocol/response.go @@ -49,11 +49,10 @@ func ReadResponse( res := &t.responses[apiVersion-minVersion] if res.flexible { - fmt.Println(">>>>>> Flexible response") // In the flexible case, there's room for tagged fields at the end // of the response header. However, we don't currently implement // anything to decode them. - tagBufferSize := int(d.readVarInt()) + tagBufferSize := int(d.readUnsignedVarInt()) if tagBufferSize > 0 { d.read(tagBufferSize) }