From 3b34bbf9eff0c49b524e115eed40452014e9c2da Mon Sep 17 00:00:00 2001 From: Steve van Loben Sels Date: Wed, 4 Dec 2019 16:44:01 -0800 Subject: [PATCH] Fix behavior of Conn.ReadPartitions() when no topics specified (#383) Ensure all topic partitions are returned when the connection does not have a configured topic and the caller does not pass any topics as arguments. Fixes #381 --- conn.go | 12 +++++++++--- conn_test.go | 17 +++++++++++++++++ metadata.go | 9 ++++++++- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/conn.go b/conn.go index f9ca8e0c9..f7ee1736d 100644 --- a/conn.go +++ b/conn.go @@ -956,10 +956,16 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) { // connection. If there are none, the method fetches all partitions of the kafka // cluster. func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) { - defaultTopics := [...]string{c.topic} - if len(topics) == 0 && len(c.topic) != 0 { - topics = defaultTopics[:] + if len(topics) == 0 { + if len(c.topic) != 0 { + defaultTopics := [...]string{c.topic} + topics = defaultTopics[:] + } else { + // topics needs to be explicitly nil-ed out or the broker will + // interpret it as a request for 0 partitions instead of all. + topics = nil + } } err = c.readOperation( diff --git a/conn_test.go b/conn_test.go index 55e8c3fcc..5c8bece15 100644 --- a/conn_test.go +++ b/conn_test.go @@ -1024,6 +1024,23 @@ func testBrokers(t *testing.T, conn *Conn) { } } +func TestReadPartitionsNoTopic(t *testing.T) { + conn, err := Dial("tcp", "127.0.0.1:9092") + if err != nil { + t.Error(err) + } + defer conn.Close() + + parts, err := conn.ReadPartitions() + if err != nil { + t.Error(err) + } + + if len(parts) == 0 { + t.Errorf("no partitions were returned") + } +} + func TestUnsupportedSASLMechanism(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() diff --git a/metadata.go b/metadata.go index 56e12d0e9..d524b9fd8 100644 --- a/metadata.go +++ b/metadata.go @@ -7,7 +7,14 @@ func (r topicMetadataRequestV1) size() int32 { } func (r topicMetadataRequestV1) writeTo(wb *writeBuffer) { - wb.writeStringArray([]string(r)) + // communicate nil-ness to the broker by passing -1 as the array length. + // for this particular request, the broker interpets a zero length array + // as a request for no topics whereas a nil array is for all topics. + if r == nil { + wb.writeArrayLen(-1) + } else { + wb.writeStringArray([]string(r)) + } } type metadataResponseV1 struct {