Skip to content

Commit

Permalink
leveldb-2pc: use transaction for block delete
Browse files Browse the repository at this point in the history
  • Loading branch information
jamieabc authored and hxw committed May 28, 2019
1 parent 69f055b commit 319e40c
Show file tree
Hide file tree
Showing 15 changed files with 237 additions and 261 deletions.
93 changes: 47 additions & 46 deletions block/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ func DeleteDownToBlock(finalBlockNumber uint64) error {
packedBlock := last.Value

// start db transaction by block & index db
storage.Pool.Blocks.Begin()
storage.Pool.Transactions.Begin()
trx, err := storage.NewDBTransaction()
if nil != err {
return err
}

outer_loop:
for {
Expand Down Expand Up @@ -83,29 +85,29 @@ outer_loop:

case *transactionrecord.AssetData:
assetId := tx.AssetId()
storage.Pool.Assets.Delete(assetId[:])
trx.Delete(storage.Pool.Assets, assetId[:])
asset.Delete(assetId)

case *transactionrecord.BitmarkIssue:
txId := packedTransaction.MakeLink()
reservoir.DeleteByTxId(txId)
if storage.Pool.Transactions.Has(txId[:]) {
storage.Pool.Transactions.Delete(txId[:])
ownership.Transfer(txId, txId, 0, tx.Owner, nil)
trx.Delete(storage.Pool.Transactions, txId[:])
ownership.Transfer(trx, txId, txId, 0, tx.Owner, nil)
}

case *transactionrecord.BitmarkTransferUnratified, *transactionrecord.BitmarkTransferCountersigned:
tr := tx.(transactionrecord.BitmarkTransfer)
txId := packedTransaction.MakeLink()
storage.Pool.Transactions.Delete(txId[:])
trx.Delete(storage.Pool.Transactions, txId[:])
reservoir.DeleteByTxId(txId)
link := tr.GetLink()
blockNumber, linkOwner := ownership.OwnerOf(link)
blockNumber, linkOwner := ownership.OwnerOf(trx, link)
if nil == linkOwner {
log.Criticalf("missing transaction record for: %v", link)
logger.Panic("Transactions database is corrupt")
}
ownership.Transfer(txId, link, blockNumber, tr.GetOwner(), linkOwner)
ownership.Transfer(trx, txId, link, blockNumber, tr.GetOwner(), linkOwner)

case *transactionrecord.BlockFoundation:
if nil == blockOwner {
Expand All @@ -115,14 +117,14 @@ outer_loop:

case *transactionrecord.BlockOwnerTransfer:
txId := packedTransaction.MakeLink()
storage.Pool.Transactions.Delete(txId[:])
trx.Delete(storage.Pool.Transactions, txId[:])
reservoir.DeleteByTxId(txId)
blockNumber, linkOwner := ownership.OwnerOf(tx.Link)
blockNumber, linkOwner := ownership.OwnerOf(trx, tx.Link)
if nil == linkOwner {
log.Criticalf("missing transaction record for: %v", tx.Link)
logger.Panic("Transactions database is corrupt")
}
ownerdata, err := ownership.GetOwnerDataB(txId[:])
ownerdata, err := ownership.GetOwnerDataB(trx, txId[:])
if nil != err {
log.Criticalf("missing ownership for: %s", txId)
logger.Panic("Ownership database is corrupt")
Expand All @@ -133,13 +135,13 @@ outer_loop:
logger.Panic("Ownership database is corrupt")
}

ownership.Transfer(txId, tx.Link, blockNumber, tx.Owner, linkOwner)
ownership.Transfer(trx, txId, tx.Link, blockNumber, tx.Owner, linkOwner)

blockNumberKey := make([]byte, 8)
binary.BigEndian.PutUint64(blockNumberKey, blockOwnerdata.IssueBlockNumber())

// put block ownership back
_, previous := storage.Pool.Transactions.GetNB(tx.Link[:])
_, previous, _ := trx.GetNB(storage.Pool.Transactions, tx.Link[:])

blockTransaction, _, err := transactionrecord.Packed(previous).Unpack(mode.IsTesting())
if nil != err {
Expand Down Expand Up @@ -175,9 +177,9 @@ outer_loop:
logger.Panic("Transaction database is corrupt")
}
// payment data
storage.Pool.BlockOwnerPayment.Put(blockNumberKey, packedPayments)
storage.Pool.BlockOwnerTxIndex.Put(tx.Link[:], blockNumberKey)
storage.Pool.BlockOwnerTxIndex.Delete(txId[:])
trx.Put(storage.Pool.BlockOwnerPayment, blockNumberKey, packedPayments, []byte{})
trx.Put(storage.Pool.BlockOwnerTxIndex, tx.Link[:], blockNumberKey, []byte{})
trx.Delete(storage.Pool.BlockOwnerTxIndex, txId[:])

default:
logger.Criticalf("invalid block transfer link: %+v", prevTx)
Expand All @@ -186,13 +188,13 @@ outer_loop:

case *transactionrecord.BitmarkShare:
txId := packedTransaction.MakeLink()
blockNumber, linkOwner := ownership.OwnerOf(tx.Link)
blockNumber, linkOwner := ownership.OwnerOf(trx, tx.Link)
if nil == linkOwner {
log.Criticalf("missing transaction record for: %v", tx.Link)
logger.Panic("Transactions database is corrupt")
}

ownerData, err := ownership.GetOwnerData(txId)
ownerData, err := ownership.GetOwnerData(trx, txId)
if nil != err {
logger.Criticalf("invalid ownerData for tx id: %s", txId)
logger.Panic("Ownership database is corrupt")
Expand All @@ -203,32 +205,32 @@ outer_loop:
logger.Panic("Ownership database is corrupt")
}

storage.Pool.Transactions.Delete(txId[:])
trx.Delete(storage.Pool.Transactions, txId[:])
reservoir.DeleteByTxId(txId)

shareId := shareData.IssueTxId()

fKey := append(linkOwner.Bytes(), shareId[:]...)
storage.Pool.Shares.Delete(shareId[:])
trx.Delete(storage.Pool.Shares, shareId[:])
storage.Pool.ShareQuantity.Delete(fKey)

ownership.Transfer(txId, tx.Link, blockNumber, linkOwner, linkOwner)
ownership.Transfer(trx, txId, tx.Link, blockNumber, linkOwner, linkOwner)

case *transactionrecord.ShareGrant:

txId := packedTransaction.MakeLink()

storage.Pool.Transactions.Delete(txId[:])
trx.Delete(storage.Pool.Transactions, txId[:])
reservoir.DeleteByTxId(txId)

oKey := append(tx.Owner.Bytes(), tx.ShareId[:]...)
rKey := append(tx.Recipient.Bytes(), tx.ShareId[:]...)

// this could be zero
oAccountBalance, _ := storage.Pool.ShareQuantity.GetN(oKey)
oAccountBalance, _, _ := trx.GetN(storage.Pool.ShareQuantity, oKey)

// this cannot be zero
rAccountBalance, ok := storage.Pool.ShareQuantity.GetN(rKey)
rAccountBalance, ok, _ := trx.GetN(storage.Pool.ShareQuantity, rKey)
if !ok {
log.Criticalf("missing balance record for: %v share id: %x", tx.Recipient, tx.ShareId)
logger.Panic("ShareQuantity database is corrupt")
Expand All @@ -240,17 +242,17 @@ outer_loop:

// update balances
if 0 == rAccountBalance {
storage.Pool.ShareQuantity.Delete(rKey)
trx.Delete(storage.Pool.ShareQuantity, rKey)
} else {
storage.Pool.ShareQuantity.PutN(rKey, rAccountBalance)
trx.PutN(storage.Pool.ShareQuantity, rKey, rAccountBalance)
}
storage.Pool.ShareQuantity.PutN(oKey, oAccountBalance)
trx.PutN(storage.Pool.ShareQuantity, oKey, oAccountBalance)

case *transactionrecord.ShareSwap:

txId := packedTransaction.MakeLink()

storage.Pool.Transactions.Delete(txId[:])
trx.Delete(storage.Pool.Transactions, txId[:])
reservoir.DeleteByTxId(txId)

ownerOneShareOneKey := append(tx.OwnerOne.Bytes(), tx.ShareIdOne[:]...)
Expand All @@ -259,16 +261,16 @@ outer_loop:
ownerTwoShareTwoKey := append(tx.OwnerTwo.Bytes(), tx.ShareIdTwo[:]...)

// either of these balances could be zero
ownerOneShareOneAccountBalance, _ := storage.Pool.ShareQuantity.GetN(ownerOneShareOneKey)
ownerTwoShareTwoAccountBalance, _ := storage.Pool.ShareQuantity.GetN(ownerTwoShareTwoKey)
ownerOneShareOneAccountBalance, _, _ := trx.GetN(storage.Pool.ShareQuantity, ownerOneShareOneKey)
ownerTwoShareTwoAccountBalance, _, _ := trx.GetN(storage.Pool.ShareQuantity, ownerTwoShareTwoKey)

// these balances cannot be zero
ownerOneShareTwoAccountBalance, ok := storage.Pool.ShareQuantity.GetN(ownerOneShareTwoKey)
ownerOneShareTwoAccountBalance, ok, _ := trx.GetN(storage.Pool.ShareQuantity, ownerOneShareTwoKey)
if !ok {
log.Criticalf("missing balance record for owner 1: %v share id 2: %x", tx.OwnerOne, tx.ShareIdTwo)
logger.Panic("ShareQuantity database is corrupt")
}
ownerTwoShareOneAccountBalance, ok := storage.Pool.ShareQuantity.GetN(ownerTwoShareOneKey)
ownerTwoShareOneAccountBalance, ok, _ := trx.GetN(storage.Pool.ShareQuantity, ownerTwoShareOneKey)
if !ok {
log.Criticalf("missing balance record for owner 2: %v share id 1: %x", tx.OwnerTwo, tx.ShareIdOne)
logger.Panic("ShareQuantity database is corrupt")
Expand All @@ -284,19 +286,19 @@ outer_loop:

// update database share one
if 0 == ownerTwoShareOneAccountBalance {
storage.Pool.ShareQuantity.Delete(ownerTwoShareOneKey)
trx.Delete(storage.Pool.ShareQuantity, ownerTwoShareOneKey)
} else {
storage.Pool.ShareQuantity.PutN(ownerTwoShareOneKey, ownerTwoShareOneAccountBalance)
trx.PutN(storage.Pool.ShareQuantity, ownerTwoShareOneKey, ownerTwoShareOneAccountBalance)
}
storage.Pool.ShareQuantity.PutN(ownerOneShareOneKey, ownerOneShareOneAccountBalance)
trx.PutN(storage.Pool.ShareQuantity, ownerOneShareOneKey, ownerOneShareOneAccountBalance)

// update database share two
if 0 == ownerOneShareTwoAccountBalance {
storage.Pool.ShareQuantity.Delete(ownerOneShareTwoKey)
trx.Delete(storage.Pool.ShareQuantity, ownerOneShareTwoKey)
} else {
storage.Pool.ShareQuantity.PutN(ownerOneShareTwoKey, ownerOneShareTwoAccountBalance)
trx.PutN(storage.Pool.ShareQuantity, ownerOneShareTwoKey, ownerOneShareTwoAccountBalance)
}
storage.Pool.ShareQuantity.PutN(ownerTwoShareTwoKey, ownerTwoShareTwoAccountBalance)
trx.PutN(storage.Pool.ShareQuantity, ownerTwoShareTwoKey, ownerTwoShareTwoAccountBalance)

default:
logger.Panicf("unexpected transaction: %v", transaction)
Expand All @@ -314,23 +316,22 @@ outer_loop:

// block ownership remove
foundationTxId := blockrecord.FoundationTxId(header, digest)
storage.Pool.Transactions.Delete(foundationTxId[:])
trx.Delete(storage.Pool.Transactions, foundationTxId[:])
if nil == blockOwner {
log.Criticalf("nil block owner for block: %d", header.Number)
} else {
ownership.Transfer(foundationTxId, foundationTxId, 0, blockOwner, nil)
ownership.Transfer(trx, foundationTxId, foundationTxId, 0, blockOwner, nil)
}
// remove remaining block data
storage.Pool.BlockOwnerTxIndex.Delete(foundationTxId[:])
storage.Pool.BlockOwnerPayment.Delete(blockNumberKey)
storage.Pool.Blocks.Delete(blockNumberKey)
trx.Delete(storage.Pool.BlockOwnerTxIndex, foundationTxId[:])
trx.Delete(storage.Pool.BlockOwnerPayment, blockNumberKey)
trx.Delete(storage.Pool.Blocks, blockNumberKey)

// and delete its hash
storage.Pool.BlockHeaderHash.Delete(blockNumberKey)
trx.Delete(storage.Pool.BlockHeaderHash, blockNumberKey)

// commit db transactions
storage.Pool.Blocks.Commit()
storage.Pool.Transactions.Commit()
trx.Commit()

// fetch previous block number
binary.BigEndian.PutUint64(blockNumberKey, header.Number-1)
Expand Down
20 changes: 10 additions & 10 deletions block/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func StoreIncoming(packedBlock []byte, performRescan rescanType) error {
case *transactionrecord.BitmarkTransferUnratified, *transactionrecord.BitmarkTransferCountersigned:
tr := tx.(transactionrecord.BitmarkTransfer)
link := tr.GetLink()
_, linkOwner := ownership.OwnerOf(link)
_, linkOwner := ownership.OwnerOf(nil, link)
if nil == linkOwner {
return fault.ErrLinkToInvalidOrUnconfirmedTransaction
}
Expand All @@ -172,7 +172,7 @@ func StoreIncoming(packedBlock []byte, performRescan rescanType) error {
return fault.ErrDoubleTransferAttempt
}

ownerData, err := ownership.GetOwnerData(link)
ownerData, err := ownership.GetOwnerData(nil, link)
if nil != err {
return fault.ErrDoubleTransferAttempt
}
Expand All @@ -191,7 +191,7 @@ func StoreIncoming(packedBlock []byte, performRescan rescanType) error {

case *transactionrecord.BlockOwnerTransfer:
link := tx.Link
_, linkOwner := ownership.OwnerOf(link)
_, linkOwner := ownership.OwnerOf(nil, link)
_, err = tx.Pack(linkOwner)
if nil != err {
return err
Expand All @@ -216,7 +216,7 @@ func StoreIncoming(packedBlock []byte, performRescan rescanType) error {

case *transactionrecord.BitmarkShare:
link := tx.Link
_, linkOwner := ownership.OwnerOf(link)
_, linkOwner := ownership.OwnerOf(nil, link)
if nil == linkOwner {
return fault.ErrLinkToInvalidOrUnconfirmedTransaction
}
Expand All @@ -225,7 +225,7 @@ func StoreIncoming(packedBlock []byte, performRescan rescanType) error {
return err
}

ownerData, err := ownership.GetOwnerData(link)
ownerData, err := ownership.GetOwnerData(nil, link)
if nil != err {
return fault.ErrDoubleTransferAttempt
}
Expand Down Expand Up @@ -371,7 +371,7 @@ func StoreIncoming(packedBlock []byte, performRescan rescanType) error {
issues := storage.Pool.Transactions
if !issues.Has(item.txId[:]) {
issues.Put(item.txId[:], thisBlockNumberKey, item.packed)
ownership.CreateAsset(item.txId, header.Number, tx.AssetId, tx.Owner)
ownership.CreateAsset(nil, item.txId, header.Number, tx.AssetId, tx.Owner)
}

case *transactionrecord.BitmarkTransferUnratified, *transactionrecord.BitmarkTransferCountersigned:
Expand All @@ -387,7 +387,7 @@ func StoreIncoming(packedBlock []byte, performRescan rescanType) error {

txrs := storage.Pool.Transactions
txrs.Put(item.txId[:], thisBlockNumberKey, item.packed)
ownership.Transfer(link, item.txId, header.Number, item.linkOwner, tr.GetOwner())
ownership.Transfer(nil, link, item.txId, header.Number, item.linkOwner, tr.GetOwner())

case *transactionrecord.BlockFoundation:
// already processed
Expand Down Expand Up @@ -416,7 +416,7 @@ func StoreIncoming(packedBlock []byte, performRescan rescanType) error {
storage.Pool.BlockOwnerPayment.Put(item.blockNumberKey, pkPayments)
storage.Pool.BlockOwnerTxIndex.Put(item.txId[:], item.blockNumberKey)
storage.Pool.BlockOwnerTxIndex.Delete(link[:])
ownership.Transfer(link, item.txId, header.Number, item.linkOwner, tx.Owner)
ownership.Transfer(nil, link, item.txId, header.Number, item.linkOwner, tx.Owner)

case *transactionrecord.BitmarkShare:

Expand All @@ -431,7 +431,7 @@ func StoreIncoming(packedBlock []byte, performRescan rescanType) error {

txrs := storage.Pool.Transactions
txrs.Put(item.txId[:], thisBlockNumberKey, item.packed)
ownership.Share(link, item.txId, header.Number, item.linkOwner, tx.Quantity)
ownership.Share(nil, link, item.txId, header.Number, item.linkOwner, tx.Quantity)

case *transactionrecord.ShareGrant:

Expand Down Expand Up @@ -533,7 +533,7 @@ func StoreIncoming(packedBlock []byte, performRescan rescanType) error {
// current owner: either foundation or block owner transfer: tx id → owned block
storage.Pool.BlockOwnerTxIndex.Put(foundationTxId[:], thisBlockNumberKey)

ownership.CreateBlock(foundationTxId, header.Number, blockOwner)
ownership.CreateBlock(nil, foundationTxId, header.Number, blockOwner)

expectedBlockNumber := height + 1
if expectedBlockNumber != header.Number {
Expand Down
18 changes: 14 additions & 4 deletions ownership/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,14 @@ type OwnerData interface {
type PackedOwnerData []byte

// GetOwnerData - fetch and unpack owner data
func GetOwnerData(txId merkle.Digest) (OwnerData, error) {
func GetOwnerData(trx storage.Transaction, txId merkle.Digest) (OwnerData, error) {
var packed []byte
if nil == trx {
packed = storage.Pool.OwnerData.Get(txId[:])
} else {
packed, _ = trx.Get(storage.Pool.OwnerData, txId[:])
}

packed := storage.Pool.OwnerData.Get(txId[:])
if nil == packed {
return nil, fault.ErrMissingOwnerData
}
Expand All @@ -115,9 +120,14 @@ func GetOwnerData(txId merkle.Digest) (OwnerData, error) {
}

// GetOwnerDataB - fetch and unpack owner data
func GetOwnerDataB(txId []byte) (OwnerData, error) {
func GetOwnerDataB(trx storage.Transaction, txId []byte) (OwnerData, error) {
var packed []byte
if nil == trx {
packed = storage.Pool.OwnerData.Get(txId)
} else {
packed, _ = trx.Get(storage.Pool.OwnerData, txId)
}

packed := storage.Pool.OwnerData.Get(txId)
if nil == packed {
return nil, fault.ErrMissingOwnerData
}
Expand Down
2 changes: 1 addition & 1 deletion ownership/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ loop:

merkle.DigestFromBytes(&record.TxId, item.Value)

ownerData, err := GetOwnerData(record.TxId)
ownerData, err := GetOwnerData(nil, record.TxId)
if nil != err {
return nil, err
}
Expand Down
Loading

0 comments on commit 319e40c

Please sign in to comment.