Skip to content

Commit

Permalink
enable errorlint linter and fix issues (segmentio#914)
Browse files Browse the repository at this point in the history
  • Loading branch information
dominicbarnes authored May 20, 2022
1 parent fb1504a commit 294fbdb
Show file tree
Hide file tree
Showing 19 changed files with 153 additions and 129 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
linters:
enable:
- bodyclose
- errorlint
- goconst
- godot
- gofmt
Expand All @@ -10,5 +11,4 @@ linters:
disable:
# Temporarily disabling so it can be addressed in a dedicated PR.
- errcheck
- errorlint
- goerr113
3 changes: 2 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"bytes"
"context"
"errors"
"io"
"math/rand"
"net"
Expand Down Expand Up @@ -262,7 +263,7 @@ func TestClientProduceAndConsume(t *testing.T) {
for {
r, err := res.Records.ReadRecord()
if err != nil {
if err != io.EOF {
if !errors.Is(err, io.EOF) {
t.Fatal(err)
}
break
Expand Down
9 changes: 5 additions & 4 deletions compress/snappy/go-xerial-snappy/snappy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package snappy

import (
"bytes"
"errors"
"testing"
)

Expand Down Expand Up @@ -92,7 +93,7 @@ func TestSnappyDecodeMalformedTruncatedHeader(t *testing.T) {
for i := 0; i < len(xerialHeader); i++ {
buf := make([]byte, i)
copy(buf, xerialHeader[:i])
if _, err := Decode(buf); err != ErrMalformed {
if _, err := Decode(buf); !errors.Is(err, ErrMalformed) {
t.Errorf("expected ErrMalformed got %v", err)
}
}
Expand All @@ -104,7 +105,7 @@ func TestSnappyDecodeMalformedTruncatedSize(t *testing.T) {
for _, size := range sizes {
buf := make([]byte, size)
copy(buf, xerialHeader)
if _, err := Decode(buf); err != ErrMalformed {
if _, err := Decode(buf); !errors.Is(err, ErrMalformed) {
t.Errorf("expected ErrMalformed got %v", err)
}
}
Expand All @@ -116,7 +117,7 @@ func TestSnappyDecodeMalformedBNoData(t *testing.T) {
copy(buf, xerialHeader)
// indicate that there's one byte of data to be read
buf[len(buf)-1] = 1
if _, err := Decode(buf); err != ErrMalformed {
if _, err := Decode(buf); !errors.Is(err, ErrMalformed) {
t.Errorf("expected ErrMalformed got %v", err)
}
}
Expand All @@ -128,7 +129,7 @@ func TestSnappyMasterDecodeFailed(t *testing.T) {
buf[len(buf)-2] = 1
// A payload which will not decode
buf[len(buf)-1] = 1
if _, err := Decode(buf); err == ErrMalformed || err == nil {
if _, err := Decode(buf); errors.Is(err, ErrMalformed) || err == nil {
t.Errorf("unexpected err: %v", err)
}
}
Expand Down
14 changes: 6 additions & 8 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1231,11 +1231,10 @@ func (c *Conn) writeRequest(apiKey apiKey, apiVersion apiVersion, correlationID

func (c *Conn) readResponse(size int, res interface{}) error {
size, err := read(&c.rbuf, size, res)
switch err.(type) {
case Error:
var e error
if size, e = discardN(&c.rbuf, size, size); e != nil {
err = e
if err != nil {
var kafkaError Error
if errors.As(err, &kafkaError) {
size, err = discardN(&c.rbuf, size, size)
}
}
return expectZeroSize(size, err)
Expand Down Expand Up @@ -1294,9 +1293,8 @@ func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func
}

if err = read(deadline, size); err != nil {
switch err.(type) {
case Error:
default:
var kafkaError Error
if !errors.As(err, &kafkaError) {
c.conn.Close()
}
}
Expand Down
46 changes: 26 additions & 20 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,11 @@ func testConnWrite(t *testing.T, conn *Conn) {
func testConnCloseAndWrite(t *testing.T, conn *Conn) {
conn.Close()

switch _, err := conn.Write([]byte("Hello World!")); err.(type) {
case *net.OpError:
default:
_, err := conn.Write([]byte("Hello World!"))

// expect a network error
var netOpError *net.OpError
if !errors.As(err, &netOpError) {
t.Error(err)
}
}
Expand Down Expand Up @@ -489,7 +491,7 @@ func testConnSeekDontCheck(t *testing.T, conn *Conn) {
t.Error("bad offset:", offset)
}

if _, err := conn.ReadMessage(1024); err != OffsetOutOfRange {
if _, err := conn.ReadMessage(1024); !errors.Is(err, OffsetOutOfRange) {
t.Error("unexpected error:", err)
}
}
Expand Down Expand Up @@ -659,13 +661,15 @@ func waitForCoordinator(t *testing.T, conn *Conn, groupID string) {
_, err := conn.findCoordinator(findCoordinatorRequestV0{
CoordinatorKey: groupID,
})
switch err {
case nil:
if err != nil {
if errors.Is(err, GroupCoordinatorNotAvailable) {
time.Sleep(250 * time.Millisecond)
continue
} else {
t.Fatalf("unable to find coordinator for group: %v", err)
}
} else {
return
case GroupCoordinatorNotAvailable:
time.Sleep(250 * time.Millisecond)
default:
t.Fatalf("unable to find coordinator for group: %v", err)
}
}

Expand All @@ -690,15 +694,18 @@ func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32,
},
},
})
switch err {
case nil:
if err != nil {
if errors.Is(err, NotCoordinatorForGroup) {
time.Sleep(250 * time.Millisecond)
continue
} else {
t.Fatalf("bad joinGroup: %s", err)
}
} else {
return
case NotCoordinatorForGroup:
time.Sleep(250 * time.Millisecond)
default:
t.Fatalf("bad joinGroup: %s", err)
}
}

return
}

Expand Down Expand Up @@ -742,12 +749,11 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) {
}
response, err := conn.findCoordinator(findCoordinatorRequestV0{CoordinatorKey: groupID})
if err != nil {
switch err {
case GroupCoordinatorNotAvailable:
if errors.Is(err, GroupCoordinatorNotAvailable) {
continue
default:
t.Fatalf("bad findCoordinator: %s", err)
}

t.Fatalf("bad findCoordinator: %s", err)
}

if response.Coordinator.NodeID == 0 {
Expand Down
24 changes: 15 additions & 9 deletions consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,19 +523,21 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
return
case <-ticker.C:
ops, err := g.conn.readPartitions(topic)
switch err {
case nil, UnknownTopicOrPartition:
switch {
case err == nil, errors.Is(err, UnknownTopicOrPartition):
if len(ops) != oParts {
g.log(func(l Logger) {
l.Printf("Partition changes found, reblancing group: %v.", g.GroupID)
})
return
}

default:
g.logError(func(l Logger) {
l.Printf("Problem getting partitions while checking for changes, %v", err)
})
if _, ok := err.(Error); ok {
var kafkaError Error
if errors.As(err, &kafkaError) {
continue
}
// other errors imply that we lost the connection to the coordinator, so we
Expand Down Expand Up @@ -724,20 +726,24 @@ func (cg *ConsumerGroup) run() {
// to the next generation. it will be non-nil in the case of an error
// joining or syncing the group.
var backoff <-chan time.Time
switch err {
case nil:

switch {
case err == nil:
// no error...the previous generation finished normally.
continue
case ErrGroupClosed:

case errors.Is(err, ErrGroupClosed):
// the CG has been closed...leave the group and exit loop.
_ = cg.leaveGroup(memberID)
return
case RebalanceInProgress:

case errors.Is(err, RebalanceInProgress):
// in case of a RebalanceInProgress, don't leave the group or
// change the member ID, but report the error. the next attempt
// to join the group will then be subject to the rebalance
// timeout, so the broker will be responsible for throttling
// this loop.

default:
// leave the group and report the error if we had gotten far
// enough so as to have a member ID. also clear the member id
Expand Down Expand Up @@ -984,7 +990,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque
for _, balancer := range cg.config.GroupBalancers {
userData, err := balancer.UserData()
if err != nil {
return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %v", balancer.ProtocolName(), err)
return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
}
request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{
ProtocolName: balancer.ProtocolName(),
Expand Down Expand Up @@ -1050,7 +1056,7 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember
metadata := groupMetadata{}
reader := bufio.NewReader(bytes.NewReader(item.MemberMetadata))
if remain, err := (&metadata).readFrom(reader, len(item.MemberMetadata)); err != nil || remain != 0 {
return nil, fmt.Errorf("unable to read metadata for member, %v: %v", item.MemberID, err)
return nil, fmt.Errorf("unable to read metadata for member, %v: %w", item.MemberID, err)
}

members = append(members, GroupMember{
Expand Down
13 changes: 8 additions & 5 deletions createtopics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"bufio"
"context"
"errors"
"fmt"
"net"
"time"
Expand Down Expand Up @@ -384,12 +385,14 @@ func (c *Conn) CreateTopics(topics ...TopicConfig) error {
_, err := c.createTopics(createTopicsRequestV0{
Topics: requestV0Topics,
})
if err != nil {
if errors.Is(err, TopicAlreadyExists) {
// ok
return nil
}

switch err {
case TopicAlreadyExists:
// ok
return nil
default:
return err
}

return nil
}
3 changes: 2 additions & 1 deletion dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -294,7 +295,7 @@ func TestDialerConnectTLSHonorsContext(t *testing.T) {
defer cancel()

_, err := d.connectTLS(ctx, conn, d.TLS)
if context.DeadlineExceeded != err {
if !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("expected err to be %v; got %v", context.DeadlineExceeded, err)
t.FailNow()
}
Expand Down
7 changes: 4 additions & 3 deletions discard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"bufio"
"bytes"
"errors"
"io"
"testing"
)
Expand Down Expand Up @@ -52,7 +53,7 @@ func TestDiscardN(t *testing.T) {
scenario: "discard more than available",
function: func(t *testing.T, r *bufio.Reader, sz int) {
remain, err := discardN(r, sz, sz+1)
if err != errShortRead {
if !errors.Is(err, errShortRead) {
t.Errorf("Expected errShortRead, got %v", err)
}
if remain != 0 {
Expand All @@ -64,7 +65,7 @@ func TestDiscardN(t *testing.T) {
scenario: "discard returns error",
function: func(t *testing.T, r *bufio.Reader, sz int) {
remain, err := discardN(r, sz+2, sz+1)
if err != io.EOF {
if !errors.Is(err, io.EOF) {
t.Errorf("Expected EOF, got %v", err)
}
if remain != 2 {
Expand All @@ -76,7 +77,7 @@ func TestDiscardN(t *testing.T) {
scenario: "errShortRead doesn't mask error",
function: func(t *testing.T, r *bufio.Reader, sz int) {
remain, err := discardN(r, sz+1, sz+2)
if err != io.EOF {
if !errors.Is(err, io.EOF) {
t.Errorf("Expected EOF, got %v", err)
}
if remain != 1 {
Expand Down
23 changes: 13 additions & 10 deletions example_consumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka_test

import (
"context"
"errors"
"fmt"
"os"

Expand Down Expand Up @@ -42,18 +43,20 @@ func ExampleGeneration_Start_consumerGroupParallelReaders() {
reader.SetOffset(offset)
for {
msg, err := reader.ReadMessage(ctx)
switch err {
case kafka.ErrGenerationEnded:
// generation has ended. commit offsets. in a real app,
// offsets would be committed periodically.
gen.CommitOffsets(map[string]map[int]int64{"my-topic": {partition: offset + 1}})
return
case nil:
fmt.Printf("received message %s/%d/%d : %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
offset = msg.Offset
default:
if err != nil {
if errors.Is(err, kafka.ErrGenerationEnded) {
// generation has ended. commit offsets. in a real app,
// offsets would be committed periodically.
gen.CommitOffsets(map[string]map[int]int64{"my-topic": {partition: offset + 1}})
return
}

fmt.Printf("error reading message: %+v\n", err)
return
}

fmt.Printf("received message %s/%d/%d : %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
offset = msg.Offset
}
})
}
Expand Down
3 changes: 2 additions & 1 deletion message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"encoding/hex"
"errors"
"fmt"
"io"
"math/rand"
Expand Down Expand Up @@ -551,7 +552,7 @@ func TestMessageSetReaderEmpty(t *testing.T) {
if headers != nil {
t.Errorf("expected nil headers, got %v", headers)
}
if err != RequestTimedOut {
if !errors.Is(err, RequestTimedOut) {
t.Errorf("expected RequestTimedOut, got %v", err)
}

Expand Down
Loading

0 comments on commit 294fbdb

Please sign in to comment.