Skip to content

Commit

Permalink
sqldb: cleanup scope state reset by adding reset closure to ExecTx
Browse files Browse the repository at this point in the history
For SQL transactions, we often accumulate results in variables declared
outside the closure's scope. To eliminate the need for manually clearing
these containers, we introduce a reset function to ExecTx, mirroring the
approach already adopted in kvdb.
  • Loading branch information
bhandras committed Apr 11, 2024
1 parent c6073a1 commit 478ae1e
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 11 deletions.
22 changes: 13 additions & 9 deletions invoices/sql_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (i *SQLStore) AddInvoice(ctx context.Context,
AddedAt: newInvoice.CreationDate.UTC(),
InvoiceID: invoiceID,
})
})
}, func() {})
if err != nil {
mappedSQLErr := sqldb.MapSQLError(err)
var uniqueConstraintErr *sqldb.ErrSQLUniqueConstraintViolation
Expand Down Expand Up @@ -599,7 +599,7 @@ func (i *SQLStore) LookupInvoice(ctx context.Context,
invoice, err = i.fetchInvoice(ctx, db, ref)

return err
})
}, func() {})
if txErr != nil {
return Invoice{}, txErr
}
Expand All @@ -617,7 +617,6 @@ func (i *SQLStore) FetchPendingInvoices(ctx context.Context) (

readTxOpt := NewSQLInvoiceQueryReadTx()
err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error {
invoices = make(map[lntypes.Hash]Invoice)
limit := queryPaginationLimit

return queryWithLimit(func(offset int) (int, error) {
Expand Down Expand Up @@ -647,6 +646,8 @@ func (i *SQLStore) FetchPendingInvoices(ctx context.Context) (

return len(rows), nil
}, limit)
}, func() {
invoices = make(map[lntypes.Hash]Invoice)
})
if err != nil {
return nil, fmt.Errorf("unable to fetch pending invoices: %w",
Expand Down Expand Up @@ -674,7 +675,6 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) (

readTxOpt := NewSQLInvoiceQueryReadTx()
err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error {
invoices = nil
settleIdx := idx
limit := queryPaginationLimit

Expand Down Expand Up @@ -762,6 +762,8 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) (
}

return nil
}, func() {
invoices = nil
})
if err != nil {
return nil, fmt.Errorf("unable to get invoices settled since "+
Expand All @@ -788,7 +790,6 @@ func (i *SQLStore) InvoicesAddedSince(ctx context.Context, idx uint64) (

readTxOpt := NewSQLInvoiceQueryReadTx()
err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error {
result = nil
addIdx := idx
limit := queryPaginationLimit

Expand Down Expand Up @@ -821,6 +822,8 @@ func (i *SQLStore) InvoicesAddedSince(ctx context.Context, idx uint64) (

return len(rows), nil
}, limit)
}, func() {
result = nil
})

if err != nil {
Expand All @@ -845,7 +848,6 @@ func (i *SQLStore) QueryInvoices(ctx context.Context,

readTxOpt := NewSQLInvoiceQueryReadTx()
err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error {
invoices = nil
limit := queryPaginationLimit

return queryWithLimit(func(offset int) (int, error) {
Expand Down Expand Up @@ -919,6 +921,8 @@ func (i *SQLStore) QueryInvoices(ctx context.Context,

return len(rows), nil
}, limit)
}, func() {
invoices = nil
})
if err != nil {
return InvoiceSlice{}, fmt.Errorf("unable to query "+
Expand Down Expand Up @@ -1306,7 +1310,7 @@ func (i *SQLStore) UpdateInvoice(ctx context.Context, ref InvoiceRef,
)

return err
})
}, func() {})
if txErr != nil {
// If the invoice is already settled, we'll return the
// (unchanged) invoice and the ErrInvoiceAlreadySettled error.
Expand Down Expand Up @@ -1370,7 +1374,7 @@ func (i *SQLStore) DeleteInvoice(ctx context.Context,
}

return nil
})
}, func() {})

if err != nil {
return fmt.Errorf("unable to delete invoices: %w", err)
Expand All @@ -1390,7 +1394,7 @@ func (i *SQLStore) DeleteCanceledInvoices(ctx context.Context) error {
}

return nil
})
}, func() {})
if err != nil {
return fmt.Errorf("unable to delete invoices: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions sqldb/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type BatchedTx[Q any] interface {
// specify if a transaction should be read-only and optionally what
// type of concurrency control should be used.
ExecTx(ctx context.Context, txOptions TxOptions,
txBody func(Q) error) error
txBody func(Q) error, reset func()) error
}

// Tx represents a database transaction that can be committed or rolled back.
Expand Down Expand Up @@ -317,7 +317,7 @@ func ExecuteSQLTransactionWithRetry(ctx context.Context, makeTx MakeTx,
// type of query and options run, in order to have access to batched operations
// related to a storage object.
func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context,
txOptions TxOptions, txBody func(Q) error) error {
txOptions TxOptions, txBody func(Q) error, reset func()) error {

makeTx := func() (Tx, error) {
return t.BatchedQuerier.BeginTx(ctx, txOptions)
Expand All @@ -328,6 +328,8 @@ func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context,
if !ok {
return fmt.Errorf("expected *sql.Tx, got %T", tx)
}

reset()
return txBody(t.createQuery(sqlTx))
}

Expand Down

0 comments on commit 478ae1e

Please sign in to comment.