From 9a956db8bd00245835f16007fbfe8ec58b31b8b9 Mon Sep 17 00:00:00 2001 From: Achille Date: Tue, 7 Jan 2020 13:52:11 -0800 Subject: [PATCH] add kafka.ApiVersion.Format (#395) * add kafka.ApiVersion.Format * set explicit constant values intead of using iota --- conn.go | 75 ++++++++-------------- createtopics.go | 2 +- deletetopics.go | 2 +- protocol.go | 162 +++++++++++++++++++++++++++++++++++++++++------ protocol_test.go | 25 +++++++- write.go | 14 ++-- write_test.go | 6 +- 7 files changed, 205 insertions(+), 81 deletions(-) diff --git a/conn.go b/conn.go index f7ee1736d..dcac23c9b 100644 --- a/conn.go +++ b/conn.go @@ -78,14 +78,14 @@ type Conn struct { requiredAcks int32 // lazily loaded API versions used by this connection - apiVersions atomic.Value // apiVersions + apiVersions atomic.Value // apiVersionMap transactionalID *string } -type apiVersions map[apiKey]ApiVersion +type apiVersionMap map[apiKey]ApiVersion -func (v apiVersions) negotiate(key apiKey, sortedSupportedVersions ...apiVersion) apiVersion { +func (v apiVersionMap) negotiate(key apiKey, sortedSupportedVersions ...apiVersion) apiVersion { x := v[key] for i := len(sortedSupportedVersions) - 1; i >= 0; i-- { @@ -210,8 +210,8 @@ func (c *Conn) negotiateVersion(key apiKey, sortedSupportedVersions ...apiVersio return a, nil } -func (c *Conn) loadVersions() (apiVersions, error) { - v, _ := c.apiVersions.Load().(apiVersions) +func (c *Conn) loadVersions() (apiVersionMap, error) { + v, _ := c.apiVersions.Load().(apiVersionMap) if v != nil { return v, nil } @@ -221,7 +221,7 @@ func (c *Conn) loadVersions() (apiVersions, error) { return nil, err } - v = make(apiVersions, len(brokerVersions)) + v = make(apiVersionMap, len(brokerVersions)) for _, a := range brokerVersions { v[apiKey(a.ApiKey)] = a @@ -235,7 +235,7 @@ func (c *Conn) loadVersions() (apiVersions, error) { func (c *Conn) Controller() (broker Broker, err error) { err = c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(metadataRequest, v1, id, topicMetadataRequestV1([]string{})) + return c.writeRequest(metadata, v1, id, topicMetadataRequestV1([]string{})) }, func(deadline time.Time, size int) error { var res metadataResponseV1 @@ -263,7 +263,7 @@ func (c *Conn) Brokers() ([]Broker, error) { var brokers []Broker err := c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(metadataRequest, v1, id, topicMetadataRequestV1([]string{})) + return c.writeRequest(metadata, v1, id, topicMetadataRequestV1([]string{})) }, func(deadline time.Time, size int) error { var res metadataResponseV1 @@ -303,7 +303,7 @@ func (c *Conn) describeGroups(request describeGroupsRequestV0) (describeGroupsRe err := c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(describeGroupsRequest, v0, id, request) + return c.writeRequest(describeGroups, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -331,7 +331,7 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato err := c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(groupCoordinatorRequest, v0, id, request) + return c.writeRequest(findCoordinator, v0, id, request) }, func(deadline time.Time, size int) error { @@ -358,7 +358,7 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error err := c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(heartbeatRequest, v0, id, request) + return c.writeRequest(heartbeat, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -384,7 +384,7 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error err := c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(joinGroupRequest, v1, id, request) + return c.writeRequest(joinGroup, v1, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -410,7 +410,7 @@ func (c *Conn) leaveGroup(request leaveGroupRequestV0) (leaveGroupResponseV0, er err := c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(leaveGroupRequest, v0, id, request) + return c.writeRequest(leaveGroup, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -436,7 +436,7 @@ func (c *Conn) listGroups(request listGroupsRequestV1) (listGroupsResponseV1, er err := c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(listGroupsRequest, v1, id, request) + return c.writeRequest(listGroups, v1, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -462,7 +462,7 @@ func (c *Conn) offsetCommit(request offsetCommitRequestV2) (offsetCommitResponse err := c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(offsetCommitRequest, v2, id, request) + return c.writeRequest(offsetCommit, v2, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -493,7 +493,7 @@ func (c *Conn) offsetFetch(request offsetFetchRequestV1) (offsetFetchResponseV1, err := c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(offsetFetchRequest, v1, id, request) + return c.writeRequest(offsetFetch, v1, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -523,7 +523,7 @@ func (c *Conn) syncGroup(request syncGroupRequestV0) (syncGroupResponseV0, error err := c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(syncGroupRequest, v0, id, request) + return c.writeRequest(syncGroup, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -783,7 +783,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch { return &Batch{err: dontExpectEOF(err)} } - fetchVersion, err := c.negotiateVersion(fetchRequest, v2, v5, v10) + fetchVersion, err := c.negotiateVersion(fetch, v2, v5, v10) if err != nil { return &Batch{err: dontExpectEOF(err)} } @@ -970,7 +970,7 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err err = c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(metadataRequest, v1, id, topicMetadataRequestV1(topics)) + return c.writeRequest(metadata, v1, id, topicMetadataRequestV1(topics)) }, func(deadline time.Time, size int) error { var res metadataResponseV1 @@ -1085,7 +1085,7 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message) } var produceVersion apiVersion - if produceVersion, err = c.negotiateVersion(produceRequest, v2, v3, v7); err != nil { + if produceVersion, err = c.negotiateVersion(produce, v2, v3, v7); err != nil { return } @@ -1378,31 +1378,6 @@ func (c *Conn) requestHeader(apiKey apiKey, apiVersion apiVersion, correlationID } } -type ApiVersion struct { - ApiKey int16 - MinVersion int16 - MaxVersion int16 -} - -var defaultApiVersions map[apiKey]ApiVersion = map[apiKey]ApiVersion{ - produceRequest: ApiVersion{int16(produceRequest), int16(v2), int16(v2)}, - fetchRequest: ApiVersion{int16(fetchRequest), int16(v2), int16(v2)}, - listOffsetRequest: ApiVersion{int16(listOffsetRequest), int16(v1), int16(v1)}, - metadataRequest: ApiVersion{int16(metadataRequest), int16(v1), int16(v1)}, - offsetCommitRequest: ApiVersion{int16(offsetCommitRequest), int16(v2), int16(v2)}, - offsetFetchRequest: ApiVersion{int16(offsetFetchRequest), int16(v1), int16(v1)}, - groupCoordinatorRequest: ApiVersion{int16(groupCoordinatorRequest), int16(v0), int16(v0)}, - joinGroupRequest: ApiVersion{int16(joinGroupRequest), int16(v1), int16(v1)}, - heartbeatRequest: ApiVersion{int16(heartbeatRequest), int16(v0), int16(v0)}, - leaveGroupRequest: ApiVersion{int16(leaveGroupRequest), int16(v0), int16(v0)}, - syncGroupRequest: ApiVersion{int16(syncGroupRequest), int16(v0), int16(v0)}, - describeGroupsRequest: ApiVersion{int16(describeGroupsRequest), int16(v1), int16(v1)}, - listGroupsRequest: ApiVersion{int16(listGroupsRequest), int16(v1), int16(v1)}, - apiVersionsRequest: ApiVersion{int16(apiVersionsRequest), int16(v0), int16(v0)}, - createTopicsRequest: ApiVersion{int16(createTopicsRequest), int16(v0), int16(v0)}, - deleteTopicsRequest: ApiVersion{int16(deleteTopicsRequest), int16(v1), int16(v1)}, -} - func (c *Conn) ApiVersions() ([]ApiVersion, error) { deadline := &c.rdeadline @@ -1417,7 +1392,7 @@ func (c *Conn) ApiVersions() ([]ApiVersion, error) { id, err := c.doRequest(deadline, func(_ time.Time, id int32) error { h := requestHeader{ - ApiKey: int16(apiVersionsRequest), + ApiKey: int16(apiVersions), ApiVersion: int16(v0), CorrelationID: id, ClientID: c.clientID, @@ -1542,14 +1517,14 @@ func (c *Conn) saslHandshake(mechanism string) error { // challenge/responses are sent var resp saslHandshakeResponseV0 - version, err := c.negotiateVersion(saslHandshakeRequest, v0, v1) + version, err := c.negotiateVersion(saslHandshake, v0, v1) if err != nil { return err } err = c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(saslHandshakeRequest, version, id, &saslHandshakeRequestV0{Mechanism: mechanism}) + return c.writeRequest(saslHandshake, version, id, &saslHandshakeRequestV0{Mechanism: mechanism}) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (int, error) { @@ -1571,7 +1546,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) { // if we sent a v1 handshake, then we must encapsulate the authentication // request in a saslAuthenticateRequest. otherwise, we read and write raw // bytes. - version, err := c.negotiateVersion(saslHandshakeRequest, v0, v1) + version, err := c.negotiateVersion(saslHandshake, v0, v1) if err != nil { return nil, err } @@ -1581,7 +1556,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) { err := c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(saslAuthenticateRequest, v0, id, request) + return c.writeRequest(saslAuthenticate, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { diff --git a/createtopics.go b/createtopics.go index a38e0801d..95619da2d 100644 --- a/createtopics.go +++ b/createtopics.go @@ -222,7 +222,7 @@ func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponse deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) } - return c.writeRequest(createTopicsRequest, v0, id, request) + return c.writeRequest(createTopics, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { diff --git a/deletetopics.go b/deletetopics.go index c0af87db1..687c380c3 100644 --- a/deletetopics.go +++ b/deletetopics.go @@ -94,7 +94,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) } - return c.writeRequest(deleteTopicsRequest, v0, id, request) + return c.writeRequest(deleteTopics, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { diff --git a/protocol.go b/protocol.go index 449ad6e40..727cb2cf8 100644 --- a/protocol.go +++ b/protocol.go @@ -3,31 +3,102 @@ package kafka import ( "encoding/binary" "fmt" + "strconv" ) +type ApiVersion struct { + ApiKey int16 + MinVersion int16 + MaxVersion int16 +} + +func (v ApiVersion) Format(w fmt.State, r rune) { + switch r { + case 's': + fmt.Fprint(w, apiKey(v.ApiKey)) + case 'd': + switch { + case w.Flag('-'): + fmt.Fprint(w, v.MinVersion) + case w.Flag('+'): + fmt.Fprint(w, v.MaxVersion) + default: + fmt.Fprint(w, v.ApiKey) + } + case 'v': + switch { + case w.Flag('-'): + fmt.Fprintf(w, "v%d", v.MinVersion) + case w.Flag('+'): + fmt.Fprintf(w, "v%d", v.MaxVersion) + case w.Flag('#'): + fmt.Fprintf(w, "kafka.ApiVersion{ApiKey:%d MinVersion:%d MaxVersion:%d}", v.ApiKey, v.MinVersion, v.MaxVersion) + default: + fmt.Fprintf(w, "%s[v%d:v%d]", apiKey(v.ApiKey), v.MinVersion, v.MaxVersion) + } + } +} + type apiKey int16 const ( - produceRequest apiKey = 0 - fetchRequest apiKey = 1 - listOffsetRequest apiKey = 2 - metadataRequest apiKey = 3 - offsetCommitRequest apiKey = 8 - offsetFetchRequest apiKey = 9 - groupCoordinatorRequest apiKey = 10 - joinGroupRequest apiKey = 11 - heartbeatRequest apiKey = 12 - leaveGroupRequest apiKey = 13 - syncGroupRequest apiKey = 14 - describeGroupsRequest apiKey = 15 - listGroupsRequest apiKey = 16 - saslHandshakeRequest apiKey = 17 - apiVersionsRequest apiKey = 18 - createTopicsRequest apiKey = 19 - deleteTopicsRequest apiKey = 20 - saslAuthenticateRequest apiKey = 36 + produce apiKey = 0 + fetch apiKey = 1 + listOffsets apiKey = 2 + metadata apiKey = 3 + leaderAndIsr apiKey = 4 + stopReplica apiKey = 5 + updateMetadata apiKey = 6 + controlledShutdown apiKey = 7 + offsetCommit apiKey = 8 + offsetFetch apiKey = 9 + findCoordinator apiKey = 10 + joinGroup apiKey = 11 + heartbeat apiKey = 12 + leaveGroup apiKey = 13 + syncGroup apiKey = 14 + describeGroups apiKey = 15 + listGroups apiKey = 16 + saslHandshake apiKey = 17 + apiVersions apiKey = 18 + createTopics apiKey = 19 + deleteTopics apiKey = 20 + deleteRecords apiKey = 21 + initProducerId apiKey = 22 + offsetForLeaderEpoch apiKey = 23 + addPartitionsToTxn apiKey = 24 + addOffsetsToTxn apiKey = 25 + endTxn apiKey = 26 + writeTxnMarkers apiKey = 27 + txnOffsetCommit apiKey = 28 + describeAcls apiKey = 29 + createAcls apiKey = 30 + deleteAcls apiKey = 31 + describeConfigs apiKey = 32 + alterConfigs apiKey = 33 + alterReplicaLogDirs apiKey = 34 + describeLogDirs apiKey = 35 + saslAuthenticate apiKey = 36 + createPartitions apiKey = 37 + createDelegationToken apiKey = 38 + renewDelegationToken apiKey = 39 + expireDelegationToken apiKey = 40 + describeDelegationToken apiKey = 41 + deleteGroups apiKey = 42 + electLeaders apiKey = 43 + incrementalAlterConfigs apiKey = 44 + alterPartitionReassignments apiKey = 45 + listPartitionReassignments apiKey = 46 + offsetDelete apiKey = 47 ) +func (k apiKey) String() string { + if i := int(k); i >= 0 && i < len(apiKeyStrings) { + return apiKeyStrings[i] + } + return strconv.Itoa(int(k)) +} + type apiVersion int16 const ( @@ -35,11 +106,66 @@ const ( v1 apiVersion = 1 v2 apiVersion = 2 v3 apiVersion = 3 + v4 apiVersion = 4 v5 apiVersion = 5 + v6 apiVersion = 6 v7 apiVersion = 7 + v8 apiVersion = 8 + v9 apiVersion = 9 v10 apiVersion = 10 ) +var apiKeyStrings = [...]string{ + produce: "Produce", + fetch: "Fetch", + listOffsets: "ListOffsets", + metadata: "Metadata", + leaderAndIsr: "LeaderAndIsr", + stopReplica: "StopReplica", + updateMetadata: "UpdateMetadata", + controlledShutdown: "ControlledShutdown", + offsetCommit: "OffsetCommit", + offsetFetch: "OffsetFetch", + findCoordinator: "FindCoordinator", + joinGroup: "JoinGroup", + heartbeat: "Heartbeat", + leaveGroup: "LeaveGroup", + syncGroup: "SyncGroup", + describeGroups: "DescribeGroups", + listGroups: "ListGroups", + saslHandshake: "SaslHandshake", + apiVersions: "ApiVersions", + createTopics: "CreateTopics", + deleteTopics: "DeleteTopics", + deleteRecords: "DeleteRecords", + initProducerId: "InitProducerId", + offsetForLeaderEpoch: "OffsetForLeaderEpoch", + addPartitionsToTxn: "AddPartitionsToTxn", + addOffsetsToTxn: "AddOffsetsToTxn", + endTxn: "EndTxn", + writeTxnMarkers: "WriteTxnMarkers", + txnOffsetCommit: "TxnOffsetCommit", + describeAcls: "DescribeAcls", + createAcls: "CreateAcls", + deleteAcls: "DeleteAcls", + describeConfigs: "DescribeConfigs", + alterConfigs: "AlterConfigs", + alterReplicaLogDirs: "AlterReplicaLogDirs", + describeLogDirs: "DescribeLogDirs", + saslAuthenticate: "SaslAuthenticate", + createPartitions: "CreatePartitions", + createDelegationToken: "CreateDelegationToken", + renewDelegationToken: "RenewDelegationToken", + expireDelegationToken: "ExpireDelegationToken", + describeDelegationToken: "DescribeDelegationToken", + deleteGroups: "DeleteGroups", + electLeaders: "ElectLeaders", + incrementalAlterConfigs: "IncrementalAlfterConfigs", + alterPartitionReassignments: "AlterPartitionReassignments", + listPartitionReassignments: "ListPartitionReassignments", + offsetDelete: "OffsetDelete", +} + type requestHeader struct { Size int32 ApiKey int16 diff --git a/protocol_test.go b/protocol_test.go index f57cc075b..0870c001c 100644 --- a/protocol_test.go +++ b/protocol_test.go @@ -8,6 +8,29 @@ import ( "testing" ) +func TestApiVersionsFormat(t *testing.T) { + for _, test := range []struct { + version ApiVersion + format string + output string + }{ + {version: ApiVersion{1, 2, 5}, format: "%s", output: "Fetch"}, + {version: ApiVersion{1, 2, 5}, format: "%d", output: "1"}, + {version: ApiVersion{1, 2, 5}, format: "%-d", output: "2"}, + {version: ApiVersion{1, 2, 5}, format: "%+d", output: "5"}, + {version: ApiVersion{1, 2, 5}, format: "%v", output: "Fetch[v2:v5]"}, + {version: ApiVersion{1, 2, 5}, format: "%-v", output: "v2"}, + {version: ApiVersion{1, 2, 5}, format: "%+v", output: "v5"}, + {version: ApiVersion{1, 2, 5}, format: "%#v", output: "kafka.ApiVersion{ApiKey:1 MinVersion:2 MaxVersion:5}"}, + } { + t.Run(test.output, func(t *testing.T) { + if s := fmt.Sprintf(test.format, test.version); s != test.output { + t.Error("output mismatch:", s, "!=", test.output) + } + }) + } +} + func TestProtocol(t *testing.T) { t.Parallel() @@ -23,7 +46,7 @@ func TestProtocol(t *testing.T) { requestHeader{ Size: 26, - ApiKey: int16(offsetCommitRequest), + ApiKey: int16(offsetCommit), ApiVersion: int16(v2), CorrelationID: 42, ClientID: "Hello World!", diff --git a/write.go b/write.go index d8296d603..3b806509c 100644 --- a/write.go +++ b/write.go @@ -167,7 +167,7 @@ type writable interface { func (wb *writeBuffer) writeFetchRequestV2(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration) error { h := requestHeader{ - ApiKey: int16(fetchRequest), + ApiKey: int16(fetch), ApiVersion: int16(v2), CorrelationID: correlationID, ClientID: clientID, @@ -203,7 +203,7 @@ func (wb *writeBuffer) writeFetchRequestV2(correlationID int32, clientID, topic func (wb *writeBuffer) writeFetchRequestV5(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error { h := requestHeader{ - ApiKey: int16(fetchRequest), + ApiKey: int16(fetch), ApiVersion: int16(v5), CorrelationID: correlationID, ClientID: clientID, @@ -245,7 +245,7 @@ func (wb *writeBuffer) writeFetchRequestV5(correlationID int32, clientID, topic func (wb *writeBuffer) writeFetchRequestV10(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error { h := requestHeader{ - ApiKey: int16(fetchRequest), + ApiKey: int16(fetch), ApiVersion: int16(v10), CorrelationID: correlationID, ClientID: clientID, @@ -297,7 +297,7 @@ func (wb *writeBuffer) writeFetchRequestV10(correlationID int32, clientID, topic func (wb *writeBuffer) writeListOffsetRequestV1(correlationID int32, clientID, topic string, partition int32, time int64) error { h := requestHeader{ - ApiKey: int16(listOffsetRequest), + ApiKey: int16(listOffsets), ApiVersion: int16(v1), CorrelationID: correlationID, ClientID: clientID, @@ -341,7 +341,7 @@ func (wb *writeBuffer) writeProduceRequestV2(codec CompressionCodec, correlation } h := requestHeader{ - ApiKey: int16(produceRequest), + ApiKey: int16(produce), ApiVersion: int16(v2), CorrelationID: correlationID, ClientID: clientID, @@ -382,7 +382,7 @@ func (wb *writeBuffer) writeProduceRequestV2(codec CompressionCodec, correlation func (wb *writeBuffer) writeProduceRequestV3(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) { h := requestHeader{ - ApiKey: int16(produceRequest), + ApiKey: int16(produce), ApiVersion: int16(v3), CorrelationID: correlationID, ClientID: clientID, @@ -420,7 +420,7 @@ func (wb *writeBuffer) writeProduceRequestV3(correlationID int32, clientID, topi func (wb *writeBuffer) writeProduceRequestV7(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) { h := requestHeader{ - ApiKey: int16(produceRequest), + ApiKey: int16(produce), ApiVersion: int16(v7), CorrelationID: correlationID, ClientID: clientID, diff --git a/write_test.go b/write_test.go index 85ec5e1ec..20f7b2563 100644 --- a/write_test.go +++ b/write_test.go @@ -60,7 +60,7 @@ func testWriteFetchRequestV2(t *testing.T) { const maxWait = 100 * time.Millisecond testWriteOptimization(t, requestHeader{ - ApiKey: int16(fetchRequest), + ApiKey: int16(fetch), ApiVersion: int16(v2), CorrelationID: testCorrelationID, ClientID: testClientID, @@ -88,7 +88,7 @@ func testWriteListOffsetRequestV1(t *testing.T) { const time = -1 testWriteOptimization(t, requestHeader{ - ApiKey: int16(listOffsetRequest), + ApiKey: int16(listOffsets), ApiVersion: int16(v1), CorrelationID: testCorrelationID, ClientID: testClientID, @@ -130,7 +130,7 @@ func testWriteProduceRequestV2(t *testing.T) { const timeout = 100 testWriteOptimization(t, requestHeader{ - ApiKey: int16(produceRequest), + ApiKey: int16(produce), ApiVersion: int16(v2), CorrelationID: testCorrelationID, ClientID: testClientID,