Skip to content

Commit

Permalink
DA-442 tx receipt handler impl
Browse files Browse the repository at this point in the history
Signed-off-by: Dzianis Andreyenka <andreenkodn@gmail.com>
  • Loading branch information
denisandreenko committed Sep 18, 2023
1 parent 9db60eb commit a20f2c0
Show file tree
Hide file tree
Showing 3 changed files with 433 additions and 12 deletions.
16 changes: 16 additions & 0 deletions internal/blockchain/tezos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ const (

defaultAddressResolverMethod = "GET"
defaultAddressResolverResponseField = "address"

defaultBackgroundInitialDelay = "5s"
defaultBackgroundRetryFactor = 2.0
defaultBackgroundMaxDelay = "1m"
)

const (
Expand All @@ -35,6 +39,14 @@ const (
TezosconnectConfigInstanceDeprecated = "instance"
// TezosconnectConfigFromBlockDeprecated is the configuration of the first block to listen to when creating the listener for the FireFly contract
TezosconnectConfigFromBlockDeprecated = "fromBlock"
// TezosconnectBackgroundStart is used to not fail the tezos plugin on init and retry to start it in the background
TezosconnectBackgroundStart = "backgroundStart.enabled"
// TezosconnectBackgroundStartInitialDelay is delay between restarts in the case where we retry to restart in the tezos plugin
TezosconnectBackgroundStartInitialDelay = "backgroundStart.initialDelay"
// TezosconnectBackgroundStartMaxDelay is the max delay between restarts in the case where we retry to restart in the tezos plugin
TezosconnectBackgroundStartMaxDelay = "backgroundStart.maxDelay"
// TezosconnectBackgroundStartFactor is to set the factor by which the delay increases when retrying
TezosconnectBackgroundStartFactor = "backgroundStart.factor"

// AddressResolverConfigKey is a sub-key in the config to contain an address resolver config.
AddressResolverConfigKey = "addressResolver"
Expand All @@ -59,6 +71,10 @@ func (t *Tezos) InitConfig(config config.Section) {
t.tezosconnectConf = config.SubSection(TezosconnectConfigKey)
wsclient.InitConfig(t.tezosconnectConf)
t.tezosconnectConf.AddKnownKey(TezosconnectConfigTopic)
t.tezosconnectConf.AddKnownKey(TezosconnectBackgroundStart)
t.tezosconnectConf.AddKnownKey(TezosconnectBackgroundStartInitialDelay, defaultBackgroundInitialDelay)
t.tezosconnectConf.AddKnownKey(TezosconnectBackgroundStartFactor, defaultBackgroundRetryFactor)
t.tezosconnectConf.AddKnownKey(TezosconnectBackgroundStartMaxDelay, defaultBackgroundMaxDelay)
t.tezosconnectConf.AddKnownKey(TezosconnectConfigBatchSize, defaultBatchSize)
t.tezosconnectConf.AddKnownKey(TezosconnectConfigBatchTimeout, defaultBatchTimeout)
t.tezosconnectConf.AddKnownKey(TezosconnectPrefixShort, defaultPrefixShort)
Expand Down
217 changes: 217 additions & 0 deletions internal/blockchain/tezos/eventstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tezos

import (
"context"
"fmt"

"github.com/go-resty/resty/v2"
"github.com/hyperledger/firefly-common/pkg/ffresty"
"github.com/hyperledger/firefly/internal/cache"
"github.com/hyperledger/firefly/internal/coremsgs"
)

type streamManager struct {
client *resty.Client
cache cache.CInterface
batchSize uint
batchTimeout uint
}

type eventStream struct {
ID string `json:"id"`
Name string `json:"name"`
ErrorHandling string `json:"errorHandling"`
BatchSize uint `json:"batchSize"`
BatchTimeoutMS uint `json:"batchTimeoutMS"`
Type string `json:"type"`
WebSocket eventStreamWebsocket `json:"websocket"`
Timestamps bool `json:"timestamps"`
}

type subscription struct {
ID string `json:"id"`
Name string `json:"name,omitempty"`
Stream string `json:"stream"`
FromBlock string `json:"fromBlock"`
TezosCompatAddress string `json:"address,omitempty"`
// EthCompatEvent *abi.Entry `json:"event,omitempty"`
// Filters []fftypes.JSONAny `json:"filters"`
// subscriptionCheckpoint
}

func newStreamManager(client *resty.Client, cache cache.CInterface, batchSize, batchTimeout uint) *streamManager {
return &streamManager{
client: client,
cache: cache,
batchSize: batchSize,
batchTimeout: batchTimeout,
}
}

func (s *streamManager) getEventStreams(ctx context.Context) (streams []*eventStream, err error) {
res, err := s.client.R().
SetContext(ctx).
SetResult(&streams).
Get("/eventstreams")
if err != nil || !res.IsSuccess() {
return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgTezosconnectRESTErr)
}
return streams, nil
}

func buildEventStream(topic string, batchSize, batchTimeout uint) *eventStream {
return &eventStream{
Name: topic,
ErrorHandling: "block",
BatchSize: batchSize,
BatchTimeoutMS: batchTimeout,
Type: "websocket",
// Some implementations require a "topic" to be set separately, while others rely only on the name.
// We set them to the same thing for cross compatibility.
WebSocket: eventStreamWebsocket{Topic: topic},
Timestamps: true,
}
}

func (s *streamManager) createEventStream(ctx context.Context, topic string) (*eventStream, error) {
stream := buildEventStream(topic, s.batchSize, s.batchTimeout)
res, err := s.client.R().
SetContext(ctx).
SetBody(stream).
SetResult(stream).
Post("/eventstreams")
if err != nil || !res.IsSuccess() {
return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgTezosconnectRESTErr)
}
return stream, nil
}

func (s *streamManager) updateEventStream(ctx context.Context, topic string, batchSize, batchTimeout uint, eventStreamID string) (*eventStream, error) {
stream := buildEventStream(topic, batchSize, batchTimeout)
res, err := s.client.R().
SetContext(ctx).
SetBody(stream).
SetResult(stream).
Patch("/eventstreams/" + eventStreamID)
if err != nil || !res.IsSuccess() {
return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgTezosconnectRESTErr)
}
return stream, nil
}

