Skip to content

Commit

Permalink
feature: newblock resumes blocks in progress
Browse files Browse the repository at this point in the history
This adds a new pact request called ContinueBlock to do this resumption
and also adds an option to NewBlock which allows creating empty blocks;
as well, the state involved in blocks in progress is encapsulated in a 
new BlockInProgress type.

Test plan is a new test that checks that a block created via NewBlock
is identical to a block created via a combination of NewBlock and
ContinueBlock.

Change-Id: Id0e6f9ac29cbef4e1b029db5be7aee0b80c2f9c8
  • Loading branch information
edmundnoble committed Jun 17, 2024
1 parent 3a25ad9 commit 38f3a3a
Show file tree
Hide file tree
Showing 24 changed files with 14,678 additions and 441 deletions.
1 change: 0 additions & 1 deletion bench/Chainweb/Pact/Backend/Bench.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import Chainweb.MerkleLogHash
import Chainweb.Pact.Backend.RelationalCheckpointer
import Chainweb.Pact.Backend.Types
import Chainweb.Pact.Backend.Utils
import Chainweb.Pact.Service.Types
import Chainweb.Pact.Types
import Chainweb.Test.TestVersions
import Chainweb.Utils.Bench
Expand Down
4 changes: 3 additions & 1 deletion bench/Chainweb/Pact/Backend/ForkingBench.hs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ createBlock validate nonce pact = do

-- assemble block without nonce and timestamp

T2 parent payload <- newBlock noMiner pact
bip <- newBlock noMiner NewBlockFill pact
let parent = _blockInProgressParentHeader bip
let payload = blockInProgressToPayloadWithOutputs bip

let creationTime = add second $ _blockCreationTime $ _parentHeader parent
let bh = newBlockHeader
Expand Down
3 changes: 2 additions & 1 deletion src/Chainweb/Chainweb.hs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ import Chainweb.Mempool.P2pConfig
import Chainweb.Miner.Config
import qualified Chainweb.OpenAPIValidation as OpenAPIValidation
import Chainweb.Pact.RestAPI.Server (PactServerData(..))
import Chainweb.Pact.Service.Types (PactServiceConfig(..), IntraBlockPersistence(..))
import Chainweb.Pact.Service.Types (PactServiceConfig(..))
import Chainweb.Pact.Backend.Types (IntraBlockPersistence(..))
import Chainweb.Pact.Validations
import Chainweb.Payload.PayloadStore
import Chainweb.Payload.PayloadStore.RocksDB
Expand Down
30 changes: 17 additions & 13 deletions src/Chainweb/Chainweb/MinerResources.hs
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,15 @@ import Chainweb.Miner.Config
import Chainweb.Miner.Coordinator
import Chainweb.Miner.Miners
import Chainweb.Miner.Pact (Miner(..), minerId)
import Chainweb.Pact.Service.Types
import Chainweb.Pact.Utils
import Chainweb.Payload
import Chainweb.Payload.PayloadStore
import Chainweb.Sync.WebBlockHeaderStore
import Chainweb.Time (Micros, Time, minute, getCurrentTimeIntegral, scaleTimeSpan)
import Chainweb.Utils (fromJuste, runForever, thd, T2(..), T3(..))
import Chainweb.Utils
import Chainweb.Version
import Chainweb.WebPactExecutionService (_webPactExecutionService)
import Chainweb.WebPactExecutionService

import Data.LogMessage (JsonLog(..), LogFunction)

Expand Down Expand Up @@ -96,8 +97,9 @@ withMiningCoordination logger conf cdb inner
in fmap ((mid,) . HM.fromList) $
forM cids $ \cid -> do
let bh = fromMaybe (genesisBlockHeader v cid) (HM.lookup cid (_cutMap cut))
fmap ((cid,) . over _2 Just) $
getPayload (ParentHeader bh) cid miner
newBlock <- getPayload (ParentHeader bh) cid miner
return (cid, Just newBlock)

