Skip to content

Commit

Permalink
Fix behavior of Conn.ReadPartitions() when no topics specified (segme…
Browse files Browse the repository at this point in the history
…ntio#383)

Ensure all topic partitions are returned when the connection does
not have a configured topic and the caller does not pass any
topics as arguments.

Fixes segmentio#381
  • Loading branch information
Steve van Loben Sels authored Dec 5, 2019
1 parent 3a767ae commit 3b34bbf
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 4 deletions.
12 changes: 9 additions & 3 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,10 +956,16 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) {
// connection. If there are none, the method fetches all partitions of the kafka
// cluster.
func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) {
defaultTopics := [...]string{c.topic}

if len(topics) == 0 && len(c.topic) != 0 {
topics = defaultTopics[:]
if len(topics) == 0 {
if len(c.topic) != 0 {
defaultTopics := [...]string{c.topic}
topics = defaultTopics[:]
} else {
// topics needs to be explicitly nil-ed out or the broker will
// interpret it as a request for 0 partitions instead of all.
topics = nil
}
}

err = c.readOperation(
Expand Down
17 changes: 17 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,23 @@ func testBrokers(t *testing.T, conn *Conn) {
}
}

func TestReadPartitionsNoTopic(t *testing.T) {
conn, err := Dial("tcp", "127.0.0.1:9092")
if err != nil {
t.Error(err)
}
defer conn.Close()

parts, err := conn.ReadPartitions()
if err != nil {
t.Error(err)
}

if len(parts) == 0 {
t.Errorf("no partitions were returned")
}
}

func TestUnsupportedSASLMechanism(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down
9 changes: 8 additions & 1 deletion metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@ func (r topicMetadataRequestV1) size() int32 {
}

func (r topicMetadataRequestV1) writeTo(wb *writeBuffer) {
wb.writeStringArray([]string(r))
// communicate nil-ness to the broker by passing -1 as the array length.
// for this particular request, the broker interpets a zero length array
// as a request for no topics whereas a nil array is for all topics.
if r == nil {
wb.writeArrayLen(-1)
} else {
wb.writeStringArray([]string(r))
}
}

type metadataResponseV1 struct {
Expand Down

0 comments on commit 3b34bbf

Please sign in to comment.