Skip to content

Commit

Permalink
Add ApiVersions request (segmentio#160)
Browse files Browse the repository at this point in the history
* Add ApiVersions request

* Retrieve api versions as connection is created

* Store api versions in a map

* Use meaningful default value for api versions
  • Loading branch information
VictorDenisov authored and achille-roussel committed Jan 9, 2019
1 parent 963714c commit 266d14d
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 1 deletion.
3 changes: 2 additions & 1 deletion batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ func TestBatchDontExpectEOF(t *testing.T) {
if err != nil {
t.Fatalf("cannot connect to partition leader at %s:%d: %s", broker.Host, broker.Port, err)
}
nc.(*net.TCPConn).CloseRead()

conn := NewConn(nc, topic, 0)
defer conn.Close()

nc.(*net.TCPConn).CloseRead()

batch := conn.ReadBatch(1024, 8192)

if _, err := batch.ReadMessage(); err != io.ErrUnexpectedEOF {
Expand Down
91 changes: 91 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Conn struct {

// number of replica acks required when publishing to a partition
requiredAcks int32
apiVersions map[apiKey]ApiVersion
}

// ConnConfig is a configuration object used to create new instances of Conn.
Expand Down Expand Up @@ -135,6 +136,16 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
}},
}).size()
c.fetchMaxBytes = math.MaxInt32 - c.fetchMinSize
var err error
apiVersions, err := c.ApiVersions()
if err != nil {
c.apiVersions = defaultApiVersions
} else {
c.apiVersions = make(map[apiKey]ApiVersion)
for _, v := range apiVersions {
c.apiVersions[apiKey(v.ApiKey)] = v
}
}
return c
}

Expand Down Expand Up @@ -183,6 +194,7 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(groupCoordinatorRequest, v0, id, request)

},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand Down Expand Up @@ -1012,6 +1024,85 @@ func (c *Conn) requestHeader(apiKey apiKey, apiVersion apiVersion, correlationID
}
}

type ApiVersion struct {
ApiKey int16
MinVersion int16
MaxVersion int16
}

var defaultApiVersions map[apiKey]ApiVersion = map[apiKey]ApiVersion{
produceRequest: ApiVersion{int16(produceRequest), int16(v2), int16(v2)},
fetchRequest: ApiVersion{int16(fetchRequest), int16(v2), int16(v2)},
listOffsetRequest: ApiVersion{int16(listOffsetRequest), int16(v1), int16(v1)},
metadataRequest: ApiVersion{int16(metadataRequest), int16(v1), int16(v1)},
offsetCommitRequest: ApiVersion{int16(offsetCommitRequest), int16(v2), int16(v2)},
offsetFetchRequest: ApiVersion{int16(offsetFetchRequest), int16(v1), int16(v1)},
groupCoordinatorRequest: ApiVersion{int16(groupCoordinatorRequest), int16(v0), int16(v0)},
joinGroupRequest: ApiVersion{int16(joinGroupRequest), int16(v1), int16(v1)},
heartbeatRequest: ApiVersion{int16(heartbeatRequest), int16(v0), int16(v0)},
leaveGroupRequest: ApiVersion{int16(leaveGroupRequest), int16(v0), int16(v0)},
syncGroupRequest: ApiVersion{int16(syncGroupRequest), int16(v0), int16(v0)},
describeGroupsRequest: ApiVersion{int16(describeGroupsRequest), int16(v1), int16(v1)},
listGroupsRequest: ApiVersion{int16(listGroupsRequest), int16(v1), int16(v1)},
apiVersionsRequest: ApiVersion{int16(apiVersionsRequest), int16(v0), int16(v0)},
createTopicsRequest: ApiVersion{int16(createTopicsRequest), int16(v0), int16(v0)},
deleteTopicsRequest: ApiVersion{int16(deleteTopicsRequest), int16(v1), int16(v1)},
}

func (c *Conn) ApiVersions() ([]ApiVersion, error) {
id, err := c.doRequest(&c.rdeadline, func(deadline time.Time, id int32) error {
now := time.Now()
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)

h := requestHeader{
ApiKey: int16(apiVersionsRequest),
ApiVersion: int16(v0),
CorrelationID: id,
ClientID: c.clientID,
}
h.Size = (h.size() - 4)

h.writeTo(&c.wbuf)
return c.wbuf.Flush()
})
if err != nil {
return nil, err
}

_, size, lock, err := c.waitResponse(&c.rdeadline, id)
if err != nil {
return nil, err
}
defer lock.Unlock()

var errorCode int16
if size, err = readInt16(&c.rbuf, size, &errorCode); err != nil {
return nil, err
}
var arrSize int32
if size, err = readInt32(&c.rbuf, size, &arrSize); err != nil {
return nil, err
}
r := make([]ApiVersion, arrSize)
for i := 0; i < int(arrSize); i++ {
if size, err = readInt16(&c.rbuf, size, &r[i].ApiKey); err != nil {
return nil, err
}
if size, err = readInt16(&c.rbuf, size, &r[i].MinVersion); err != nil {
return nil, err
}
if size, err = readInt16(&c.rbuf, size, &r[i].MaxVersion); err != nil {
return nil, err
}
}

if errorCode != 0 {
return r, Error(errorCode)
}

return r, nil
}

// connDeadline is a helper type to implement read/write deadline management on
// the kafka connection.
type connDeadline struct {
Expand Down
1 change: 1 addition & 0 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
syncGroupRequest apiKey = 14
describeGroupsRequest apiKey = 15
listGroupsRequest apiKey = 16
apiVersionsRequest apiKey = 18
createTopicsRequest apiKey = 19
deleteTopicsRequest apiKey = 20
)
Expand Down

0 comments on commit 266d14d

Please sign in to comment.