Skip to content

Commit

Permalink
feature: miner update stream reports refreshed blocks
Browse files Browse the repository at this point in the history
Github PR: #1907

Change-Id: I16a7fc2fed6c5e09e73cbcb704c81da41b7ffa3b
  • Loading branch information
edmundnoble committed Jun 17, 2024
1 parent 6e409ac commit dc7279f
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 29 deletions.
11 changes: 6 additions & 5 deletions src/Chainweb/Chainweb/MinerResources.hs
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,16 @@ withMiningCoordination logger conf cdb inner
!miners = S.toList (_coordinationMiners coordConf)
<> [ _nodeMiner inNodeConf | _nodeMiningEnabled inNodeConf ]

chainLogger cid = addLabel ("chain", toText cid)

-- | THREAD: Keep a live-updated cache of Payloads for specific miners, such
-- that when they request new work, the block can be instantly constructed
-- without interacting with the Pact Queue.
--
primeWork :: TVar PrimedWork -> ChainId -> IO ()
primeWork tpw cid =
forConcurrently_ miners $ \miner ->
runForever (logFunction logger) "primeWork" (go miner)
runForever (logFunction (chainLogger cid logger)) "primeWork" (go miner)
where
go :: Miner -> IO ()
go miner = do
Expand Down Expand Up @@ -168,9 +170,8 @@ withMiningCoordination logger conf cdb inner
NoHistory -> continuableBlockInProgress
Historical b -> b

logFunctionText logger Info
$ "refreshed block on chain " <> sshow cid
<> ", old and new tx count "
logFunctionText (chainLogger cid logger) Debug
$ "refreshed block, old and new tx count: "
<> sshow (V.length $ _transactionPairs $ _blockInProgressTransactions continuableBlockInProgress, V.length $ _transactionPairs $ _blockInProgressTransactions newBlock)

atomically $ modifyTVar' tpw $
Expand Down Expand Up @@ -204,7 +205,7 @@ withMiningCoordination logger conf cdb inner
-- Chainweb.Pact.PactService.Checkpointer.findLatestValidBlockHeader'
then return $
NewBlockPayload ph emptyPayload
else trace (logFunction logger)
else trace (logFunction (chainLogger cid logger))
"Chainweb.Chainweb.MinerResources.withMiningCoordination.newBlock"
() 1 (_pactNewBlock pact cid m NewBlockFill)

Expand Down
139 changes: 117 additions & 22 deletions src/Chainweb/Miner/RestAPI/Server.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
Expand All @@ -25,18 +26,21 @@ module Chainweb.Miner.RestAPI.Server

import Control.Concurrent.STM.TVar
(TVar, readTVar, readTVarIO, registerDelay)
import Control.Lens
import Control.Monad (when, unless)
import Control.Monad.Catch (bracket, try, catches)
import qualified Control.Monad.Catch as E
import Control.Monad.Except (throwError)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.STM (atomically)
import Control.Monad.STM

import Data.Binary.Builder (fromByteString)
import qualified Data.HashMap.Strict as HM
import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef, writeIORef)
import qualified Data.Map.Strict as M
import Data.Proxy (Proxy(..))
import qualified Data.Set as S
import qualified Data.Vector as V

import Network.HTTP.Types.Status
import Network.Wai (responseLBS)
Expand All @@ -45,23 +49,26 @@ import Network.Wai.EventSource (ServerEvent(..), eventSourceAppIO)
import Servant.API
import Servant.Server

import System.LogLevel
import System.Random

-- internal modules

import Chainweb.Cut (Cut)
import Chainweb.BlockHeader
import Chainweb.Cut.Create
import Chainweb.CutDB (awaitNewCutByChainIdStm, _cut)
import Chainweb.Logger (Logger)
import Chainweb.Logger
import Chainweb.Miner.Config
import Chainweb.Miner.Coordinator
import Chainweb.Miner.Core
import Chainweb.Miner.Pact
import Chainweb.Miner.RestAPI (MiningApi)
import Chainweb.Pact.Service.Types(BlockInProgress(..), Transactions(..))
import Chainweb.Payload
import Chainweb.RestAPI.Utils
import Chainweb.Utils (EncodingException(..))
import Chainweb.Utils
import Chainweb.Utils.Serialization
import Chainweb.Version
import Chainweb.WebPactExecutionService

