Skip to content

Commit

Permalink
Move DB logic to firefly-common impl
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
  • Loading branch information
peterbroadhurst committed Dec 5, 2022
1 parent 057a7af commit a570149
Show file tree
Hide file tree
Showing 96 changed files with 1,402 additions and 4,487 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/golang-migrate/migrate/v4 v4.15.2
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/hyperledger/firefly-common v1.1.4
github.com/hyperledger/firefly-common v1.1.5-0.20221205013211-db8cd88a141f
github.com/hyperledger/firefly-signer v1.1.2
github.com/jarcoal/httpmock v1.2.0
github.com/karlseguin/ccache v2.0.3+incompatible
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,18 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/hyperledger/firefly-common v1.1.4 h1:7Oqe7FFOSjt8Uo3/i/ujD4wke2kD2Xr2Kouq4QmgWns=
github.com/hyperledger/firefly-common v1.1.4/go.mod h1:taWRM7vsramcM7iWjeOmIgm3yd+RoWvaXSWxZKpSXM8=
github.com/hyperledger/firefly-common v1.1.5-0.20221203042833-800b9fd4d10a h1:QQj909iAkIYl+eT7hJSzj2+ICo84i3coBc3JrY7Mylg=
github.com/hyperledger/firefly-common v1.1.5-0.20221203042833-800b9fd4d10a/go.mod h1:3ubN46/dB+xurCPvdfqMKjB/CJU3I/DsfOoS7dY2SyQ=
github.com/hyperledger/firefly-common v1.1.5-0.20221205003343-e0ddbf5dc33d h1:VuGCjaBtRvhUg/m9Sl5T1jvIpU6QsugBg6sNucugjGU=
github.com/hyperledger/firefly-common v1.1.5-0.20221205003343-e0ddbf5dc33d/go.mod h1:3ubN46/dB+xurCPvdfqMKjB/CJU3I/DsfOoS7dY2SyQ=
github.com/hyperledger/firefly-common v1.1.5-0.20221205003709-9aa8a5f5bd55 h1:CPWQEodCUg9rSWUj9RTeY6b86SMqo0aaI2mve/zPzAw=
github.com/hyperledger/firefly-common v1.1.5-0.20221205003709-9aa8a5f5bd55/go.mod h1:3ubN46/dB+xurCPvdfqMKjB/CJU3I/DsfOoS7dY2SyQ=
github.com/hyperledger/firefly-common v1.1.5-0.20221205005833-d82dd5500c51 h1:8ZXqf9MvInNdCMjSlXlMUT379VuKyc1dH1t9facqGaA=
github.com/hyperledger/firefly-common v1.1.5-0.20221205005833-d82dd5500c51/go.mod h1:3ubN46/dB+xurCPvdfqMKjB/CJU3I/DsfOoS7dY2SyQ=
github.com/hyperledger/firefly-common v1.1.5-0.20221205010550-085676278a41 h1:Tbz2Fbyq4fTk5rNXwiHk2W7wqZt1lrf/8wV4KGou2/w=
github.com/hyperledger/firefly-common v1.1.5-0.20221205010550-085676278a41/go.mod h1:3ubN46/dB+xurCPvdfqMKjB/CJU3I/DsfOoS7dY2SyQ=
github.com/hyperledger/firefly-common v1.1.5-0.20221205013211-db8cd88a141f h1:EfkvaVkVldy0tmJUjN0pnR83DpZYfG1ARR60c+Q4NJ4=
github.com/hyperledger/firefly-common v1.1.5-0.20221205013211-db8cd88a141f/go.mod h1:3ubN46/dB+xurCPvdfqMKjB/CJU3I/DsfOoS7dY2SyQ=
github.com/hyperledger/firefly-signer v1.1.2 h1:QuS3M5w9px3BnPa4jIWMDg+z2ySK76MoO5Egh0G+tFg=
github.com/hyperledger/firefly-signer v1.1.2/go.mod h1:4h2MN910A2knrWGYCT+aWjBDlhptgQn/9WcT1N/Ct8s=
github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
Expand Down
8 changes: 4 additions & 4 deletions internal/batch/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type DispatchState struct {
Data core.DataArray
Pins []*fftypes.Bytes32
noncesAssigned map[fftypes.Bytes32]*nonceState
msgPins map[fftypes.UUID]core.FFStringArray
msgPins map[fftypes.UUID]fftypes.FFStringArray
}