m <- newTVarIO initialPw
c503 <- newIORef 0
c403 <- newIORef 0
Expand Down Expand Up @@ -141,18 +143,20 @@ withMiningCoordination logger conf cdb inner
pw <- readTVarIO tpw
let
-- we assume that this path always exists in PrimedWork and never delete it.
ourMiner :: Traversal' PrimedWork (T2 ParentHeader (Maybe PayloadWithOutputs))
ourMiner = _Wrapped' . at (view minerId miner) . _Just . at cid . _Just
let !(T2 ph _) = fromJuste $ pw ^? ourMiner
ourMiner :: Traversal' PrimedWork (Maybe NewBlock)
ourMiner = _Wrapped' . ix (view minerId miner) . ix cid
let !nb = pw ^?! ourMiner . _Just
let ph = newBlockParentHeader nb
-- wait for a block different from what we've got primed work for
new <- awaitNewBlock cdb cid (_parentHeader ph)
-- Temporarily block this chain from being considered for queries
atomically $ modifyTVar' tpw (ourMiner . _2 .~ Nothing)
atomically $ modifyTVar' tpw (ourMiner .~ Nothing)
-- Generate new payload for this miner
newParentAndPayload <- getPayload (ParentHeader new) cid miner
atomically $ modifyTVar' tpw (ourMiner .~ over _2 Just newParentAndPayload)
newBlock <- getPayload (ParentHeader new) cid miner

atomically $ modifyTVar' tpw (ourMiner .~ Just newBlock)

getPayload :: ParentHeader -> ChainId -> Miner -> IO (T2 ParentHeader PayloadWithOutputs)
getPayload :: ParentHeader -> ChainId -> Miner -> IO NewBlock
getPayload new cid m =
if v ^. versionCheats . disablePact
-- if pact is disabled, we must keep track of the latest header
Expand All @@ -161,10 +165,10 @@ withMiningCoordination logger conf cdb inner
-- with rocksdb though that shouldn't cause a problem, just wasted work,
-- see docs for
-- Chainweb.Pact.PactService.Checkpointer.findLatestValidBlockHeader'
then return $ T2 new emptyPayload
then return $ NewBlockPayload new emptyPayload
else trace (logFunction logger)
"Chainweb.Chainweb.MinerResources.withMiningCoordination.newBlock"
() 1 (_pactNewBlock pact cid m)
() 1 (_pactNewBlock pact cid m NewBlockFill)

pact :: PactExecutionService
pact = _webPactExecutionService $ view cutDbPactService cdb
Expand Down
58 changes: 32 additions & 26 deletions src/Chainweb/Miner/Coordinator.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ViewPatterns #-}

-- |
-- Module: Chainweb.Miner.Coordinator
Expand Down Expand Up @@ -130,14 +132,14 @@ data MiningCoordination logger tbl = MiningCoordination
-- made as often as desired, without clogging the Pact queue.
--
newtype PrimedWork =
PrimedWork (HM.HashMap MinerId (HM.HashMap ChainId (T2 ParentHeader (Maybe PayloadWithOutputs))))
PrimedWork (HM.HashMap MinerId (HM.HashMap ChainId (Maybe NewBlock)))
deriving newtype (Semigroup, Monoid)
deriving stock Generic
deriving anyclass (Wrapped)

resetPrimed :: MinerId -> ChainId -> PrimedWork -> PrimedWork
resetPrimed mid cid (PrimedWork pw) = PrimedWork
$! HM.adjust (HM.adjust (_2 .~ Nothing) cid) mid pw
$! HM.adjust (HM.adjust (\_ -> Nothing) cid) mid pw

-- | Data shared between the mining threads represented by `newWork` and
-- `publish`.
Expand Down Expand Up @@ -200,40 +202,44 @@ newWork logFun choice eminer@(Miner mid _) hdb pact tpw c = do
mpw <- atomically $ do
PrimedWork pw <- readTVar tpw
mpw <- maybe retry return (HM.lookup mid pw)
guard (any (isJust . ssnd) mpw)
guard (any isJust mpw)
return mpw
let mr = T2
<$> HM.lookup cid mpw
<*> getCutExtension c cid

