Skip to content

Commit

Permalink
Merge pull request segmentio#125 from segmentio/read-broker-rack
Browse files Browse the repository at this point in the history
Upgrade Metadata Request from v0 to v1
  • Loading branch information
stevevls authored Oct 16, 2018
2 parents 48c37f7 + e201ffc commit f8858a2
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 29 deletions.
6 changes: 4 additions & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Broker struct {
Host string
Port int
ID int
Rack string
}

// Partition carries the metadata associated with a kafka partition.
Expand Down Expand Up @@ -715,10 +716,10 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err

err = c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(metadataRequest, v0, id, topicMetadataRequestV0(topics))
return c.writeRequest(metadataRequest, v1, id, topicMetadataRequestV1(topics))
},
func(deadline time.Time, size int) error {
var res metadataResponseV0
var res metadataResponseV1

if err := c.readResponse(size, &res); err != nil {
return err
Expand All @@ -730,6 +731,7 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err
Host: b.Host,
Port: int(b.Port),
ID: int(b.NodeID),
Rack: b.Rack,
}
}

Expand Down
48 changes: 27 additions & 21 deletions metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,79 +2,85 @@ package kafka

import "bufio"

type topicMetadataRequestV0 []string
type topicMetadataRequestV1 []string

func (r topicMetadataRequestV0) size() int32 {
func (r topicMetadataRequestV1) size() int32 {
return sizeofStringArray([]string(r))
}

func (r topicMetadataRequestV0) writeTo(w *bufio.Writer) {
func (r topicMetadataRequestV1) writeTo(w *bufio.Writer) {
writeStringArray(w, []string(r))
}

type metadataResponseV0 struct {
Brokers []brokerMetadataV0
Topics []topicMetadataV0
type metadataResponseV1 struct {
Brokers []brokerMetadataV1
ControllerID int32
Topics []topicMetadataV1
}

func (r metadataResponseV0) size() int32 {
func (r metadataResponseV1) size() int32 {
n1 := sizeofArray(len(r.Brokers), func(i int) int32 { return r.Brokers[i].size() })
n2 := sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
return n1 + n2
return 4 + n1 + n2
}

func (r metadataResponseV0) writeTo(w *bufio.Writer) {
func (r metadataResponseV1) writeTo(w *bufio.Writer) {
writeArray(w, len(r.Brokers), func(i int) { r.Brokers[i].writeTo(w) })
writeInt32(w, r.ControllerID)
writeArray(w, len(r.Topics), func(i int) { r.Topics[i].writeTo(w) })
}

type brokerMetadataV0 struct {
type brokerMetadataV1 struct {
NodeID int32
Host string
Port int32
Rack string
}

func (b brokerMetadataV0) size() int32 {
return 4 + 4 + sizeofString(b.Host)
func (b brokerMetadataV1) size() int32 {
return 4 + 4 + sizeofString(b.Host) + sizeofString(b.Rack)
}

func (b brokerMetadataV0) writeTo(w *bufio.Writer) {
func (b brokerMetadataV1) writeTo(w *bufio.Writer) {
writeInt32(w, b.NodeID)
writeString(w, b.Host)
writeInt32(w, b.Port)
writeString(w, b.Rack)
}

type topicMetadataV0 struct {
type topicMetadataV1 struct {
TopicErrorCode int16
TopicName string
Partitions []partitionMetadataV0
Internal bool
Partitions []partitionMetadataV1
}

func (t topicMetadataV0) size() int32 {
return 2 +
func (t topicMetadataV1) size() int32 {
return 2 + 1 +
sizeofString(t.TopicName) +
sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
}

func (t topicMetadataV0) writeTo(w *bufio.Writer) {
func (t topicMetadataV1) writeTo(w *bufio.Writer) {
writeInt16(w, t.TopicErrorCode)
writeString(w, t.TopicName)
writeBool(w, t.Internal)
writeArray(w, len(t.Partitions), func(i int) { t.Partitions[i].writeTo(w) })
}

type partitionMetadataV0 struct {
type partitionMetadataV1 struct {
PartitionErrorCode int16
PartitionID int32
Leader int32
Replicas []int32
Isr []int32
}

func (p partitionMetadataV0) size() int32 {
func (p partitionMetadataV1) size() int32 {
return 2 + 4 + 4 + sizeofInt32Array(p.Replicas) + sizeofInt32Array(p.Isr)
}

func (p partitionMetadataV0) writeTo(w *bufio.Writer) {
func (p partitionMetadataV1) writeTo(w *bufio.Writer) {
writeInt16(w, p.PartitionErrorCode)
writeInt32(w, p.PartitionID)
writeInt32(w, p.Leader)
Expand Down
13 changes: 7 additions & 6 deletions protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ func TestProtocol(t *testing.T) {
Value: []byte("Hello World!"),
},

topicMetadataRequestV0{"A", "B", "C"},
topicMetadataRequestV1{"A", "B", "C"},

metadataResponseV0{
Brokers: []brokerMetadataV0{
metadataResponseV1{
Brokers: []brokerMetadataV1{
{NodeID: 1, Host: "localhost", Port: 9001},
{NodeID: 2, Host: "localhost", Port: 9002},
{NodeID: 2, Host: "localhost", Port: 9002, Rack:"rack2"},
},
Topics: []topicMetadataV0{
{TopicErrorCode: 0, Partitions: []partitionMetadataV0{{
ControllerID: 2,
Topics: []topicMetadataV1{
{TopicErrorCode: 0, Internal: true, Partitions: []partitionMetadataV1{{
PartitionErrorCode: 0,
PartitionID: 1,
Leader: 2,
Expand Down
6 changes: 6 additions & 0 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func readInt64(r *bufio.Reader, sz int, v *int64) (int, error) {
return peekRead(r, sz, 8, func(b []byte) { *v = makeInt64(b) })
}

func readBool(r *bufio.Reader, sz int, v *bool) (int, error) {
return peekRead(r, sz, 1, func(b []byte) { *v = b[0] != 0 })
}

func readString(r *bufio.Reader, sz int, v *string) (int, error) {
return readStringWith(r, sz, func(r *bufio.Reader, sz int, n int) (remain int, err error) {
*v, remain, err = readNewString(r, sz, n)
Expand Down Expand Up @@ -186,6 +190,8 @@ func read(r *bufio.Reader, sz int, a interface{}) (int, error) {
return readInt32(r, sz, v)
case *int64:
return readInt64(r, sz, v)
case *bool:
return readBool(r, sz, v)
case *string:
return readString(r, sz, v)
case *[]byte:
Expand Down

0 comments on commit f8858a2

Please sign in to comment.