Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into clarify_special_off…
Browse files Browse the repository at this point in the history
…sets
  • Loading branch information
jnjackins committed Oct 23, 2018
2 parents 3b9595a + 51bf2aa commit 2602bcb
Show file tree
Hide file tree
Showing 30 changed files with 668 additions and 712 deletions.
4 changes: 3 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ jobs:
- image: circleci/golang
- image: wurstmeister/zookeeper
ports: ['2181:2181']
- image: wurstmeister/kafka
- image: wurstmeister/kafka:0.11.0.1
ports: ['9092:9092']
environment:
KAFKA_VERSION: '0.11.0.1'
KAFKA_BROKER_ID: '1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_ADVERTISED_HOST_NAME: 'localhost'
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
Expand Down
79 changes: 39 additions & 40 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Broker struct {
Host string
Port int
ID int
Rack string
}

// Partition carries the metadata associated with a kafka partition.
Expand Down Expand Up @@ -176,12 +177,12 @@ func (c *Conn) describeGroups(request describeGroupsRequestV1) (describeGroupsRe
// findCoordinator finds the coordinator for the specified group or transaction
//
// See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator
func (c *Conn) findCoordinator(request findCoordinatorRequestV1) (findCoordinatorResponseV1, error) {
var response findCoordinatorResponseV1
func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinatorResponseV0, error) {
var response findCoordinatorResponseV0

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(groupCoordinatorRequest, v1, id, request)
return c.writeRequest(groupCoordinatorRequest, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -190,10 +191,10 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV1) (findCoordinato
},
)
if err != nil {
return findCoordinatorResponseV1{}, err
return findCoordinatorResponseV0{}, err
}
if response.ErrorCode != 0 {
return findCoordinatorResponseV1{}, Error(response.ErrorCode)
return findCoordinatorResponseV0{}, Error(response.ErrorCode)
}

return response, nil
Expand All @@ -202,12 +203,12 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV1) (findCoordinato
// heartbeat sends a heartbeat message required by consumer groups
//
// See http://kafka.apache.org/protocol.html#The_Messages_Heartbeat
func (c *Conn) heartbeat(request heartbeatRequestV1) (heartbeatResponseV1, error) {
var response heartbeatResponseV1
func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error) {
var response heartbeatResponseV0

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(heartbeatRequest, v1, id, request)
return c.writeRequest(heartbeatRequest, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -216,10 +217,10 @@ func (c *Conn) heartbeat(request heartbeatRequestV1) (heartbeatResponseV1, error
},
)
if err != nil {
return heartbeatResponseV1{}, err
return heartbeatResponseV0{}, err
}
if response.ErrorCode != 0 {
return heartbeatResponseV1{}, Error(response.ErrorCode)
return heartbeatResponseV0{}, Error(response.ErrorCode)
}

return response, nil
Expand All @@ -228,12 +229,12 @@ func (c *Conn) heartbeat(request heartbeatRequestV1) (heartbeatResponseV1, error
// joinGroup attempts to join a consumer group
//
// See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
func (c *Conn) joinGroup(request joinGroupRequestV2) (joinGroupResponseV2, error) {
var response joinGroupResponseV2
func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error) {
var response joinGroupResponseV1

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(joinGroupRequest, v2, id, request)
return c.writeRequest(joinGroupRequest, v1, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -242,10 +243,10 @@ func (c *Conn) joinGroup(request joinGroupRequestV2) (joinGroupResponseV2, error
},
)
if err != nil {
return joinGroupResponseV2{}, err
return joinGroupResponseV1{}, err
}
if response.ErrorCode != 0 {
return joinGroupResponseV2{}, Error(response.ErrorCode)
return joinGroupResponseV1{}, Error(response.ErrorCode)
}

return response, nil
Expand All @@ -254,12 +255,12 @@ func (c *Conn) joinGroup(request joinGroupRequestV2) (joinGroupResponseV2, error
// leaveGroup leaves the consumer from the consumer group
//
// See http://kafka.apache.org/protocol.html#The_Messages_LeaveGroup
func (c *Conn) leaveGroup(request leaveGroupRequestV1) (leaveGroupResponseV1, error) {
var response leaveGroupResponseV1
func (c *Conn) leaveGroup(request leaveGroupRequestV0) (leaveGroupResponseV0, error) {
var response leaveGroupResponseV0

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(leaveGroupRequest, v1, id, request)
return c.writeRequest(leaveGroupRequest, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -268,10 +269,10 @@ func (c *Conn) leaveGroup(request leaveGroupRequestV1) (leaveGroupResponseV1, er
},
)
if err != nil {
return leaveGroupResponseV1{}, err
return leaveGroupResponseV0{}, err
}
if response.ErrorCode != 0 {
return leaveGroupResponseV1{}, Error(response.ErrorCode)
return leaveGroupResponseV0{}, Error(response.ErrorCode)
}

return response, nil
Expand Down Expand Up @@ -306,12 +307,12 @@ func (c *Conn) listGroups(request listGroupsRequestV1) (listGroupsResponseV1, er
// offsetCommit commits the specified topic partition offsets
//
// See http://kafka.apache.org/protocol.html#The_Messages_OffsetCommit
func (c *Conn) offsetCommit(request offsetCommitRequestV3) (offsetCommitResponseV3, error) {
var response offsetCommitResponseV3
func (c *Conn) offsetCommit(request offsetCommitRequestV2) (offsetCommitResponseV2, error) {
var response offsetCommitResponseV2

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(offsetCommitRequest, v3, id, request)
return c.writeRequest(offsetCommitRequest, v2, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -320,12 +321,12 @@ func (c *Conn) offsetCommit(request offsetCommitRequestV3) (offsetCommitResponse
},
)
if err != nil {
return offsetCommitResponseV3{}, err
return offsetCommitResponseV2{}, err
}
for _, r := range response.Responses {
for _, pr := range r.PartitionResponses {
if pr.ErrorCode != 0 {
return offsetCommitResponseV3{}, Error(pr.ErrorCode)
return offsetCommitResponseV2{}, Error(pr.ErrorCode)
}
}
}
Expand All @@ -336,12 +337,12 @@ func (c *Conn) offsetCommit(request offsetCommitRequestV3) (offsetCommitResponse
// offsetFetch fetches the offsets for the specified topic partitions
//
// See http://kafka.apache.org/protocol.html#The_Messages_OffsetFetch
func (c *Conn) offsetFetch(request offsetFetchRequestV3) (offsetFetchResponseV3, error) {
var response offsetFetchResponseV3
func (c *Conn) offsetFetch(request offsetFetchRequestV1) (offsetFetchResponseV1, error) {
var response offsetFetchResponseV1

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(offsetFetchRequest, v3, id, request)
return c.writeRequest(offsetFetchRequest, v1, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -350,15 +351,12 @@ func (c *Conn) offsetFetch(request offsetFetchRequestV3) (offsetFetchResponseV3,
},
)
if err != nil {
return offsetFetchResponseV3{}, err
}
if response.ErrorCode != 0 {
return offsetFetchResponseV3{}, Error(response.ErrorCode)
return offsetFetchResponseV1{}, err
}
for _, r := range response.Responses {
for _, pr := range r.PartitionResponses {
if pr.ErrorCode != 0 {
return offsetFetchResponseV3{}, Error(pr.ErrorCode)
return offsetFetchResponseV1{}, Error(pr.ErrorCode)
}
}
}
Expand All @@ -369,12 +367,12 @@ func (c *Conn) offsetFetch(request offsetFetchRequestV3) (offsetFetchResponseV3,
// syncGroups completes the handshake to join a consumer group
//
// See http://kafka.apache.org/protocol.html#The_Messages_SyncGroup
func (c *Conn) syncGroups(request syncGroupRequestV1) (syncGroupResponseV1, error) {
var response syncGroupResponseV1
func (c *Conn) syncGroups(request syncGroupRequestV0) (syncGroupResponseV0, error) {
var response syncGroupResponseV0

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(syncGroupRequest, v1, id, request)
return c.writeRequest(syncGroupRequest, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -383,10 +381,10 @@ func (c *Conn) syncGroups(request syncGroupRequestV1) (syncGroupResponseV1, erro
},
)
if err != nil {
return syncGroupResponseV1{}, err
return syncGroupResponseV0{}, err
}
if response.ErrorCode != 0 {
return syncGroupResponseV1{}, Error(response.ErrorCode)
return syncGroupResponseV0{}, Error(response.ErrorCode)
}

return response, nil
Expand Down Expand Up @@ -714,10 +712,10 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err

err = c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(metadataRequest, v0, id, topicMetadataRequestV0(topics))
return c.writeRequest(metadataRequest, v1, id, topicMetadataRequestV1(topics))
},
func(deadline time.Time, size int) error {
var res metadataResponseV0
var res metadataResponseV1

if err := c.readResponse(size, &res); err != nil {
return err
Expand All @@ -729,6 +727,7 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err
Host: b.Host,
Port: int(b.Port),
ID: int(b.NodeID),
Rack: b.Rack,
}
}

Expand Down
Loading

0 comments on commit 2602bcb

Please sign in to comment.