From 1d98b48da2c3b204cb0a0304491ef63ced827449 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Sun, 23 Jan 2022 12:40:27 -0500 Subject: [PATCH 1/2] Ready state changes require a bump to the message to re-sequence it Signed-off-by: Peter Broadhurst --- internal/database/sqlcommon/message_sql.go | 28 ++++++++++++ .../database/sqlcommon/message_sql_test.go | 45 ++++++++++++++++++- internal/events/tokens_transferred.go | 2 +- internal/events/tokens_transferred_test.go | 5 +-- mocks/databasemocks/plugin.go | 14 ++++++ pkg/database/plugin.go | 4 ++ 6 files changed, 92 insertions(+), 6 deletions(-) diff --git a/internal/database/sqlcommon/message_sql.go b/internal/database/sqlcommon/message_sql.go index 2c62326d6..fcfedf73b 100644 --- a/internal/database/sqlcommon/message_sql.go +++ b/internal/database/sqlcommon/message_sql.go @@ -183,6 +183,34 @@ func (s *SQLCommon) UpsertMessage(ctx context.Context, message *fftypes.Message, return s.commitTx(ctx, tx, autoCommit) } +// In SQL update+bump is a delete+insert within a TX +func (s *SQLCommon) UpdateAndBumpMessage(ctx context.Context, message *fftypes.Message) (err error) { + ctx, tx, autoCommit, err := s.beginOrUseTx(ctx) + if err != nil { + return err + } + defer s.rollbackTx(ctx, tx, autoCommit) + + if err := s.deleteTx(ctx, tx, + sq.Delete("messages"). + Where(sq.And{ + sq.Eq{"id": message.Header.ID}, + }), + nil, // no change event + ); err != nil { + return err + } + + if err = s.attemptMessageInsert(ctx, tx, message); err != nil { + return err + } + + // Note there is no call to updateMessageDataRefs as the data refs are not allowed to change, + // and are correlated by UUID (not sequence) + + return s.commitTx(ctx, tx, autoCommit) +} + func (s *SQLCommon) updateMessageDataRefs(ctx context.Context, tx *txWrapper, message *fftypes.Message, recreateDatarefs bool) error { if recreateDatarefs { diff --git a/internal/database/sqlcommon/message_sql_test.go b/internal/database/sqlcommon/message_sql_test.go index af258ef29..df8ea2b9f 100644 --- a/internal/database/sqlcommon/message_sql_test.go +++ b/internal/database/sqlcommon/message_sql_test.go @@ -61,7 +61,7 @@ func TestUpsertE2EWithDB(t *testing.T) { TxType: fftypes.TransactionTypeNone, }, Hash: fftypes.NewRandB32(), - State: fftypes.MessageStateReady, + State: fftypes.MessageStateStaged, Confirmed: nil, Data: []*fftypes.DataRef{ {ID: dataID1, Hash: rand1}, @@ -69,7 +69,7 @@ func TestUpsertE2EWithDB(t *testing.T) { }, } - s.callbacks.On("OrderedUUIDCollectionNSEvent", database.CollectionMessages, fftypes.ChangeEventTypeCreated, "ns12345", msgID, mock.Anything).Return() + s.callbacks.On("OrderedUUIDCollectionNSEvent", database.CollectionMessages, fftypes.ChangeEventTypeCreated, "ns12345", msgID, mock.Anything).Return().Twice() s.callbacks.On("OrderedUUIDCollectionNSEvent", database.CollectionMessages, fftypes.ChangeEventTypeUpdated, "ns12345", msgID, mock.Anything).Return() err := s.UpsertMessage(ctx, msg, database.UpsertOptimizationNew) @@ -205,6 +205,15 @@ func TestUpsertE2EWithDB(t *testing.T) { assert.Equal(t, 1, len(msgs)) assert.Equal(t, *bid2, *msgs[0].BatchID) + // Bump and Update - this is for a ready transition + msgUpdated.State = fftypes.MessageStateReady + err = s.UpdateAndBumpMessage(context.Background(), msgUpdated) + assert.NoError(t, err) + msgRead, err = s.GetMessageByID(ctx, msgUpdated.Header.ID) + msgJson, _ = json.Marshal(&msgUpdated) + msgReadJson, _ = json.Marshal(msgRead) + assert.Equal(t, string(msgJson), string(msgReadJson)) + s.callbacks.AssertExpectations(t) } @@ -276,6 +285,38 @@ func TestUpsertMessageFailCommit(t *testing.T) { assert.NoError(t, mock.ExpectationsWereMet()) } +func TestUpdateAndBumpMessageFailBegin(t *testing.T) { + s, mock := newMockProvider().init() + mock.ExpectBegin().WillReturnError(fmt.Errorf("pop")) + msgID := fftypes.NewUUID() + err := s.UpdateAndBumpMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}}) + assert.Regexp(t, "FF10114", err) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestUpdateAndBumpMessageFailDelete(t *testing.T) { + s, mock := newMockProvider().init() + mock.ExpectBegin() + mock.ExpectExec("DELETE .*").WillReturnError(fmt.Errorf("pop")) + mock.ExpectRollback() + msgID := fftypes.NewUUID() + err := s.UpdateAndBumpMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}}) + assert.Regexp(t, "FF10118", err) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestUpdateAndBumpMessageFailInsert(t *testing.T) { + s, mock := newMockProvider().init() + mock.ExpectBegin() + mock.ExpectExec("DELETE .*").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("INSERT .*").WillReturnError(fmt.Errorf("pop")) + mock.ExpectRollback() + msgID := fftypes.NewUUID() + err := s.UpdateAndBumpMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}}) + assert.Regexp(t, "FF10116", err) + assert.NoError(t, mock.ExpectationsWereMet()) +} + func TestUpdateMessageDataRefsNilID(t *testing.T) { s, mock := newMockProvider().init() msgID := fftypes.NewUUID() diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index d8fe87fda..4bd08b1bc 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -128,7 +128,7 @@ func (em *eventManager) TokensTransferred(ti tokens.Plugin, poolProtocolID strin if msg.State == fftypes.MessageStateStaged { // Message can now be sent msg.State = fftypes.MessageStateReady - if err := em.database.UpsertMessage(ctx, msg, database.UpsertOptimizationExisting); err != nil { + if err := em.database.UpdateAndBumpMessage(ctx, msg); err != nil { return err } } else { diff --git a/internal/events/tokens_transferred_test.go b/internal/events/tokens_transferred_test.go index a7e71913e..2b1b42ee9 100644 --- a/internal/events/tokens_transferred_test.go +++ b/internal/events/tokens_transferred_test.go @@ -371,10 +371,9 @@ func TestTokensTransferredWithMessageSend(t *testing.T) { mdi.On("UpsertTokenTransfer", em.ctx, transfer).Return(nil).Times(2) mdi.On("UpdateTokenBalances", em.ctx, transfer).Return(nil).Times(2) mdi.On("GetMessageByID", em.ctx, mock.Anything).Return(message, nil).Times(2) - mdi.On("UpsertMessage", em.ctx, mock.Anything, database.UpsertOptimizationExisting).Return(fmt.Errorf("pop")) - mdi.On("UpsertMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool { + mdi.On("UpdateAndBumpMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool { return msg.State == fftypes.MessageStateReady - }), database.UpsertOptimizationExisting).Return(nil) + })).Return(fmt.Errorf("pop")) mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { return ev.Type == fftypes.EventTypeTransferConfirmed && ev.Reference == transfer.LocalID && ev.Namespace == pool.Namespace })).Return(nil).Once() diff --git a/mocks/databasemocks/plugin.go b/mocks/databasemocks/plugin.go index 120eef897..a1e263e43 100644 --- a/mocks/databasemocks/plugin.go +++ b/mocks/databasemocks/plugin.go @@ -1786,6 +1786,20 @@ func (_m *Plugin) SetPinDispatched(ctx context.Context, sequence int64) error { return r0 } +// UpdateAndBumpMessage provides a mock function with given fields: ctx, message +func (_m *Plugin) UpdateAndBumpMessage(ctx context.Context, message *fftypes.Message) error { + ret := _m.Called(ctx, message) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *fftypes.Message) error); ok { + r0 = rf(ctx, message) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // UpdateBatch provides a mock function with given fields: ctx, id, update func (_m *Plugin) UpdateBatch(ctx context.Context, id *fftypes.UUID, update database.Update) error { ret := _m.Called(ctx, id, update) diff --git a/pkg/database/plugin.go b/pkg/database/plugin.go index 56a1779ae..7d2e3e68d 100644 --- a/pkg/database/plugin.go +++ b/pkg/database/plugin.go @@ -80,6 +80,10 @@ type iMessageCollection interface { // UpdateMessage - Update message UpdateMessage(ctx context.Context, id *fftypes.UUID, update Update) (err error) + // UpdateAndBumpMessage updates the message, and assigns it a new sequence number at the front of the list. + // A new event is raised for the message, with the new sequence number - as if it was brand new. + UpdateAndBumpMessage(ctx context.Context, message *fftypes.Message) (err error) + // UpdateMessages - Update messages UpdateMessages(ctx context.Context, filter Filter, update Update) (err error) From eab45d8346961cfef750ee7dd01c3011d4deaa76 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Sun, 23 Jan 2022 16:26:51 -0500 Subject: [PATCH 2/2] Improved naming from review Signed-off-by: Peter Broadhurst --- internal/database/sqlcommon/message_sql.go | 2 +- .../database/sqlcommon/message_sql_test.go | 14 +++++----- internal/events/tokens_transferred.go | 2 +- internal/events/tokens_transferred_test.go | 2 +- mocks/databasemocks/plugin.go | 28 +++++++++---------- pkg/database/plugin.go | 4 +-- 6 files changed, 26 insertions(+), 26 deletions(-) diff --git a/internal/database/sqlcommon/message_sql.go b/internal/database/sqlcommon/message_sql.go index fcfedf73b..ce7ab1f46 100644 --- a/internal/database/sqlcommon/message_sql.go +++ b/internal/database/sqlcommon/message_sql.go @@ -184,7 +184,7 @@ func (s *SQLCommon) UpsertMessage(ctx context.Context, message *fftypes.Message, } // In SQL update+bump is a delete+insert within a TX -func (s *SQLCommon) UpdateAndBumpMessage(ctx context.Context, message *fftypes.Message) (err error) { +func (s *SQLCommon) ReplaceMessage(ctx context.Context, message *fftypes.Message) (err error) { ctx, tx, autoCommit, err := s.beginOrUseTx(ctx) if err != nil { return err diff --git a/internal/database/sqlcommon/message_sql_test.go b/internal/database/sqlcommon/message_sql_test.go index df8ea2b9f..bac9284dc 100644 --- a/internal/database/sqlcommon/message_sql_test.go +++ b/internal/database/sqlcommon/message_sql_test.go @@ -207,7 +207,7 @@ func TestUpsertE2EWithDB(t *testing.T) { // Bump and Update - this is for a ready transition msgUpdated.State = fftypes.MessageStateReady - err = s.UpdateAndBumpMessage(context.Background(), msgUpdated) + err = s.ReplaceMessage(context.Background(), msgUpdated) assert.NoError(t, err) msgRead, err = s.GetMessageByID(ctx, msgUpdated.Header.ID) msgJson, _ = json.Marshal(&msgUpdated) @@ -285,34 +285,34 @@ func TestUpsertMessageFailCommit(t *testing.T) { assert.NoError(t, mock.ExpectationsWereMet()) } -func TestUpdateAndBumpMessageFailBegin(t *testing.T) { +func TestReplaceMessageFailBegin(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectBegin().WillReturnError(fmt.Errorf("pop")) msgID := fftypes.NewUUID() - err := s.UpdateAndBumpMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}}) + err := s.ReplaceMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}}) assert.Regexp(t, "FF10114", err) assert.NoError(t, mock.ExpectationsWereMet()) } -func TestUpdateAndBumpMessageFailDelete(t *testing.T) { +func TestReplaceMessageFailDelete(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectBegin() mock.ExpectExec("DELETE .*").WillReturnError(fmt.Errorf("pop")) mock.ExpectRollback() msgID := fftypes.NewUUID() - err := s.UpdateAndBumpMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}}) + err := s.ReplaceMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}}) assert.Regexp(t, "FF10118", err) assert.NoError(t, mock.ExpectationsWereMet()) } -func TestUpdateAndBumpMessageFailInsert(t *testing.T) { +func TestReplaceMessageFailInsert(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectBegin() mock.ExpectExec("DELETE .*").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec("INSERT .*").WillReturnError(fmt.Errorf("pop")) mock.ExpectRollback() msgID := fftypes.NewUUID() - err := s.UpdateAndBumpMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}}) + err := s.ReplaceMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}}) assert.Regexp(t, "FF10116", err) assert.NoError(t, mock.ExpectationsWereMet()) } diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index 4bd08b1bc..e31b09bba 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -128,7 +128,7 @@ func (em *eventManager) TokensTransferred(ti tokens.Plugin, poolProtocolID strin if msg.State == fftypes.MessageStateStaged { // Message can now be sent msg.State = fftypes.MessageStateReady - if err := em.database.UpdateAndBumpMessage(ctx, msg); err != nil { + if err := em.database.ReplaceMessage(ctx, msg); err != nil { return err } } else { diff --git a/internal/events/tokens_transferred_test.go b/internal/events/tokens_transferred_test.go index 2b1b42ee9..b528a20a8 100644 --- a/internal/events/tokens_transferred_test.go +++ b/internal/events/tokens_transferred_test.go @@ -371,7 +371,7 @@ func TestTokensTransferredWithMessageSend(t *testing.T) { mdi.On("UpsertTokenTransfer", em.ctx, transfer).Return(nil).Times(2) mdi.On("UpdateTokenBalances", em.ctx, transfer).Return(nil).Times(2) mdi.On("GetMessageByID", em.ctx, mock.Anything).Return(message, nil).Times(2) - mdi.On("UpdateAndBumpMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool { + mdi.On("ReplaceMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool { return msg.State == fftypes.MessageStateReady })).Return(fmt.Errorf("pop")) mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { diff --git a/mocks/databasemocks/plugin.go b/mocks/databasemocks/plugin.go index a1e263e43..9db954b5b 100644 --- a/mocks/databasemocks/plugin.go +++ b/mocks/databasemocks/plugin.go @@ -1758,6 +1758,20 @@ func (_m *Plugin) Name() string { return r0 } +// ReplaceMessage provides a mock function with given fields: ctx, message +func (_m *Plugin) ReplaceMessage(ctx context.Context, message *fftypes.Message) error { + ret := _m.Called(ctx, message) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *fftypes.Message) error); ok { + r0 = rf(ctx, message) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // RunAsGroup provides a mock function with given fields: ctx, fn func (_m *Plugin) RunAsGroup(ctx context.Context, fn func(context.Context) error) error { ret := _m.Called(ctx, fn) @@ -1786,20 +1800,6 @@ func (_m *Plugin) SetPinDispatched(ctx context.Context, sequence int64) error { return r0 } -// UpdateAndBumpMessage provides a mock function with given fields: ctx, message -func (_m *Plugin) UpdateAndBumpMessage(ctx context.Context, message *fftypes.Message) error { - ret := _m.Called(ctx, message) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *fftypes.Message) error); ok { - r0 = rf(ctx, message) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // UpdateBatch provides a mock function with given fields: ctx, id, update func (_m *Plugin) UpdateBatch(ctx context.Context, id *fftypes.UUID, update database.Update) error { ret := _m.Called(ctx, id, update) diff --git a/pkg/database/plugin.go b/pkg/database/plugin.go index 7d2e3e68d..a89d8fbd1 100644 --- a/pkg/database/plugin.go +++ b/pkg/database/plugin.go @@ -80,9 +80,9 @@ type iMessageCollection interface { // UpdateMessage - Update message UpdateMessage(ctx context.Context, id *fftypes.UUID, update Update) (err error) - // UpdateAndBumpMessage updates the message, and assigns it a new sequence number at the front of the list. + // ReplaceMessage updates the message, and assigns it a new sequence number at the front of the list. // A new event is raised for the message, with the new sequence number - as if it was brand new. - UpdateAndBumpMessage(ctx context.Context, message *fftypes.Message) (err error) + ReplaceMessage(ctx context.Context, message *fftypes.Message) (err error) // UpdateMessages - Update messages UpdateMessages(ctx context.Context, filter Filter, update Update) (err error)