case mr of
Just (T2 (T2 _ Nothing) _) -> do
Just (T2 Nothing _) -> do
logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " has stale work"
newWork logFun Anything eminer hdb pact tpw c
Nothing -> do
logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " not mineable"
newWork logFun Anything eminer hdb pact tpw c
Just (T2 (T2 (ParentHeader primedParent) (Just payload)) extension)
| _blockHash primedParent == _blockHash (_parentHeader (_cutExtensionParent extension)) -> do
let !phash = _payloadWithOutputsPayloadHash payload
!wh <- newWorkHeader hdb extension phash
pure $ Just $ T2 wh payload
| otherwise -> do
-- The cut is too old or the primed work is outdated. Probably
-- the former because it the mining coordination background job
-- is updating the primed work cache regularly. We could try
-- another chain, but it's safer to just return 'Nothing' here
-- and retry with an updated cut.
--
let !extensionParent = _parentHeader (_cutExtensionParent extension)
logFun @T.Text Info
$ "newWork: chain " <> sshow cid <> " not mineable because of parent header mismatch"
<> ". Primed parent hash: " <> toText (_blockHash primedParent)
<> ". Primed parent height: " <> sshow (_blockHeight primedParent)
<> ". Extension parent: " <> toText (_blockHash extensionParent)
<> ". Extension height: " <> sshow (_blockHeight extensionParent)

return Nothing
Just (T2 (Just newBlock) extension)
| ParentHeader primedParent <- newBlockParentHeader newBlock ->
if _blockHash primedParent ==
_blockHash (_parentHeader (_cutExtensionParent extension))
then do
let payload = newBlockToPayloadWithOutputs newBlock
let !phash = _payloadWithOutputsPayloadHash payload
!wh <- newWorkHeader hdb extension phash
pure $ Just $ T2 wh payload
else do
-- The cut is too old or the primed work is outdated. Probably
-- the former because it the mining coordination background job
-- is updating the primed work cache regularly. We could try
-- another chain, but it's safer to just return 'Nothing' here
-- and retry with an updated cut.
--
let !extensionParent = _parentHeader (_cutExtensionParent extension)
logFun @T.Text Info
$ "newWork: chain " <> sshow cid <> " not mineable because of parent header mismatch"
<> ". Primed parent hash: " <> toText (_blockHash primedParent)
<> ". Primed parent height: " <> sshow (_blockHeight primedParent)
<> ". Extension parent: " <> toText (_blockHash extensionParent)
<> ". Extension height: " <> sshow (_blockHeight extensionParent)

return Nothing

-- | Accepts a "solved" `BlockHeader` from some external source (e.g. a remote
-- mining client), attempts to reassociate it with the current best `Cut`, and
Expand Down Expand Up @@ -333,7 +339,7 @@ work mr mcid m = do
"no chains have primed work"
| otherwise ->
"all chains with primed work may be stalled. chains with primed payloads: "
<> sshow (sort [cid | (cid, T2 _ (Just _)) <- HM.toList mpw])
<> sshow (sort [cid | (cid, Just _) <- HM.toList mpw])
)

