diff --git a/internal/database/sqlcommon/message_sql.go b/internal/database/sqlcommon/message_sql.go index 2c62326d67..ce7ab1f46d 100644 --- a/internal/database/sqlcommon/message_sql.go +++ b/internal/database/sqlcommon/message_sql.go @@ -183,6 +183,34 @@ func (s *SQLCommon) UpsertMessage(ctx context.Context, message *fftypes.Message, return s.commitTx(ctx, tx, autoCommit) } +// In SQL update+bump is a delete+insert within a TX +func (s *SQLCommon) ReplaceMessage(ctx context.Context, message *fftypes.Message) (err error) { + ctx, tx, autoCommit, err := s.beginOrUseTx(ctx) + if err != nil { + return err + } + defer s.rollbackTx(ctx, tx, autoCommit) + + if err := s.deleteTx(ctx, tx, + sq.Delete("messages"). + Where(sq.And{ + sq.Eq{"id": message.Header.ID}, + }), + nil, // no change event + ); err != nil { + return err + } + + if err = s.attemptMessageInsert(ctx, tx, message); err != nil { + return err + } + + // Note there is no call to updateMessageDataRefs as the data refs are not allowed to change, + // and are correlated by UUID (not sequence) + + return s.commitTx(ctx, tx, autoCommit) +} + func (s *SQLCommon) updateMessageDataRefs(ctx context.Context, tx *txWrapper, message *fftypes.Message, recreateDatarefs bool) error { if recreateDatarefs { diff --git a/internal/database/sqlcommon/message_sql_test.go b/internal/database/sqlcommon/message_sql_test.go index af258ef29b..bac9284dc9 100644 --- a/internal/database/sqlcommon/message_sql_test.go +++ b/internal/database/sqlcommon/message_sql_test.go @@ -61,7 +61,7 @@ func TestUpsertE2EWithDB(t *testing.T) { TxType: fftypes.TransactionTypeNone, }, Hash: fftypes.NewRandB32(), - State: fftypes.MessageStateReady, + State: fftypes.MessageStateStaged, Confirmed: nil, Data: []*fftypes.DataRef{ {ID: dataID1, Hash: rand1}, @@ -69,7 +69,7 @@ func TestUpsertE2EWithDB(t *testing.T) { }, } - s.callbacks.On("OrderedUUIDCollectionNSEvent", database.CollectionMessages, fftypes.ChangeEventTypeCreated, "ns12345", msgID, mock.Anything).Return() + s.callbacks.On("OrderedUUIDCollectionNSEvent", database.CollectionMessages, fftypes.ChangeEventTypeCreated, "ns12345", msgID, mock.Anything).Return().Twice() s.callbacks.On("OrderedUUIDCollectionNSEvent", database.CollectionMessages, fftypes.ChangeEventTypeUpdated, "ns12345", msgID, mock.Anything).Return() err := s.UpsertMessage(ctx, msg, database.UpsertOptimizationNew) @@ -205,6 +205,15 @@ func TestUpsertE2EWithDB(t *testing.T) { assert.Equal(t, 1, len(msgs)) assert.Equal(t, *bid2, *msgs[0].BatchID) + // Bump and Update - this is for a ready transition + msgUpdated.State = fftypes.MessageStateReady + err = s.ReplaceMessage(context.Background(), msgUpdated) + assert.NoError(t, err) + msgRead, err = s.GetMessageByID(ctx, msgUpdated.Header.ID) + msgJson, _ = json.Marshal(&msgUpdated) + msgReadJson, _ = json.Marshal(msgRead) + assert.Equal(t, string(msgJson), string(msgReadJson)) + s.callbacks.AssertExpectations(t) } @@ -276,6 +285,38 @@ func TestUpsertMessageFailCommit(t *testing.T) { assert.NoError(t, mock.ExpectationsWereMet()) } +func TestReplaceMessageFailBegin(t *testing.T) { + s, mock := newMockProvider().init() + mock.ExpectBegin().WillReturnError(fmt.Errorf("pop")) + msgID := fftypes.NewUUID() + err := s.ReplaceMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}}) + assert.Regexp(t, "FF10114", err) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestReplaceMessageFailDelete(t *testing.T) { + s, mock := newMockProvider().init() + mock.ExpectBegin() + mock.ExpectExec("DELETE .*").WillReturnError(fmt.Errorf("pop")) + mock.ExpectRollback() + msgID := fftypes.NewUUID() + err := s.ReplaceMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}}) + assert.Regexp(t, "FF10118", err) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestReplaceMessageFailInsert(t *testing.T) { + s, mock := newMockProvider().init() + mock.ExpectBegin() + mock.ExpectExec("DELETE .*").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("INSERT .*").WillReturnError(fmt.Errorf("pop")) + mock.ExpectRollback() + msgID := fftypes.NewUUID() + err := s.ReplaceMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}}) + assert.Regexp(t, "FF10116", err) + assert.NoError(t, mock.ExpectationsWereMet()) +} + func TestUpdateMessageDataRefsNilID(t *testing.T) { s, mock := newMockProvider().init() msgID := fftypes.NewUUID() diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index d8fe87fdae..e31b09bba6 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -128,7 +128,7 @@ func (em *eventManager) TokensTransferred(ti tokens.Plugin, poolProtocolID strin if msg.State == fftypes.MessageStateStaged { // Message can now be sent msg.State = fftypes.MessageStateReady - if err := em.database.UpsertMessage(ctx, msg, database.UpsertOptimizationExisting); err != nil { + if err := em.database.ReplaceMessage(ctx, msg); err != nil { return err } } else { diff --git a/internal/events/tokens_transferred_test.go b/internal/events/tokens_transferred_test.go index a7e71913ef..b528a20a8b 100644 --- a/internal/events/tokens_transferred_test.go +++ b/internal/events/tokens_transferred_test.go @@ -371,10 +371,9 @@ func TestTokensTransferredWithMessageSend(t *testing.T) { mdi.On("UpsertTokenTransfer", em.ctx, transfer).Return(nil).Times(2) mdi.On("UpdateTokenBalances", em.ctx, transfer).Return(nil).Times(2) mdi.On("GetMessageByID", em.ctx, mock.Anything).Return(message, nil).Times(2) - mdi.On("UpsertMessage", em.ctx, mock.Anything, database.UpsertOptimizationExisting).Return(fmt.Errorf("pop")) - mdi.On("UpsertMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool { + mdi.On("ReplaceMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool { return msg.State == fftypes.MessageStateReady - }), database.UpsertOptimizationExisting).Return(nil) + })).Return(fmt.Errorf("pop")) mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { return ev.Type == fftypes.EventTypeTransferConfirmed && ev.Reference == transfer.LocalID && ev.Namespace == pool.Namespace })).Return(nil).Once() diff --git a/mocks/databasemocks/plugin.go b/mocks/databasemocks/plugin.go index 120eef8975..9db954b5be 100644 --- a/mocks/databasemocks/plugin.go +++ b/mocks/databasemocks/plugin.go @@ -1758,6 +1758,20 @@ func (_m *Plugin) Name() string { return r0 } +// ReplaceMessage provides a mock function with given fields: ctx, message +func (_m *Plugin) ReplaceMessage(ctx context.Context, message *fftypes.Message) error { + ret := _m.Called(ctx, message) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *fftypes.Message) error); ok { + r0 = rf(ctx, message) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // RunAsGroup provides a mock function with given fields: ctx, fn func (_m *Plugin) RunAsGroup(ctx context.Context, fn func(context.Context) error) error { ret := _m.Called(ctx, fn) diff --git a/pkg/database/plugin.go b/pkg/database/plugin.go index 56a1779ae8..a89d8fbd1e 100644 --- a/pkg/database/plugin.go +++ b/pkg/database/plugin.go @@ -80,6 +80,10 @@ type iMessageCollection interface { // UpdateMessage - Update message UpdateMessage(ctx context.Context, id *fftypes.UUID, update Update) (err error) + // ReplaceMessage updates the message, and assigns it a new sequence number at the front of the list. + // A new event is raised for the message, with the new sequence number - as if it was brand new. + ReplaceMessage(ctx context.Context, message *fftypes.Message) (err error) + // UpdateMessages - Update messages UpdateMessages(ctx context.Context, filter Filter, update Update) (err error)