From 07715b4648c8009f51336b05a7b09b814e503d7b Mon Sep 17 00:00:00 2001 From: Thomas Holmes Date: Fri, 19 Oct 2018 16:39:53 -0700 Subject: [PATCH] Downgrade various api versions to be compatible with kafka 0.10.1 (#81) --- .circleci/config.yml | 3 +- conn.go | 73 +++++++++++------------ conn_test.go | 48 +++++++-------- createtopics.go | 129 +++++++++++++++++----------------------- createtopics_test.go | 14 ++--- docker-compose.yml | 3 +- findcoordinator.go | 54 +++++------------ findcoordinator_test.go | 12 ++-- heartbeat.go | 28 +++------ heartbeat_test.go | 9 ++- joingroup.go | 48 ++++++--------- joingroup_test.go | 17 +++--- leavegroup.go | 28 +++------ leavegroup_test.go | 9 ++- offsetcommit.go | 68 +++++++++------------ offsetcommit_test.go | 11 ++-- offsetfetch.go | 72 ++++++++-------------- offsetfetch_test.go | 12 ++-- reader.go | 60 +++++++++++-------- reader_test.go | 28 ++++----- syncgroup.go | 36 ++++------- syncgroup_test.go | 14 ++--- 22 files changed, 330 insertions(+), 446 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 14141db88..4bd6f725e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,11 +6,12 @@ jobs: - image: circleci/golang - image: wurstmeister/zookeeper ports: ['2181:2181'] - - image: wurstmeister/kafka + - image: wurstmeister/kafka:0.11.0.1 ports: ['9092:9092'] environment: KAFKA_VERSION: '0.11.0.1' KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1' + KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_ADVERTISED_HOST_NAME: 'localhost' KAFKA_ADVERTISED_PORT: '9092' KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181' diff --git a/conn.go b/conn.go index ca8f39abb..a9a004987 100644 --- a/conn.go +++ b/conn.go @@ -176,12 +176,12 @@ func (c *Conn) describeGroups(request describeGroupsRequestV1) (describeGroupsRe // findCoordinator finds the coordinator for the specified group or transaction // // See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator -func (c *Conn) findCoordinator(request findCoordinatorRequestV1) (findCoordinatorResponseV1, error) { - var response findCoordinatorResponseV1 +func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinatorResponseV0, error) { + var response findCoordinatorResponseV0 err := c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(groupCoordinatorRequest, v1, id, request) + return c.writeRequest(groupCoordinatorRequest, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -190,10 +190,10 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV1) (findCoordinato }, ) if err != nil { - return findCoordinatorResponseV1{}, err + return findCoordinatorResponseV0{}, err } if response.ErrorCode != 0 { - return findCoordinatorResponseV1{}, Error(response.ErrorCode) + return findCoordinatorResponseV0{}, Error(response.ErrorCode) } return response, nil @@ -202,12 +202,12 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV1) (findCoordinato // heartbeat sends a heartbeat message required by consumer groups // // See http://kafka.apache.org/protocol.html#The_Messages_Heartbeat -func (c *Conn) heartbeat(request heartbeatRequestV1) (heartbeatResponseV1, error) { - var response heartbeatResponseV1 +func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error) { + var response heartbeatResponseV0 err := c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(heartbeatRequest, v1, id, request) + return c.writeRequest(heartbeatRequest, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -216,10 +216,10 @@ func (c *Conn) heartbeat(request heartbeatRequestV1) (heartbeatResponseV1, error }, ) if err != nil { - return heartbeatResponseV1{}, err + return heartbeatResponseV0{}, err } if response.ErrorCode != 0 { - return heartbeatResponseV1{}, Error(response.ErrorCode) + return heartbeatResponseV0{}, Error(response.ErrorCode) } return response, nil @@ -228,12 +228,12 @@ func (c *Conn) heartbeat(request heartbeatRequestV1) (heartbeatResponseV1, error // joinGroup attempts to join a consumer group // // See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup -func (c *Conn) joinGroup(request joinGroupRequestV2) (joinGroupResponseV2, error) { - var response joinGroupResponseV2 +func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error) { + var response joinGroupResponseV1 err := c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(joinGroupRequest, v2, id, request) + return c.writeRequest(joinGroupRequest, v1, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -242,10 +242,10 @@ func (c *Conn) joinGroup(request joinGroupRequestV2) (joinGroupResponseV2, error }, ) if err != nil { - return joinGroupResponseV2{}, err + return joinGroupResponseV1{}, err } if response.ErrorCode != 0 { - return joinGroupResponseV2{}, Error(response.ErrorCode) + return joinGroupResponseV1{}, Error(response.ErrorCode) } return response, nil @@ -254,12 +254,12 @@ func (c *Conn) joinGroup(request joinGroupRequestV2) (joinGroupResponseV2, error // leaveGroup leaves the consumer from the consumer group // // See http://kafka.apache.org/protocol.html#The_Messages_LeaveGroup -func (c *Conn) leaveGroup(request leaveGroupRequestV1) (leaveGroupResponseV1, error) { - var response leaveGroupResponseV1 +func (c *Conn) leaveGroup(request leaveGroupRequestV0) (leaveGroupResponseV0, error) { + var response leaveGroupResponseV0 err := c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(leaveGroupRequest, v1, id, request) + return c.writeRequest(leaveGroupRequest, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -268,10 +268,10 @@ func (c *Conn) leaveGroup(request leaveGroupRequestV1) (leaveGroupResponseV1, er }, ) if err != nil { - return leaveGroupResponseV1{}, err + return leaveGroupResponseV0{}, err } if response.ErrorCode != 0 { - return leaveGroupResponseV1{}, Error(response.ErrorCode) + return leaveGroupResponseV0{}, Error(response.ErrorCode) } return response, nil @@ -306,12 +306,12 @@ func (c *Conn) listGroups(request listGroupsRequestV1) (listGroupsResponseV1, er // offsetCommit commits the specified topic partition offsets // // See http://kafka.apache.org/protocol.html#The_Messages_OffsetCommit -func (c *Conn) offsetCommit(request offsetCommitRequestV3) (offsetCommitResponseV3, error) { - var response offsetCommitResponseV3 +func (c *Conn) offsetCommit(request offsetCommitRequestV2) (offsetCommitResponseV2, error) { + var response offsetCommitResponseV2 err := c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(offsetCommitRequest, v3, id, request) + return c.writeRequest(offsetCommitRequest, v2, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -320,12 +320,12 @@ func (c *Conn) offsetCommit(request offsetCommitRequestV3) (offsetCommitResponse }, ) if err != nil { - return offsetCommitResponseV3{}, err + return offsetCommitResponseV2{}, err } for _, r := range response.Responses { for _, pr := range r.PartitionResponses { if pr.ErrorCode != 0 { - return offsetCommitResponseV3{}, Error(pr.ErrorCode) + return offsetCommitResponseV2{}, Error(pr.ErrorCode) } } } @@ -336,12 +336,12 @@ func (c *Conn) offsetCommit(request offsetCommitRequestV3) (offsetCommitResponse // offsetFetch fetches the offsets for the specified topic partitions // // See http://kafka.apache.org/protocol.html#The_Messages_OffsetFetch -func (c *Conn) offsetFetch(request offsetFetchRequestV3) (offsetFetchResponseV3, error) { - var response offsetFetchResponseV3 +func (c *Conn) offsetFetch(request offsetFetchRequestV1) (offsetFetchResponseV1, error) { + var response offsetFetchResponseV1 err := c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(offsetFetchRequest, v3, id, request) + return c.writeRequest(offsetFetchRequest, v1, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -350,15 +350,12 @@ func (c *Conn) offsetFetch(request offsetFetchRequestV3) (offsetFetchResponseV3, }, ) if err != nil { - return offsetFetchResponseV3{}, err - } - if response.ErrorCode != 0 { - return offsetFetchResponseV3{}, Error(response.ErrorCode) + return offsetFetchResponseV1{}, err } for _, r := range response.Responses { for _, pr := range r.PartitionResponses { if pr.ErrorCode != 0 { - return offsetFetchResponseV3{}, Error(pr.ErrorCode) + return offsetFetchResponseV1{}, Error(pr.ErrorCode) } } } @@ -369,12 +366,12 @@ func (c *Conn) offsetFetch(request offsetFetchRequestV3) (offsetFetchResponseV3, // syncGroups completes the handshake to join a consumer group // // See http://kafka.apache.org/protocol.html#The_Messages_SyncGroup -func (c *Conn) syncGroups(request syncGroupRequestV1) (syncGroupResponseV1, error) { - var response syncGroupResponseV1 +func (c *Conn) syncGroups(request syncGroupRequestV0) (syncGroupResponseV0, error) { + var response syncGroupResponseV0 err := c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(syncGroupRequest, v1, id, request) + return c.writeRequest(syncGroupRequest, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -383,10 +380,10 @@ func (c *Conn) syncGroups(request syncGroupRequestV1) (syncGroupResponseV1, erro }, ) if err != nil { - return syncGroupResponseV1{}, err + return syncGroupResponseV0{}, err } if response.ErrorCode != 0 { - return syncGroupResponseV1{}, Error(response.ErrorCode) + return syncGroupResponseV0{}, Error(response.ErrorCode) } return response, nil diff --git a/conn_test.go b/conn_test.go index 45aeb50bf..9f17500b4 100644 --- a/conn_test.go +++ b/conn_test.go @@ -38,7 +38,7 @@ func (c *connPipe) Close() error { func (c *connPipe) Read(b []byte) (int, error) { // See comments in Write. time.Sleep(time.Millisecond) - if t := c.rconn.readDeadline(); !t.IsZero() && t.Sub(time.Now()) <= (10*time.Millisecond) { + if t := c.rconn.readDeadline(); !t.IsZero() { return 0, &timeout{} } n, err := c.rconn.Read(b) @@ -58,10 +58,10 @@ func (c *connPipe) Write(b []byte) (int, error) { // goroutines a chance to start and set the deadline. time.Sleep(time.Millisecond) - // Some tests set very short deadlines which end up aborting requests and - // closing the connection. To prevent this from happening we check how far - // the deadline is and if it's too close we timeout. - if t := c.wconn.writeDeadline(); !t.IsZero() && t.Sub(time.Now()) <= (10*time.Millisecond) { + // The nettest code only sets deadlines when it expects the write to time + // out. The broker connection is alive and able to accept data, so we need + // to simulate the timeout in order to get the tests to pass. + if t := c.wconn.writeDeadline(); !t.IsZero() { return 0, &timeout{} } @@ -498,7 +498,7 @@ func waitForCoordinator(t *testing.T, conn *Conn, groupID string) { // appear to happen if the kafka been running for a while. const maxAttempts = 20 for attempt := 1; attempt <= maxAttempts; attempt++ { - _, err := conn.findCoordinator(findCoordinatorRequestV1{ + _, err := conn.findCoordinator(findCoordinatorRequestV0{ CoordinatorKey: groupID, }) switch err { @@ -517,15 +517,15 @@ func waitForCoordinator(t *testing.T, conn *Conn, groupID string) { func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32, memberID string, stop func()) { waitForCoordinator(t, conn, groupID) - join := func() (joinGroup joinGroupResponseV2) { + join := func() (joinGroup joinGroupResponseV1) { var err error for attempt := 0; attempt < 10; attempt++ { - joinGroup, err = conn.joinGroup(joinGroupRequestV2{ + joinGroup, err = conn.joinGroup(joinGroupRequestV1{ GroupID: groupID, SessionTimeout: int32(time.Minute / time.Millisecond), RebalanceTimeout: int32(time.Second / time.Millisecond), ProtocolType: "roundrobin", - GroupProtocols: []joinGroupRequestGroupProtocolV2{ + GroupProtocols: []joinGroupRequestGroupProtocolV1{ { ProtocolName: "roundrobin", ProtocolMetadata: []byte("blah"), @@ -548,11 +548,11 @@ func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32, joinGroup := join() // sync the group - _, err := conn.syncGroups(syncGroupRequestV1{ + _, err := conn.syncGroups(syncGroupRequestV0{ GroupID: groupID, GenerationID: joinGroup.GenerationID, MemberID: joinGroup.MemberID, - GroupAssignments: []syncGroupRequestGroupAssignmentV1{ + GroupAssignments: []syncGroupRequestGroupAssignmentV0{ { MemberID: joinGroup.MemberID, MemberAssignments: []byte("blah"), @@ -566,7 +566,7 @@ func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32, generationID = joinGroup.GenerationID memberID = joinGroup.MemberID stop = func() { - conn.leaveGroup(leaveGroupRequestV1{ + conn.leaveGroup(leaveGroupRequestV0{ GroupID: groupID, MemberID: joinGroup.MemberID, }) @@ -602,7 +602,7 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) { if attempt != 0 { time.Sleep(time.Millisecond * 50) } - response, err := conn.findCoordinator(findCoordinatorRequestV1{CoordinatorKey: groupID}) + response, err := conn.findCoordinator(findCoordinatorRequestV0{CoordinatorKey: groupID}) if err != nil { switch err { case GroupCoordinatorNotAvailable: @@ -626,7 +626,7 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) { } func testConnJoinGroupInvalidGroupID(t *testing.T, conn *Conn) { - _, err := conn.joinGroup(joinGroupRequestV2{}) + _, err := conn.joinGroup(joinGroupRequestV1{}) if err != InvalidGroupId && err != NotCoordinatorForGroup { t.Fatalf("expected %v or %v; got %v", InvalidGroupId, NotCoordinatorForGroup, err) } @@ -636,7 +636,7 @@ func testConnJoinGroupInvalidSessionTimeout(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.joinGroup(joinGroupRequestV2{ + _, err := conn.joinGroup(joinGroupRequestV1{ GroupID: groupID, }) if err != InvalidSessionTimeout && err != NotCoordinatorForGroup { @@ -648,7 +648,7 @@ func testConnJoinGroupInvalidRefreshTimeout(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.joinGroup(joinGroupRequestV2{ + _, err := conn.joinGroup(joinGroupRequestV1{ GroupID: groupID, SessionTimeout: int32(3 * time.Second / time.Millisecond), }) @@ -661,7 +661,7 @@ func testConnHeartbeatErr(t *testing.T, conn *Conn) { groupID := makeGroupID() createGroup(t, conn, groupID) - _, err := conn.syncGroups(syncGroupRequestV1{ + _, err := conn.syncGroups(syncGroupRequestV0{ GroupID: groupID, }) if err != UnknownMemberId && err != NotCoordinatorForGroup { @@ -673,7 +673,7 @@ func testConnLeaveGroupErr(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.leaveGroup(leaveGroupRequestV1{ + _, err := conn.leaveGroup(leaveGroupRequestV0{ GroupID: groupID, }) if err != UnknownMemberId && err != NotCoordinatorForGroup { @@ -685,7 +685,7 @@ func testConnSyncGroupErr(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.syncGroups(syncGroupRequestV1{ + _, err := conn.syncGroups(syncGroupRequestV0{ GroupID: groupID, }) if err != UnknownMemberId && err != NotCoordinatorForGroup { @@ -735,9 +735,9 @@ func testConnFetchAndCommitOffsets(t *testing.T, conn *Conn) { generationID, memberID, stop := createGroup(t, conn, groupID) defer stop() - request := offsetFetchRequestV3{ + request := offsetFetchRequestV1{ GroupID: groupID, - Topics: []offsetFetchRequestV3Topic{ + Topics: []offsetFetchRequestV1Topic{ { Topic: conn.topic, Partitions: []int32{0}, @@ -762,15 +762,15 @@ func testConnFetchAndCommitOffsets(t *testing.T, conn *Conn) { } committedOffset := int64(N - 1) - _, err = conn.offsetCommit(offsetCommitRequestV3{ + _, err = conn.offsetCommit(offsetCommitRequestV2{ GroupID: groupID, GenerationID: generationID, MemberID: memberID, RetentionTime: int64(time.Hour / time.Millisecond), - Topics: []offsetCommitRequestV3Topic{ + Topics: []offsetCommitRequestV2Topic{ { Topic: conn.topic, - Partitions: []offsetCommitRequestV3Partition{ + Partitions: []offsetCommitRequestV2Partition{ { Partition: 0, Offset: committedOffset, diff --git a/createtopics.go b/createtopics.go index a3cccb03d..51bb21fb1 100644 --- a/createtopics.go +++ b/createtopics.go @@ -10,24 +10,24 @@ type ConfigEntry struct { ConfigValue string } -func (c ConfigEntry) toCreateTopicsRequestV2ConfigEntry() createTopicsRequestV2ConfigEntry { - return createTopicsRequestV2ConfigEntry{ +func (c ConfigEntry) toCreateTopicsRequestV0ConfigEntry() createTopicsRequestV0ConfigEntry { + return createTopicsRequestV0ConfigEntry{ ConfigName: c.ConfigName, ConfigValue: c.ConfigValue, } } -type createTopicsRequestV2ConfigEntry struct { +type createTopicsRequestV0ConfigEntry struct { ConfigName string ConfigValue string } -func (t createTopicsRequestV2ConfigEntry) size() int32 { +func (t createTopicsRequestV0ConfigEntry) size() int32 { return sizeofString(t.ConfigName) + sizeofString(t.ConfigValue) } -func (t createTopicsRequestV2ConfigEntry) writeTo(w *bufio.Writer) { +func (t createTopicsRequestV0ConfigEntry) writeTo(w *bufio.Writer) { writeString(w, t.ConfigName) writeString(w, t.ConfigValue) } @@ -37,24 +37,24 @@ type ReplicaAssignment struct { Replicas int } -func (a ReplicaAssignment) toCreateTopicsRequestV2ReplicaAssignment() createTopicsRequestV2ReplicaAssignment { - return createTopicsRequestV2ReplicaAssignment{ +func (a ReplicaAssignment) toCreateTopicsRequestV0ReplicaAssignment() createTopicsRequestV0ReplicaAssignment { + return createTopicsRequestV0ReplicaAssignment{ Partition: int32(a.Partition), Replicas: int32(a.Replicas), } } -type createTopicsRequestV2ReplicaAssignment struct { +type createTopicsRequestV0ReplicaAssignment struct { Partition int32 Replicas int32 } -func (t createTopicsRequestV2ReplicaAssignment) size() int32 { +func (t createTopicsRequestV0ReplicaAssignment) size() int32 { return sizeofInt32(t.Partition) + sizeofInt32(t.Replicas) } -func (t createTopicsRequestV2ReplicaAssignment) writeTo(w *bufio.Writer) { +func (t createTopicsRequestV0ReplicaAssignment) writeTo(w *bufio.Writer) { writeInt32(w, t.Partition) writeInt32(w, t.Replicas) } @@ -77,30 +77,30 @@ type TopicConfig struct { ConfigEntries []ConfigEntry } -func (t TopicConfig) toCreateTopicsRequestV2Topic() createTopicsRequestV2Topic { - var requestV2ReplicaAssignments []createTopicsRequestV2ReplicaAssignment +func (t TopicConfig) toCreateTopicsRequestV0Topic() createTopicsRequestV0Topic { + var requestV0ReplicaAssignments []createTopicsRequestV0ReplicaAssignment for _, a := range t.ReplicaAssignments { - requestV2ReplicaAssignments = append( - requestV2ReplicaAssignments, - a.toCreateTopicsRequestV2ReplicaAssignment()) + requestV0ReplicaAssignments = append( + requestV0ReplicaAssignments, + a.toCreateTopicsRequestV0ReplicaAssignment()) } - var requestV2ConfigEntries []createTopicsRequestV2ConfigEntry + var requestV0ConfigEntries []createTopicsRequestV0ConfigEntry for _, c := range t.ConfigEntries { - requestV2ConfigEntries = append( - requestV2ConfigEntries, - c.toCreateTopicsRequestV2ConfigEntry()) + requestV0ConfigEntries = append( + requestV0ConfigEntries, + c.toCreateTopicsRequestV0ConfigEntry()) } - return createTopicsRequestV2Topic{ + return createTopicsRequestV0Topic{ Topic: t.Topic, NumPartitions: int32(t.NumPartitions), ReplicationFactor: int16(t.ReplicationFactor), - ReplicaAssignments: requestV2ReplicaAssignments, - ConfigEntries: requestV2ConfigEntries, + ReplicaAssignments: requestV0ReplicaAssignments, + ConfigEntries: requestV0ConfigEntries, } } -type createTopicsRequestV2Topic struct { +type createTopicsRequestV0Topic struct { // Topic name Topic string @@ -112,13 +112,13 @@ type createTopicsRequestV2Topic struct { // ReplicaAssignments among kafka brokers for this topic partitions. If this // is set num_partitions and replication_factor must be unset. - ReplicaAssignments []createTopicsRequestV2ReplicaAssignment + ReplicaAssignments []createTopicsRequestV0ReplicaAssignment // ConfigEntries holds topic level configuration for topic to be set. - ConfigEntries []createTopicsRequestV2ConfigEntry + ConfigEntries []createTopicsRequestV0ConfigEntry } -func (t createTopicsRequestV2Topic) size() int32 { +func (t createTopicsRequestV0Topic) size() int32 { return sizeofString(t.Topic) + sizeofInt32(t.NumPartitions) + sizeofInt16(t.ReplicationFactor) + @@ -126,7 +126,7 @@ func (t createTopicsRequestV2Topic) size() int32 { sizeofArray(len(t.ConfigEntries), func(i int) int32 { return t.ConfigEntries[i].size() }) } -func (t createTopicsRequestV2Topic) writeTo(w *bufio.Writer) { +func (t createTopicsRequestV0Topic) writeTo(w *bufio.Writer) { writeString(w, t.Topic) writeInt32(w, t.NumPartitions) writeInt16(w, t.ReplicationFactor) @@ -135,106 +135,85 @@ func (t createTopicsRequestV2Topic) writeTo(w *bufio.Writer) { } // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics -type createTopicsRequestV2 struct { +type createTopicsRequestV0 struct { // Topics contains n array of single topic creation requests. Can not // have multiple entries for the same topic. - Topics []createTopicsRequestV2Topic + Topics []createTopicsRequestV0Topic // Timeout ms to wait for a topic to be completely created on the // controller node. Values <= 0 will trigger topic creation and return immediately Timeout int32 - - // ValidateOnly if true, the request will be validated, but the topic won - // 't be created. - ValidateOnly bool } -func (t createTopicsRequestV2) size() int32 { +func (t createTopicsRequestV0) size() int32 { return sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) + - sizeofInt32(t.Timeout) + - sizeofBool(t.ValidateOnly) + sizeofInt32(t.Timeout) } -func (t createTopicsRequestV2) writeTo(w *bufio.Writer) { +func (t createTopicsRequestV0) writeTo(w *bufio.Writer) { writeArray(w, len(t.Topics), func(i int) { t.Topics[i].writeTo(w) }) writeInt32(w, t.Timeout) - writeBool(w, t.ValidateOnly) } -type createTopicsResponseV2TopicError struct { +type createTopicsResponseV0TopicError struct { // Topic name Topic string // ErrorCode holds response error code ErrorCode int16 - - // ErrorMessage holds the response error message - ErrorMessage string } -func (t createTopicsResponseV2TopicError) size() int32 { +func (t createTopicsResponseV0TopicError) size() int32 { return sizeofString(t.Topic) + - sizeofInt16(t.ErrorCode) + - sizeofString(t.ErrorMessage) + sizeofInt16(t.ErrorCode) } -func (t createTopicsResponseV2TopicError) writeTo(w *bufio.Writer) { +func (t createTopicsResponseV0TopicError) writeTo(w *bufio.Writer) { writeString(w, t.Topic) writeInt16(w, t.ErrorCode) - writeString(w, t.ErrorMessage) } -func (t *createTopicsResponseV2TopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *createTopicsResponseV0TopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readString(r, size, &t.Topic); err != nil { return } if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { return } - if remain, err = readString(r, remain, &t.ErrorMessage); err != nil { - return - } return } // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics -type createTopicsResponseV2 struct { - ThrottleTimeMS int32 - TopicErrors []createTopicsResponseV2TopicError +type createTopicsResponseV0 struct { + TopicErrors []createTopicsResponseV0TopicError } -func (t createTopicsResponseV2) size() int32 { - return sizeofInt32(t.ThrottleTimeMS) + - sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() }) +func (t createTopicsResponseV0) size() int32 { + return sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() }) } -func (t createTopicsResponseV2) writeTo(w *bufio.Writer) { - writeInt32(w, t.ThrottleTimeMS) +func (t createTopicsResponseV0) writeTo(w *bufio.Writer) { writeArray(w, len(t.TopicErrors), func(i int) { t.TopicErrors[i].writeTo(w) }) } -func (t *createTopicsResponseV2) readFrom(r *bufio.Reader, size int) (remain int, err error) { - if remain, err = readInt32(r, size, &t.ThrottleTimeMS); err != nil { - return - } - +func (t *createTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) { fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { - var topic createTopicsResponseV2TopicError + var topic createTopicsResponseV0TopicError if fnRemain, fnErr = (&topic).readFrom(r, size); err != nil { return } t.TopicErrors = append(t.TopicErrors, topic) return } - if remain, err = readArrayWith(r, remain, fn); err != nil { + if remain, err = readArrayWith(r, size, fn); err != nil { return } return } -func (c *Conn) createTopics(request createTopicsRequestV2) (createTopicsResponseV2, error) { - var response createTopicsResponseV2 +func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponseV0, error) { + var response createTopicsResponseV0 err := c.writeOperation( func(deadline time.Time, id int32) error { @@ -243,7 +222,7 @@ func (c *Conn) createTopics(request createTopicsRequestV2) (createTopicsResponse deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) } - return c.writeRequest(createTopicsRequest, v2, id, request) + return c.writeRequest(createTopicsRequest, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -267,15 +246,15 @@ func (c *Conn) createTopics(request createTopicsRequestV2) (createTopicsResponse // operational semantics. In other words, if CreateTopics is invoked with a // configuration for an existing topic, it will have no effect. func (c *Conn) CreateTopics(topics ...TopicConfig) error { - var requestV2Topics []createTopicsRequestV2Topic + var requestV0Topics []createTopicsRequestV0Topic for _, t := range topics { - requestV2Topics = append( - requestV2Topics, - t.toCreateTopicsRequestV2Topic()) + requestV0Topics = append( + requestV0Topics, + t.toCreateTopicsRequestV0Topic()) } - _, err := c.createTopics(createTopicsRequestV2{ - Topics: requestV2Topics, + _, err := c.createTopics(createTopicsRequestV0{ + Topics: requestV0Topics, }) switch err { diff --git a/createtopics_test.go b/createtopics_test.go index 2214e546c..7b71b31c4 100644 --- a/createtopics_test.go +++ b/createtopics_test.go @@ -7,14 +7,12 @@ import ( "testing" ) -func TestCreateTopicsResponseV2(t *testing.T) { - item := createTopicsResponseV2{ - ThrottleTimeMS: 1, - TopicErrors: []createTopicsResponseV2TopicError{ +func TestCreateTopicsResponseV0(t *testing.T) { + item := createTopicsResponseV0{ + TopicErrors: []createTopicsResponseV0TopicError{ { - Topic: "topic", - ErrorCode: 2, - ErrorMessage: "topic error", + Topic: "topic", + ErrorCode: 2, }, }, } @@ -24,7 +22,7 @@ func TestCreateTopicsResponseV2(t *testing.T) { item.writeTo(w) w.Flush() - var found createTopicsResponseV2 + var found createTopicsResponseV0 remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) if err != nil { t.Error(err) diff --git a/docker-compose.yml b/docker-compose.yml index 24fd0de56..7714b7dcb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,12 @@ kafka: - image: wurstmeister/kafka:0.10.2.1 + image: wurstmeister/kafka:0.11.0.1 links: - zookeeper ports: - "9092:9092" environment: KAFKA_CREATE_TOPICS: "test-writer-0:3:1,test-writer-1:3:1" + KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_ADVERTISED_HOST_NAME: "localhost" KAFKA_ADVERTISED_PORT: "9092" KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" diff --git a/findcoordinator.go b/findcoordinator.go index 750ba7c87..d0b47728e 100644 --- a/findcoordinator.go +++ b/findcoordinator.go @@ -4,28 +4,24 @@ import ( "bufio" ) -// FindCoordinatorRequestV1 requests the coordinator for the specified group or transaction +// FindCoordinatorRequestV0 requests the coordinator for the specified group or transaction // // See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator -type findCoordinatorRequestV1 struct { +type findCoordinatorRequestV0 struct { // CoordinatorKey holds id to use for finding the coordinator (for groups, this is // the groupId, for transactional producers, this is the transactional id) CoordinatorKey string - - // CoordinatorType indicates type of coordinator to find (0 = group, 1 = transaction) - CoordinatorType int8 } -func (t findCoordinatorRequestV1) size() int32 { - return sizeofString(t.CoordinatorKey) + sizeof(t.CoordinatorType) +func (t findCoordinatorRequestV0) size() int32 { + return sizeofString(t.CoordinatorKey) } -func (t findCoordinatorRequestV1) writeTo(w *bufio.Writer) { +func (t findCoordinatorRequestV0) writeTo(w *bufio.Writer) { writeString(w, t.CoordinatorKey) - writeInt8(w, t.CoordinatorType) } -type findCoordinatorResponseCoordinatorV1 struct { +type findCoordinatorResponseCoordinatorV0 struct { // NodeID holds the broker id. NodeID int32 @@ -36,19 +32,19 @@ type findCoordinatorResponseCoordinatorV1 struct { Port int32 } -func (t findCoordinatorResponseCoordinatorV1) size() int32 { +func (t findCoordinatorResponseCoordinatorV0) size() int32 { return sizeofInt32(t.NodeID) + sizeofString(t.Host) + sizeofInt32(t.Port) } -func (t findCoordinatorResponseCoordinatorV1) writeTo(w *bufio.Writer) { +func (t findCoordinatorResponseCoordinatorV0) writeTo(w *bufio.Writer) { writeInt32(w, t.NodeID) writeString(w, t.Host) writeInt32(w, t.Port) } -func (t *findCoordinatorResponseCoordinatorV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *findCoordinatorResponseCoordinatorV0) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readInt32(r, size, &t.NodeID); err != nil { return } @@ -61,44 +57,26 @@ func (t *findCoordinatorResponseCoordinatorV1) readFrom(r *bufio.Reader, size in return } -type findCoordinatorResponseV1 struct { - // ThrottleTimeMS holds the duration in milliseconds for which the request - // was throttled due to quota violation (Zero if the request did not violate - // any quota) - ThrottleTimeMS int32 - +type findCoordinatorResponseV0 struct { // ErrorCode holds response error code ErrorCode int16 - // ErrorMessage holds response error message - ErrorMessage string - // Coordinator holds host and port information for the coordinator - Coordinator findCoordinatorResponseCoordinatorV1 + Coordinator findCoordinatorResponseCoordinatorV0 } -func (t findCoordinatorResponseV1) size() int32 { - return sizeofInt32(t.ThrottleTimeMS) + - sizeofInt16(t.ErrorCode) + - sizeofString(t.ErrorMessage) + +func (t findCoordinatorResponseV0) size() int32 { + return sizeofInt16(t.ErrorCode) + t.Coordinator.size() } -func (t findCoordinatorResponseV1) writeTo(w *bufio.Writer) { - writeInt32(w, t.ThrottleTimeMS) +func (t findCoordinatorResponseV0) writeTo(w *bufio.Writer) { writeInt16(w, t.ErrorCode) - writeString(w, t.ErrorMessage) t.Coordinator.writeTo(w) } -func (t *findCoordinatorResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { - if remain, err = readInt32(r, size, &t.ThrottleTimeMS); err != nil { - return - } - if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { - return - } - if remain, err = readString(r, remain, &t.ErrorMessage); err != nil { +func (t *findCoordinatorResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) { + if remain, err = readInt16(r, size, &t.ErrorCode); err != nil { return } if remain, err = (&t.Coordinator).readFrom(r, remain); err != nil { diff --git a/findcoordinator_test.go b/findcoordinator_test.go index d44fe3811..18c5592ff 100644 --- a/findcoordinator_test.go +++ b/findcoordinator_test.go @@ -7,12 +7,10 @@ import ( "testing" ) -func TestFindCoordinatorResponseV1(t *testing.T) { - item := findCoordinatorResponseV1{ - ThrottleTimeMS: 1, - ErrorCode: 2, - ErrorMessage: "a", - Coordinator: findCoordinatorResponseCoordinatorV1{ +func TestFindCoordinatorResponseV0(t *testing.T) { + item := findCoordinatorResponseV0{ + ErrorCode: 2, + Coordinator: findCoordinatorResponseCoordinatorV0{ NodeID: 3, Host: "b", Port: 4, @@ -24,7 +22,7 @@ func TestFindCoordinatorResponseV1(t *testing.T) { item.writeTo(w) w.Flush() - var found findCoordinatorResponseV1 + var found findCoordinatorResponseV0 remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) if err != nil { t.Error(err) diff --git a/heartbeat.go b/heartbeat.go index 5a683adcf..fd95049a1 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -2,7 +2,7 @@ package kafka import "bufio" -type heartbeatRequestV1 struct { +type heartbeatRequestV0 struct { // GroupID holds the unique group identifier GroupID string @@ -13,43 +13,33 @@ type heartbeatRequestV1 struct { MemberID string } -func (t heartbeatRequestV1) size() int32 { +func (t heartbeatRequestV0) size() int32 { return sizeofString(t.GroupID) + sizeofInt32(t.GenerationID) + sizeofString(t.MemberID) } -func (t heartbeatRequestV1) writeTo(w *bufio.Writer) { +func (t heartbeatRequestV0) writeTo(w *bufio.Writer) { writeString(w, t.GroupID) writeInt32(w, t.GenerationID) writeString(w, t.MemberID) } -type heartbeatResponseV1 struct { - // ThrottleTimeMS holds the duration in milliseconds for which the request - // was throttled due to quota violation (Zero if the request did not violate - // any quota) - ThrottleTimeMS int32 - +type heartbeatResponseV0 struct { // ErrorCode holds response error code ErrorCode int16 } -func (t heartbeatResponseV1) size() int32 { - return sizeofInt32(t.ThrottleTimeMS) + - sizeofInt16(t.ErrorCode) +func (t heartbeatResponseV0) size() int32 { + return sizeofInt16(t.ErrorCode) } -func (t heartbeatResponseV1) writeTo(w *bufio.Writer) { - writeInt32(w, t.ThrottleTimeMS) +func (t heartbeatResponseV0) writeTo(w *bufio.Writer) { writeInt16(w, t.ErrorCode) } -func (t *heartbeatResponseV1) readFrom(r *bufio.Reader, sz int) (remain int, err error) { - if remain, err = readInt32(r, sz, &t.ThrottleTimeMS); err != nil { - return - } - if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { +func (t *heartbeatResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) { + if remain, err = readInt16(r, sz, &t.ErrorCode); err != nil { return } return diff --git a/heartbeat_test.go b/heartbeat_test.go index d90eaed3f..89d536a5c 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -7,10 +7,9 @@ import ( "testing" ) -func TestHeartbeatRequestV1(t *testing.T) { - item := heartbeatResponseV1{ - ThrottleTimeMS: 1, - ErrorCode: 2, +func TestHeartbeatRequestV0(t *testing.T) { + item := heartbeatResponseV0{ + ErrorCode: 2, } buf := bytes.NewBuffer(nil) @@ -18,7 +17,7 @@ func TestHeartbeatRequestV1(t *testing.T) { item.writeTo(w) w.Flush() - var found heartbeatResponseV1 + var found heartbeatResponseV0 remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) if err != nil { t.Error(err) diff --git a/joingroup.go b/joingroup.go index 2e1e06006..81d3ca973 100644 --- a/joingroup.go +++ b/joingroup.go @@ -51,22 +51,22 @@ func (t *groupMetadata) readFrom(r *bufio.Reader, size int) (remain int, err err return } -type joinGroupRequestGroupProtocolV2 struct { +type joinGroupRequestGroupProtocolV1 struct { ProtocolName string ProtocolMetadata []byte } -func (t joinGroupRequestGroupProtocolV2) size() int32 { +func (t joinGroupRequestGroupProtocolV1) size() int32 { return sizeofString(t.ProtocolName) + sizeofBytes(t.ProtocolMetadata) } -func (t joinGroupRequestGroupProtocolV2) writeTo(w *bufio.Writer) { +func (t joinGroupRequestGroupProtocolV1) writeTo(w *bufio.Writer) { writeString(w, t.ProtocolName) writeBytes(w, t.ProtocolMetadata) } -type joinGroupRequestV2 struct { +type joinGroupRequestV1 struct { // GroupID holds the unique group identifier GroupID string @@ -86,10 +86,10 @@ type joinGroupRequestV2 struct { ProtocolType string // GroupProtocols holds the list of protocols that the member supports - GroupProtocols []joinGroupRequestGroupProtocolV2 + GroupProtocols []joinGroupRequestGroupProtocolV1 } -func (t joinGroupRequestV2) size() int32 { +func (t joinGroupRequestV1) size() int32 { return sizeofString(t.GroupID) + sizeofInt32(t.SessionTimeout) + sizeofInt32(t.RebalanceTimeout) + @@ -98,7 +98,7 @@ func (t joinGroupRequestV2) size() int32 { sizeofArray(len(t.GroupProtocols), func(i int) int32 { return t.GroupProtocols[i].size() }) } -func (t joinGroupRequestV2) writeTo(w *bufio.Writer) { +func (t joinGroupRequestV1) writeTo(w *bufio.Writer) { writeString(w, t.GroupID) writeInt32(w, t.SessionTimeout) writeInt32(w, t.RebalanceTimeout) @@ -107,23 +107,23 @@ func (t joinGroupRequestV2) writeTo(w *bufio.Writer) { writeArray(w, len(t.GroupProtocols), func(i int) { t.GroupProtocols[i].writeTo(w) }) } -type joinGroupResponseMemberV2 struct { +type joinGroupResponseMemberV1 struct { // MemberID assigned by the group coordinator MemberID string MemberMetadata []byte } -func (t joinGroupResponseMemberV2) size() int32 { +func (t joinGroupResponseMemberV1) size() int32 { return sizeofString(t.MemberID) + sizeofBytes(t.MemberMetadata) } -func (t joinGroupResponseMemberV2) writeTo(w *bufio.Writer) { +func (t joinGroupResponseMemberV1) writeTo(w *bufio.Writer) { writeString(w, t.MemberID) writeBytes(w, t.MemberMetadata) } -func (t *joinGroupResponseMemberV2) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readString(r, size, &t.MemberID); err != nil { return } @@ -133,12 +133,7 @@ func (t *joinGroupResponseMemberV2) readFrom(r *bufio.Reader, size int) (remain return } -type joinGroupResponseV2 struct { - // ThrottleTimeMS holds the duration in milliseconds for which the request - // was throttled due to quota violation (Zero if the request did not violate - // any quota) - ThrottleTimeMS int32 - +type joinGroupResponseV1 struct { // ErrorCode holds response error code ErrorCode int16 @@ -153,12 +148,11 @@ type joinGroupResponseV2 struct { // MemberID assigned by the group coordinator MemberID string - Members []joinGroupResponseMemberV2 + Members []joinGroupResponseMemberV1 } -func (t joinGroupResponseV2) size() int32 { - return sizeofInt32(t.ThrottleTimeMS) + - sizeofInt16(t.ErrorCode) + +func (t joinGroupResponseV1) size() int32 { + return sizeofInt16(t.ErrorCode) + sizeofInt32(t.GenerationID) + sizeofString(t.GroupProtocol) + sizeofString(t.LeaderID) + @@ -166,8 +160,7 @@ func (t joinGroupResponseV2) size() int32 { sizeofArray(len(t.MemberID), func(i int) int32 { return t.Members[i].size() }) } -func (t joinGroupResponseV2) writeTo(w *bufio.Writer) { - writeInt32(w, t.ThrottleTimeMS) +func (t joinGroupResponseV1) writeTo(w *bufio.Writer) { writeInt16(w, t.ErrorCode) writeInt32(w, t.GenerationID) writeString(w, t.GroupProtocol) @@ -176,11 +169,8 @@ func (t joinGroupResponseV2) writeTo(w *bufio.Writer) { writeArray(w, len(t.Members), func(i int) { t.Members[i].writeTo(w) }) } -func (t *joinGroupResponseV2) readFrom(r *bufio.Reader, size int) (remain int, err error) { - if remain, err = readInt32(r, size, &t.ThrottleTimeMS); err != nil { - return - } - if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { +func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { + if remain, err = readInt16(r, size, &t.ErrorCode); err != nil { return } if remain, err = readInt32(r, remain, &t.GenerationID); err != nil { @@ -197,7 +187,7 @@ func (t *joinGroupResponseV2) readFrom(r *bufio.Reader, size int) (remain int, e } fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { - var item joinGroupResponseMemberV2 + var item joinGroupResponseMemberV1 if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil { return } diff --git a/joingroup_test.go b/joingroup_test.go index 463a1a698..8af976fa5 100644 --- a/joingroup_test.go +++ b/joingroup_test.go @@ -102,14 +102,13 @@ func TestMemberMetadata(t *testing.T) { } func TestJoinGroupResponseV1(t *testing.T) { - item := joinGroupResponseV2{ - ThrottleTimeMS: 1, - ErrorCode: 2, - GenerationID: 3, - GroupProtocol: "a", - LeaderID: "b", - MemberID: "c", - Members: []joinGroupResponseMemberV2{ + item := joinGroupResponseV1{ + ErrorCode: 2, + GenerationID: 3, + GroupProtocol: "a", + LeaderID: "b", + MemberID: "c", + Members: []joinGroupResponseMemberV1{ { MemberID: "d", MemberMetadata: []byte("blah"), @@ -122,7 +121,7 @@ func TestJoinGroupResponseV1(t *testing.T) { item.writeTo(w) w.Flush() - var found joinGroupResponseV2 + var found joinGroupResponseV1 remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) if err != nil { t.Error(err) diff --git a/leavegroup.go b/leavegroup.go index e3625d473..c87c0dbfc 100644 --- a/leavegroup.go +++ b/leavegroup.go @@ -2,7 +2,7 @@ package kafka import "bufio" -type leaveGroupRequestV1 struct { +type leaveGroupRequestV0 struct { // GroupID holds the unique group identifier GroupID string @@ -11,41 +11,31 @@ type leaveGroupRequestV1 struct { MemberID string } -func (t leaveGroupRequestV1) size() int32 { +func (t leaveGroupRequestV0) size() int32 { return sizeofString(t.GroupID) + sizeofString(t.MemberID) } -func (t leaveGroupRequestV1) writeTo(w *bufio.Writer) { +func (t leaveGroupRequestV0) writeTo(w *bufio.Writer) { writeString(w, t.GroupID) writeString(w, t.MemberID) } -type leaveGroupResponseV1 struct { - // ThrottleTimeMS holds the duration in milliseconds for which the request - // was throttled due to quota violation (Zero if the request did not violate - // any quota) - ThrottleTimeMS int32 - +type leaveGroupResponseV0 struct { // ErrorCode holds response error code ErrorCode int16 } -func (t leaveGroupResponseV1) size() int32 { - return sizeofInt32(t.ThrottleTimeMS) + - sizeofInt16(t.ErrorCode) +func (t leaveGroupResponseV0) size() int32 { + return sizeofInt16(t.ErrorCode) } -func (t leaveGroupResponseV1) writeTo(w *bufio.Writer) { - writeInt32(w, t.ThrottleTimeMS) +func (t leaveGroupResponseV0) writeTo(w *bufio.Writer) { writeInt16(w, t.ErrorCode) } -func (t *leaveGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { - if remain, err = readInt32(r, size, &t.ThrottleTimeMS); err != nil { - return - } - if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { +func (t *leaveGroupResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) { + if remain, err = readInt16(r, size, &t.ErrorCode); err != nil { return } return diff --git a/leavegroup_test.go b/leavegroup_test.go index 79db6448c..90adb254b 100644 --- a/leavegroup_test.go +++ b/leavegroup_test.go @@ -7,10 +7,9 @@ import ( "testing" ) -func TestLeaveGroupResponseV1(t *testing.T) { - item := leaveGroupResponseV1{ - ThrottleTimeMS: 1, - ErrorCode: 2, +func TestLeaveGroupResponseV0(t *testing.T) { + item := leaveGroupResponseV0{ + ErrorCode: 2, } buf := bytes.NewBuffer(nil) @@ -18,7 +17,7 @@ func TestLeaveGroupResponseV1(t *testing.T) { item.writeTo(w) w.Flush() - var found leaveGroupResponseV1 + var found leaveGroupResponseV0 remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) if err != nil { t.Error(err) diff --git a/offsetcommit.go b/offsetcommit.go index 4de7814ab..cbd024caa 100644 --- a/offsetcommit.go +++ b/offsetcommit.go @@ -2,7 +2,7 @@ package kafka import "bufio" -type offsetCommitRequestV3Partition struct { +type offsetCommitRequestV2Partition struct { // Partition ID Partition int32 @@ -13,37 +13,37 @@ type offsetCommitRequestV3Partition struct { Metadata string } -func (t offsetCommitRequestV3Partition) size() int32 { +func (t offsetCommitRequestV2Partition) size() int32 { return sizeofInt32(t.Partition) + sizeofInt64(t.Offset) + sizeofString(t.Metadata) } -func (t offsetCommitRequestV3Partition) writeTo(w *bufio.Writer) { +func (t offsetCommitRequestV2Partition) writeTo(w *bufio.Writer) { writeInt32(w, t.Partition) writeInt64(w, t.Offset) writeString(w, t.Metadata) } -type offsetCommitRequestV3Topic struct { +type offsetCommitRequestV2Topic struct { // Topic name Topic string // Partitions to commit offsets - Partitions []offsetCommitRequestV3Partition + Partitions []offsetCommitRequestV2Partition } -func (t offsetCommitRequestV3Topic) size() int32 { +func (t offsetCommitRequestV2Topic) size() int32 { return sizeofString(t.Topic) + sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() }) } -func (t offsetCommitRequestV3Topic) writeTo(w *bufio.Writer) { +func (t offsetCommitRequestV2Topic) writeTo(w *bufio.Writer) { writeString(w, t.Topic) writeArray(w, len(t.Partitions), func(i int) { t.Partitions[i].writeTo(w) }) } -type offsetCommitRequestV3 struct { +type offsetCommitRequestV2 struct { // GroupID holds the unique group identifier GroupID string @@ -57,10 +57,10 @@ type offsetCommitRequestV3 struct { RetentionTime int64 // Topics to commit offsets - Topics []offsetCommitRequestV3Topic + Topics []offsetCommitRequestV2Topic } -func (t offsetCommitRequestV3) size() int32 { +func (t offsetCommitRequestV2) size() int32 { return sizeofString(t.GroupID) + sizeofInt32(t.GenerationID) + sizeofString(t.MemberID) + @@ -68,7 +68,7 @@ func (t offsetCommitRequestV3) size() int32 { sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) } -func (t offsetCommitRequestV3) writeTo(w *bufio.Writer) { +func (t offsetCommitRequestV2) writeTo(w *bufio.Writer) { writeString(w, t.GroupID) writeInt32(w, t.GenerationID) writeString(w, t.MemberID) @@ -76,24 +76,24 @@ func (t offsetCommitRequestV3) writeTo(w *bufio.Writer) { writeArray(w, len(t.Topics), func(i int) { t.Topics[i].writeTo(w) }) } -type offsetCommitResponseV3PartitionResponse struct { +type offsetCommitResponseV2PartitionResponse struct { Partition int32 // ErrorCode holds response error code ErrorCode int16 } -func (t offsetCommitResponseV3PartitionResponse) size() int32 { +func (t offsetCommitResponseV2PartitionResponse) size() int32 { return sizeofInt32(t.Partition) + sizeofInt16(t.ErrorCode) } -func (t offsetCommitResponseV3PartitionResponse) writeTo(w *bufio.Writer) { +func (t offsetCommitResponseV2PartitionResponse) writeTo(w *bufio.Writer) { writeInt32(w, t.Partition) writeInt16(w, t.ErrorCode) } -func (t *offsetCommitResponseV3PartitionResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *offsetCommitResponseV2PartitionResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readInt32(r, size, &t.Partition); err != nil { return } @@ -103,28 +103,28 @@ func (t *offsetCommitResponseV3PartitionResponse) readFrom(r *bufio.Reader, size return } -type offsetCommitResponseV3Response struct { +type offsetCommitResponseV2Response struct { Topic string - PartitionResponses []offsetCommitResponseV3PartitionResponse + PartitionResponses []offsetCommitResponseV2PartitionResponse } -func (t offsetCommitResponseV3Response) size() int32 { +func (t offsetCommitResponseV2Response) size() int32 { return sizeofString(t.Topic) + sizeofArray(len(t.PartitionResponses), func(i int) int32 { return t.PartitionResponses[i].size() }) } -func (t offsetCommitResponseV3Response) writeTo(w *bufio.Writer) { +func (t offsetCommitResponseV2Response) writeTo(w *bufio.Writer) { writeString(w, t.Topic) writeArray(w, len(t.PartitionResponses), func(i int) { t.PartitionResponses[i].writeTo(w) }) } -func (t *offsetCommitResponseV3Response) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *offsetCommitResponseV2Response) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readString(r, size, &t.Topic); err != nil { return } fn := func(r *bufio.Reader, withSize int) (fnRemain int, fnErr error) { - item := offsetCommitResponseV3PartitionResponse{} + item := offsetCommitResponseV2PartitionResponse{} if fnRemain, fnErr = (&item).readFrom(r, withSize); fnErr != nil { return } @@ -138,38 +138,28 @@ func (t *offsetCommitResponseV3Response) readFrom(r *bufio.Reader, size int) (re return } -type offsetCommitResponseV3 struct { - // ThrottleTimeMS holds the duration in milliseconds for which the request - // was throttled due to quota violation (Zero if the request did not violate - // any quota) - ThrottleTimeMS int32 - Responses []offsetCommitResponseV3Response +type offsetCommitResponseV2 struct { + Responses []offsetCommitResponseV2Response } -func (t offsetCommitResponseV3) size() int32 { - return sizeofInt32(t.ThrottleTimeMS) + - sizeofArray(len(t.Responses), func(i int) int32 { return t.Responses[i].size() }) +func (t offsetCommitResponseV2) size() int32 { + return sizeofArray(len(t.Responses), func(i int) int32 { return t.Responses[i].size() }) } -func (t offsetCommitResponseV3) writeTo(w *bufio.Writer) { - writeInt32(w, t.ThrottleTimeMS) +func (t offsetCommitResponseV2) writeTo(w *bufio.Writer) { writeArray(w, len(t.Responses), func(i int) { t.Responses[i].writeTo(w) }) } -func (t *offsetCommitResponseV3) readFrom(r *bufio.Reader, size int) (remain int, err error) { - if remain, err = readInt32(r, size, &t.ThrottleTimeMS); err != nil { - return - } - +func (t *offsetCommitResponseV2) readFrom(r *bufio.Reader, size int) (remain int, err error) { fn := func(r *bufio.Reader, withSize int) (fnRemain int, fnErr error) { - item := offsetCommitResponseV3Response{} + item := offsetCommitResponseV2Response{} if fnRemain, fnErr = (&item).readFrom(r, withSize); fnErr != nil { return } t.Responses = append(t.Responses, item) return } - if remain, err = readArrayWith(r, remain, fn); err != nil { + if remain, err = readArrayWith(r, size, fn); err != nil { return } diff --git a/offsetcommit_test.go b/offsetcommit_test.go index d70cf15ca..54e8c295f 100644 --- a/offsetcommit_test.go +++ b/offsetcommit_test.go @@ -7,13 +7,12 @@ import ( "testing" ) -func TestOffsetCommitResponseV3(t *testing.T) { - item := offsetCommitResponseV3{ - ThrottleTimeMS: 1, - Responses: []offsetCommitResponseV3Response{ +func TestOffsetCommitResponseV2(t *testing.T) { + item := offsetCommitResponseV2{ + Responses: []offsetCommitResponseV2Response{ { Topic: "a", - PartitionResponses: []offsetCommitResponseV3PartitionResponse{ + PartitionResponses: []offsetCommitResponseV2PartitionResponse{ { Partition: 1, ErrorCode: 2, @@ -28,7 +27,7 @@ func TestOffsetCommitResponseV3(t *testing.T) { item.writeTo(w) w.Flush() - var found offsetCommitResponseV3 + var found offsetCommitResponseV2 remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) if err != nil { t.Error(err) diff --git a/offsetfetch.go b/offsetfetch.go index 7931490da..8b822149c 100644 --- a/offsetfetch.go +++ b/offsetfetch.go @@ -4,7 +4,7 @@ import ( "bufio" ) -type offsetFetchRequestV3Topic struct { +type offsetFetchRequestV1Topic struct { // Topic name Topic string @@ -12,35 +12,35 @@ type offsetFetchRequestV3Topic struct { Partitions []int32 } -func (t offsetFetchRequestV3Topic) size() int32 { +func (t offsetFetchRequestV1Topic) size() int32 { return sizeofString(t.Topic) + sizeofInt32Array(t.Partitions) } -func (t offsetFetchRequestV3Topic) writeTo(w *bufio.Writer) { +func (t offsetFetchRequestV1Topic) writeTo(w *bufio.Writer) { writeString(w, t.Topic) writeInt32Array(w, t.Partitions) } -type offsetFetchRequestV3 struct { +type offsetFetchRequestV1 struct { // GroupID holds the unique group identifier GroupID string // Topics to fetch offsets. - Topics []offsetFetchRequestV3Topic + Topics []offsetFetchRequestV1Topic } -func (t offsetFetchRequestV3) size() int32 { +func (t offsetFetchRequestV1) size() int32 { return sizeofString(t.GroupID) + sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) } -func (t offsetFetchRequestV3) writeTo(w *bufio.Writer) { +func (t offsetFetchRequestV1) writeTo(w *bufio.Writer) { writeString(w, t.GroupID) writeArray(w, len(t.Topics), func(i int) { t.Topics[i].writeTo(w) }) } -type offsetFetchResponseV3PartitionResponse struct { +type offsetFetchResponseV1PartitionResponse struct { // Partition ID Partition int32 @@ -54,21 +54,21 @@ type offsetFetchResponseV3PartitionResponse struct { ErrorCode int16 } -func (t offsetFetchResponseV3PartitionResponse) size() int32 { +func (t offsetFetchResponseV1PartitionResponse) size() int32 { return sizeofInt32(t.Partition) + sizeofInt64(t.Offset) + sizeofString(t.Metadata) + sizeofInt16(t.ErrorCode) } -func (t offsetFetchResponseV3PartitionResponse) writeTo(w *bufio.Writer) { +func (t offsetFetchResponseV1PartitionResponse) writeTo(w *bufio.Writer) { writeInt32(w, t.Partition) writeInt64(w, t.Offset) writeString(w, t.Metadata) writeInt16(w, t.ErrorCode) } -func (t *offsetFetchResponseV3PartitionResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *offsetFetchResponseV1PartitionResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readInt32(r, size, &t.Partition); err != nil { return } @@ -84,31 +84,31 @@ func (t *offsetFetchResponseV3PartitionResponse) readFrom(r *bufio.Reader, size return } -type offsetFetchResponseV3Response struct { +type offsetFetchResponseV1Response struct { // Topic name Topic string // PartitionResponses holds offsets by partition - PartitionResponses []offsetFetchResponseV3PartitionResponse + PartitionResponses []offsetFetchResponseV1PartitionResponse } -func (t offsetFetchResponseV3Response) size() int32 { +func (t offsetFetchResponseV1Response) size() int32 { return sizeofString(t.Topic) + sizeofArray(len(t.PartitionResponses), func(i int) int32 { return t.PartitionResponses[i].size() }) } -func (t offsetFetchResponseV3Response) writeTo(w *bufio.Writer) { +func (t offsetFetchResponseV1Response) writeTo(w *bufio.Writer) { writeString(w, t.Topic) writeArray(w, len(t.PartitionResponses), func(i int) { t.PartitionResponses[i].writeTo(w) }) } -func (t *offsetFetchResponseV3Response) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *offsetFetchResponseV1Response) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readString(r, size, &t.Topic); err != nil { return } fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { - item := offsetFetchResponseV3PartitionResponse{} + item := offsetFetchResponseV1PartitionResponse{} if fnRemain, fnErr = (&item).readFrom(r, size); err != nil { return } @@ -122,56 +122,36 @@ func (t *offsetFetchResponseV3Response) readFrom(r *bufio.Reader, size int) (rem return } -type offsetFetchResponseV3 struct { - // ThrottleTimeMS holds the duration in milliseconds for which the request - // was throttled due to quota violation (Zero if the request did not violate - // any quota) - ThrottleTimeMS int32 - +type offsetFetchResponseV1 struct { // Responses holds topic partition offsets - Responses []offsetFetchResponseV3Response - - // ErrorCode holds response error code - ErrorCode int16 + Responses []offsetFetchResponseV1Response } -func (t offsetFetchResponseV3) size() int32 { - return sizeofInt32(t.ThrottleTimeMS) + - sizeofArray(len(t.Responses), func(i int) int32 { return t.Responses[i].size() }) + - sizeofInt16(t.ErrorCode) +func (t offsetFetchResponseV1) size() int32 { + return sizeofArray(len(t.Responses), func(i int) int32 { return t.Responses[i].size() }) } -func (t offsetFetchResponseV3) writeTo(w *bufio.Writer) { - writeInt32(w, t.ThrottleTimeMS) +func (t offsetFetchResponseV1) writeTo(w *bufio.Writer) { writeArray(w, len(t.Responses), func(i int) { t.Responses[i].writeTo(w) }) - writeInt16(w, t.ErrorCode) } -func (t *offsetFetchResponseV3) readFrom(r *bufio.Reader, size int) (remain int, err error) { - if remain, err = readInt32(r, size, &t.ThrottleTimeMS); err != nil { - return - } - +func (t *offsetFetchResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { fn := func(r *bufio.Reader, withSize int) (fnRemain int, fnErr error) { - item := offsetFetchResponseV3Response{} + item := offsetFetchResponseV1Response{} if fnRemain, fnErr = (&item).readFrom(r, withSize); fnErr != nil { return } t.Responses = append(t.Responses, item) return } - if remain, err = readArrayWith(r, remain, fn); err != nil { - return - } - - if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { + if remain, err = readArrayWith(r, size, fn); err != nil { return } return } -func findOffset(topic string, partition int32, response offsetFetchResponseV3) (int64, bool) { +func findOffset(topic string, partition int32, response offsetFetchResponseV1) (int64, bool) { for _, r := range response.Responses { if r.Topic != topic { continue diff --git a/offsetfetch_test.go b/offsetfetch_test.go index 58f49cffe..79195696d 100644 --- a/offsetfetch_test.go +++ b/offsetfetch_test.go @@ -7,13 +7,12 @@ import ( "testing" ) -func TestOffsetFetchResponseV3(t *testing.T) { - item := offsetFetchResponseV3{ - ThrottleTimeMS: 1, - Responses: []offsetFetchResponseV3Response{ +func TestOffsetFetchResponseV1(t *testing.T) { + item := offsetFetchResponseV1{ + Responses: []offsetFetchResponseV1Response{ { Topic: "a", - PartitionResponses: []offsetFetchResponseV3PartitionResponse{ + PartitionResponses: []offsetFetchResponseV1PartitionResponse{ { Partition: 2, Offset: 3, @@ -23,7 +22,6 @@ func TestOffsetFetchResponseV3(t *testing.T) { }, }, }, - ErrorCode: 5, } buf := bytes.NewBuffer(nil) @@ -31,7 +29,7 @@ func TestOffsetFetchResponseV3(t *testing.T) { item.writeTo(w) w.Flush() - var found offsetFetchResponseV3 + var found offsetFetchResponseV1 remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) if err != nil { t.Error(err) diff --git a/reader.go b/reader.go index a99bafab4..a26e6b5fd 100644 --- a/reader.go +++ b/reader.go @@ -127,7 +127,7 @@ func (r *Reader) lookupCoordinator() (string, error) { } defer conn.Close() - out, err := conn.findCoordinator(findCoordinatorRequestV1{ + out, err := conn.findCoordinator(findCoordinatorRequestV0{ CoordinatorKey: r.config.GroupID, }) if err != nil { @@ -174,12 +174,12 @@ func (r *Reader) refreshCoordinator() (err error) { return nil } -// makeJoinGroupRequestV2 handles the logic of constructing a joinGroup +// makejoinGroupRequestV1 handles the logic of constructing a joinGroup // request -func (r *Reader) makeJoinGroupRequestV2() (joinGroupRequestV2, error) { +func (r *Reader) makejoinGroupRequestV1() (joinGroupRequestV1, error) { _, memberID := r.membership() - request := joinGroupRequestV2{ + request := joinGroupRequestV1{ GroupID: r.config.GroupID, MemberID: memberID, SessionTimeout: int32(r.config.SessionTimeout / time.Millisecond), @@ -190,10 +190,10 @@ func (r *Reader) makeJoinGroupRequestV2() (joinGroupRequestV2, error) { for _, strategy := range allStrategies { meta, err := strategy.GroupMetadata([]string{r.config.Topic}) if err != nil { - return joinGroupRequestV2{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %v\n", strategy.ProtocolName(), err) + return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %v\n", strategy.ProtocolName(), err) } - request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV2{ + request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{ ProtocolName: strategy.ProtocolName(), ProtocolMetadata: meta.bytes(), }) @@ -203,7 +203,7 @@ func (r *Reader) makeJoinGroupRequestV2() (joinGroupRequestV2, error) { } // makeMemberProtocolMetadata maps encoded member metadata ([]byte) into memberGroupMetadata -func (r *Reader) makeMemberProtocolMetadata(in []joinGroupResponseMemberV2) ([]memberGroupMetadata, error) { +func (r *Reader) makeMemberProtocolMetadata(in []joinGroupResponseMemberV1) ([]memberGroupMetadata, error) { members := make([]memberGroupMetadata, 0, len(in)) for _, item := range in { metadata := groupMetadata{} @@ -229,7 +229,7 @@ type partitionReader interface { // assignTopicPartitions uses the selected strategy to assign members to their // various partitions -func (r *Reader) assignTopicPartitions(conn partitionReader, group joinGroupResponseV2) (memberGroupAssignments, error) { +func (r *Reader) assignTopicPartitions(conn partitionReader, group joinGroupResponseV1) (memberGroupAssignments, error) { r.withLogger(func(l *log.Logger) { l.Println("selected as leader for group,", r.config.GroupID) }) @@ -252,6 +252,9 @@ func (r *Reader) assignTopicPartitions(conn partitionReader, group joinGroupResp r.withLogger(func(l *log.Logger) { l.Printf("using '%v' strategy to assign group, %v\n", group.GroupProtocol, r.config.GroupID) + for _, member := range members { + l.Printf("found member: %v/%#v", member.MemberID, member.Metadata) + } for _, partition := range partitions { l.Printf("found topic/partition: %v/%v", partition.Topic, partition.ID) } @@ -262,7 +265,7 @@ func (r *Reader) assignTopicPartitions(conn partitionReader, group joinGroupResp func (r *Reader) leaveGroup(conn *Conn) error { _, memberID := r.membership() - _, err := conn.leaveGroup(leaveGroupRequestV1{ + _, err := conn.leaveGroup(leaveGroupRequestV0{ GroupID: r.config.GroupID, MemberID: memberID, }) @@ -291,7 +294,7 @@ func (r *Reader) joinGroup() (memberGroupAssignments, error) { } defer conn.Close() - request, err := r.makeJoinGroupRequestV2() + request, err := r.makejoinGroupRequestV1() if err != nil { return nil, err } @@ -354,19 +357,19 @@ func (r *Reader) joinGroup() (memberGroupAssignments, error) { return assignments, nil } -func (r *Reader) makeSyncGroupRequestV1(memberAssignments memberGroupAssignments) syncGroupRequestV1 { +func (r *Reader) makesyncGroupRequestV0(memberAssignments memberGroupAssignments) syncGroupRequestV0 { generationID, memberID := r.membership() - request := syncGroupRequestV1{ + request := syncGroupRequestV0{ GroupID: r.config.GroupID, GenerationID: generationID, MemberID: memberID, } if memberAssignments != nil { - request.GroupAssignments = make([]syncGroupRequestGroupAssignmentV1, 0, 1) + request.GroupAssignments = make([]syncGroupRequestGroupAssignmentV0, 0, 1) for memberID, topics := range memberAssignments { - request.GroupAssignments = append(request.GroupAssignments, syncGroupRequestGroupAssignmentV1{ + request.GroupAssignments = append(request.GroupAssignments, syncGroupRequestGroupAssignmentV0{ MemberID: memberID, MemberAssignments: groupAssignment{ Version: 1, @@ -374,6 +377,10 @@ func (r *Reader) makeSyncGroupRequestV1(memberAssignments memberGroupAssignments }.bytes(), }) } + + r.withErrorLogger(func(logger *log.Logger) { + logger.Printf("Syncing %d assignments for generation %d as member %s", len(request.GroupAssignments), generationID, memberID) + }) } return request @@ -396,7 +403,7 @@ func (r *Reader) syncGroup(memberAssignments memberGroupAssignments) (map[string } defer conn.Close() - request := r.makeSyncGroupRequestV1(memberAssignments) + request := r.makesyncGroupRequestV0(memberAssignments) response, err := conn.syncGroups(request) if err != nil { switch err { @@ -424,6 +431,11 @@ func (r *Reader) syncGroup(memberAssignments memberGroupAssignments) (map[string return nil, fmt.Errorf("unable to read SyncGroup response for group, %v: %v\n", r.config.GroupID, err) } + if len(assignments.Topics) == 0 { + generation, memberID := r.membership() + return nil, fmt.Errorf("received empty assignments for group, %v as member %s for generation %d", r.config.GroupID, memberID, generation) + } + r.withLogger(func(l *log.Logger) { l.Printf("sync group finished for group, %v\n", r.config.GroupID) }) @@ -467,9 +479,9 @@ func (r *Reader) fetchOffsets(subs map[string][]int32) (map[int]int64, error) { defer conn.Close() partitions := subs[r.config.Topic] - offsets, err := conn.offsetFetch(offsetFetchRequestV3{ + offsets, err := conn.offsetFetch(offsetFetchRequestV1{ GroupID: r.config.GroupID, - Topics: []offsetFetchRequestV3Topic{ + Topics: []offsetFetchRequestV1Topic{ { Topic: r.config.Topic, Partitions: partitions, @@ -567,7 +579,7 @@ func (r *Reader) heartbeat(conn *Conn) error { return nil } - resp, err := conn.heartbeat(heartbeatRequestV1{ + _, err := conn.heartbeat(heartbeatRequestV0{ GroupID: r.config.GroupID, GenerationID: generationID, MemberID: memberID, @@ -576,8 +588,6 @@ func (r *Reader) heartbeat(conn *Conn) error { return fmt.Errorf("heartbeat failed: %v", err) } - r.waitThrottleTime(resp.ThrottleTimeMS) - return nil } @@ -608,7 +618,7 @@ func (r *Reader) heartbeatLoop(conn *Conn) func(stop <-chan struct{}) { } type offsetCommitter interface { - offsetCommit(request offsetCommitRequestV3) (offsetCommitResponseV3, error) + offsetCommit(request offsetCommitRequestV2) (offsetCommitResponseV2, error) } func (r *Reader) commitOffsets(conn offsetCommitter, offsetStash offsetStash) error { @@ -617,7 +627,7 @@ func (r *Reader) commitOffsets(conn offsetCommitter, offsetStash offsetStash) er } generationID, memberID := r.membership() - request := offsetCommitRequestV3{ + request := offsetCommitRequestV2{ GroupID: r.config.GroupID, GenerationID: generationID, MemberID: memberID, @@ -625,9 +635,9 @@ func (r *Reader) commitOffsets(conn offsetCommitter, offsetStash offsetStash) er } for topic, partitions := range offsetStash { - t := offsetCommitRequestV3Topic{Topic: topic} + t := offsetCommitRequestV2Topic{Topic: topic} for partition, offset := range partitions { - t.Partitions = append(t.Partitions, offsetCommitRequestV3Partition{ + t.Partitions = append(t.Partitions, offsetCommitRequestV2Partition{ Partition: int32(partition), Offset: offset, }) @@ -1068,7 +1078,7 @@ func NewReader(config ReaderConfig) *Reader { msgs: make(chan readerMessage, config.QueueCapacity), cancel: func() {}, done: make(chan struct{}), - commits: make(chan commitRequest), + commits: make(chan commitRequest, config.QueueCapacity), stop: stop, offset: firstOffset, stctx: stctx, diff --git a/reader_test.go b/reader_test.go index 1b372d9b8..ba517a28e 100644 --- a/reader_test.go +++ b/reader_test.go @@ -264,8 +264,8 @@ func createTopic(t *testing.T, topic string, partitions int) { } defer conn.Close() - _, err = conn.createTopics(createTopicsRequestV2{ - Topics: []createTopicsRequestV2Topic{ + _, err = conn.createTopics(createTopicsRequestV0{ + Topics: []createTopicsRequestV0Topic{ { Topic: topic, NumPartitions: int32(partitions), @@ -622,13 +622,13 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, } - newJoinGroupResponseV2 := func(topicsByMemberID map[string][]string) joinGroupResponseV2 { - resp := joinGroupResponseV2{ + newJoinGroupResponseV1 := func(topicsByMemberID map[string][]string) joinGroupResponseV1 { + resp := joinGroupResponseV1{ GroupProtocol: roundrobinStrategy{}.ProtocolName(), } for memberID, topics := range topicsByMemberID { - resp.Members = append(resp.Members, joinGroupResponseMemberV2{ + resp.Members = append(resp.Members, joinGroupResponseMemberV1{ MemberID: memberID, MemberMetadata: groupMetadata{ Topics: topics, @@ -640,15 +640,15 @@ func TestReaderAssignTopicPartitions(t *testing.T) { } testCases := map[string]struct { - Members joinGroupResponseV2 + Members joinGroupResponseV1 Assignments memberGroupAssignments }{ "nil": { - Members: newJoinGroupResponseV2(nil), + Members: newJoinGroupResponseV1(nil), Assignments: memberGroupAssignments{}, }, "one member, one topic": { - Members: newJoinGroupResponseV2(map[string][]string{ + Members: newJoinGroupResponseV1(map[string][]string{ "member-1": {"topic-1"}, }), Assignments: memberGroupAssignments{ @@ -658,7 +658,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, }, "one member, two topics": { - Members: newJoinGroupResponseV2(map[string][]string{ + Members: newJoinGroupResponseV1(map[string][]string{ "member-1": {"topic-1", "topic-2"}, }), Assignments: memberGroupAssignments{ @@ -669,7 +669,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, }, "two members, one topic": { - Members: newJoinGroupResponseV2(map[string][]string{ + Members: newJoinGroupResponseV1(map[string][]string{ "member-1": {"topic-1"}, "member-2": {"topic-1"}, }), @@ -683,7 +683,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, }, "two members, two unshared topics": { - Members: newJoinGroupResponseV2(map[string][]string{ + Members: newJoinGroupResponseV1(map[string][]string{ "member-1": {"topic-1"}, "member-2": {"topic-2"}, }), @@ -1165,15 +1165,15 @@ type mockOffsetCommitter struct { err error } -func (m *mockOffsetCommitter) offsetCommit(request offsetCommitRequestV3) (offsetCommitResponseV3, error) { +func (m *mockOffsetCommitter) offsetCommit(request offsetCommitRequestV2) (offsetCommitResponseV2, error) { m.invocations++ if m.failCount > 0 { m.failCount-- - return offsetCommitResponseV3{}, io.EOF + return offsetCommitResponseV2{}, io.EOF } - return offsetCommitResponseV3{}, nil + return offsetCommitResponseV2{}, nil } func TestCommitOffsetsWithRetry(t *testing.T) { diff --git a/syncgroup.go b/syncgroup.go index 241a9ea9c..131469421 100644 --- a/syncgroup.go +++ b/syncgroup.go @@ -66,7 +66,7 @@ func (t groupAssignment) bytes() []byte { return buf.Bytes() } -type syncGroupRequestGroupAssignmentV1 struct { +type syncGroupRequestGroupAssignmentV0 struct { // MemberID assigned by the group coordinator MemberID string @@ -76,17 +76,17 @@ type syncGroupRequestGroupAssignmentV1 struct { MemberAssignments []byte } -func (t syncGroupRequestGroupAssignmentV1) size() int32 { +func (t syncGroupRequestGroupAssignmentV0) size() int32 { return sizeofString(t.MemberID) + sizeofBytes(t.MemberAssignments) } -func (t syncGroupRequestGroupAssignmentV1) writeTo(w *bufio.Writer) { +func (t syncGroupRequestGroupAssignmentV0) writeTo(w *bufio.Writer) { writeString(w, t.MemberID) writeBytes(w, t.MemberAssignments) } -type syncGroupRequestV1 struct { +type syncGroupRequestV0 struct { // GroupID holds the unique group identifier GroupID string @@ -96,29 +96,24 @@ type syncGroupRequestV1 struct { // MemberID assigned by the group coordinator MemberID string - GroupAssignments []syncGroupRequestGroupAssignmentV1 + GroupAssignments []syncGroupRequestGroupAssignmentV0 } -func (t syncGroupRequestV1) size() int32 { +func (t syncGroupRequestV0) size() int32 { return sizeofString(t.GroupID) + sizeofInt32(t.GenerationID) + sizeofString(t.MemberID) + sizeofArray(len(t.GroupAssignments), func(i int) int32 { return t.GroupAssignments[i].size() }) } -func (t syncGroupRequestV1) writeTo(w *bufio.Writer) { +func (t syncGroupRequestV0) writeTo(w *bufio.Writer) { writeString(w, t.GroupID) writeInt32(w, t.GenerationID) writeString(w, t.MemberID) writeArray(w, len(t.GroupAssignments), func(i int) { t.GroupAssignments[i].writeTo(w) }) } -type syncGroupResponseV1 struct { - // ThrottleTimeMS holds the duration in milliseconds for which the request - // was throttled due to quota violation (Zero if the request did not violate - // any quota) - ThrottleTimeMS int32 - +type syncGroupResponseV0 struct { // ErrorCode holds response error code ErrorCode int16 @@ -128,23 +123,18 @@ type syncGroupResponseV1 struct { MemberAssignments []byte } -func (t syncGroupResponseV1) size() int32 { - return sizeofInt32(t.ThrottleTimeMS) + - sizeofInt16(t.ErrorCode) + +func (t syncGroupResponseV0) size() int32 { + return sizeofInt16(t.ErrorCode) + sizeofBytes(t.MemberAssignments) } -func (t syncGroupResponseV1) writeTo(w *bufio.Writer) { - writeInt32(w, t.ThrottleTimeMS) +func (t syncGroupResponseV0) writeTo(w *bufio.Writer) { writeInt16(w, t.ErrorCode) writeBytes(w, t.MemberAssignments) } -func (t *syncGroupResponseV1) readFrom(r *bufio.Reader, sz int) (remain int, err error) { - if remain, err = readInt32(r, sz, &t.ThrottleTimeMS); err != nil { - return - } - if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { +func (t *syncGroupResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) { + if remain, err = readInt16(r, sz, &t.ErrorCode); err != nil { return } if remain, err = readBytes(r, remain, &t.MemberAssignments); err != nil { diff --git a/syncgroup_test.go b/syncgroup_test.go index e16304d37..55aa659af 100644 --- a/syncgroup_test.go +++ b/syncgroup_test.go @@ -55,9 +55,8 @@ func TestGroupAssignmentReadsFromZeroSize(t *testing.T) { } } -func TestSyncGroupResponseV1(t *testing.T) { - item := syncGroupResponseV1{ - ThrottleTimeMS: 1, +func TestSyncGroupResponseV0(t *testing.T) { + item := syncGroupResponseV0{ ErrorCode: 2, MemberAssignments: []byte(`blah`), } @@ -67,7 +66,7 @@ func TestSyncGroupResponseV1(t *testing.T) { item.writeTo(w) w.Flush() - var found syncGroupResponseV1 + var found syncGroupResponseV0 remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) if err != nil { t.Error(err) @@ -83,9 +82,8 @@ func TestSyncGroupResponseV1(t *testing.T) { } } -func BenchmarkSyncGroupResponseV1(t *testing.B) { - item := syncGroupResponseV1{ - ThrottleTimeMS: 1, +func BenchmarkSyncGroupResponseV0(t *testing.B) { + item := syncGroupResponseV0{ ErrorCode: 2, MemberAssignments: []byte(`blah`), } @@ -101,7 +99,7 @@ func BenchmarkSyncGroupResponseV1(t *testing.B) { for i := 0; i < t.N; i++ { r.Seek(0, io.SeekStart) - var found syncGroupResponseV1 + var found syncGroupResponseV0 remain, err := (&found).readFrom(reader, size) if err != nil { t.Error(err)