const batchSizeEstimateBase = int64(512)
Expand Down Expand Up @@ -484,9 +484,9 @@ func (bp *batchProcessor) maskContexts(ctx context.Context, state *DispatchState
log.L(ctx).Debugf("Message %s already has %d pins allocated", msg.Header.ID, len(msg.Pins))
continue
}
var pins core.FFStringArray
var pins fftypes.FFStringArray
if isPrivate {
pins = make(core.FFStringArray, len(msg.Header.Topics))
pins = make(fftypes.FFStringArray, len(msg.Header.Topics))
state.msgPins[*msg.Header.ID] = pins
}
for i, topic := range msg.Header.Topics {
Expand Down Expand Up @@ -548,7 +548,7 @@ func (bp *batchProcessor) sealBatch(state *DispatchState) (err error) {

// Clear state from any previous retry. We need to do fresh queries against the DB for nonces.
state.noncesAssigned = make(map[fftypes.Bytes32]*nonceState)
state.msgPins = make(map[fftypes.UUID]core.FFStringArray)
state.msgPins = make(map[fftypes.UUID]fftypes.FFStringArray)

if bp.conf.txType == core.TransactionTypeBatchPin {
// Generate a new Transaction, which will be used to record status of the associated transaction as it happens
Expand Down
16 changes: 8 additions & 8 deletions internal/batch/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func TestInsertNewNonceFail(t *testing.T) {
{Header: core.MessageHeader{
ID: fftypes.NewUUID(),
Group: gid,
Topics: core.FFStringArray{"topic1"},
Topics: fftypes.FFStringArray{"topic1"},
}},
},
})
Expand Down Expand Up @@ -291,7 +291,7 @@ func TestUpdateExistingNonceFail(t *testing.T) {
{Header: core.MessageHeader{
ID: fftypes.NewUUID(),
Group: gid,
Topics: core.FFStringArray{"topic1"},
Topics: fftypes.FFStringArray{"topic1"},
}},
},
})
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestGetNonceFail(t *testing.T) {
{Header: core.MessageHeader{
ID: fftypes.NewUUID(),
Group: gid,
Topics: core.FFStringArray{"topic1"},
Topics: fftypes.FFStringArray{"topic1"},
}},
},
})
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestGetNonceMigrationFail(t *testing.T) {
{Header: core.MessageHeader{
ID: fftypes.NewUUID(),
Group: gid,
Topics: core.FFStringArray{"topic1"},
Topics: fftypes.FFStringArray{"topic1"},
}},
},
})
Expand Down Expand Up @@ -437,7 +437,7 @@ func TestMarkMessageDispatchedUnpinnedOK(t *testing.T) {
for i := 0; i < 5; i++ {
msgid := fftypes.NewUUID()
bp.newWork <- &batchWork{
msg: &core.Message{Header: core.MessageHeader{ID: msgid, Topics: core.FFStringArray{"topic1"}}, Sequence: int64(1000 + i)},
msg: &core.Message{Header: core.MessageHeader{ID: msgid, Topics: fftypes.FFStringArray{"topic1"}}, Sequence: int64(1000 + i)},
}
}
}()
Expand Down Expand Up @@ -493,15 +493,15 @@ func TestMaskContextsRetryAfterPinsAssigned(t *testing.T) {
ID: fftypes.NewUUID(),
Type: core.MessageTypePrivate,
Group: groupID,
Topics: core.FFStringArray{"topic1"},
Topics: fftypes.FFStringArray{"topic1"},
},
}
msg2 := &core.Message{
Header: core.MessageHeader{
ID: fftypes.NewUUID(),
Type: core.MessageTypePrivate,
Group: groupID,
Topics: core.FFStringArray{"topic1"},
Topics: fftypes.FFStringArray{"topic1"},
},
}

