Skip to content

Commit

Permalink
Add finalised event watcher (#84)
Browse files Browse the repository at this point in the history
* add ability to observe finalised events

* test reorg scenarios

* create deterministic id scheme for events

* minor refactoring

* fix comment typo
  • Loading branch information
ermyas authored Jan 17, 2022
1 parent 9028b88 commit 0d6c68c
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/core/types"

"github.com/consensys/gpact/messaging/relayer/internal/contracts/functioncall"
v1 "github.com/consensys/gpact/messaging/relayer/internal/messages/v1"
Expand Down Expand Up @@ -55,7 +56,7 @@ func (t *SFCEventTransformer) ToMessage(event interface{}) (*v1.Message, error)
}

message := v1.Message{
ID: hex.EncodeToString(randomBytes(16)), // TODO: replace with a proper message id scheme
ID: t.getIDForEvent(sfcEvent.Raw),
Timestamp: sfcEvent.Timestamp.Int64(),
MsgType: v1.MessageType,
Version: v1.Version,
Expand All @@ -80,6 +81,11 @@ func (t *SFCEventTransformer) validate(event *functioncall.SfcCrossCall) error {
return nil
}

// getIDForEvent generates a deterministic ID for an event of the format {network_id}/{contract_address}/{block_number}/{tx_index}/{log_index}
func (t *SFCEventTransformer) getIDForEvent(event types.Log) string {
return fmt.Sprintf("%s/%s/%d/%d/%d", t.Source, t.SourceAddr, event.BlockNumber, event.TxIndex, event.Index)
}

func NewSFCEventTransformer(sourceNetwork string, sourceAddr string) *SFCEventTransformer {
return &SFCEventTransformer{sourceNetwork, sourceAddr}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package observer
import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/ethereum/go-ethereum/core/types"
"math/big"
"testing"

Expand All @@ -26,12 +28,20 @@ import (
"github.com/stretchr/testify/assert"
)

var fixLog = types.Log{
BlockNumber: uint64(12234),
TxIndex: uint(2),
Index: uint(1),
}

var fixValidEvent = functioncall.SfcCrossCall{
DestBcId: big.NewInt(1),
DestContract: common.HexToAddress("0x8e215d06ea7ec1fdb4fc5fd21768f4b34ee92ef4"),
Timestamp: big.NewInt(1639527190),
DestFunctionCall: randomBytes(10),
Raw: fixLog,
}

var transformer = NewSFCEventTransformer("network-001", "0x8e215d06ea7ec1fdb4fc5fd21768f4b34ee92ef4")

func TestSFCTransformer(t *testing.T) {
Expand All @@ -45,18 +55,23 @@ func TestSFCTransformer(t *testing.T) {
assert.Equal(t, fixValidEvent.DestContract.String(), message.Destination.ContractAddress)
assert.Equal(t, fixValidEvent.Timestamp, big.NewInt(message.Timestamp))
assert.Equal(t, hex.EncodeToString(data), message.Payload)

expectedID := fmt.Sprintf("%s/%s/%d/%d/%d", transformer.Source, transformer.SourceAddr,
fixLog.BlockNumber, fixLog.TxIndex, fixLog.Index)
assert.Equal(t, expectedID, message.ID)
}

func TestSFCTransformerFailsOnInvalidEventType(t *testing.T) {
assert.Panics(t, func() { transformer.ToMessage("invalid event") })
}

func TestSFCTransformerFailsOnInvalidTimestamp(t *testing.T) {
invalidTimestamp := fixValidEvent
invalidTimestamp.Timestamp = big.NewInt(-1)

_, err := transformer.ToMessage(&invalidTimestamp)
assert.NotNil(t, err)
assert.Regexp(t, "invalid timestamp", err.Error())

}

func TestSFCTransformerFailsOnInvalidDestination(t *testing.T) {
Expand All @@ -67,5 +82,3 @@ func TestSFCTransformerFailsOnInvalidDestination(t *testing.T) {
assert.NotNil(t, err)
assert.Regexp(t, "destination network id", err.Error())
}

// TODO: verify encoded payload
133 changes: 118 additions & 15 deletions messaging/relayer/internal/msgobserver/eth/observer/event_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package observer

import (
"context"
"log"

"fmt"
"github.com/consensys/gpact/messaging/relayer/internal/contracts/functioncall"
"github.com/consensys/gpact/messaging/relayer/internal/logging"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"log"
)

// EventWatcher listens to blockchain events
Expand All @@ -31,24 +33,25 @@ type EventWatcher interface {
}

type EventWatcherConfig struct {
Start *uint64
Start uint64
Context context.Context
Handler EventHandler
}

// SFCCrossCallWatcher subscribes and listens to events from a simple-function-call bridge contract
type SFCCrossCallWatcher struct {
// SFCCrossCallRealtimeEventWatcher subscribes and listens to events from a Simple Function Call bridge contract.
// The events produced by this watcher are generated the instant they are mined (i.e. 1 confirmation).
// Note: The watcher does not check to see if the event is affected by any reorgs.
type SFCCrossCallRealtimeEventWatcher struct {
EventWatcherConfig
SfcContract *functioncall.Sfc

end chan bool
end chan bool
}

// Watch subscribes and starts listening to 'CrossCall' events from a given simple-function-call contract.
// Watch subscribes and starts listening to 'CrossCall' events from a given Simple Function Call contract.
// Events received are passed to an event handler for processing.
// The method fails if subscribing to the event with the underlying network is not successful.
func (l *SFCCrossCallWatcher) Watch() {
opts := bind.WatchOpts{Start: l.Start, Context: l.Context}
func (l *SFCCrossCallRealtimeEventWatcher) Watch() {
opts := bind.WatchOpts{Start: &l.Start, Context: l.Context}
chanEvents := make(chan *functioncall.SfcCrossCall)
sub, err := l.SfcContract.WatchCrossCall(&opts, chanEvents)
if err != nil {
Expand All @@ -57,22 +60,122 @@ func (l *SFCCrossCallWatcher) Watch() {
l.start(sub, chanEvents)
}

func (l *SFCCrossCallWatcher) start(sub event.Subscription, chanEvents <-chan *functioncall.SfcCrossCall) {
func (l *SFCCrossCallRealtimeEventWatcher) start(sub event.Subscription, chanEvents <-chan *functioncall.SfcCrossCall) {
logging.Info("Start watching %v...", l.SfcContract)
for {
select {
case err := <-sub.Err():
// TODO: communicate this to the calling context
logging.Error("error in log subscription %v", err)
case log := <-chanEvents:
l.Handler.Handle(log)
case ev := <-chanEvents:
l.Handler.Handle(ev)
case <-l.end:
logging.Info("Stop watching %v.", l.SfcContract)
return
}
}
}

func NewSFCCrossCallWatcher(context context.Context, handler EventHandler, contract *functioncall.Sfc, end chan bool) *SFCCrossCallWatcher {
return &SFCCrossCallWatcher{EventWatcherConfig: EventWatcherConfig{Context: context, Handler: handler}, SfcContract: contract, end: end}
func NewSFCCrossCallRealtimeEventWatcher(context context.Context, handler EventHandler, contract *functioncall.Sfc, end chan bool) *SFCCrossCallRealtimeEventWatcher {
return &SFCCrossCallRealtimeEventWatcher{EventWatcherConfig: EventWatcherConfig{Context: context, Handler: handler}, SfcContract: contract, end: end}
}

type BlockHeadProducer interface {
SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)
}

// SFCCrossCallFinalisedEventWatcher listens for events from a Simple Function Call bridge and processes these events only once they are
// finalised. An event is considered finalised once it receives a configurable number of block confirmations.
// One block confirmation means the instant the transaction generating the event is mined,
// and is equivalent in behaviour to the SFCCrossCallRealtimeEventWatcher
type SFCCrossCallFinalisedEventWatcher struct {
EventWatcherConfig
// confirmationsForFinality refers to the number of block confirmations required before an event is considered finalised.
confirmationsForFinality uint64
SfcContract *functioncall.Sfc
nextBlockToProcess uint64
client BlockHeadProducer
end chan bool
}

// Watch subscribes and starts listening to 'CrossCall' events from a given Simple Function Call contract.
// Once an events receives sufficient block confirmations, it is passed to an event handler for processing.
func (l *SFCCrossCallFinalisedEventWatcher) Watch() {
l.nextBlockToProcess = l.Start
headers := make(chan *types.Header)

sub, err := l.client.SubscribeNewHead(l.Context, headers)
if err != nil {
log.Fatalf("failed to subscribe to new block headers %v", err)
}
for {
select {
case err := <-sub.Err():
// TODO: communicate this to the calling context
logging.Error("error in log subscription %v", err)
case latestHead := <-headers:
// TODO: communicate err to the calling context
l.processFinalisedEvents(latestHead)
}
}
}

func (l *SFCCrossCallFinalisedEventWatcher) processFinalisedEvents(latest *types.Header) error {
// TODO: maintain persistent state about last finalised block and observed messages
latestBlock := latest.Number.Uint64()
confirmations := (latestBlock - l.nextBlockToProcess) + 1

logging.Debug("latest: '%d', next to process: '%d', confirmations: '%d', required confirmations: %d", latestBlock, l.nextBlockToProcess,
confirmations, l.confirmationsForFinality)

if latestBlock >= l.nextBlockToProcess && confirmations >= l.confirmationsForFinality {
numFinalisedBlocks := confirmations - l.confirmationsForFinality
startFinalisedBlock := l.nextBlockToProcess
lastFinalisedBlock := startFinalisedBlock + numFinalisedBlocks

logging.Debug("Finalising blocks '%d' to '%d'", startFinalisedBlock, lastFinalisedBlock)

filterOpts := &bind.FilterOpts{Start: startFinalisedBlock, End: &lastFinalisedBlock, Context: l.Context}
finalisedEvs, err := l.SfcContract.FilterCrossCall(filterOpts)
if err != nil {
logging.Error("error filtering logs from block: %d to %d, error: %v", startFinalisedBlock, lastFinalisedBlock, err)
return err
}

// TODO: Handle partial failure scenarios when handling events.
// Three cases to consider:
// 1. A range of blocks are being processed and some blocks fail
// 2. A block is being processed and some events in the block fail processing
// 3. Combination of 1 and 2
err = l.handleEvents(finalisedEvs)
if err != nil {
return err
}

l.nextBlockToProcess = lastFinalisedBlock + 1
}
return nil
}

func (l *SFCCrossCallFinalisedEventWatcher) handleEvents(events *functioncall.SfcCrossCallIterator) error {
for events.Next() {
ev := events.Event
err := l.Handler.Handle(ev)
if err != nil {
logging.Error("failed to handle event: %v, error: %v", ev, err)
return err
}
}
return nil
}

// NewSFCCrossCallFinalisedEventWatcher creates an SFCCrossCall event watcher that only returns events once they receive a configured number of
// confirmations. Note: 1 block confirmation means the instant the transaction generating the event is mined
func NewSFCCrossCallFinalisedEventWatcher(context context.Context, blockConfirmations uint64, handler EventHandler, contract *functioncall.Sfc,
start uint64, client BlockHeadProducer, end chan bool) (*SFCCrossCallFinalisedEventWatcher, error) {
if blockConfirmations < 1 {
return nil, fmt.Errorf("block confirmationsForFinality cannot be less than 1. supplied value: %d", blockConfirmations)
}
return &SFCCrossCallFinalisedEventWatcher{EventWatcherConfig: EventWatcherConfig{Context: context, Handler: handler, Start: start},
SfcContract: contract, confirmationsForFinality: blockConfirmations, client: client, end: end}, nil
}
Loading

0 comments on commit 0d6c68c

Please sign in to comment.