Skip to content

Commit

Permalink
Add Transactionalmessage interface for request routing (segmentio#744)
Browse files Browse the repository at this point in the history
* Add TransactionalMessage interface.

Add TransactionMessage interface so the Transport
automatically routes messages to Transaction Coordinators
if required.
  • Loading branch information
rhansen2 authored Sep 20, 2021
1 parent bedcf4a commit 392c450
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 0 deletions.
4 changes: 4 additions & 0 deletions protocol/addoffsetstotxn/addoffsetstotxn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ type Request struct {

func (r *Request) ApiKey() protocol.ApiKey { return protocol.AddOffsetsToTxn }

func (r *Request) Transaction() string { return r.TransactionalID }

var _ protocol.TransactionalMessage = (*Request)(nil)

type Response struct {
// We need at least one tagged field to indicate that this is a "flexible" message
// type.
Expand Down
4 changes: 4 additions & 0 deletions protocol/addpartitionstotxn/addpartitionstotxn.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type RequestTopic struct {

func (r *Request) ApiKey() protocol.ApiKey { return protocol.AddPartitionsToTxn }

func (r *Request) Transaction() string { return r.TransactionalID }

var _ protocol.TransactionalMessage = (*Request)(nil)

type Response struct {
// We need at least one tagged field to indicate that this is a "flexible" message
// type.
Expand Down
4 changes: 4 additions & 0 deletions protocol/endtxn/endtxn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ type Request struct {

func (r *Request) ApiKey() protocol.ApiKey { return protocol.EndTxn }

func (r *Request) Transaction() string { return r.TransactionalID }

var _ protocol.TransactionalMessage = (*Request)(nil)

type Response struct {
// We need at least one tagged field to indicate that this is a "flexible" message
// type.
Expand Down
4 changes: 4 additions & 0 deletions protocol/initproducerid/initproducerid.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ type Request struct {

func (r *Request) ApiKey() protocol.ApiKey { return protocol.InitProducerId }

func (r *Request) Transaction() string { return r.TransactionalID }

var _ protocol.TransactionalMessage = (*Request)(nil)

type Response struct {
// We need at least one tagged field to indicate that this is a "flexible" message
// type.
Expand Down
8 changes: 8 additions & 0 deletions protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,14 @@ type GroupMessage interface {
Group() string
}

// TransactionalMessage is an extension of the Message interface implemented by some
// request types to inform the program that they should be reouted to a transaction
// coordinator.
type TransactionalMessage interface {
// Returns the transactional id configured on the message.
Transaction() string
}

// PreparedMessage is an extension of the Message interface implemented by some
// request types which may need to run some pre-processing on their state before
// being sent.
Expand Down
4 changes: 4 additions & 0 deletions protocol/txnoffsetcommit/txnoffsetcommit.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ type RequestPartition struct {

func (r *Request) ApiKey() protocol.ApiKey { return protocol.TxnOffsetCommit }

func (r *Request) Group() string { return r.GroupID }

var _ protocol.GroupMessage = (*Request)(nil)

type Response struct {
// We need at least one tagged field to indicate that this is a "flexible" message
// type.
Expand Down
10 changes: 10 additions & 0 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,16 @@ func (p *connPool) sendRequest(ctx context.Context, req Request, state connPoolS
return reject(err)
}
brokerID = r.(*findcoordinator.Response).NodeID
case protocol.TransactionalMessage:
p := p.sendRequest(ctx, &findcoordinator.Request{
Key: m.Transaction(),
KeyType: int8(CoordinatorKeyTypeTransaction),
}, state)
r, err := p.await(ctx)
if err != nil {
return reject(err)
}
brokerID = r.(*findcoordinator.Response).NodeID
}

var c *conn
Expand Down

0 comments on commit 392c450

Please sign in to comment.