Expand Down Expand Up @@ -547,7 +547,7 @@ func TestMaskContextsUpdateMessageFail(t *testing.T) {
ID: fftypes.NewUUID(),
Type: core.MessageTypePrivate,
Group: fftypes.NewRandB32(),
Topics: core.FFStringArray{"topic1"},
Topics: fftypes.FFStringArray{"topic1"},
},
}

Expand Down
6 changes: 3 additions & 3 deletions internal/data/data_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,14 +1083,14 @@ func TestUpdateMessageCacheCRORequirePins(t *testing.T) {
msgNoPins := &core.Message{
Header: core.MessageHeader{
ID: fftypes.NewUUID(),
Topics: core.FFStringArray{"topic1"},
Topics: fftypes.FFStringArray{"topic1"},
},
Data: data.Refs(),
}
msgWithPins := &core.Message{
Header: msgNoPins.Header,
Data: data.Refs(),
Pins: core.FFStringArray{"pin1"},
Pins: fftypes.FFStringArray{"pin1"},
}

msg, _ := dm.PeekMessageCache(ctx, msgWithPins.Header.ID)
Expand Down Expand Up @@ -1127,7 +1127,7 @@ func TestUpdateMessageCacheCRORequireBatchID(t *testing.T) {
msgNoPins := &core.Message{
Header: core.MessageHeader{
ID: fftypes.NewUUID(),
Topics: core.FFStringArray{"topic1"},
Topics: fftypes.FFStringArray{"topic1"},
},
Data: data.Refs(),
}
Expand Down
35 changes: 18 additions & 17 deletions internal/database/sqlcommon/batch_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"database/sql"

sq "github.com/Masterminds/squirrel"
"github.com/hyperledger/firefly-common/pkg/ffapi"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/log"
Expand Down Expand Up @@ -57,14 +58,14 @@ var (
const batchesTable = "batches"

func (s *SQLCommon) UpsertBatch(ctx context.Context, batch *core.BatchPersisted) (err error) {
ctx, tx, autoCommit, err := s.beginOrUseTx(ctx)
ctx, tx, autoCommit, err := s.BeginOrUseTx(ctx)
if err != nil {
return err
}
defer s.rollbackTx(ctx, tx, autoCommit)
defer s.RollbackTx(ctx, tx, autoCommit)

// Do a select within the transaction to detemine if the UUID already exists
batchRows, _, err := s.queryTx(ctx, batchesTable, tx,
batchRows, _, err := s.QueryTx(ctx, batchesTable, tx,
sq.Select("hash").
From(batchesTable).
Where(sq.Eq{"id": batch.ID, "namespace": batch.Namespace}),
Expand All @@ -88,7 +89,7 @@ func (s *SQLCommon) UpsertBatch(ctx context.Context, batch *core.BatchPersisted)
if existing {

// Update the batch
if _, err = s.updateTx(ctx, batchesTable, tx,
if _, err = s.UpdateTx(ctx, batchesTable, tx,
sq.Update(batchesTable).
Set("btype", string(batch.Type)).
Set("author", batch.Author).
Expand All @@ -110,7 +111,7 @@ func (s *SQLCommon) UpsertBatch(ctx context.Context, batch *core.BatchPersisted)
}
} else {

if _, err = s.insertTx(ctx, batchesTable, tx,
if _, err = s.InsertTx(ctx, batchesTable, tx,
sq.Insert(batchesTable).
Columns(batchColumns...).
Values(
Expand All @@ -136,7 +137,7 @@ func (s *SQLCommon) UpsertBatch(ctx context.Context, batch *core.BatchPersisted)
}
}

return s.commitTx(ctx, tx, autoCommit)
return s.CommitTx(ctx, tx, autoCommit)
}

func (s *SQLCommon) batchResult(ctx context.Context, row *sql.Rows) (*core.BatchPersisted, error) {
Expand Down Expand Up @@ -164,7 +165,7 @@ func (s *SQLCommon) batchResult(ctx context.Context, row *sql.Rows) (*core.Batch

func (s *SQLCommon) GetBatchByID(ctx context.Context, namespace string, id *fftypes.UUID) (message *core.BatchPersisted, err error) {

rows, _, err := s.query(ctx, batchesTable,
rows, _, err := s.Query(ctx, batchesTable,
sq.Select(batchColumns...).
From(batchesTable).
Where(sq.Eq{"id": id, "namespace": namespace}),
Expand All @@ -187,14 +188,14 @@ func (s *SQLCommon) GetBatchByID(ctx context.Context, namespace string, id *ffty
return batch, nil
}

func (s *SQLCommon) GetBatches(ctx context.Context, namespace string, filter database.Filter) (message []*core.BatchPersisted, res *database.FilterResult, err error) {
func (s *SQLCommon) GetBatches(ctx context.Context, namespace string, filter ffapi.Filter) (message []*core.BatchPersisted, res *ffapi.FilterResult, err error) {

query, fop, fi, err := s.filterSelect(ctx, "", sq.Select(batchColumns...).From(batchesTable), filter, batchFilterFieldMap, []interface{}{"sequence"}, sq.Eq{"namespace": namespace})
query, fop, fi, err := s.FilterSelect(ctx, "", sq.Select(batchColumns...).From(batchesTable), filter, batchFilterFieldMap, []interface{}{"sequence"}, sq.Eq{"namespace": namespace})
if err != nil {
return nil, nil, err
}

rows, tx, err := s.query(ctx, batchesTable, query)
rows, tx, err := s.Query(ctx, batchesTable, query)
if err != nil {
return nil, nil, err
}
Expand All @@ -209,28 +210,28 @@ func (s *SQLCommon) GetBatches(ctx context.Context, namespace string, filter dat
batches = append(batches, batch)
}

return batches, s.queryRes(ctx, batchesTable, tx, fop, fi), err
return batches, s.QueryRes(ctx, batchesTable, tx, fop, fi), err

}

func (s *SQLCommon) UpdateBatch(ctx context.Context, namespace string, id *fftypes.UUID, update database.Update) (err error) {
func (s *SQLCommon) UpdateBatch(ctx context.Context, namespace string, id *fftypes.UUID, update ffapi.Update) (err error) {

ctx, tx, autoCommit, err := s.beginOrUseTx(ctx)
ctx, tx, autoCommit, err := s.BeginOrUseTx(ctx)
if err != nil {
return err
}
defer s.rollbackTx(ctx, tx, autoCommit)
defer s.RollbackTx(ctx, tx, autoCommit)

query, err := s.buildUpdate(sq.Update(batchesTable), update, batchFilterFieldMap)
query, err := s.BuildUpdate(sq.Update(batchesTable), update, batchFilterFieldMap)
if err != nil {
return err
}
query = query.Where(sq.Eq{"id": id, "namespace": namespace})

_, err = s.updateTx(ctx, batchesTable, tx, query, nil /* no change events on filter update */)
_, err = s.UpdateTx(ctx, batchesTable, tx, query, nil /* no change events on filter update */)
if err != nil {
return err
}

return s.commitTx(ctx, tx, autoCommit)
return s.CommitTx(ctx, tx, autoCommit)
}
18 changes: 9 additions & 9 deletions internal/database/sqlcommon/batch_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestUpsertBatchFailBegin(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin().WillReturnError(fmt.Errorf("pop"))
err := s.UpsertBatch(context.Background(), &core.BatchPersisted{})
assert.Regexp(t, "FF10114", err)
assert.Regexp(t, "FF00175", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

Expand All @@ -178,7 +178,7 @@ func TestUpsertBatchFailSelect(t *testing.T) {
mock.ExpectRollback()
batchID := fftypes.NewUUID()
err := s.UpsertBatch(context.Background(), &core.BatchPersisted{BatchHeader: core.BatchHeader{ID: batchID}})
assert.Regexp(t, "FF10115", err)
assert.Regexp(t, "FF00176", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

Expand All @@ -190,7 +190,7 @@ func TestUpsertBatchFailInsert(t *testing.T) {
mock.ExpectRollback()
batchID := fftypes.NewUUID()
err := s.UpsertBatch(context.Background(), &core.BatchPersisted{BatchHeader: core.BatchHeader{ID: batchID}})
assert.Regexp(t, "FF10116", err)
assert.Regexp(t, "FF00177", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

Expand All @@ -203,7 +203,7 @@ func TestUpsertBatchFailUpdate(t *testing.T) {
mock.ExpectExec("UPDATE .*").WillReturnError(fmt.Errorf("pop"))
mock.ExpectRollback()
err := s.UpsertBatch(context.Background(), &core.BatchPersisted{BatchHeader: core.BatchHeader{ID: batchID}, Hash: hash})
assert.Regexp(t, "FF10117", err)
assert.Regexp(t, "FF00178", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

Expand All @@ -215,7 +215,7 @@ func TestUpsertBatchFailCommit(t *testing.T) {
mock.ExpectExec("INSERT .*").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit().WillReturnError(fmt.Errorf("pop"))
err := s.UpsertBatch(context.Background(), &core.BatchPersisted{BatchHeader: core.BatchHeader{ID: batchID}})
assert.Regexp(t, "FF10119", err)
assert.Regexp(t, "FF00180", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

Expand All @@ -224,7 +224,7 @@ func TestGetBatchByIDSelectFail(t *testing.T) {
batchID := fftypes.NewUUID()
mock.ExpectQuery("SELECT .*").WillReturnError(fmt.Errorf("pop"))
_, err := s.GetBatchByID(context.Background(), "ns1", batchID)
assert.Regexp(t, "FF10115", err)
assert.Regexp(t, "FF00176", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

Expand Down Expand Up @@ -252,7 +252,7 @@ func TestGetBatchesQueryFail(t *testing.T) {
mock.ExpectQuery("SELECT .*").WillReturnError(fmt.Errorf("pop"))
f := database.BatchQueryFactory.NewFilter(context.Background()).Eq("id", "")
_, _, err := s.GetBatches(context.Background(), "ns1", f)
assert.Regexp(t, "FF10115", err)
assert.Regexp(t, "FF00176", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

Expand All @@ -277,7 +277,7 @@ func TestBatchUpdateBeginFail(t *testing.T) {
mock.ExpectBegin().WillReturnError(fmt.Errorf("pop"))
u := database.BatchQueryFactory.NewUpdate(context.Background()).Set("id", "anything")
err := s.UpdateBatch(context.Background(), "ns1", fftypes.NewUUID(), u)
assert.Regexp(t, "FF10114", err)
assert.Regexp(t, "FF00175", err)
}

func TestBatchUpdateBuildQueryFail(t *testing.T) {
Expand All @@ -295,5 +295,5 @@ func TestBatchUpdateFail(t *testing.T) {
mock.ExpectRollback()
u := database.BatchQueryFactory.NewUpdate(context.Background()).Set("id", fftypes.NewUUID())
err := s.UpdateBatch(context.Background(), "ns1", fftypes.NewUUID(), u)
assert.Regexp(t, "FF10117", err)
assert.Regexp(t, "FF00178", err)
}
Loading

0 comments on commit a570149

Please sign in to comment.