-- -------------------------------------------------------------------------- --
-- Work Handler
Expand Down Expand Up @@ -114,14 +121,33 @@ solvedHandler mr (HeaderBytes bytes) = do
-- -------------------------------------------------------------------------- --
-- Updates Handler

-- | Whether the work is outdated and should be thrown out immediately,
-- or there's just fresher work available and the old work is still valid.
data WorkChange = WorkOutdated | WorkRefreshed | WorkRegressed
deriving stock (Show, Eq)

updatesHandler
:: Logger l
=> MiningCoordination l tbl
-> ChainBytes
-> Tagged Handler Application
updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ do
cid <- runGetS decodeChainId cbytes
cv <- _cut (_coordCutDb mr) >>= newIORef
watchedChain <- runGetS decodeChainId cbytes
(watchedMiner, blockOnChain) <- atomically $ do
PrimedWork pw <- readTVar (_coordPrimedWork mr)
-- we check if `watchedMiner` has new work. we ignore
-- all other miner IDs, because we don't know which miner
-- is asking for updates, and if we watched all of them at once
-- we'd send too many messages.
-- this is deliberately partial, primed work will always have
-- at least one miner or that's an error
let (watchedMiner, minerBlocks) = HM.toList pw ^?! _head
-- and that miner will always have this chain(?)
-- if this chain doesn't exist yet just wait
blockOnChain <- maybe retry return
$ minerBlocks ^? ix watchedChain
return (watchedMiner, blockOnChain)
blockOnChainRef <- newIORef blockOnChain

-- An update stream is closed after @timeout@ seconds. We add some jitter to
-- availablility of streams is uniformily distributed over time and not
Expand All @@ -130,36 +156,105 @@ updatesHandler mr (ChainBytes cbytes) = Tagged $ \req resp -> withLimit resp $ d
jitter <- randomRIO @Double (0.9, 1.1)
timer <- registerDelay (round $ jitter * realToFrac timeout * 1_000_000)

eventSourceAppIO (go timer cid cv) req resp
eventSourceAppIO (go timer watchedChain watchedMiner blockOnChainRef) req resp
where
timeout = _coordinationUpdateStreamTimeout $ _coordConf mr

-- | A nearly empty `ServerEvent` that signals the discovery of a new
-- `Cut`. Currently there is no need to actually send any information over
-- to the caller.
-- | A nearly empty `ServerEvent` that signals the work on this chain has
-- changed, and how.
--
f :: ServerEvent
f = ServerEvent (Just $ fromByteString "New Cut") Nothing []
eventForWorkChangeType :: WorkChange -> ServerEvent
eventForWorkChangeType WorkOutdated = ServerEvent (Just $ fromByteString "New Cut") Nothing []
eventForWorkChangeType WorkRefreshed = ServerEvent (Just $ fromByteString "Refreshed Block") Nothing []
-- this is only different from WorkRefreshed for logging
eventForWorkChangeType WorkRegressed = ServerEvent (Just $ fromByteString "Refreshed Block") Nothing []

go :: TVar Bool -> ChainId -> IORef Cut -> IO ServerEvent
go timer cid cv = do
c <- readIORef cv
go :: TVar Bool -> ChainId -> MinerId -> IORef (Maybe NewBlock) -> IO ServerEvent
go timer watchedChain watchedMiner blockOnChainRef = do
lastBlockOnChain <- readIORef blockOnChainRef

-- await either a timeout or a new event
maybeCut <- atomically $ do
maybeNewBlock <- atomically $ do
t <- readTVar timer
if t
then return Nothing
else Just <$> awaitNewCutByChainIdStm (_coordCutDb mr) cid c
else Just <$> awaitNewPrimedWork watchedChain watchedMiner lastBlockOnChain