logDelays n'
Expand Down
3 changes: 1 addition & 2 deletions src/Chainweb/Pact/Backend/PactState/GrandHash/Import.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ import Chainweb.Pact.Backend.PactState.EmbeddedSnapshot (Snapshot(..))
import Chainweb.Pact.Backend.PactState.EmbeddedSnapshot.Mainnet qualified as MainnetSnapshots
import Chainweb.Pact.Backend.PactState.GrandHash.Utils (resolveLatestCutHeaders, resolveCutHeadersAtHeight, computeGrandHashesAt, exitLog, withConnections, chainwebDbFilePath, rocksParser, cwvParser)
import Chainweb.Pact.Backend.RelationalCheckpointer (withProdRelationalCheckpointer)
import Chainweb.Pact.Backend.Types (SQLiteEnv, _cpRewindTo)
import Chainweb.Pact.Service.Types (IntraBlockPersistence(..))
import Chainweb.Pact.Backend.Types (IntraBlockPersistence(..), SQLiteEnv, _cpRewindTo)
import Chainweb.Pact.Types (defaultModuleCacheLimit)
import Chainweb.Storage.Table.RocksDB (RocksDb, withReadOnlyRocksDb, modernDefaultOptions)
import Chainweb.Utils (sshow)
Expand Down
36 changes: 34 additions & 2 deletions src/Chainweb/Pact/Backend/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ module Chainweb.Pact.Backend.Types
, blockHandlerEnv
, runBlockEnv
, SQLiteEnv
, IntraBlockPersistence(..)
, BlockHandler(..)
, BlockHandlerEnv(..)
, mkBlockHandlerEnv
Expand All @@ -89,9 +90,11 @@ module Chainweb.Pact.Backend.Types
, MemPoolAccess(..)

, PactServiceException(..)
, BlockTxHistory(..)
) where

import Control.Concurrent.MVar
import Control.DeepSeq
import Control.Exception
import Control.Exception.Safe hiding (bracket)
import Control.Lens
Expand Down Expand Up @@ -124,13 +127,14 @@ import Pact.Types.Persistence
import Pact.Types.RowData (RowData)
import Pact.Types.Runtime (TableName)

import qualified Pact.JSON.Encode as J

-- internal modules
import Chainweb.BlockHash
import Chainweb.BlockHeader
import Chainweb.BlockHeight
import Chainweb.ChainId
import Chainweb.Pact.Backend.DbCache
import Chainweb.Pact.Service.Types
import Chainweb.Transaction
import Chainweb.Utils (T2)
import Chainweb.Version
Expand Down Expand Up @@ -212,7 +216,7 @@ data SQLitePendingData = SQLitePendingData
, _pendingTxLogMap :: !TxLogMap
, _pendingSuccessfulTxs :: !SQLitePendingSuccessfulTxs
}
deriving (Show)
deriving (Eq, Show)

makeLenses ''SQLitePendingData

Expand Down Expand Up @@ -250,6 +254,12 @@ initBlockState cl txid = BlockState

makeLenses ''BlockState

-- | Whether we write rows to the database that were already overwritten
-- in the same block. This is temporarily necessary to do while Rosetta uses
-- those rows to determine the contents of historic transactions.
data IntraBlockPersistence = PersistIntraBlockWrites | DoNotPersistIntraBlockWrites
deriving (Eq, Ord, Show)

data BlockHandlerEnv logger = BlockHandlerEnv
{ _blockHandlerDb :: !SQLiteEnv
, _blockHandlerLogger :: !logger
Expand Down Expand Up @@ -440,3 +450,25 @@ instance Show PactServiceException where
]

instance Exception PactServiceException

-- | Gather tx logs for a block, along with last tx for each
-- key in history, if any
-- Not intended for public API use; ToJSONs are for logging output.
data BlockTxHistory = BlockTxHistory
{ _blockTxHistory :: !(Map TxId [TxLog RowData])
, _blockPrevHistory :: !(Map RowKey (TxLog RowData))
}
deriving (Eq,Generic)
instance Show BlockTxHistory where
show = show . fmap (J.encodeText . J.Array) . _blockTxHistory
instance NFData BlockTxHistory

-- | The result of a historical lookup which might fail to even find the
-- header the history is being queried for.
data Historical a
= Historical a
| NoHistory
deriving stock (Foldable, Functor, Generic, Traversable)
deriving anyclass NFData

makePrisms ''Historical
Loading

0 comments on commit 38f3a3a

Please sign in to comment.