Skip to content

Commit

Permalink
Downgrade various api versions to be compatible with kafka 0.10.1 (se…
Browse files Browse the repository at this point in the history
  • Loading branch information
tsholmes authored and stevevls committed Oct 19, 2018
1 parent f8858a2 commit 07715b4
Show file tree
Hide file tree
Showing 22 changed files with 330 additions and 446 deletions.
3 changes: 2 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ jobs:
- image: circleci/golang
- image: wurstmeister/zookeeper
ports: ['2181:2181']
- image: wurstmeister/kafka
- image: wurstmeister/kafka:0.11.0.1
ports: ['9092:9092']
environment:
KAFKA_VERSION: '0.11.0.1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_ADVERTISED_HOST_NAME: 'localhost'
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
Expand Down
73 changes: 35 additions & 38 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,12 @@ func (c *Conn) describeGroups(request describeGroupsRequestV1) (describeGroupsRe
// findCoordinator finds the coordinator for the specified group or transaction
//
// See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator
func (c *Conn) findCoordinator(request findCoordinatorRequestV1) (findCoordinatorResponseV1, error) {
var response findCoordinatorResponseV1
func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinatorResponseV0, error) {
var response findCoordinatorResponseV0

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(groupCoordinatorRequest, v1, id, request)
return c.writeRequest(groupCoordinatorRequest, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -190,10 +190,10 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV1) (findCoordinato
},
)
if err != nil {
return findCoordinatorResponseV1{}, err
return findCoordinatorResponseV0{}, err
}
if response.ErrorCode != 0 {
return findCoordinatorResponseV1{}, Error(response.ErrorCode)
return findCoordinatorResponseV0{}, Error(response.ErrorCode)
}

return response, nil
Expand All @@ -202,12 +202,12 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV1) (findCoordinato
// heartbeat sends a heartbeat message required by consumer groups
//
// See http://kafka.apache.org/protocol.html#The_Messages_Heartbeat
func (c *Conn) heartbeat(request heartbeatRequestV1) (heartbeatResponseV1, error) {
var response heartbeatResponseV1
func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error) {
var response heartbeatResponseV0

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(heartbeatRequest, v1, id, request)
return c.writeRequest(heartbeatRequest, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -216,10 +216,10 @@ func (c *Conn) heartbeat(request heartbeatRequestV1) (heartbeatResponseV1, error
},
)
if err != nil {
return heartbeatResponseV1{}, err
return heartbeatResponseV0{}, err
}
if response.ErrorCode != 0 {
return heartbeatResponseV1{}, Error(response.ErrorCode)
return heartbeatResponseV0{}, Error(response.ErrorCode)
}

return response, nil
Expand All @@ -228,12 +228,12 @@ func (c *Conn) heartbeat(request heartbeatRequestV1) (heartbeatResponseV1, error
// joinGroup attempts to join a consumer group
//
// See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
func (c *Conn) joinGroup(request joinGroupRequestV2) (joinGroupResponseV2, error) {
var response joinGroupResponseV2
func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error) {
var response joinGroupResponseV1

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(joinGroupRequest, v2, id, request)
return c.writeRequest(joinGroupRequest, v1, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -242,10 +242,10 @@ func (c *Conn) joinGroup(request joinGroupRequestV2) (joinGroupResponseV2, error
},
)
if err != nil {
return joinGroupResponseV2{}, err
return joinGroupResponseV1{}, err
}
if response.ErrorCode != 0 {
return joinGroupResponseV2{}, Error(response.ErrorCode)
return joinGroupResponseV1{}, Error(response.ErrorCode)
}

return response, nil
Expand All @@ -254,12 +254,12 @@ func (c *Conn) joinGroup(request joinGroupRequestV2) (joinGroupResponseV2, error
// leaveGroup leaves the consumer from the consumer group
//
// See http://kafka.apache.org/protocol.html#The_Messages_LeaveGroup
func (c *Conn) leaveGroup(request leaveGroupRequestV1) (leaveGroupResponseV1, error) {
var response leaveGroupResponseV1
func (c *Conn) leaveGroup(request leaveGroupRequestV0) (leaveGroupResponseV0, error) {
var response leaveGroupResponseV0

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(leaveGroupRequest, v1, id, request)
return c.writeRequest(leaveGroupRequest, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -268,10 +268,10 @@ func (c *Conn) leaveGroup(request leaveGroupRequestV1) (leaveGroupResponseV1, er
},
)
if err != nil {
return leaveGroupResponseV1{}, err
return leaveGroupResponseV0{}, err
}
if response.ErrorCode != 0 {
return leaveGroupResponseV1{}, Error(response.ErrorCode)
return leaveGroupResponseV0{}, Error(response.ErrorCode)
}

return response, nil
Expand Down Expand Up @@ -306,12 +306,12 @@ func (c *Conn) listGroups(request listGroupsRequestV1) (listGroupsResponseV1, er
// offsetCommit commits the specified topic partition offsets
//
// See http://kafka.apache.org/protocol.html#The_Messages_OffsetCommit
func (c *Conn) offsetCommit(request offsetCommitRequestV3) (offsetCommitResponseV3, error) {
var response offsetCommitResponseV3
func (c *Conn) offsetCommit(request offsetCommitRequestV2) (offsetCommitResponseV2, error) {
var response offsetCommitResponseV2

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(offsetCommitRequest, v3, id, request)
return c.writeRequest(offsetCommitRequest, v2, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -320,12 +320,12 @@ func (c *Conn) offsetCommit(request offsetCommitRequestV3) (offsetCommitResponse
},
)
if err != nil {
return offsetCommitResponseV3{}, err
return offsetCommitResponseV2{}, err
}
for _, r := range response.Responses {
for _, pr := range r.PartitionResponses {
if pr.ErrorCode != 0 {
return offsetCommitResponseV3{}, Error(pr.ErrorCode)
return offsetCommitResponseV2{}, Error(pr.ErrorCode)
}
}
}
Expand All @@ -336,12 +336,12 @@ func (c *Conn) offsetCommit(request offsetCommitRequestV3) (offsetCommitResponse
// offsetFetch fetches the offsets for the specified topic partitions
//
// See http://kafka.apache.org/protocol.html#The_Messages_OffsetFetch
func (c *Conn) offsetFetch(request offsetFetchRequestV3) (offsetFetchResponseV3, error) {
var response offsetFetchResponseV3
func (c *Conn) offsetFetch(request offsetFetchRequestV1) (offsetFetchResponseV1, error) {
var response offsetFetchResponseV1

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(offsetFetchRequest, v3, id, request)
return c.writeRequest(offsetFetchRequest, v1, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -350,15 +350,12 @@ func (c *Conn) offsetFetch(request offsetFetchRequestV3) (offsetFetchResponseV3,
},
)
if err != nil {
return offsetFetchResponseV3{}, err
}
if response.ErrorCode != 0 {
return offsetFetchResponseV3{}, Error(response.ErrorCode)
return offsetFetchResponseV1{}, err
}
for _, r := range response.Responses {
for _, pr := range r.PartitionResponses {
if pr.ErrorCode != 0 {
return offsetFetchResponseV3{}, Error(pr.ErrorCode)
return offsetFetchResponseV1{}, Error(pr.ErrorCode)
}
}
}
Expand All @@ -369,12 +366,12 @@ func (c *Conn) offsetFetch(request offsetFetchRequestV3) (offsetFetchResponseV3,
// syncGroups completes the handshake to join a consumer group
//
// See http://kafka.apache.org/protocol.html#The_Messages_SyncGroup
func (c *Conn) syncGroups(request syncGroupRequestV1) (syncGroupResponseV1, error) {
var response syncGroupResponseV1
func (c *Conn) syncGroups(request syncGroupRequestV0) (syncGroupResponseV0, error) {
var response syncGroupResponseV0

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(syncGroupRequest, v1, id, request)
return c.writeRequest(syncGroupRequest, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -383,10 +380,10 @@ func (c *Conn) syncGroups(request syncGroupRequestV1) (syncGroupResponseV1, erro
},
)
if err != nil {
return syncGroupResponseV1{}, err
return syncGroupResponseV0{}, err
}
if response.ErrorCode != 0 {
return syncGroupResponseV1{}, Error(response.ErrorCode)
return syncGroupResponseV0{}, Error(response.ErrorCode)
}

return response, nil
Expand Down
48 changes: 24 additions & 24 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (c *connPipe) Close() error {
func (c *connPipe) Read(b []byte) (int, error) {
// See comments in Write.
time.Sleep(time.Millisecond)
if t := c.rconn.readDeadline(); !t.IsZero() && t.Sub(time.Now()) <= (10*time.Millisecond) {
if t := c.rconn.readDeadline(); !t.IsZero() {
return 0, &timeout{}
}
n, err := c.rconn.Read(b)
Expand All @@ -58,10 +58,10 @@ func (c *connPipe) Write(b []byte) (int, error) {
// goroutines a chance to start and set the deadline.
time.Sleep(time.Millisecond)

// Some tests set very short deadlines which end up aborting requests and
// closing the connection. To prevent this from happening we check how far
// the deadline is and if it's too close we timeout.
if t := c.wconn.writeDeadline(); !t.IsZero() && t.Sub(time.Now()) <= (10*time.Millisecond) {
// The nettest code only sets deadlines when it expects the write to time
// out. The broker connection is alive and able to accept data, so we need
// to simulate the timeout in order to get the tests to pass.
if t := c.wconn.writeDeadline(); !t.IsZero() {
return 0, &timeout{}
}

Expand Down Expand Up @@ -498,7 +498,7 @@ func waitForCoordinator(t *testing.T, conn *Conn, groupID string) {
// appear to happen if the kafka been running for a while.
const maxAttempts = 20
for attempt := 1; attempt <= maxAttempts; attempt++ {
_, err := conn.findCoordinator(findCoordinatorRequestV1{
_, err := conn.findCoordinator(findCoordinatorRequestV0{
CoordinatorKey: groupID,
})
switch err {
Expand All @@ -517,15 +517,15 @@ func waitForCoordinator(t *testing.T, conn *Conn, groupID string) {
func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32, memberID string, stop func()) {
waitForCoordinator(t, conn, groupID)

join := func() (joinGroup joinGroupResponseV2) {
join := func() (joinGroup joinGroupResponseV1) {
var err error
for attempt := 0; attempt < 10; attempt++ {
joinGroup, err = conn.joinGroup(joinGroupRequestV2{
joinGroup, err = conn.joinGroup(joinGroupRequestV1{
GroupID: groupID,
SessionTimeout: int32(time.Minute / time.Millisecond),
RebalanceTimeout: int32(time.Second / time.Millisecond),
ProtocolType: "roundrobin",
GroupProtocols: []joinGroupRequestGroupProtocolV2{
GroupProtocols: []joinGroupRequestGroupProtocolV1{
{
ProtocolName: "roundrobin",
ProtocolMetadata: []byte("blah"),
Expand All @@ -548,11 +548,11 @@ func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32,
joinGroup := join()

// sync the group
_, err := conn.syncGroups(syncGroupRequestV1{
_, err := conn.syncGroups(syncGroupRequestV0{
GroupID: groupID,
GenerationID: joinGroup.GenerationID,
MemberID: joinGroup.MemberID,
GroupAssignments: []syncGroupRequestGroupAssignmentV1{
GroupAssignments: []syncGroupRequestGroupAssignmentV0{
{
MemberID: joinGroup.MemberID,
MemberAssignments: []byte("blah"),
Expand All @@ -566,7 +566,7 @@ func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32,
generationID = joinGroup.GenerationID
memberID = joinGroup.MemberID
stop = func() {
conn.leaveGroup(leaveGroupRequestV1{
conn.leaveGroup(leaveGroupRequestV0{
GroupID: groupID,
MemberID: joinGroup.MemberID,
})
Expand Down Expand Up @@ -602,7 +602,7 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) {
if attempt != 0 {
time.Sleep(time.Millisecond * 50)
}
response, err := conn.findCoordinator(findCoordinatorRequestV1{CoordinatorKey: groupID})
response, err := conn.findCoordinator(findCoordinatorRequestV0{CoordinatorKey: groupID})
if err != nil {
switch err {
case GroupCoordinatorNotAvailable:
Expand All @@ -626,7 +626,7 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) {
}

func testConnJoinGroupInvalidGroupID(t *testing.T, conn *Conn) {
_, err := conn.joinGroup(joinGroupRequestV2{})
_, err := conn.joinGroup(joinGroupRequestV1{})
if err != InvalidGroupId && err != NotCoordinatorForGroup {
t.Fatalf("expected %v or %v; got %v", InvalidGroupId, NotCoordinatorForGroup, err)
}
Expand All @@ -636,7 +636,7 @@ func testConnJoinGroupInvalidSessionTimeout(t *testing.T, conn *Conn) {
groupID := makeGroupID()
waitForCoordinator(t, conn, groupID)

_, err := conn.joinGroup(joinGroupRequestV2{
_, err := conn.joinGroup(joinGroupRequestV1{
GroupID: groupID,
})
if err != InvalidSessionTimeout && err != NotCoordinatorForGroup {
Expand All @@ -648,7 +648,7 @@ func testConnJoinGroupInvalidRefreshTimeout(t *testing.T, conn *Conn) {
groupID := makeGroupID()
waitForCoordinator(t, conn, groupID)

_, err := conn.joinGroup(joinGroupRequestV2{
_, err := conn.joinGroup(joinGroupRequestV1{
GroupID: groupID,
SessionTimeout: int32(3 * time.Second / time.Millisecond),
})
Expand All @@ -661,7 +661,7 @@ func testConnHeartbeatErr(t *testing.T, conn *Conn) {
groupID := makeGroupID()
createGroup(t, conn, groupID)

_, err := conn.syncGroups(syncGroupRequestV1{
_, err := conn.syncGroups(syncGroupRequestV0{
GroupID: groupID,
})
if err != UnknownMemberId && err != NotCoordinatorForGroup {
Expand All @@ -673,7 +673,7 @@ func testConnLeaveGroupErr(t *testing.T, conn *Conn) {
groupID := makeGroupID()
waitForCoordinator(t, conn, groupID)

_, err := conn.leaveGroup(leaveGroupRequestV1{
_, err := conn.leaveGroup(leaveGroupRequestV0{
GroupID: groupID,
})
if err != UnknownMemberId && err != NotCoordinatorForGroup {
Expand All @@ -685,7 +685,7 @@ func testConnSyncGroupErr(t *testing.T, conn *Conn) {
groupID := makeGroupID()
waitForCoordinator(t, conn, groupID)

_, err := conn.syncGroups(syncGroupRequestV1{
_, err := conn.syncGroups(syncGroupRequestV0{
GroupID: groupID,
})
if err != UnknownMemberId && err != NotCoordinatorForGroup {
Expand Down Expand Up @@ -735,9 +735,9 @@ func testConnFetchAndCommitOffsets(t *testing.T, conn *Conn) {
generationID, memberID, stop := createGroup(t, conn, groupID)
defer stop()

request := offsetFetchRequestV3{
request := offsetFetchRequestV1{
GroupID: groupID,
Topics: []offsetFetchRequestV3Topic{
Topics: []offsetFetchRequestV1Topic{
{
Topic: conn.topic,
Partitions: []int32{0},
Expand All @@ -762,15 +762,15 @@ func testConnFetchAndCommitOffsets(t *testing.T, conn *Conn) {
}

committedOffset := int64(N - 1)
_, err = conn.offsetCommit(offsetCommitRequestV3{
_, err = conn.offsetCommit(offsetCommitRequestV2{
GroupID: groupID,
GenerationID: generationID,
MemberID: memberID,
RetentionTime: int64(time.Hour / time.Millisecond),
Topics: []offsetCommitRequestV3Topic{
Topics: []offsetCommitRequestV2Topic{
{
Topic: conn.topic,
Partitions: []offsetCommitRequestV3Partition{
Partitions: []offsetCommitRequestV2Partition{
{
Partition: 0,
Offset: committedOffset,
Expand Down
Loading

0 comments on commit 07715b4

Please sign in to comment.