From e67bd5f0c63ac69bb9942d5aeb7e8ed281e811c6 Mon Sep 17 00:00:00 2001 From: Cyril David Date: Tue, 19 Apr 2022 23:59:16 -0700 Subject: [PATCH] fix: handle error in response --- protocol/describeconfigs/describeconfigs.go | 19 +++++-- .../describeconfigs/describeconfigs_test.go | 57 +++++++++++++++++++ 2 files changed, 71 insertions(+), 5 deletions(-) create mode 100644 protocol/describeconfigs/describeconfigs_test.go diff --git a/protocol/describeconfigs/describeconfigs.go b/protocol/describeconfigs/describeconfigs.go index fb9708a7c..c28840565 100644 --- a/protocol/describeconfigs/describeconfigs.go +++ b/protocol/describeconfigs/describeconfigs.go @@ -1,6 +1,7 @@ package describeconfigs import ( + "fmt" "strconv" "github.com/segmentio/kafka-go/protocol" @@ -87,11 +88,19 @@ func (r *Response) Merge(requests []protocol.Message, results []interface{}) ( response := &Response{} for _, result := range results { - brokerResp := result.(*Response) - response.Resources = append( - response.Resources, - brokerResp.Resources..., - ) + switch v := result.(type) { + case *Response: + response.Resources = append( + response.Resources, + v.Resources..., + ) + + case error: + return nil, v + + default: + return nil, fmt.Errorf("Unknown result type: %T", result) + } } return response, nil diff --git a/protocol/describeconfigs/describeconfigs_test.go b/protocol/describeconfigs/describeconfigs_test.go new file mode 100644 index 000000000..ec6a6a68f --- /dev/null +++ b/protocol/describeconfigs/describeconfigs_test.go @@ -0,0 +1,57 @@ +package describeconfigs + +import ( + "io" + "reflect" + "testing" + + "github.com/segmentio/kafka-go/protocol" +) + +func TestResponse_Merge(t *testing.T) { + t.Run("happy path", func(t *testing.T) { + r := &Response{} + + r1 := &Response{ + Resources: []ResponseResource{ + {ResourceName: "r1"}, + }, + } + r2 := &Response{ + Resources: []ResponseResource{ + {ResourceName: "r2"}, + }, + } + + got, err := r.Merge([]protocol.Message{&Request{}}, []interface{}{r1, r2}) + if err != nil { + t.Fatal(err) + } + + want := &Response{ + Resources: []ResponseResource{ + {ResourceName: "r1"}, + {ResourceName: "r2"}, + }, + } + + if !reflect.DeepEqual(want, got) { + t.Fatalf("wanted response: \n%+v, got \n%+v", want, got) + } + }) + + t.Run("with errors", func(t *testing.T) { + r := &Response{} + + r1 := &Response{ + Resources: []ResponseResource{ + {ResourceName: "r1"}, + }, + } + + _, err := r.Merge([]protocol.Message{&Request{}}, []interface{}{r1, io.EOF}) + if err != io.EOF { + t.Fatalf("wanted err io.EOF, got %v", err) + } + }) +}