Skip to content

Commit

Permalink
feat: make insertMoves return post commit volumes
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 23, 2024
1 parent 8a0c554 commit e2fe629
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 130 deletions.
1 change: 1 addition & 0 deletions internal/storage/bucket/migrations/11-stateless.sql
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ create function "{{.Bucket}}".set_volumes()
as
$$
begin
--todo: use balances table directly...
new.post_commit_volumes = coalesce((
select (
(post_commit_volumes).inputs + case when new.is_source then 0 else new.amount end,
Expand Down
24 changes: 14 additions & 10 deletions internal/storage/ledger/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,19 @@ import (
"github.com/uptrace/bun"
)

type Volume struct {
Asset string
Inputs *big.Int
Outputs *big.Int
type Volumes struct {
Inputs *big.Int `bun:"inputs" json:"inputs"`
Outputs *big.Int `bun:"outputs" json:"outputs"`
}

type Volumes []Volume
type AggregatedAccountVolume struct {
Volumes
Asset string `bun:"asset"`
}

type AggregatedAccountVolumes []AggregatedAccountVolume

func (volumes Volumes) toCore() ledger.VolumesByAssets {
func (volumes AggregatedAccountVolumes) toCore() ledger.VolumesByAssets {
if volumes == nil {
return ledger.VolumesByAssets{}
}
Expand Down Expand Up @@ -69,9 +73,9 @@ type Account struct {
UpdatedAt time.Time `bun:"updated_at"`
FirstUsage time.Time `bun:"first_usage"`

PostCommitVolumes Volumes `bun:"pcv,scanonly"`
PostCommitEffectiveVolumes Volumes `bun:"pcev,scanonly"`
Sequence int `bun:"seq,scanonly"`
PostCommitVolumes AggregatedAccountVolumes `bun:"pcv,scanonly"`
PostCommitEffectiveVolumes AggregatedAccountVolumes `bun:"pcev,scanonly"`
Seq int `bun:"seq,scanonly"`
}

func (account Account) toCore() ledger.ExpandedAccount {
Expand Down Expand Up @@ -434,7 +438,7 @@ func (s *Store) upsertAccount(ctx context.Context, account *Account) (bool, erro
return err
}

account.Sequence = upserted.Sequence
account.Seq = upserted.Seq
account.FirstUsage = upserted.FirstUsage
account.InsertionDate = upserted.InsertionDate
account.UpdatedAt = upserted.UpdatedAt
Expand Down
8 changes: 4 additions & 4 deletions internal/storage/ledger/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,8 @@ func TestGetAccount(t *testing.T) {
// Metadata: metadata.Metadata{},
// FirstUsage: now.Add(-time.Minute),
// },
// Volumes: map[string]ledger.Volumes{},
// EffectiveVolumes: map[string]ledger.Volumes{},
// AggregatedAccountVolumes: map[string]ledger.AggregatedAccountVolumes{},
// EffectiveVolumes: map[string]ledger.AggregatedAccountVolumes{},
// }, *account)
//})

Expand Down Expand Up @@ -450,7 +450,7 @@ func TestUpsertAccount(t *testing.T) {
upserted, err := store.upsertAccount(ctx, &account)
require.NoError(t, err)
require.True(t, upserted)
require.NotZero(t, account.Sequence)
require.NotZero(t, account.Seq)

// reset the account model
account = Account{
Expand All @@ -469,5 +469,5 @@ func TestUpsertAccount(t *testing.T) {
upserted, err = store.upsertAccount(ctx, &account)
require.NoError(t, err)
require.False(t, upserted)
require.NotZero(t, account.Sequence)
require.NotZero(t, account.Seq)
}
2 changes: 1 addition & 1 deletion internal/storage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool,

func (s *Store) GetAggregatedBalances(ctx context.Context, q ledgercontroller.GetAggregatedBalanceQuery) (ledger.BalancesByAssets, error) {
type AggregatedVolumes struct {
Aggregated Volumes `bun:"aggregated,type:jsonb"`
Aggregated AggregatedAccountVolumes `bun:"aggregated,type:jsonb"`
}
aggregatedVolumes := AggregatedVolumes{}
if err := s.SelectAggregatedBalances(q.PIT, q.UseInsertionDate, q.QueryBuilder).
Expand Down
28 changes: 15 additions & 13 deletions internal/storage/ledger/moves.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ func (s *Store) SelectDistinctMovesByEffectiveDate(date *time.Time) *bun.SelectQ
return ret
}

func (s *Store) insertMoves(ctx context.Context, moves ...Move) error {

func (s *Store) insertMoves(ctx context.Context, moves ...*Move) error {
_, err := tracing.TraceWithLatency(ctx, "InsertMoves", tracing.NoResult(func(ctx context.Context) error {
_, err := s.db.NewInsert().
Model(&moves).
ModelTableExpr(s.GetPrefixedRelationName("moves")).
Returning("to_json(post_commit_volumes) as post_commit_volumes, to_json(post_commit_effective_volumes) as post_commit_effective_volumes").
Exec(ctx)

return postgres.ResolveError(err)
Expand All @@ -74,19 +74,21 @@ func (s *Store) insertMoves(ctx context.Context, moves ...Move) error {
type Move struct {
bun.BaseModel `bun:"table:moves"`

Ledger string `bun:"ledger,type:varchar"`
IsSource bool `bun:"is_source,type:bool"`
Account string `bun:"account_address,type:varchar"`
AccountAddressArray []string `bun:"account_address_array,type:jsonb"`
Amount *bunpaginate.BigInt `bun:"amount,type:numeric"`
Asset string `bun:"asset,type:varchar"`
TransactionSeq int `bun:"transactions_seq,type:int"`
AccountSeq int `bun:"accounts_seq,type:int"`
InsertionDate time.Time `bun:"insertion_date,type:timestamp"`
EffectiveDate time.Time `bun:"effective_date,type:timestamp"`
Ledger string `bun:"ledger,type:varchar"`
IsSource bool `bun:"is_source,type:bool"`
Account string `bun:"account_address,type:varchar"`
AccountAddressArray []string `bun:"account_address_array,type:jsonb"`
Amount *bunpaginate.BigInt `bun:"amount,type:numeric"`
Asset string `bun:"asset,type:varchar"`
TransactionSeq int `bun:"transactions_seq,type:int"`
AccountSeq int `bun:"accounts_seq,type:int"`
InsertionDate time.Time `bun:"insertion_date,type:timestamp"`
EffectiveDate time.Time `bun:"effective_date,type:timestamp"`
PostCommitVolumes Volumes `bun:"post_commit_volumes,type:jsonb,scanonly"`
PostCommitEffectiveVolumes Volumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"`
}

type Moves []Move
type Moves []*Move

func (m Moves) BalanceUpdates() map[string]map[string]*big.Int {
ret := make(map[string]map[string]*big.Int)
Expand Down
236 changes: 143 additions & 93 deletions internal/storage/ledger/moves_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"database/sql"
"fmt"
"github.com/alitto/pond"
"github.com/formancehq/go-libs/bun/bunpaginate"
"github.com/formancehq/go-libs/logging"
"github.com/formancehq/go-libs/platform/postgres"
"github.com/formancehq/go-libs/time"
ledger "github.com/formancehq/ledger/internal"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"github.com/pkg/errors"
Expand All @@ -17,99 +19,7 @@ import (
"testing"
)

//func TestMoves(t *testing.T) {
// t.Parallel()
//
// store := newLedgerStore(t)
// ctx := logging.TestingContext()
//
// now := time.Now()
// _, err := store.upsertAccount(ctx, ledger.Account{
// BaseModel: bun.BaseModel{},
// Address: "world",
// Metadata: metadata.Metadata{},
// FirstUsage: now,
// InsertionDate: now,
// UpdatedAt: now,
// })
// require.NoError(t, err)
//
// _, err = store.upsertAccount(ctx, ledger.Account{
// BaseModel: bun.BaseModel{},
// Address: "bank",
// Metadata: metadata.Metadata{},
// FirstUsage: now,
// InsertionDate: now,
// UpdatedAt: now,
// })
// require.NoError(t, err)
//
// _, err = store.upsertAccount(ctx, ledger.Account{
// BaseModel: bun.BaseModel{},
// Address: "bank2",
// Metadata: metadata.Metadata{},
// FirstUsage: now,
// InsertionDate: now,
// UpdatedAt: now,
// })
// require.NoError(t, err)
//
// // Insert first tx
// tx1, err := store.CommitTransaction(ctx, ledger.NewTransactionData().WithPostings(
// ledger.NewPosting("world", "bank", "USD/2", big.NewInt(100)),
// ).WithTimestamp(now))
// require.NoError(t, err)
//
// for _, move := range tx1.GetMoves() {
// require.NoError(t, store.insertMoves(ctx, move))
// }
//
// balance, err := store.GetBalance(ctx, "world", "USD/2")
// require.NoError(t, err)
// require.Equal(t, big.NewInt(-100), balance)
//
// balance, err = store.GetBalance(ctx, "bank", "USD/2")
// require.NoError(t, err)
// require.Equal(t, big.NewInt(100), balance)
//
// // Insert second tx
// tx2, err := store.CommitTransaction(ctx, ledger.NewTransactionData().WithPostings(
// ledger.NewPosting("world", "bank2", "USD/2", big.NewInt(100)),
// ).WithTimestamp(now.Add(time.Minute)))
// require.NoError(t, err)
//
// for _, move := range tx2.GetMoves() {
// require.NoError(t, store.insertMoves(ctx, move))
// }
//
// balance, err = store.GetBalance(ctx, "world", "USD/2")
// require.NoError(t, err)
// require.Equal(t, big.NewInt(-200), balance)
//
// balance, err = store.GetBalance(ctx, "bank2", "USD/2")
// require.NoError(t, err)
// require.Equal(t, big.NewInt(100), balance)
//
// // Insert backdated tx
// tx3, err := store.CommitTransaction(ctx, ledger.NewTransactionData().WithPostings(
// ledger.NewPosting("world", "bank", "USD/2", big.NewInt(100)),
// ).WithTimestamp(now.Add(30*time.Second)))
// require.NoError(t, err)
//
// for _, move := range tx3.GetMoves() {
// require.NoError(t, store.insertMoves(ctx, move))
// }
//
// balance, err = store.GetBalance(ctx, "world", "USD/2")
// require.NoError(t, err)
// require.Equal(t, big.NewInt(-300), balance)
//
// balance, err = store.GetBalance(ctx, "bank", "USD/2")
// require.NoError(t, err)
// require.Equal(t, big.NewInt(200), balance)
//}

func TestPostCommitVolumesComputation(t *testing.T) {
func TestMovesPostCommitVolumesComputation(t *testing.T) {
t.Parallel()

store := newLedgerStore(t)
Expand Down Expand Up @@ -148,3 +58,143 @@ func TestPostCommitVolumesComputation(t *testing.T) {
"USD": big.NewInt(0),
}, aggregatedBalances)
}

func TestMovesInsert(t *testing.T) {
t.Parallel()

store := newLedgerStore(t)
ctx := logging.TestingContext()

tx := Transaction{}
require.NoError(t, store.insertTransaction(ctx, &tx))

account := &Account{}
_, err := store.upsertAccount(ctx, account)
require.NoError(t, err)

now := time.Now()

// we will insert 5 tx at five different timestamps
// t0 ---------> t1 ---------> t2 ---------> t3 ----------> t4
// m1 ---------> m3 ---------> m4 ---------> m2 ----------> m5
t0 := now
t1 := t0.Add(time.Hour)
t2 := t1.Add(time.Hour)
t3 := t2.Add(time.Hour)
t4 := t3.Add(time.Hour)

// insert a first tx at t0
m1 := Move{
Ledger: store.ledger.Name,
IsSource: true,
Account: "world",
AccountAddressArray: []string{"world"},
Amount: (*bunpaginate.BigInt)(big.NewInt(100)),
Asset: "USD",
TransactionSeq: tx.Seq,
AccountSeq: account.Seq,
InsertionDate: t0,
EffectiveDate: t0,
}
require.NoError(t, store.insertMoves(ctx, &m1))
require.Equal(t, Volumes{
Inputs: big.NewInt(0),
Outputs: big.NewInt(100),
}, m1.PostCommitVolumes)
require.Equal(t, Volumes{
Inputs: big.NewInt(0),
Outputs: big.NewInt(100),
}, m1.PostCommitEffectiveVolumes)

// add a second move at t3
m2 := Move{
Ledger: store.ledger.Name,
IsSource: false,
Account: "world",
AccountAddressArray: []string{"world"},
Amount: (*bunpaginate.BigInt)(big.NewInt(50)),
Asset: "USD",
TransactionSeq: tx.Seq,
AccountSeq: account.Seq,
InsertionDate: t3,
EffectiveDate: t3,
}
require.NoError(t, store.insertMoves(ctx, &m2))
require.Equal(t, Volumes{
Inputs: big.NewInt(50),
Outputs: big.NewInt(100),
}, m2.PostCommitVolumes)
require.Equal(t, Volumes{
Inputs: big.NewInt(50),
Outputs: big.NewInt(100),
}, m2.PostCommitEffectiveVolumes)

// add a third move at t1
m3 := Move{
Ledger: store.ledger.Name,
IsSource: true,
Account: "world",
AccountAddressArray: []string{"world"},
Amount: (*bunpaginate.BigInt)(big.NewInt(200)),
Asset: "USD",
TransactionSeq: tx.Seq,
AccountSeq: account.Seq,
InsertionDate: t1,
EffectiveDate: t1,
}
require.NoError(t, store.insertMoves(ctx, &m3))
require.Equal(t, Volumes{
Inputs: big.NewInt(50),
Outputs: big.NewInt(300),
}, m3.PostCommitVolumes)
require.Equal(t, Volumes{
Inputs: big.NewInt(0),
Outputs: big.NewInt(300),
}, m3.PostCommitEffectiveVolumes)

// add a fourth move at t2
m4 := Move{
Ledger: store.ledger.Name,
IsSource: false,
Account: "world",
AccountAddressArray: []string{"world"},
Amount: (*bunpaginate.BigInt)(big.NewInt(50)),
Asset: "USD",
TransactionSeq: tx.Seq,
AccountSeq: account.Seq,
InsertionDate: t2,
EffectiveDate: t2,
}
require.NoError(t, store.insertMoves(ctx, &m4))
require.Equal(t, Volumes{
Inputs: big.NewInt(100),
Outputs: big.NewInt(300),
}, m4.PostCommitVolumes)
require.Equal(t, Volumes{
Inputs: big.NewInt(50),
Outputs: big.NewInt(300),
}, m4.PostCommitEffectiveVolumes)

// add a fifth move at t4
m5 := Move{
Ledger: store.ledger.Name,
IsSource: false,
Account: "world",
AccountAddressArray: []string{"world"},
Amount: (*bunpaginate.BigInt)(big.NewInt(50)),
Asset: "USD",
TransactionSeq: tx.Seq,
AccountSeq: account.Seq,
InsertionDate: t4,
EffectiveDate: t4,
}
require.NoError(t, store.insertMoves(ctx, &m5))
require.Equal(t, Volumes{
Inputs: big.NewInt(150),
Outputs: big.NewInt(300),
}, m5.PostCommitVolumes)
require.Equal(t, Volumes{
Inputs: big.NewInt(150),
Outputs: big.NewInt(300),
}, m5.PostCommitEffectiveVolumes)
}
Loading

0 comments on commit e2fe629

Please sign in to comment.