Skip to content

Commit

Permalink
Sync Relayer from previous Ethereum block (#384)
Browse files Browse the repository at this point in the history
* add start block to eth config

* add delete cmd to database writer

* remove hex root print

* add INCENTIVIZED_CHANNEL_FEE env var to script

* update beefy eth listener

* update beefy eth writer

* update beefy relayer init

* uncomment relayer from start script

* revert beefy init changes

* call processFinalVerificationSuccessfulEvents

* send txs for populated database before going live

* update types for main

* add code comment

* minor fixes

Co-authored-by: Aidan Musnitzky <musnit@gmail.com>
  • Loading branch information
denalimarsh and musnit authored Jun 1, 2021
1 parent 3e137de commit 0eec3df
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 61 deletions.
1 change: 1 addition & 0 deletions ethereum/scripts/dumpTestConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const dump = (tmpDir, channels, bridge) => {
const config = {
ethereum: {
endpoint: "ws://localhost:8545/",
startblock: 1,
"descendants-until-final": 3,
channels: {
basic: {
Expand Down
1 change: 1 addition & 0 deletions relayer/chain/ethereum/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type Config struct {
DescendantsUntilFinal byte `mapstructure:"descendants-until-final"`
Channels ChannelsConfig `mapstructure:"channels"`
LightClientBridge string `mapstructure:"lightclientbridge"`
StartBlock uint64 `mapstructure:"startblock"`
}

type ChannelsConfig struct {
Expand Down
308 changes: 259 additions & 49 deletions relayer/workers/beefyrelayer/beefy-ethereum-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewBeefyEthereumListener(ethereumConfig *ethereum.Config, ethereumConn *eth
}
}

func (li *BeefyEthereumListener) Start(cxt context.Context, eg *errgroup.Group, descendantsUntilFinal uint64) error {
func (li *BeefyEthereumListener) Start(ctx context.Context, eg *errgroup.Group, descendantsUntilFinal uint64) error {

// Set up light client bridge contract
lightClientBridgeContract, err := lightclientbridge.NewContract(common.HexToAddress(li.ethereumConfig.LightClientBridge), li.ethereumConn.GetClient())
Expand All @@ -63,19 +63,49 @@ func (li *BeefyEthereumListener) Start(cxt context.Context, eg *errgroup.Group,
return err
}
li.blockWaitPeriod = blockWaitPeriod.Uint64()

// If starting block < latest block, sync the Relayer to the latest block
blockNumber, err := li.ethereumConn.GetClient().BlockNumber(ctx)
if err != nil {
return err
}
// Relayer config StartBlock config variable must be updated to the latest Ethereum block number
if uint64(li.ethereumConfig.StartBlock) < blockNumber {
li.log.Info(fmt.Sprintf("Syncing Relayer from block %d...", li.ethereumConfig.StartBlock))
err := li.pollHistoricEventsAndHeaders(ctx, descendantsUntilFinal)
if err != nil {
return err
}
li.log.Info(fmt.Sprintf("Relayer fully synced. Starting live processing on block number %d...", blockNumber))
}

// In live mode the relayer processes blocks as they're mined and broadcast
eg.Go(func() error {
err := li.pollEventsAndHeaders(cxt, descendantsUntilFinal)
err := li.pollEventsAndHeaders(ctx, descendantsUntilFinal)
close(li.headers)
return err
})

return nil
}

func (li *BeefyEthereumListener) pollEventsAndHeaders(
ctx context.Context,
descendantsUntilFinal uint64,
) error {
func (li *BeefyEthereumListener) pollHistoricEventsAndHeaders(ctx context.Context, descendantsUntilFinal uint64) error {
// Load starting block number and latest block number
blockNumber := li.ethereumConfig.StartBlock
latestBlockNumber, err := li.ethereumConn.GetClient().BlockNumber(ctx)
if err != nil {
return err
}
// Populate database
li.processHistoricalInitialVerificationSuccessfulEvents(ctx, blockNumber, latestBlockNumber)
li.processHistoricalFinalVerificationSuccessfulEvents(ctx, blockNumber, latestBlockNumber)
// Send transactions for items in database based on their statuses
li.forwardWitnessedBeefyJustifications()
li.forwardReadyToCompleteItems(ctx, blockNumber, descendantsUntilFinal)
return nil
}

func (li *BeefyEthereumListener) pollEventsAndHeaders(ctx context.Context, descendantsUntilFinal uint64) error {
headers := make(chan *gethTypes.Header, 5)

li.ethereumConn.GetClient().SubscribeNewHead(ctx, headers)
Expand All @@ -86,48 +116,17 @@ func (li *BeefyEthereumListener) pollEventsAndHeaders(
li.log.Info("Shutting down listener...")
return ctx.Err()
case gethheader := <-headers:

// Query LightClientBridge contract's InitialVerificationSuccessful events
blockNumber := gethheader.Number.Uint64()
var lightClientBridgeEvents []*lightclientbridge.ContractInitialVerificationSuccessful

contractEvents, err := li.queryLightClientEvents(ctx, blockNumber, &blockNumber)
if err != nil {
li.log.WithError(err).Error("Failure fetching event logs")
return err
}
lightClientBridgeEvents = append(lightClientBridgeEvents, contractEvents...)

if len(lightClientBridgeEvents) > 0 {
li.log.Info(fmt.Sprintf("Found %d LightClientBridge contract events on block %d", len(lightClientBridgeEvents), blockNumber))
}
li.processLightClientEvents(ctx, lightClientBridgeEvents)

// Mark items ReadyToComplete if the current block number has passed their CompleteOnBlock number
items := li.beefyDB.GetItemsByStatus(store.InitialVerificationTxConfirmed)
if len(items) > 0 {
li.log.Info(fmt.Sprintf("Found %d item(s) in database awaiting completion block", len(items)))
}
for _, item := range items {
if item.CompleteOnBlock+descendantsUntilFinal <= blockNumber {
// Fetch intended completion block's hash
block, err := li.ethereumConn.GetClient().BlockByNumber(ctx, big.NewInt(int64(item.CompleteOnBlock)))
if err != nil {
li.log.WithError(err).Error("Failure fetching inclusion block")
}

li.log.Info("3: Updating item status from 'InitialVerificationTxConfirmed' to 'ReadyToComplete'")
item.Status = store.ReadyToComplete
item.RandomSeed = block.Hash()
li.beefyMessages <- *item
}
}
li.forwardWitnessedBeefyJustifications()
li.processInitialVerificationSuccessfulEvents(ctx, blockNumber)
li.forwardReadyToCompleteItems(ctx, blockNumber, descendantsUntilFinal)
li.processFinalVerificationSuccessfulEvents(ctx, blockNumber)
}
}
}

// queryLightClientEvents queries ContractInitialVerificationSuccessful events from the LightClientBridge contract
func (li *BeefyEthereumListener) queryLightClientEvents(ctx context.Context, start uint64,
// queryInitialVerificationSuccessfulEvents queries ContractInitialVerificationSuccessful events from the LightClientBridge contract
func (li *BeefyEthereumListener) queryInitialVerificationSuccessfulEvents(ctx context.Context, start uint64,
end *uint64) ([]*lightclientbridge.ContractInitialVerificationSuccessful, error) {
var events []*lightclientbridge.ContractInitialVerificationSuccessful
filterOps := bind.FilterOpts{Start: start, End: end, Context: ctx}
Expand All @@ -153,26 +152,88 @@ func (li *BeefyEthereumListener) queryLightClientEvents(ctx context.Context, sta
return events, nil
}

// processLightClientEvents matches events to BEEFY commitment info by transaction hash
func (li *BeefyEthereumListener) processLightClientEvents(ctx context.Context, events []*lightclientbridge.ContractInitialVerificationSuccessful) {
// processHistoricalInitialVerificationSuccessfulEvents processes historical InitialVerificationSuccessful
// events, updating the status of matched BEEFY justifications in the database
func (li *BeefyEthereumListener) processHistoricalInitialVerificationSuccessfulEvents(ctx context.Context,
blockNumber, latestBlockNumber uint64) {

// Query previous InitialVerificationSuccessful events and update the status of BEEFY justifications in database
events, err := li.queryInitialVerificationSuccessfulEvents(ctx, blockNumber, &latestBlockNumber)
if err != nil {
li.log.WithError(err).Error("Failure fetching event logs")
}

li.log.Info(fmt.Sprintf(
"Found %d InitialVerificationSuccessful events between blocks %d-%d",
len(events), blockNumber, latestBlockNumber),
)

for _, event := range events {
// Only process events emitted by transactions sent from our node
if event.Prover != li.ethereumConn.GetKP().CommonAddress() {
continue
// Fetch validation data from contract using event.ID
validationData, err := li.lightClientBridge.ContractCaller.ValidationData(nil, event.Id)
if err != nil {
li.log.WithError(err).Error(fmt.Sprintf("Error querying validation data for ID %d", event.Id))
}

// Attempt to match items in database based on their payload
itemFoundInDatabase := false
items := li.beefyDB.GetItemsByStatus(store.CommitmentWitnessed)
for _, item := range items {
generatedPayload := li.simulatePayloadGeneration(*item)
if generatedPayload == validationData.CommitmentHash {
// Update existing database item
li.log.Info("Updating item status from 'CommitmentWitnessed' to 'InitialVerificationTxConfirmed'")
instructions := map[string]interface{}{
"status": store.InitialVerificationTxConfirmed,
"initial_verification_tx": event.Raw.TxHash.Hex(),
"complete_on_block": event.Raw.BlockNumber + li.blockWaitPeriod,
}
updateCmd := store.NewDatabaseCmd(item, store.Update, instructions)
li.dbMessages <- updateCmd

itemFoundInDatabase = true
break
}
}
if !itemFoundInDatabase {
// Don't have an existing item to update, therefore we won't be able to build the completion tx
li.log.Error("BEEFY justification data not found in database for InitialVerificationSuccessful event. Ignoring event.")
}
}
}

// processInitialVerificationSuccessfulEvents transitions matched database items from status
// InitialVerificationTxSent to InitialVerificationTxConfirmed
func (li *BeefyEthereumListener) processInitialVerificationSuccessfulEvents(ctx context.Context,
blockNumber uint64) {

events, err := li.queryInitialVerificationSuccessfulEvents(ctx, blockNumber, &blockNumber)
if err != nil {
li.log.WithError(err).Error("Failure fetching event logs")
}

if len(events) > 0 {
li.log.Info(fmt.Sprintf("Found %d InitialVerificationSuccessful events on block %d", len(events), blockNumber))
}

for _, event := range events {
li.log.WithFields(logrus.Fields{
"blockHash": event.Raw.BlockHash.Hex(),
"blockNumber": event.Raw.BlockNumber,
"txHash": event.Raw.TxHash.Hex(),
}).Info("event information")

// Only process events emitted by transactions sent from our node
if event.Prover != li.ethereumConn.GetKP().CommonAddress() {
continue
}

item := li.beefyDB.GetItemByInitialVerificationTxHash(event.Raw.TxHash)
if item.Status != store.InitialVerificationTxSent {
continue
}

li.log.Info("2: Updating item status from 'InitialVerificationTxSent' to 'InitialVerificationTxConfirmed'")
li.log.Info("3: Updating item status from 'InitialVerificationTxSent' to 'InitialVerificationTxConfirmed'")
instructions := map[string]interface{}{
"status": store.InitialVerificationTxConfirmed,
"complete_on_block": event.Raw.BlockNumber + li.blockWaitPeriod,
Expand All @@ -181,3 +242,152 @@ func (li *BeefyEthereumListener) processLightClientEvents(ctx context.Context, e
li.dbMessages <- updateCmd
}
}

// queryFinalVerificationSuccessfulEvents queries ContractFinalVerificationSuccessful events from the LightClientBridge contract
func (li *BeefyEthereumListener) queryFinalVerificationSuccessfulEvents(ctx context.Context, start uint64,
end *uint64) ([]*lightclientbridge.ContractFinalVerificationSuccessful, error) {
var events []*lightclientbridge.ContractFinalVerificationSuccessful
filterOps := bind.FilterOpts{Start: start, End: end, Context: ctx}

iter, err := li.lightClientBridge.FilterFinalVerificationSuccessful(&filterOps)
if err != nil {
return nil, err
}

for {
more := iter.Next()
if !more {
err = iter.Error()
if err != nil {
return nil, err
}
break
}

events = append(events, iter.Event)
}

return events, nil
}

// processHistoricalFinalVerificationSuccessfulEvents processes historical FinalVerificationSuccessful
// events, updating the status of matched BEEFY justifications in the database
func (li *BeefyEthereumListener) processHistoricalFinalVerificationSuccessfulEvents(ctx context.Context,
blockNumber, latestBlockNumber uint64) {
// Query previous FinalVerificationSuccessful events and update the status of BEEFY justifications in database
events, err := li.queryFinalVerificationSuccessfulEvents(ctx, blockNumber, &latestBlockNumber)
if err != nil {
li.log.WithError(err).Error("Failure fetching event logs")
}
li.log.Info(fmt.Sprintf(
"Found %d FinalVerificationSuccessful events between blocks %d-%d",
len(events), blockNumber, latestBlockNumber),
)

for _, event := range events {
// Fetch validation data from contract using event.ID
validationData, err := li.lightClientBridge.ContractCaller.ValidationData(nil, event.Id)
if err != nil {
li.log.WithError(err).Error(fmt.Sprintf("Error querying validation data for ID %d", event.Id))
}

// Attempt to match items in database based on their payload
itemFoundInDatabase := false
items := li.beefyDB.GetItemsByStatus(store.InitialVerificationTxConfirmed)
for _, item := range items {
generatedPayload := li.simulatePayloadGeneration(*item)
if generatedPayload == validationData.CommitmentHash {
li.log.Info("Deleting finalized item from the database'")
deleteCmd := store.NewDatabaseCmd(item, store.Delete, nil)
li.dbMessages <- deleteCmd

itemFoundInDatabase = true
break
}
}
if !itemFoundInDatabase {
li.log.Error("BEEFY justification data not found in database for FinalVerificationSuccessful event. Ignoring event.")
}
}
}

// processFinalVerificationSuccessfulEvents removes finalized commitments from the relayer's BEEFY justification database
func (li *BeefyEthereumListener) processFinalVerificationSuccessfulEvents(ctx context.Context,
blockNumber uint64) {
events, err := li.queryFinalVerificationSuccessfulEvents(ctx, blockNumber, &blockNumber)
if err != nil {
li.log.WithError(err).Error("Failure fetching event logs")
}

if len(events) > 0 {
li.log.Info(fmt.Sprintf("Found %d FinalVerificationSuccessful events on block %d", len(events), blockNumber))
}

for _, event := range events {
li.log.WithFields(logrus.Fields{
"blockHash": event.Raw.BlockHash.Hex(),
"blockNumber": event.Raw.BlockNumber,
"txHash": event.Raw.TxHash.Hex(),
}).Info("event information")

if event.Prover != li.ethereumConn.GetKP().CommonAddress() {
continue
}

item := li.beefyDB.GetItemByFinalVerificationTxHash(event.Raw.TxHash)
if item.Status != store.CompleteVerificationTxSent {
continue
}

li.log.Info("6: Deleting finalized item from the database'")
deleteCmd := store.NewDatabaseCmd(item, store.Delete, nil)
li.dbMessages <- deleteCmd
}
}

// matchGeneratedPayload simulates msg building and payload generation
func (li *BeefyEthereumListener) simulatePayloadGeneration(item store.BeefyRelayInfo) [32]byte {
beefyJustification, err := item.ToBeefyJustification()
if err != nil {
li.log.WithError(fmt.Errorf("Error converting BeefyRelayInfo to BeefyJustification: %s", err.Error()))
}

msg, err := beefyJustification.BuildNewSignatureCommitmentMessage(0)
if err != nil {
li.log.WithError(err).Error("Error building commitment message")
}

return msg.CommitmentHash
}

// forwardWitnessedBeefyJustifications forwards witnessed BEEFY commitments to the Ethereum writer
func (li *BeefyEthereumListener) forwardWitnessedBeefyJustifications() {
witnessedItems := li.beefyDB.GetItemsByStatus(store.CommitmentWitnessed)
for _, item := range witnessedItems {
li.beefyMessages <- *item
}
}

// forwardReadyToCompleteItems updates the status of items in the databse to ReadyToComplete if the
// current block number has passed their CompleteOnBlock number
func (li *BeefyEthereumListener) forwardReadyToCompleteItems(ctx context.Context, blockNumber, descendantsUntilFinal uint64) {
// Mark items ReadyToComplete if the current block number has passed their CompleteOnBlock number
initialVerificationItems := li.beefyDB.GetItemsByStatus(store.InitialVerificationTxConfirmed)
if len(initialVerificationItems) > 0 {
li.log.Info(fmt.Sprintf("Found %d item(s) in database awaiting completion block", len(initialVerificationItems)))
}
for _, item := range initialVerificationItems {
if item.CompleteOnBlock+descendantsUntilFinal <= blockNumber {
// Fetch intended completion block's hash
block, err := li.ethereumConn.GetClient().BlockByNumber(ctx, big.NewInt(int64(item.CompleteOnBlock)))
if err != nil {
li.log.WithError(err).Error("Failure fetching inclusion block")
}

li.log.Info("4: Updating item status from 'InitialVerificationTxConfirmed' to 'ReadyToComplete'")
item.Status = store.ReadyToComplete
item.RandomSeed = block.Hash()
li.beefyMessages <- *item
}
}
}
Loading

0 comments on commit 0eec3df

Please sign in to comment.