case maybeCut of
case maybeNewBlock of
Nothing -> return CloseEvent
Just c' -> do
writeIORef cv $! c'
return f
Just (workChange, currentBlockOnChain) -> do
writeIORef blockOnChainRef currentBlockOnChain
logFunctionText logger Debug $
"sent update to miner on chain " <> toText watchedChain <> ": " <> sshow workChange
when (workChange == WorkRegressed) $
logFunctionText logger Warn $
"miner block regressed: " <> sshow currentBlockOnChain
return (eventForWorkChangeType workChange)
where
logger = addLabel ("chain", toText watchedChain) (_coordLogger mr)

count = _coordUpdateStreamCount mr

awaitNewPrimedWork watchedChain watchedMiner lastBlockOnChain = do
PrimedWork pw <- readTVar (_coordPrimedWork mr)
let currentBlockOnChain = pw ^?! ix watchedMiner . ix watchedChain
case (lastBlockOnChain, currentBlockOnChain) of

-- we just got new PrimedWork after it was outdated;
-- should only happen in case of a race, where the miner
-- subscribes for updates and its work becomes stale before
-- it receives an update
(Nothing, Just _) -> return (WorkOutdated, currentBlockOnChain)

-- we just lost our PrimedWork because it's outdated,
-- miner should grab new work.
(Just _, Nothing) -> return (WorkOutdated, currentBlockOnChain)

-- there was no work, and that hasn't changed.
(Nothing, Nothing) -> retry

(Just (NewBlockInProgress lastBip), Just (NewBlockInProgress currentBip))
| ParentHeader lastPh <- _blockInProgressParentHeader lastBip
, ParentHeader currentPh <- _blockInProgressParentHeader currentBip
, lastPh /= currentPh ->
-- we've got a new block on a new parent, we must've missed
-- the update where the old block became outdated.
-- miner should restart
return (WorkOutdated, currentBlockOnChain)

| lastTlen <- V.length (_transactionPairs $ _blockInProgressTransactions lastBip)
, currentTlen <- V.length (_transactionPairs $ _blockInProgressTransactions currentBip)
, lastTlen /= currentTlen ->
if currentTlen < lastTlen
then
-- our refreshed block somehow has less transactions,
-- but the same parent header, log this as a bizarre case
return (WorkRegressed, currentBlockOnChain)
else
-- we've got a block that's been extended with new transactions
-- miner should restart
return (WorkRefreshed, currentBlockOnChain)

-- no apparent change
| otherwise -> retry
(Just (NewBlockPayload lastPh lastPwo), Just (NewBlockPayload currentPh currentPwo))
| lastPh /= currentPh ->
-- we've got a new block on a new parent, we must've missed
-- the update where the old block became outdated.
-- miner should restart.
return (WorkOutdated, currentBlockOnChain)

| _payloadWithOutputsPayloadHash lastPwo /= _payloadWithOutputsPayloadHash currentPwo ->
-- this should be impossible because NewBlockPayload is for
-- when Pact is off so blocks can't be refreshed, but we've got
-- a different block with the same parent, so the miner should restart.
return (WorkRefreshed, currentBlockOnChain)

-- no apparent change
| otherwise -> retry
(Just _, Just _) ->
error "awaitNewPrimedWork: impossible: NewBlockInProgress replaced by a NewBlockPayload"

withLimit resp inner = bracket
(atomicModifyIORef' count $ \x -> (x - 1, x - 1))
(const $ atomicModifyIORef' count $ \x -> (x + 1, ()))
Expand Down
5 changes: 3 additions & 2 deletions src/Chainweb/WebPactExecutionService.hs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ import Pact.Types.RowData (RowData)
-- PactExecutionService

data NewBlock
= NewBlockInProgress !BlockInProgress
| NewBlockPayload !ParentHeader !PayloadWithOutputs
= NewBlockInProgress !BlockInProgress
| NewBlockPayload !ParentHeader !PayloadWithOutputs
deriving Show

newBlockToPayloadWithOutputs :: NewBlock -> PayloadWithOutputs
newBlockToPayloadWithOutputs (NewBlockInProgress bip)
Expand Down

0 comments on commit dc7279f

Please sign in to comment.