func (s *streamManager) ensureEventStream(ctx context.Context, topic string) (*eventStream, error) {
existingStreams, err := s.getEventStreams(ctx)
if err != nil {
return nil, err
}
for _, stream := range existingStreams {
if stream.Name == topic {
stream, err = s.updateEventStream(ctx, topic, s.batchSize, s.batchTimeout, stream.ID)
if err != nil {
return nil, err
}
return stream, nil
}
}
return s.createEventStream(ctx, topic)
}

func (s *streamManager) getSubscriptions(ctx context.Context) (subs []*subscription, err error) {
res, err := s.client.R().
SetContext(ctx).
SetResult(&subs).
Get("/subscriptions")
if err != nil || !res.IsSuccess() {
return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgTezosconnectRESTErr)
}
return subs, nil
}

func (s *streamManager) getSubscription(ctx context.Context, subID string, okNotFound bool) (sub *subscription, err error) {
res, err := s.client.R().
SetContext(ctx).
SetResult(&sub).
Get(fmt.Sprintf("/subscriptions/%s", subID))
if err != nil || !res.IsSuccess() {
if okNotFound && res.StatusCode() == 404 {
return nil, nil
}
return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgTezosconnectRESTErr)
}
return sub, nil
}

func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) (string, error) {
if cachedValue := s.cache.GetString("sub:" + subID); cachedValue != "" {
return cachedValue, nil
}
sub, err := s.getSubscription(ctx, subID, false)
if err != nil {
return "", err
}
s.cache.SetString("sub:"+subID, sub.Name)
return sub.Name, nil
}

// func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, name, event, fromBlock string) (*subscription, error) {
// // Map FireFly "firstEvent" values to Tezos "fromBlock" values
// switch fromBlock {
// case string(core.SubOptsFirstEventOldest):
// fromBlock = "0"
// case string(core.SubOptsFirstEventNewest):
// fromBlock = "latest"
// }
// sub := subscription{
// Name: name,
// Stream: stream,
// FromBlock: fromBlock,
// // EthCompatEvent: abi,
// }

// if location != nil {
// sub.TezosCompatAddress = location.Address
// }

// res, err := s.client.R().
// SetContext(ctx).
// SetBody(&sub).
// SetResult(&sub).
// Post("/subscriptions")
// if err != nil || !res.IsSuccess() {
// return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgTezosconnectRESTErr)
// }
// return &sub, nil
// }

func (s *streamManager) deleteSubscription(ctx context.Context, subID string, okNotFound bool) error {
res, err := s.client.R().
SetContext(ctx).
Delete("/subscriptions/" + subID)
if err != nil || !res.IsSuccess() {
if okNotFound && res.StatusCode() == 404 {
return nil
}
return ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgTezosconnectRESTErr)
}
return nil
}

// func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, version int, instancePath, fromBlock, stream, event string) (sub *subscription, err error) {
// return nil, nil
// }
Loading

0 comments on commit a20f2c0

Please sign in to comment.