From 478ae1e9b0c7110ac9c8fcacadc3000cfefcb7f0 Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Tue, 2 Apr 2024 16:46:31 +0200 Subject: [PATCH] sqldb: cleanup scope state reset by adding reset closure to ExecTx For SQL transactions, we often accumulate results in variables declared outside the closure's scope. To eliminate the need for manually clearing these containers, we introduce a reset function to ExecTx, mirroring the approach already adopted in kvdb. --- invoices/sql_store.go | 22 +++++++++++++--------- sqldb/interfaces.go | 6 ++++-- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/invoices/sql_store.go b/invoices/sql_store.go index 94575ede0c..cf829cd43a 100644 --- a/invoices/sql_store.go +++ b/invoices/sql_store.go @@ -259,7 +259,7 @@ func (i *SQLStore) AddInvoice(ctx context.Context, AddedAt: newInvoice.CreationDate.UTC(), InvoiceID: invoiceID, }) - }) + }, func() {}) if err != nil { mappedSQLErr := sqldb.MapSQLError(err) var uniqueConstraintErr *sqldb.ErrSQLUniqueConstraintViolation @@ -599,7 +599,7 @@ func (i *SQLStore) LookupInvoice(ctx context.Context, invoice, err = i.fetchInvoice(ctx, db, ref) return err - }) + }, func() {}) if txErr != nil { return Invoice{}, txErr } @@ -617,7 +617,6 @@ func (i *SQLStore) FetchPendingInvoices(ctx context.Context) ( readTxOpt := NewSQLInvoiceQueryReadTx() err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error { - invoices = make(map[lntypes.Hash]Invoice) limit := queryPaginationLimit return queryWithLimit(func(offset int) (int, error) { @@ -647,6 +646,8 @@ func (i *SQLStore) FetchPendingInvoices(ctx context.Context) ( return len(rows), nil }, limit) + }, func() { + invoices = make(map[lntypes.Hash]Invoice) }) if err != nil { return nil, fmt.Errorf("unable to fetch pending invoices: %w", @@ -674,7 +675,6 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) ( readTxOpt := NewSQLInvoiceQueryReadTx() err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error { - invoices = nil settleIdx := idx limit := queryPaginationLimit @@ -762,6 +762,8 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) ( } return nil + }, func() { + invoices = nil }) if err != nil { return nil, fmt.Errorf("unable to get invoices settled since "+ @@ -788,7 +790,6 @@ func (i *SQLStore) InvoicesAddedSince(ctx context.Context, idx uint64) ( readTxOpt := NewSQLInvoiceQueryReadTx() err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error { - result = nil addIdx := idx limit := queryPaginationLimit @@ -821,6 +822,8 @@ func (i *SQLStore) InvoicesAddedSince(ctx context.Context, idx uint64) ( return len(rows), nil }, limit) + }, func() { + result = nil }) if err != nil { @@ -845,7 +848,6 @@ func (i *SQLStore) QueryInvoices(ctx context.Context, readTxOpt := NewSQLInvoiceQueryReadTx() err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error { - invoices = nil limit := queryPaginationLimit return queryWithLimit(func(offset int) (int, error) { @@ -919,6 +921,8 @@ func (i *SQLStore) QueryInvoices(ctx context.Context, return len(rows), nil }, limit) + }, func() { + invoices = nil }) if err != nil { return InvoiceSlice{}, fmt.Errorf("unable to query "+ @@ -1306,7 +1310,7 @@ func (i *SQLStore) UpdateInvoice(ctx context.Context, ref InvoiceRef, ) return err - }) + }, func() {}) if txErr != nil { // If the invoice is already settled, we'll return the // (unchanged) invoice and the ErrInvoiceAlreadySettled error. @@ -1370,7 +1374,7 @@ func (i *SQLStore) DeleteInvoice(ctx context.Context, } return nil - }) + }, func() {}) if err != nil { return fmt.Errorf("unable to delete invoices: %w", err) @@ -1390,7 +1394,7 @@ func (i *SQLStore) DeleteCanceledInvoices(ctx context.Context) error { } return nil - }) + }, func() {}) if err != nil { return fmt.Errorf("unable to delete invoices: %w", err) } diff --git a/sqldb/interfaces.go b/sqldb/interfaces.go index 690414bb6d..f81799c577 100644 --- a/sqldb/interfaces.go +++ b/sqldb/interfaces.go @@ -52,7 +52,7 @@ type BatchedTx[Q any] interface { // specify if a transaction should be read-only and optionally what // type of concurrency control should be used. ExecTx(ctx context.Context, txOptions TxOptions, - txBody func(Q) error) error + txBody func(Q) error, reset func()) error } // Tx represents a database transaction that can be committed or rolled back. @@ -317,7 +317,7 @@ func ExecuteSQLTransactionWithRetry(ctx context.Context, makeTx MakeTx, // type of query and options run, in order to have access to batched operations // related to a storage object. func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context, - txOptions TxOptions, txBody func(Q) error) error { + txOptions TxOptions, txBody func(Q) error, reset func()) error { makeTx := func() (Tx, error) { return t.BatchedQuerier.BeginTx(ctx, txOptions) @@ -328,6 +328,8 @@ func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context, if !ok { return fmt.Errorf("expected *sql.Tx, got %T", tx) } + + reset() return txBody(t.createQuery(sqlTx)) }