Skip to content

Commit

Permalink
add kafka.ApiVersion.Format (segmentio#395)
Browse files Browse the repository at this point in the history
* add kafka.ApiVersion.Format

* set explicit constant values intead of using iota
  • Loading branch information
Achille authored Jan 7, 2020
1 parent a44a65c commit 9a956db
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 81 deletions.
75 changes: 25 additions & 50 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ type Conn struct {
requiredAcks int32

// lazily loaded API versions used by this connection
apiVersions atomic.Value // apiVersions
apiVersions atomic.Value // apiVersionMap

transactionalID *string
}

type apiVersions map[apiKey]ApiVersion
type apiVersionMap map[apiKey]ApiVersion

func (v apiVersions) negotiate(key apiKey, sortedSupportedVersions ...apiVersion) apiVersion {
func (v apiVersionMap) negotiate(key apiKey, sortedSupportedVersions ...apiVersion) apiVersion {
x := v[key]

for i := len(sortedSupportedVersions) - 1; i >= 0; i-- {
Expand Down Expand Up @@ -210,8 +210,8 @@ func (c *Conn) negotiateVersion(key apiKey, sortedSupportedVersions ...apiVersio
return a, nil
}

func (c *Conn) loadVersions() (apiVersions, error) {
v, _ := c.apiVersions.Load().(apiVersions)
func (c *Conn) loadVersions() (apiVersionMap, error) {
v, _ := c.apiVersions.Load().(apiVersionMap)
if v != nil {
return v, nil
}
Expand All @@ -221,7 +221,7 @@ func (c *Conn) loadVersions() (apiVersions, error) {
return nil, err
}

v = make(apiVersions, len(brokerVersions))
v = make(apiVersionMap, len(brokerVersions))

for _, a := range brokerVersions {
v[apiKey(a.ApiKey)] = a
Expand All @@ -235,7 +235,7 @@ func (c *Conn) loadVersions() (apiVersions, error) {
func (c *Conn) Controller() (broker Broker, err error) {
err = c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(metadataRequest, v1, id, topicMetadataRequestV1([]string{}))
return c.writeRequest(metadata, v1, id, topicMetadataRequestV1([]string{}))
},
func(deadline time.Time, size int) error {
var res metadataResponseV1
Expand Down Expand Up @@ -263,7 +263,7 @@ func (c *Conn) Brokers() ([]Broker, error) {
var brokers []Broker
err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(metadataRequest, v1, id, topicMetadataRequestV1([]string{}))
return c.writeRequest(metadata, v1, id, topicMetadataRequestV1([]string{}))
},
func(deadline time.Time, size int) error {
var res metadataResponseV1
Expand Down Expand Up @@ -303,7 +303,7 @@ func (c *Conn) describeGroups(request describeGroupsRequestV0) (describeGroupsRe

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(describeGroupsRequest, v0, id, request)
return c.writeRequest(describeGroups, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand Down Expand Up @@ -331,7 +331,7 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(groupCoordinatorRequest, v0, id, request)
return c.writeRequest(findCoordinator, v0, id, request)

},
func(deadline time.Time, size int) error {
Expand All @@ -358,7 +358,7 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(heartbeatRequest, v0, id, request)
return c.writeRequest(heartbeat, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -384,7 +384,7 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(joinGroupRequest, v1, id, request)
return c.writeRequest(joinGroup, v1, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -410,7 +410,7 @@ func (c *Conn) leaveGroup(request leaveGroupRequestV0) (leaveGroupResponseV0, er

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(leaveGroupRequest, v0, id, request)
return c.writeRequest(leaveGroup, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -436,7 +436,7 @@ func (c *Conn) listGroups(request listGroupsRequestV1) (listGroupsResponseV1, er

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(listGroupsRequest, v1, id, request)
return c.writeRequest(listGroups, v1, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -462,7 +462,7 @@ func (c *Conn) offsetCommit(request offsetCommitRequestV2) (offsetCommitResponse

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(offsetCommitRequest, v2, id, request)
return c.writeRequest(offsetCommit, v2, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand Down Expand Up @@ -493,7 +493,7 @@ func (c *Conn) offsetFetch(request offsetFetchRequestV1) (offsetFetchResponseV1,

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(offsetFetchRequest, v1, id, request)
return c.writeRequest(offsetFetch, v1, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand Down Expand Up @@ -523,7 +523,7 @@ func (c *Conn) syncGroup(request syncGroupRequestV0) (syncGroupResponseV0, error

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(syncGroupRequest, v0, id, request)
return c.writeRequest(syncGroup, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand Down Expand Up @@ -783,7 +783,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
return &Batch{err: dontExpectEOF(err)}
}

fetchVersion, err := c.negotiateVersion(fetchRequest, v2, v5, v10)
fetchVersion, err := c.negotiateVersion(fetch, v2, v5, v10)
if err != nil {
return &Batch{err: dontExpectEOF(err)}
}
Expand Down Expand Up @@ -970,7 +970,7 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err

err = c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(metadataRequest, v1, id, topicMetadataRequestV1(topics))
return c.writeRequest(metadata, v1, id, topicMetadataRequestV1(topics))
},
func(deadline time.Time, size int) error {
var res metadataResponseV1
Expand Down Expand Up @@ -1085,7 +1085,7 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
}

var produceVersion apiVersion
if produceVersion, err = c.negotiateVersion(produceRequest, v2, v3, v7); err != nil {
if produceVersion, err = c.negotiateVersion(produce, v2, v3, v7); err != nil {
return
}

Expand Down Expand Up @@ -1378,31 +1378,6 @@ func (c *Conn) requestHeader(apiKey apiKey, apiVersion apiVersion, correlationID
}
}

type ApiVersion struct {
ApiKey int16
MinVersion int16
MaxVersion int16
}

var defaultApiVersions map[apiKey]ApiVersion = map[apiKey]ApiVersion{
produceRequest: ApiVersion{int16(produceRequest), int16(v2), int16(v2)},
fetchRequest: ApiVersion{int16(fetchRequest), int16(v2), int16(v2)},
listOffsetRequest: ApiVersion{int16(listOffsetRequest), int16(v1), int16(v1)},
metadataRequest: ApiVersion{int16(metadataRequest), int16(v1), int16(v1)},
offsetCommitRequest: ApiVersion{int16(offsetCommitRequest), int16(v2), int16(v2)},
offsetFetchRequest: ApiVersion{int16(offsetFetchRequest), int16(v1), int16(v1)},
groupCoordinatorRequest: ApiVersion{int16(groupCoordinatorRequest), int16(v0), int16(v0)},
joinGroupRequest: ApiVersion{int16(joinGroupRequest), int16(v1), int16(v1)},
heartbeatRequest: ApiVersion{int16(heartbeatRequest), int16(v0), int16(v0)},
leaveGroupRequest: ApiVersion{int16(leaveGroupRequest), int16(v0), int16(v0)},
syncGroupRequest: ApiVersion{int16(syncGroupRequest), int16(v0), int16(v0)},
describeGroupsRequest: ApiVersion{int16(describeGroupsRequest), int16(v1), int16(v1)},
listGroupsRequest: ApiVersion{int16(listGroupsRequest), int16(v1), int16(v1)},
apiVersionsRequest: ApiVersion{int16(apiVersionsRequest), int16(v0), int16(v0)},
createTopicsRequest: ApiVersion{int16(createTopicsRequest), int16(v0), int16(v0)},
deleteTopicsRequest: ApiVersion{int16(deleteTopicsRequest), int16(v1), int16(v1)},
}

func (c *Conn) ApiVersions() ([]ApiVersion, error) {
deadline := &c.rdeadline

Expand All @@ -1417,7 +1392,7 @@ func (c *Conn) ApiVersions() ([]ApiVersion, error) {

id, err := c.doRequest(deadline, func(_ time.Time, id int32) error {
h := requestHeader{
ApiKey: int16(apiVersionsRequest),
ApiKey: int16(apiVersions),
ApiVersion: int16(v0),
CorrelationID: id,
ClientID: c.clientID,
Expand Down Expand Up @@ -1542,14 +1517,14 @@ func (c *Conn) saslHandshake(mechanism string) error {
// challenge/responses are sent
var resp saslHandshakeResponseV0

version, err := c.negotiateVersion(saslHandshakeRequest, v0, v1)
version, err := c.negotiateVersion(saslHandshake, v0, v1)
if err != nil {
return err
}

err = c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(saslHandshakeRequest, version, id, &saslHandshakeRequestV0{Mechanism: mechanism})
return c.writeRequest(saslHandshake, version, id, &saslHandshakeRequestV0{Mechanism: mechanism})
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (int, error) {
Expand All @@ -1571,7 +1546,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
// if we sent a v1 handshake, then we must encapsulate the authentication
// request in a saslAuthenticateRequest. otherwise, we read and write raw
// bytes.
version, err := c.negotiateVersion(saslHandshakeRequest, v0, v1)
version, err := c.negotiateVersion(saslHandshake, v0, v1)
if err != nil {
return nil, err
}
Expand All @@ -1581,7 +1556,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(saslAuthenticateRequest, v0, id, request)
return c.writeRequest(saslAuthenticate, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand Down
2 changes: 1 addition & 1 deletion createtopics.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponse
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
request.Timeout = milliseconds(deadlineToTimeout(deadline, now))
}
return c.writeRequest(createTopicsRequest, v0, id, request)
return c.writeRequest(createTopics, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand Down
2 changes: 1 addition & 1 deletion deletetopics.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
request.Timeout = milliseconds(deadlineToTimeout(deadline, now))
}
return c.writeRequest(deleteTopicsRequest, v0, id, request)
return c.writeRequest(deleteTopics, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand Down
Loading

0 comments on commit 9a956db

Please sign in to comment.