Skip to content

Commit

Permalink
Move spvchain into neutrino and start integration w/btcwallet
Browse files Browse the repository at this point in the history
  • Loading branch information
aakselrod committed May 19, 2017
1 parent 8af0920 commit 4d479d4
Show file tree
Hide file tree
Showing 19 changed files with 408 additions and 6,604 deletions.
44 changes: 44 additions & 0 deletions chain/interface.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package chain

import (
"time"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/wtxmgr"
)

// Interface allows more than one backing blockchain source, such as a
Expand All @@ -23,3 +26,44 @@ type Interface interface {
NotifyBlocks() error
Notifications() <-chan interface{}
}

// Notification types. These are defined here and processed from from reading
// a notificationChan to avoid handling these notifications directly in
// btcrpcclient callbacks, which isn't very Go-like and doesn't allow
// blocking client calls.
type (
// ClientConnected is a notification for when a client connection is
// opened or reestablished to the chain server.
ClientConnected struct{}

// BlockConnected is a notification for a newly-attached block to the
// best chain.
BlockConnected wtxmgr.BlockMeta

// BlockDisconnected is a notifcation that the block described by the
// BlockStamp was reorganized out of the best chain.
BlockDisconnected wtxmgr.BlockMeta

// RelevantTx is a notification for a transaction which spends wallet
// inputs or pays to a watched address.
RelevantTx struct {
TxRecord *wtxmgr.TxRecord
Block *wtxmgr.BlockMeta // nil if unmined
}

// RescanProgress is a notification describing the current status
// of an in-progress rescan.
RescanProgress struct {
Hash *chainhash.Hash
Height int32
Time time.Time
}

// RescanFinished is a notification that a previous rescan request
// has finished.
RescanFinished struct {
Hash *chainhash.Hash
Height int32
Time time.Time
}
)
312 changes: 312 additions & 0 deletions chain/neutrino.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
package chain

import (
"errors"
"fmt"
"sync"
"time"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcrpcclient"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/wtxmgr"
"github.com/lightninglabs/neutrino"
)

// SPVChain is an implementation of the btcwalet chain.Interface interface.
type SPVChain struct {
cs *neutrino.ChainService

// We currently support one rescan/notifiction goroutine per client
rescan *neutrino.Rescan

enqueueNotification chan interface{}
dequeueNotification chan interface{}
currentBlock chan *waddrmgr.BlockStamp

quit chan struct{}
rescanQuit chan struct{}
wg sync.WaitGroup
started bool
scanning bool

clientMtx sync.Mutex
}

// NewSPVChain creates a new SPVChain struct with a backing ChainService
func NewSPVChain(chainService *neutrino.ChainService) *SPVChain {
return &SPVChain{cs: chainService}
}

// Start replicates the RPC client's Start method.
func (s *SPVChain) Start() error {
s.cs.Start()
s.clientMtx.Lock()
defer s.clientMtx.Unlock()
if !s.started {
s.enqueueNotification = make(chan interface{})
s.dequeueNotification = make(chan interface{})
s.currentBlock = make(chan *waddrmgr.BlockStamp)
s.quit = make(chan struct{})
s.started = true
s.wg.Add(1)
go s.notificationHandler()
}
return nil
}

// Stop replicates the RPC client's Stop method.
func (s *SPVChain) Stop() {
s.clientMtx.Lock()
defer s.clientMtx.Unlock()
if !s.started {
return
}
close(s.quit)
s.started = false
}

// WaitForShutdown replicates the RPC client's WaitForShutdown method.
func (s *SPVChain) WaitForShutdown() {
s.wg.Wait()
}

// GetBlock replicates the RPC client's GetBlock command.
func (s *SPVChain) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) {
// TODO(roasbeef): add a block cache?
// * which evication strategy? depends on use case
block, err := s.cs.GetBlockFromNetwork(*hash)
if err != nil {
return nil, err
}
return block.MsgBlock(), nil
}

// GetBestBlock replicates the RPC client's GetBestBlock command.
func (s *SPVChain) GetBestBlock() (*chainhash.Hash, int32, error) {
header, height, err := s.cs.LatestBlock()
if err != nil {
return nil, 0, err
}
hash := header.BlockHash()
return &hash, int32(height), nil
}

// BlockStamp returns the latest block notified by the client, or an error
// if the client has been shut down.
func (s *SPVChain) BlockStamp() (*waddrmgr.BlockStamp, error) {
select {
case bs := <-s.currentBlock:
return bs, nil
case <-s.quit:
return nil, errors.New("disconnected")
}
}

// SendRawTransaction replicates the RPC client's SendRawTransaction command.
func (s *SPVChain) SendRawTransaction(tx *wire.MsgTx, allowHighFees bool) (
*chainhash.Hash, error) {
err := s.cs.SendTransaction(tx)
if err != nil {
return nil, err
}
hash := tx.TxHash()
return &hash, nil
}

