Skip to content

Commit

Permalink
Nettest fix (segmentio#788)
Browse files Browse the repository at this point in the history
* don't skip messages when offset == highwatermark
  • Loading branch information
rhansen2 authored Nov 15, 2021
1 parent eebea66 commit 2e02f37
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 51 deletions.
33 changes: 0 additions & 33 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,6 @@ jobs:
working_directory: *working_directory
environment:
KAFKA_VERSION: "2.4.1"

# Need to skip nettest to avoid these kinds of errors:
# --- FAIL: TestConn/nettest (17.56s)
# --- FAIL: TestConn/nettest/PingPong (7.40s)
# conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
# conntest.go:118: mismatching value: got 77, want 78
# conntest.go:118: mismatching value: got 78, want 79
# ...
#
# TODO: Figure out why these are happening and fix them (they don't appear to be new).
KAFKA_SKIP_NETTEST: "1"
docker:
- image: circleci/golang
- image: wurstmeister/zookeeper
Expand All @@ -203,17 +192,6 @@ jobs:
working_directory: *working_directory
environment:
KAFKA_VERSION: "2.6.0"

# Need to skip nettest to avoid these kinds of errors:
# --- FAIL: TestConn/nettest (17.56s)
# --- FAIL: TestConn/nettest/PingPong (7.40s)
# conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
# conntest.go:118: mismatching value: got 77, want 78
# conntest.go:118: mismatching value: got 78, want 79
# ...
#
# TODO: Figure out why these are happening and fix them (they don't appear to be new).
KAFKA_SKIP_NETTEST: "1"
docker:
- image: circleci/golang
- image: wurstmeister/zookeeper
Expand All @@ -230,17 +208,6 @@ jobs:
working_directory: *working_directory
environment:
KAFKA_VERSION: "2.7.1"

# Need to skip nettest to avoid these kinds of errors:
# --- FAIL: TestConn/nettest (17.56s)
# --- FAIL: TestConn/nettest/PingPong (7.40s)
# conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
# conntest.go:118: mismatching value: got 77, want 78
# conntest.go:118: mismatching value: got 78, want 79
# ...
#
# TODO: Figure out why these are happening and fix them (they don't appear to be new).
KAFKA_SKIP_NETTEST: "1"
docker:
- image: circleci/golang
- image: wurstmeister/zookeeper
Expand Down
28 changes: 10 additions & 18 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,9 @@ const (
ReadCommitted IsolationLevel = 1
)

var (
// DefaultClientID is the default value used as ClientID of kafka
// connections.
DefaultClientID string
)
// DefaultClientID is the default value used as ClientID of kafka
// connections.
var DefaultClientID string

func init() {
progname := filepath.Base(os.Args[0])
Expand Down Expand Up @@ -263,10 +261,12 @@ func (c *Conn) Controller() (broker Broker, err error) {
}
for _, brokerMeta := range res.Brokers {
if brokerMeta.NodeID == res.ControllerID {
broker = Broker{ID: int(brokerMeta.NodeID),
broker = Broker{
ID: int(brokerMeta.NodeID),
Port: int(brokerMeta.Port),
Host: brokerMeta.Host,
Rack: brokerMeta.Rack}
Rack: brokerMeta.Rack,
}
break
}
}
Expand Down Expand Up @@ -322,7 +322,6 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(findCoordinator, v0, id, request)

},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand Down Expand Up @@ -752,9 +751,8 @@ func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
// ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured
// with the default values in ReadBatchConfig except for minBytes and maxBytes.
func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {

var adjustedDeadline time.Time
var maxFetch = int(c.fetchMaxBytes)
maxFetch := int(c.fetchMaxBytes)

if cfg.MinBytes < 0 || cfg.MinBytes > maxFetch {
return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds", cfg.MinBytes, maxFetch)}
Expand Down Expand Up @@ -859,11 +857,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {

var msgs *messageSetReader
if err == nil {
if highWaterMark == offset {
msgs = &messageSetReader{empty: true}
} else {
msgs, err = newMessageSetReader(&c.rbuf, remain)
}
msgs, err = newMessageSetReader(&c.rbuf, remain)
}
if err == errShortRead {
err = checkTimeoutErr(adjustedDeadline)
Expand Down Expand Up @@ -959,7 +953,6 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) {
// connection. If there are none, the method fetches all partitions of the kafka
// cluster.
func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) {

if len(topics) == 0 {
if len(c.topic) != 0 {
defaultTopics := [...]string{c.topic}
Expand Down Expand Up @@ -1188,7 +1181,6 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
}
return size, err
}

})
if err != nil {
return size, err
Expand Down Expand Up @@ -1556,7 +1548,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
return nil, err
}
if version == v1 {
var request = saslAuthenticateRequestV0{Data: data}
request := saslAuthenticateRequestV0{Data: data}
var response saslAuthenticateResponseV0

err := c.writeOperation(
Expand Down

0 comments on commit 2e02f37

Please sign in to comment.