Skip to content

Commit

Permalink
expanding Conn to handle kafka api calls required to support consumer…
Browse files Browse the repository at this point in the history
… groups (segmentio#47)

* expanding Conn to support kafka api calls required to support consumer groups

* made handling of readFrom consistent

* added read helpers, readStringArray and readMapStringInt32

* changed Fatalf to Errorf to allow additional error cases to be reported

* removed writeHeader in favor of existing (*Conn).writeRequest

* kafka api calls returned now return zero values instead of partial values

* clarified docs around MemberID

* updated docs for MemberID

* extended test for protocol metadata to call to bytes

* removing unused protocolMetadata.  will reintroduce when adding the consumer group code
  • Loading branch information
savaki authored and achille-roussel committed Jan 4, 2018
1 parent d1a5286 commit 8547597
Show file tree
Hide file tree
Showing 27 changed files with 2,308 additions and 30 deletions.
247 changes: 247 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,253 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
return c
}

// describeGroups retrieves the specified groups
//
// See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups
func (c *Conn) describeGroups(request describeGroupsRequestV1) (describeGroupsResponseV1, error) {
var response describeGroupsResponseV1

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(describeGroupsRequest, v1, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
return (&response).readFrom(&c.rbuf, size)
}())
},
)
if err != nil {
return describeGroupsResponseV1{}, err
}
for _, group := range response.Groups {
if group.ErrorCode != 0 {
return describeGroupsResponseV1{}, Error(group.ErrorCode)
}
}

return response, nil
}

// findCoordinator finds the coordinator for the specified group or transaction
//
// See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator
func (c *Conn) findCoordinator(request findCoordinatorRequestV1) (findCoordinatorResponseV1, error) {
var response findCoordinatorResponseV1

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(groupCoordinatorRequest, v1, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
return (&response).readFrom(&c.rbuf, size)
}())
},
)
if err != nil {
return findCoordinatorResponseV1{}, err
}
if response.ErrorCode != 0 {
return findCoordinatorResponseV1{}, Error(response.ErrorCode)
}

return response, nil
}

// heartbeat sends a heartbeat message required by consumer groups
//
// See http://kafka.apache.org/protocol.html#The_Messages_Heartbeat
func (c *Conn) heartbeat(request heartbeatRequestV1) (heartbeatResponseV1, error) {
var response heartbeatResponseV1

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(heartbeatRequest, v1, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
return (&response).readFrom(&c.rbuf, size)
}())
},
)
if err != nil {
return heartbeatResponseV1{}, err
}
if response.ErrorCode != 0 {
return heartbeatResponseV1{}, Error(response.ErrorCode)
}

return response, nil
}

// joinGroup attempts to join a consumer group
//
// See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
func (c *Conn) joinGroup(request joinGroupRequestV2) (joinGroupResponseV2, error) {
var response joinGroupResponseV2

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(joinGroupRequest, v2, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
return (&response).readFrom(&c.rbuf, size)
}())
},
)
if err != nil {
return joinGroupResponseV2{}, err
}
if response.ErrorCode != 0 {
return joinGroupResponseV2{}, Error(response.ErrorCode)
}

return response, nil
}

// leaveGroup leaves the consumer from the consumer group
//
// See http://kafka.apache.org/protocol.html#The_Messages_LeaveGroup
func (c *Conn) leaveGroup(request leaveGroupRequestV1) (leaveGroupResponseV1, error) {
var response leaveGroupResponseV1

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(leaveGroupRequest, v1, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
return (&response).readFrom(&c.rbuf, size)
}())
},
)
if err != nil {
return leaveGroupResponseV1{}, err
}
if response.ErrorCode != 0 {
return leaveGroupResponseV1{}, Error(response.ErrorCode)
}

return response, nil
}

// listGroups lists all the consumer groups
//
// See http://kafka.apache.org/protocol.html#The_Messages_ListGroups
func (c *Conn) listGroups(request listGroupsRequestV1) (listGroupsResponseV1, error) {
var response listGroupsResponseV1

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(listGroupsRequest, v1, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
return (&response).readFrom(&c.rbuf, size)
}())
},
)
if err != nil {
return listGroupsResponseV1{}, err
}
if response.ErrorCode != 0 {
return listGroupsResponseV1{}, Error(response.ErrorCode)
}

return response, nil
}

// offsetCommit commits the specified topic partition offsets
//
// See http://kafka.apache.org/protocol.html#The_Messages_OffsetCommit
func (c *Conn) offsetCommit(request offsetCommitRequestV3) (offsetCommitResponseV3, error) {
var response offsetCommitResponseV3

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(offsetCommitRequest, v3, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
return (&response).readFrom(&c.rbuf, size)
}())
},
)
if err != nil {
return offsetCommitResponseV3{}, err
}
for _, r := range response.Responses {
for _, pr := range r.PartitionResponses {
if pr.ErrorCode != 0 {
return offsetCommitResponseV3{}, Error(pr.ErrorCode)
}
}
}

return response, nil
}

// offsetFetch fetches the offsets for the specified topic partitions
//
// See http://kafka.apache.org/protocol.html#The_Messages_OffsetFetch
func (c *Conn) offsetFetch(request offsetFetchRequestV3) (offsetFetchResponseV3, error) {
var response offsetFetchResponseV3

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(offsetFetchRequest, v3, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
return (&response).readFrom(&c.rbuf, size)
}())
},
)
if err != nil {
return offsetFetchResponseV3{}, err
}
if response.ErrorCode != 0 {
return offsetFetchResponseV3{}, Error(response.ErrorCode)
}
for _, r := range response.Responses {
for _, pr := range r.PartitionResponses {
if pr.ErrorCode != 0 {
return offsetFetchResponseV3{}, Error(pr.ErrorCode)
}
}
}

return response, nil
}

// syncGroups completes the handshake to join a consumer group
//
// See http://kafka.apache.org/protocol.html#The_Messages_SyncGroup
func (c *Conn) syncGroups(request syncGroupRequestV1) (syncGroupResponseV1, error) {
var response syncGroupResponseV1

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(syncGroupRequest, v1, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
return (&response).readFrom(&c.rbuf, size)
}())
},
)
if err != nil {
return syncGroupResponseV1{}, err
}
if response.ErrorCode != 0 {
return syncGroupResponseV1{}, Error(response.ErrorCode)
}

return response, nil
}

// Close closes the kafka connection.
func (c *Conn) Close() error {
return c.conn.Close()
Expand Down
Loading

0 comments on commit 8547597

Please sign in to comment.