// Rescan replicates the RPC client's Rescan command.
func (s *SPVChain) Rescan(startHash *chainhash.Hash, addrs []btcutil.Address,
outPoints []*wire.OutPoint) error {
s.clientMtx.Lock()
if !s.started {
s.clientMtx.Unlock()
return fmt.Errorf("can't do a rescan when the chain client " +
"is not started")
}
if s.scanning {
// Restart the rescan by killing the existing rescan.
close(s.rescanQuit)
}
s.rescanQuit = make(chan struct{})
s.scanning = true
s.clientMtx.Unlock()
return s.cs.Rescan(
neutrino.NotificationHandlers(btcrpcclient.NotificationHandlers{
OnFilteredBlockConnected: s.onFilteredBlockConnected,
OnBlockDisconnected: s.onBlockDisconnected,
}),
neutrino.QuitChan(s.rescanQuit),
)
}

// NotifyBlocks replicates the RPC client's NotifyBlocks command.
func (s *SPVChain) NotifyBlocks() error {
return nil
}

// NotifyReceived replicates the RPC client's NotifyReceived command.
func (s *SPVChain) NotifyReceived() error {
return nil
}

// Notifications replicates the RPC client's Notifications method.
func (s *SPVChain) Notifications() <-chan interface{} {
return s.dequeueNotification
}

// onFilteredBlockConnected sends appropriate notifications to the notification
// channel.
func (s *SPVChain) onFilteredBlockConnected(height int32,
header *wire.BlockHeader, relevantTxs []*btcutil.Tx) {
blockMeta := wtxmgr.BlockMeta{
Block: wtxmgr.Block{
Hash: header.BlockHash(),
Height: height,
},
Time: header.Timestamp,
}
select {
case s.enqueueNotification <- BlockConnected(blockMeta):
case <-s.quit:
return
case <-s.rescanQuit:
return
}
for _, tx := range relevantTxs {
rec, err := wtxmgr.NewTxRecordFromMsgTx(tx.MsgTx(),
blockMeta.Time)
if err != nil {
log.Errorf("Cannot create transaction record for "+
"relevant tx: %s", err)
// TODO(aakselrod): Continue?
return
}
select {
case s.enqueueNotification <- RelevantTx{
TxRecord: rec,
Block: &blockMeta,
}:
case <-s.quit:
return
case <-s.rescanQuit:
return
}
}
bs, err := s.cs.SyncedTo()
if err != nil {
log.Errorf("Can't get chain service's best block: %s", err)
return
}
if bs.Hash == header.BlockHash() {
select {
case s.enqueueNotification <- RescanFinished{
Hash: &bs.Hash,
Height: bs.Height,
Time: header.Timestamp,
}:
case <-s.quit:
return
case <-s.rescanQuit:
return

}
}
}

// onBlockDisconnected sends appropriate notifications to the notification
// channel.
func (s *SPVChain) onBlockDisconnected(hash *chainhash.Hash, height int32,
t time.Time) {
select {
case s.enqueueNotification <- BlockDisconnected{
Block: wtxmgr.Block{
Hash: *hash,
Height: height,
},
Time: t,
}:
case <-s.quit:
case <-s.rescanQuit:
}
}

// notificationHandler queues and dequeues notifications. There are currently
// no bounds on the queue, so the dequeue channel should be read continually to
// avoid running out of memory.
func (s *SPVChain) notificationHandler() {
hash, height, err := s.GetBestBlock()
if err != nil {
log.Errorf("Failed to get best block from chain service: %s",
err)
s.Stop()
s.wg.Done()
return
}

bs := &waddrmgr.BlockStamp{Hash: *hash, Height: height}

// TODO: Rather than leaving this as an unbounded queue for all types of
// notifications, try dropping ones where a later enqueued notification
// can fully invalidate one waiting to be processed. For example,
// blockconnected notifications for greater block heights can remove the
// need to process earlier blockconnected notifications still waiting
// here.

var notifications []interface{}
enqueue := s.enqueueNotification
var dequeue chan interface{}
var next interface{}
out:
for {
select {
case n, ok := <-enqueue:
if !ok {
// If no notifications are queued for handling,
// the queue is finished.
if len(notifications) == 0 {
break out
}
// nil channel so no more reads can occur.
enqueue = nil
continue
}
if len(notifications) == 0 {
next = n
dequeue = s.dequeueNotification
}
notifications = append(notifications, n)

case dequeue <- next:
if n, ok := next.(BlockConnected); ok {
bs = &waddrmgr.BlockStamp{
Height: n.Height,
Hash: n.Hash,
}
}

notifications[0] = nil
notifications = notifications[1:]
if len(notifications) != 0 {
next = notifications[0]
} else {
// If no more notifications can be enqueued, the
// queue is finished.
if enqueue == nil {
break out
}
dequeue = nil
}

case s.currentBlock <- bs:

case <-s.quit:
break out
}
}

s.Stop()
close(s.dequeueNotification)
s.wg.Done()
}
Loading

0 comments on commit 4d479d4

Please sign in to comment.