Skip to content

Commit

Permalink
Merge pull request hyperledger#439 from kaleido-io/backport-438
Browse files Browse the repository at this point in the history
v0.11.x backport: Ready state changes require a bump to the message to re-sequence it
  • Loading branch information
peterbroadhurst authored Jan 24, 2022
2 parents b58663e + eab45d8 commit fc28d33
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 6 deletions.
28 changes: 28 additions & 0 deletions internal/database/sqlcommon/message_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,34 @@ func (s *SQLCommon) UpsertMessage(ctx context.Context, message *fftypes.Message,
return s.commitTx(ctx, tx, autoCommit)
}

// In SQL update+bump is a delete+insert within a TX
func (s *SQLCommon) ReplaceMessage(ctx context.Context, message *fftypes.Message) (err error) {
ctx, tx, autoCommit, err := s.beginOrUseTx(ctx)
if err != nil {
return err
}
defer s.rollbackTx(ctx, tx, autoCommit)

if err := s.deleteTx(ctx, tx,
sq.Delete("messages").
Where(sq.And{
sq.Eq{"id": message.Header.ID},
}),
nil, // no change event
); err != nil {
return err
}

if err = s.attemptMessageInsert(ctx, tx, message); err != nil {
return err
}

// Note there is no call to updateMessageDataRefs as the data refs are not allowed to change,
// and are correlated by UUID (not sequence)

return s.commitTx(ctx, tx, autoCommit)
}

func (s *SQLCommon) updateMessageDataRefs(ctx context.Context, tx *txWrapper, message *fftypes.Message, recreateDatarefs bool) error {

if recreateDatarefs {
Expand Down
45 changes: 43 additions & 2 deletions internal/database/sqlcommon/message_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ func TestUpsertE2EWithDB(t *testing.T) {
TxType: fftypes.TransactionTypeNone,
},
Hash: fftypes.NewRandB32(),
State: fftypes.MessageStateReady,
State: fftypes.MessageStateStaged,
Confirmed: nil,
Data: []*fftypes.DataRef{
{ID: dataID1, Hash: rand1},
{ID: dataID2, Hash: rand2},
},
}

s.callbacks.On("OrderedUUIDCollectionNSEvent", database.CollectionMessages, fftypes.ChangeEventTypeCreated, "ns12345", msgID, mock.Anything).Return()
s.callbacks.On("OrderedUUIDCollectionNSEvent", database.CollectionMessages, fftypes.ChangeEventTypeCreated, "ns12345", msgID, mock.Anything).Return().Twice()
s.callbacks.On("OrderedUUIDCollectionNSEvent", database.CollectionMessages, fftypes.ChangeEventTypeUpdated, "ns12345", msgID, mock.Anything).Return()

err := s.UpsertMessage(ctx, msg, database.UpsertOptimizationNew)
Expand Down Expand Up @@ -205,6 +205,15 @@ func TestUpsertE2EWithDB(t *testing.T) {
assert.Equal(t, 1, len(msgs))
assert.Equal(t, *bid2, *msgs[0].BatchID)

// Bump and Update - this is for a ready transition
msgUpdated.State = fftypes.MessageStateReady
err = s.ReplaceMessage(context.Background(), msgUpdated)
assert.NoError(t, err)
msgRead, err = s.GetMessageByID(ctx, msgUpdated.Header.ID)
msgJson, _ = json.Marshal(&msgUpdated)
msgReadJson, _ = json.Marshal(msgRead)
assert.Equal(t, string(msgJson), string(msgReadJson))

s.callbacks.AssertExpectations(t)
}

Expand Down Expand Up @@ -276,6 +285,38 @@ func TestUpsertMessageFailCommit(t *testing.T) {
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestReplaceMessageFailBegin(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin().WillReturnError(fmt.Errorf("pop"))
msgID := fftypes.NewUUID()
err := s.ReplaceMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}})
assert.Regexp(t, "FF10114", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestReplaceMessageFailDelete(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin()
mock.ExpectExec("DELETE .*").WillReturnError(fmt.Errorf("pop"))
mock.ExpectRollback()
msgID := fftypes.NewUUID()
err := s.ReplaceMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}})
assert.Regexp(t, "FF10118", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestReplaceMessageFailInsert(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin()
mock.ExpectExec("DELETE .*").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("INSERT .*").WillReturnError(fmt.Errorf("pop"))
mock.ExpectRollback()
msgID := fftypes.NewUUID()
err := s.ReplaceMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}})
assert.Regexp(t, "FF10116", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestUpdateMessageDataRefsNilID(t *testing.T) {
s, mock := newMockProvider().init()
msgID := fftypes.NewUUID()
Expand Down
2 changes: 1 addition & 1 deletion internal/events/tokens_transferred.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (em *eventManager) TokensTransferred(ti tokens.Plugin, poolProtocolID strin
if msg.State == fftypes.MessageStateStaged {
// Message can now be sent
msg.State = fftypes.MessageStateReady
if err := em.database.UpsertMessage(ctx, msg, database.UpsertOptimizationExisting); err != nil {
if err := em.database.ReplaceMessage(ctx, msg); err != nil {
return err
}
} else {
Expand Down
5 changes: 2 additions & 3 deletions internal/events/tokens_transferred_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,9 @@ func TestTokensTransferredWithMessageSend(t *testing.T) {
mdi.On("UpsertTokenTransfer", em.ctx, transfer).Return(nil).Times(2)
mdi.On("UpdateTokenBalances", em.ctx, transfer).Return(nil).Times(2)
mdi.On("GetMessageByID", em.ctx, mock.Anything).Return(message, nil).Times(2)
mdi.On("UpsertMessage", em.ctx, mock.Anything, database.UpsertOptimizationExisting).Return(fmt.Errorf("pop"))
mdi.On("UpsertMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool {
mdi.On("ReplaceMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool {
return msg.State == fftypes.MessageStateReady
}), database.UpsertOptimizationExisting).Return(nil)
})).Return(fmt.Errorf("pop"))
mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool {
return ev.Type == fftypes.EventTypeTransferConfirmed && ev.Reference == transfer.LocalID && ev.Namespace == pool.Namespace
})).Return(nil).Once()
Expand Down
14 changes: 14 additions & 0 deletions mocks/databasemocks/plugin.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/database/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ type iMessageCollection interface {
// UpdateMessage - Update message
UpdateMessage(ctx context.Context, id *fftypes.UUID, update Update) (err error)

// ReplaceMessage updates the message, and assigns it a new sequence number at the front of the list.
// A new event is raised for the message, with the new sequence number - as if it was brand new.
ReplaceMessage(ctx context.Context, message *fftypes.Message) (err error)

// UpdateMessages - Update messages
UpdateMessages(ctx context.Context, filter Filter, update Update) (err error)

Expand Down

0 comments on commit fc28d33

Please sign in to comment.