Skip to content

Commit

Permalink
Merge pull request segmentio#212 from segmentio/brokers
Browse files Browse the repository at this point in the history
Add Brokers function
  • Loading branch information
Pryz authored Feb 19, 2019
2 parents 192634d + b5b8bef commit fcb01ba
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
31 changes: 30 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
return c
}

// GetController requests kafka for the current controller and returns its URL
// Controller requests kafka for the current controller and returns its URL
func (c *Conn) Controller() (broker Broker, err error) {
err = c.readOperation(
func(deadline time.Time, id int32) error {
Expand All @@ -176,6 +176,35 @@ func (c *Conn) Controller() (broker Broker, err error) {
return broker, err
}

// Brokers retrieve the broker list from the Kafka metadata
func (c *Conn) Brokers() ([]Broker, error) {
var brokers []Broker
err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(metadataRequest, v1, id, topicMetadataRequestV1([]string{}))
},
func(deadline time.Time, size int) error {
var res metadataResponseV1

if err := c.readResponse(size, &res); err != nil {
return err
}

brokers = make([]Broker, len(res.Brokers))
for i, brokerMeta := range res.Brokers {
brokers[i] = Broker{
ID: int(brokerMeta.NodeID),
Port: int(brokerMeta.Port),
Host: brokerMeta.Host,
Rack: brokerMeta.Rack,
}
}
return nil
},
)
return brokers, err
}

// DeleteTopics deletes the specified topics.
func (c *Conn) DeleteTopics(topics ...string) error {
_, err := c.deleteTopics(deleteTopicsRequestV0{
Expand Down
21 changes: 20 additions & 1 deletion conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ func TestConn(t *testing.T) {
scenario: "test retrieve controller",
function: testController,
},
{
scenario: "test list brokers",
function: testBrokers,
},
}

const (
Expand Down Expand Up @@ -954,13 +958,28 @@ func testController(t *testing.T, conn *Conn) {
t.Errorf("expected 9092 received %d", b.Port)
}
if b.ID != 1 {
t.Errorf("expected 0 received %d", b.ID)
t.Errorf("expected 1 received %d", b.ID)
}
if b.Rack != "" {
t.Errorf("expected empty string for rack received %s", b.Rack)
}
}

func testBrokers(t *testing.T, conn *Conn) {
brokers, err := conn.Brokers()
if err != nil {
t.Error(err)
}

if len(brokers) != 1 {
t.Errorf("expected 1 broker in %+v", brokers)
}

if brokers[0].ID != 1 {
t.Errorf("expected ID 1 received %d", brokers[0].ID)
}
}

const benchmarkMessageCount = 100

func BenchmarkConn(b *testing.B) {
Expand Down

0 comments on commit fcb01ba

Please sign in to comment.