Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync Relayer from previous Ethereum block #384

Merged
merged 16 commits into from
Jun 1, 2021
Merged
Prev Previous commit
Next Next commit
update beefy eth listener
  • Loading branch information
denalimarsh committed Apr 30, 2021
commit 1b743dc5ab20064ca44af14fb9a9a55efcd1ba12
299 changes: 251 additions & 48 deletions relayer/workers/beefyrelayer/beefy-ethereum-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type BeefyEthereumListener struct {

func NewBeefyEthereumListener(ethereumConfig *ethereum.Config, ethereumConn *ethereum.Connection, beefyDB *store.Database,
beefyMessages chan<- store.BeefyRelayInfo, dbMessages chan<- store.DatabaseCmd, headers chan<- chain.Header,
log *logrus.Entry) *BeefyEthereumListener {
log *logrus.Entry) (*BeefyEthereumListener, error) {
return &BeefyEthereumListener{
ethereumConfig: ethereumConfig,
ethereumConn: ethereumConn,
Expand All @@ -45,7 +45,7 @@ func NewBeefyEthereumListener(ethereumConfig *ethereum.Config, ethereumConn *eth
headers: headers,
blockWaitPeriod: 0,
log: log,
}
}, nil
}

func (li *BeefyEthereumListener) Start(cxt context.Context, eg *errgroup.Group, descendantsUntilFinal uint64) error {
Expand All @@ -63,6 +63,22 @@ func (li *BeefyEthereumListener) Start(cxt context.Context, eg *errgroup.Group,
return err
}
li.blockWaitPeriod = blockWaitPeriod.Uint64()

// If starting block < latest block, sync the Relayer to the latest block
blockNumber, err := li.ethereumConn.GetClient().BlockNumber(cxt)
if err != nil {
return err
}
if uint64(li.ethereumConfig.StartBlock) < blockNumber {
denalimarsh marked this conversation as resolved.
Show resolved Hide resolved
li.log.Info(fmt.Sprintf("Syncing Relayer from block %d...", li.ethereumConfig.StartBlock))
err := li.pollHistoricEventsAndHeaders(cxt)
if err != nil {
return err
}
li.log.Info(fmt.Sprintf("Relayer fully synced. Starting live processing on block number %d...", blockNumber))
}

// In live mode the relayer processes blocks as they're mined and broadcast
eg.Go(func() error {
err := li.pollEventsAndHeaders(cxt, descendantsUntilFinal)
close(li.headers)
Expand All @@ -72,10 +88,21 @@ func (li *BeefyEthereumListener) Start(cxt context.Context, eg *errgroup.Group,
return nil
}

func (li *BeefyEthereumListener) pollEventsAndHeaders(
ctx context.Context,
descendantsUntilFinal uint64,
) error {
func (li *BeefyEthereumListener) pollHistoricEventsAndHeaders(ctx context.Context) error {
// Load starting block number and latest block number
blockNumber := li.ethereumConfig.StartBlock
denalimarsh marked this conversation as resolved.
Show resolved Hide resolved
latestBlockNumber, err := li.ethereumConn.GetClient().BlockNumber(ctx)
if err != nil {
return err
}

li.processHistoricalInitialVerificationSuccessfulEvents(ctx, blockNumber, latestBlockNumber)
li.processHistoricalFinalVerificationSuccessfulEvents(ctx, blockNumber, latestBlockNumber)

return nil
}

func (li *BeefyEthereumListener) pollEventsAndHeaders(ctx context.Context, descendantsUntilFinal uint64) error {
headers := make(chan *gethTypes.Header, 5)

li.ethereumConn.GetClient().SubscribeNewHead(ctx, headers)
Expand All @@ -86,48 +113,17 @@ func (li *BeefyEthereumListener) pollEventsAndHeaders(
li.log.Info("Shutting down listener...")
return ctx.Err()
case gethheader := <-headers:

// Query LightClientBridge contract's InitialVerificationSuccessful events
blockNumber := gethheader.Number.Uint64()
var lightClientBridgeEvents []*lightclientbridge.ContractInitialVerificationSuccessful

contractEvents, err := li.queryLightClientEvents(ctx, blockNumber, &blockNumber)
if err != nil {
li.log.WithError(err).Error("Failure fetching event logs")
return err
}
lightClientBridgeEvents = append(lightClientBridgeEvents, contractEvents...)

if len(lightClientBridgeEvents) > 0 {
li.log.Info(fmt.Sprintf("Found %d LightClientBridge contract events on block %d", len(lightClientBridgeEvents), blockNumber))
}
li.processLightClientEvents(ctx, lightClientBridgeEvents)

// Mark items ReadyToComplete if the current block number has passed their CompleteOnBlock number
items := li.beefyDB.GetItemsByStatus(store.InitialVerificationTxConfirmed)
if len(items) > 0 {
li.log.Info(fmt.Sprintf("Found %d item(s) in database awaiting completion block", len(items)))
}
for _, item := range items {
if item.CompleteOnBlock+descendantsUntilFinal <= blockNumber {
// Fetch intended completion block's hash
block, err := li.ethereumConn.GetClient().BlockByNumber(ctx, big.NewInt(int64(item.CompleteOnBlock)))
if err != nil {
li.log.WithError(err).Error("Failure fetching inclusion block")
}

li.log.Info("3: Updating item status from 'InitialVerificationTxConfirmed' to 'ReadyToComplete'")
item.Status = store.ReadyToComplete
item.RandomSeed = block.Hash()
li.beefyMessages <- *item
}
}
li.processInitialVerificationSuccessfulEvents(ctx, blockNumber)
li.forwardWitnessedBeefyCommitment(ctx, blockNumber, descendantsUntilFinal)
li.processInitialVerificationSuccessfulEvents(ctx, blockNumber)
}
}
}

// queryLightClientEvents queries ContractInitialVerificationSuccessful events from the LightClientBridge contract
func (li *BeefyEthereumListener) queryLightClientEvents(ctx context.Context, start uint64,
// queryInitialVerificationSuccessfulEvents queries ContractInitialVerificationSuccessful events from the LightClientBridge contract
func (li *BeefyEthereumListener) queryInitialVerificationSuccessfulEvents(ctx context.Context, start uint64,
end *uint64) ([]*lightclientbridge.ContractInitialVerificationSuccessful, error) {
var events []*lightclientbridge.ContractInitialVerificationSuccessful
filterOps := bind.FilterOpts{Start: start, End: end, Context: ctx}
Expand All @@ -153,26 +149,88 @@ func (li *BeefyEthereumListener) queryLightClientEvents(ctx context.Context, sta
return events, nil
}

// processLightClientEvents matches events to BEEFY commitment info by transaction hash
func (li *BeefyEthereumListener) processLightClientEvents(ctx context.Context, events []*lightclientbridge.ContractInitialVerificationSuccessful) {
// processHistoricalInitialVerificationSuccessfulEvents processes historical InitialVerificationSuccessful
// events, updating the status of matched BEEFY justifications in the database
func (li *BeefyEthereumListener) processHistoricalInitialVerificationSuccessfulEvents(ctx context.Context,
blockNumber, latestBlockNumber uint64) {

// Query previous InitialVerificationSuccessful events and update the status of BEEFY justifications in database
events, err := li.queryInitialVerificationSuccessfulEvents(ctx, blockNumber, &latestBlockNumber)
if err != nil {
li.log.WithError(err).Error("Failure fetching event logs")
}

li.log.Info(fmt.Sprintf(
"Found %d InitialVerificationSuccessful events between blocks %d-%d",
len(events), blockNumber, latestBlockNumber),
)

for _, event := range events {
// Only process events emitted by transactions sent from our node
if event.Prover != li.ethereumConn.GetKP().CommonAddress() {
continue
// Fetch validation data from contract using event.ID
validationData, err := li.lightClientBridge.ContractCaller.ValidationData(nil, event.Id)
if err != nil {
li.log.WithError(err).Error(fmt.Sprintf("Error querying validation data for ID %d", event.Id))
}

// Attempt to match items in database based on their payload
itemFoundInDatabase := false
items := li.beefyDB.GetItemsByStatus(store.CommitmentWitnessed)
for _, item := range items {
denalimarsh marked this conversation as resolved.
Show resolved Hide resolved
generatedPayload := li.simulatePayloadGeneration(*item)
if generatedPayload == validationData.Payload {
// Update existing database item
li.log.Info("Updating item status from 'CommitmentWitnessed' to 'InitialVerificationTxConfirmed'")
instructions := map[string]interface{}{
"status": store.InitialVerificationTxConfirmed,
"initial_verification_tx": event.Raw.TxHash.Hex(),
"complete_on_block": event.Raw.BlockNumber + li.blockWaitPeriod,
}
updateCmd := store.NewDatabaseCmd(item, store.Update, instructions)
li.dbMessages <- updateCmd

itemFoundInDatabase = true
break
}
}
if !itemFoundInDatabase {
// Don't have an existing item to update, therefore we won't be able to build the completion tx
li.log.Error("BEEFY justification data not found in database for InitialVerificationSuccessful event. Ignoring event.")
}
}
}

// processInitialVerificationSuccessfulEvents transitions matched database items from status
// InitialVerificationTxSent to InitialVerificationTxConfirmed
func (li *BeefyEthereumListener) processInitialVerificationSuccessfulEvents(ctx context.Context,
blockNumber uint64) {

events, err := li.queryInitialVerificationSuccessfulEvents(ctx, blockNumber, &blockNumber)
if err != nil {
li.log.WithError(err).Error("Failure fetching event logs")
}

if len(events) > 0 {
li.log.Info(fmt.Sprintf("Found %d InitialVerificationSuccessful events on block %d", len(events), blockNumber))
}

for _, event := range events {
li.log.WithFields(logrus.Fields{
"blockHash": event.Raw.BlockHash.Hex(),
"blockNumber": event.Raw.BlockNumber,
"txHash": event.Raw.TxHash.Hex(),
}).Info("event information")

// Only process events emitted by transactions sent from our node
if event.Prover != li.ethereumConn.GetKP().CommonAddress() {
continue
}

item := li.beefyDB.GetItemByInitialVerificationTxHash(event.Raw.TxHash)
if item.Status != store.InitialVerificationTxSent {
continue
}

li.log.Info("2: Updating item status from 'InitialVerificationTxSent' to 'InitialVerificationTxConfirmed'")
li.log.Info("3: Updating item status from 'InitialVerificationTxSent' to 'InitialVerificationTxConfirmed'")
instructions := map[string]interface{}{
"status": store.InitialVerificationTxConfirmed,
"complete_on_block": event.Raw.BlockNumber + li.blockWaitPeriod,
Expand All @@ -181,3 +239,148 @@ func (li *BeefyEthereumListener) processLightClientEvents(ctx context.Context, e
li.dbMessages <- updateCmd
}
}

// queryFinalVerificationSuccessfulEvents queries ContractFinalVerificationSuccessful events from the LightClientBridge contract
func (li *BeefyEthereumListener) queryFinalVerificationSuccessfulEvents(ctx context.Context, start uint64,
end *uint64) ([]*lightclientbridge.ContractFinalVerificationSuccessful, error) {
var events []*lightclientbridge.ContractFinalVerificationSuccessful
filterOps := bind.FilterOpts{Start: start, End: end, Context: ctx}

iter, err := li.lightClientBridge.FilterFinalVerificationSuccessful(&filterOps)
if err != nil {
return nil, err
}

for {
more := iter.Next()
if !more {
err = iter.Error()
if err != nil {
return nil, err
}
break
}

events = append(events, iter.Event)
}

return events, nil
}

// processHistoricalFinalVerificationSuccessfulEvents processes historical FinalVerificationSuccessful
// events, updating the status of matched BEEFY justifications in the database
func (li *BeefyEthereumListener) processHistoricalFinalVerificationSuccessfulEvents(ctx context.Context,
blockNumber, latestBlockNumber uint64) {
// Query previous FinalVerificationSuccessful events and update the status of BEEFY justifications in database
events, err := li.queryFinalVerificationSuccessfulEvents(ctx, blockNumber, &latestBlockNumber)
if err != nil {
li.log.WithError(err).Error("Failure fetching event logs")
}
li.log.Info(fmt.Sprintf(
"Found %d FinalVerificationSuccessful events between blocks %d-%d",
len(events), blockNumber, latestBlockNumber),
)

for _, event := range events {
// Fetch validation data from contract using event.ID
validationData, err := li.lightClientBridge.ContractCaller.ValidationData(nil, event.Id)
if err != nil {
li.log.WithError(err).Error(fmt.Sprintf("Error querying validation data for ID %d", event.Id))
}

// Attempt to match items in database based on their payload
itemFoundInDatabase := false
items := li.beefyDB.GetItemsByStatus(store.InitialVerificationTxConfirmed) // TODO: list of statuses
for _, item := range items {
denalimarsh marked this conversation as resolved.
Show resolved Hide resolved
generatedPayload := li.simulatePayloadGeneration(*item)
if generatedPayload == validationData.Payload {
li.log.Info("Deleting finalized item from the database'")
deleteCmd := store.NewDatabaseCmd(item, store.Delete, nil)
li.dbMessages <- deleteCmd

itemFoundInDatabase = true
break
}
philipstanislaus marked this conversation as resolved.
Show resolved Hide resolved
}
if !itemFoundInDatabase {
li.log.Error("BEEFY justification data not found in database for FinalVerificationSuccessful event. Ignoring event.")
}
}
}

// processFinalVerificationSuccessfulEvents removes finalized commitments from the relayer's BEEFY justification database
func (li *BeefyEthereumListener) processFinalVerificationSuccessfulEvents(ctx context.Context,
blockNumber uint64) {
events, err := li.queryFinalVerificationSuccessfulEvents(ctx, blockNumber, &blockNumber)
if err != nil {
li.log.WithError(err).Error("Failure fetching event logs")
}

if len(events) > 0 {
li.log.Info(fmt.Sprintf("Found %d FinalVerificationSuccessful events on block %d", len(events), blockNumber))
}

for _, event := range events {
li.log.WithFields(logrus.Fields{
"blockHash": event.Raw.BlockHash.Hex(),
"blockNumber": event.Raw.BlockNumber,
"txHash": event.Raw.TxHash.Hex(),
}).Info("event information")

if event.Prover != li.ethereumConn.GetKP().CommonAddress() {
continue
}

item := li.beefyDB.GetItemByFinalVerificationTxHash(event.Raw.TxHash)
if item.Status != store.CompleteVerificationTxSent {
continue
}

li.log.Info("6: Deleting finalized item from the database'")
deleteCmd := store.NewDatabaseCmd(item, store.Delete, nil)
li.dbMessages <- deleteCmd
}
}

// matchGeneratedPayload simulates msg building and payload generation
func (li *BeefyEthereumListener) simulatePayloadGeneration(item store.BeefyRelayInfo) [32]byte {
beefyJustification, err := item.ToBeefyJustification()
if err != nil {
li.log.WithError(fmt.Errorf("Error converting BeefyRelayInfo to BeefyJustification: %s", err.Error()))
}

msg, err := beefyJustification.BuildNewSignatureCommitmentMessage(0)
if err != nil {
li.log.WithError(err).Error("Error building commitment message")
}

return msg.Payload
}

// forwardWitnessedBeefyCommitment forwards witnessed BEEFY commitments to the Ethereum writer
func (li *BeefyEthereumListener) forwardWitnessedBeefyCommitment(ctx context.Context, blockNumber, descendantsUntilFinal uint64) {
witnessedItems := li.beefyDB.GetItemsByStatus(store.CommitmentWitnessed)
for _, item := range witnessedItems {
li.beefyMessages <- *item
}

// Mark items ReadyToComplete if the current block number has passed their CompleteOnBlock number
initialVerificationItems := li.beefyDB.GetItemsByStatus(store.InitialVerificationTxConfirmed)
if len(initialVerificationItems) > 0 {
li.log.Info(fmt.Sprintf("Found %d item(s) in database awaiting completion block", len(initialVerificationItems)))
}
for _, item := range initialVerificationItems {
if item.CompleteOnBlock+descendantsUntilFinal <= blockNumber {
// Fetch intended completion block's hash
block, err := li.ethereumConn.GetClient().BlockByNumber(ctx, big.NewInt(int64(item.CompleteOnBlock)))
if err != nil {
li.log.WithError(err).Error("Failure fetching inclusion block")
}

li.log.Info("4: Updating item status from 'InitialVerificationTxConfirmed' to 'ReadyToComplete'")
item.Status = store.ReadyToComplete
item.RandomSeed = block.Hash()
li.beefyMessages <- *item
}
}
}