Skip to content

Commit

Permalink
Fix for panic when RequiredAcks is set to RequireNone (segmentio#504)
Browse files Browse the repository at this point in the history
* Fix panic in async wait() method when RequiredAcks is None

When RequiredAcks is None, the producer does not wait for a
response from the broker, therefore the response is nil.
The async wait() method was not handling this case, leading
to a panic.

* Add regression test for RequiredAcks == RequireNone

This new test is required because all the other Writer tests use
NewWriter() to create Writers, which sets RequiredAcks to
RequireAll when 0 (None) was specified.
  • Loading branch information
neilcook authored Sep 29, 2020
1 parent 6bd4c78 commit e7d5971
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
2 changes: 2 additions & 0 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,8 @@ func (p async) await(ctx context.Context) (Response, error) {
select {
case x := <-p:
switch v := x.(type) {
case nil:
return nil, nil // A nil response is ok (e.g. when RequiredAcks is None)
case Response:
return v, nil
case error:
Expand Down
27 changes: 27 additions & 0 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func TestWriter(t *testing.T) {
scenario: "setting a non default balancer on the writer",
function: testWriterSetsRightBalancer,
},
{
scenario: "setting RequiredAcks to None in Writer doesn't cause a panic",
function: testWriterRequiredAcksNone,
},
}

for _, test := range tests {
Expand Down Expand Up @@ -83,6 +87,29 @@ func testWriterClose(t *testing.T) {
}
}

func testWriterRequiredAcksNone(t *testing.T) {
topic := makeTopic()
createTopic(t, topic, 1)
defer deleteTopic(t, topic)

w := &Writer{
Addr: TCP("localhost:9092"),
Topic: topic,
Balancer: &RoundRobin{},
RequiredAcks: RequireNone,
}
defer w.Close()

msg := Message{
Key: []byte("ThisIsAKey"),
Value: []byte("Test message for required acks test")}

err := w.WriteMessages(context.Background(), msg)
if err != nil {
t.Fatal(err)
}
}

func testWriterSetsRightBalancer(t *testing.T) {
const topic = "test-writer-1"
balancer := &CRC32Balancer{}
Expand Down

0 comments on commit e7d5971

Please sign in to comment.