Skip to content

Commit

Permalink
More neutrino integration into btcwallet.
Browse files Browse the repository at this point in the history
  • Loading branch information
aakselrod committed May 21, 2017
1 parent 4d479d4 commit 2065d2f
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 29 deletions.
8 changes: 8 additions & 0 deletions chain/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ type (
// best chain.
BlockConnected wtxmgr.BlockMeta

// FilteredBlockConnected is an alternate notification that contains
// both block and relevant transaction information in one struct, which
// allows atomic updates.
FilteredBlockConnected struct {
Block *wtxmgr.BlockMeta
RelevantTxs []*wtxmgr.TxRecord
}

// BlockDisconnected is a notifcation that the block described by the
// BlockStamp was reorganized out of the best chain.
BlockDisconnected wtxmgr.BlockMeta
Expand Down
94 changes: 66 additions & 28 deletions chain/neutrino.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type SPVChain struct {
cs *neutrino.ChainService

// We currently support one rescan/notifiction goroutine per client
rescan *neutrino.Rescan
rescan neutrino.Rescan

enqueueNotification chan interface{}
dequeueNotification chan interface{}
Expand All @@ -31,6 +31,7 @@ type SPVChain struct {
wg sync.WaitGroup
started bool
scanning bool
finished bool

clientMtx sync.Mutex
}
Expand Down Expand Up @@ -131,23 +132,59 @@ func (s *SPVChain) Rescan(startHash *chainhash.Hash, addrs []btcutil.Address,
}
s.rescanQuit = make(chan struct{})
s.scanning = true
s.finished = false
s.clientMtx.Unlock()
return s.cs.Rescan(
watchOutPoints := make([]wire.OutPoint, 0, len(outPoints))
for _, op := range outPoints {
watchOutPoints = append(watchOutPoints, *op)
}
s.rescan = s.cs.NewRescan(
neutrino.NotificationHandlers(btcrpcclient.NotificationHandlers{
OnFilteredBlockConnected: s.onFilteredBlockConnected,
OnBlockDisconnected: s.onBlockDisconnected,
}),
neutrino.QuitChan(s.rescanQuit),
neutrino.WatchAddrs(addrs...),
neutrino.WatchOutPoints(watchOutPoints...),
)
return nil
}

// NotifyBlocks replicates the RPC client's NotifyBlocks command.
func (s *SPVChain) NotifyBlocks() error {
s.clientMtx.Lock()
defer s.clientMtx.Unlock()
// If we're scanning, we're already notifying on blocks. Otherwise,
// start a rescan without watching any addresses.
if !s.scanning {
return s.NotifyReceived([]btcutil.Address{})
}
return nil
}

// NotifyReceived replicates the RPC client's NotifyReceived command.
func (s *SPVChain) NotifyReceived() error {
func (s *SPVChain) NotifyReceived(addrs []btcutil.Address) error {
// If we have a rescan running, we just need to add the appropriate
// addresses to the watch list.
s.clientMtx.Lock()
if s.scanning {
s.clientMtx.Unlock()
return s.rescan.Update(neutrino.AddAddrs(addrs...))
}
s.rescanQuit = make(chan struct{})
s.scanning = true
// Don't need RescanFinished notifications.
s.finished = true
s.clientMtx.Unlock()
// Rescan with just the specified addresses.
s.rescan = s.cs.NewRescan(
neutrino.NotificationHandlers(btcrpcclient.NotificationHandlers{
OnFilteredBlockConnected: s.onFilteredBlockConnected,
OnBlockDisconnected: s.onBlockDisconnected,
}),
neutrino.QuitChan(s.rescanQuit),
neutrino.WatchAddrs(addrs...),
)
return nil
}

Expand All @@ -160,46 +197,47 @@ func (s *SPVChain) Notifications() <-chan interface{} {
// channel.
func (s *SPVChain) onFilteredBlockConnected(height int32,
header *wire.BlockHeader, relevantTxs []*btcutil.Tx) {
blockMeta := wtxmgr.BlockMeta{
Block: wtxmgr.Block{
Hash: header.BlockHash(),
Height: height,
ntfn := FilteredBlockConnected{
Block: &wtxmgr.BlockMeta{
Block: wtxmgr.Block{
Hash: header.BlockHash(),
Height: height,
},
Time: header.Timestamp,
},
Time: header.Timestamp,
}
select {
case s.enqueueNotification <- BlockConnected(blockMeta):
case <-s.quit:
return
case <-s.rescanQuit:
return
}
for _, tx := range relevantTxs {
rec, err := wtxmgr.NewTxRecordFromMsgTx(tx.MsgTx(),
blockMeta.Time)
header.Timestamp)
if err != nil {
log.Errorf("Cannot create transaction record for "+
"relevant tx: %s", err)
// TODO(aakselrod): Continue?
return
}
select {
case s.enqueueNotification <- RelevantTx{
TxRecord: rec,
Block: &blockMeta,
}:
case <-s.quit:
return
case <-s.rescanQuit:
return
// TODO(aakselrod): Return?
continue
}
ntfn.RelevantTxs = append(ntfn.RelevantTxs, rec)
}
select {
case s.enqueueNotification <- ntfn:
case <-s.quit:
return
case <-s.rescanQuit:
return
}
bs, err := s.cs.SyncedTo()
if err != nil {
log.Errorf("Can't get chain service's best block: %s", err)
return
}
if bs.Hash == header.BlockHash() {
// Only send the RescanFinished notification once.
s.clientMtx.Lock()
if s.finished {
s.clientMtx.Unlock()
return
}
s.finished = true
s.clientMtx.Unlock()
select {
case s.enqueueNotification <- RescanFinished{
Hash: &bs.Hash,
Expand Down
2 changes: 1 addition & 1 deletion glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions wallet/chainntfns.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,21 @@ func (w *Wallet) handleChainNotifications() {
return w.addRelevantTx(tx, n.TxRecord, n.Block)
})
notificationName = "recvtx/redeemingtx"
case chain.FilteredBlockConnected:
// Atomically update for the whole block.
err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
err := w.connectBlock(tx, *n.Block)
if err != nil {
return err
}
for _, rec := range n.RelevantTxs {
err := w.addRelevantTx(tx, rec, n.Block)
if err != nil {
return err
}
}
return nil
})

// The following are handled by the wallet's rescan
// goroutines, so just pass them there.
Expand Down

0 comments on commit 2065d2f

Please sign in to comment.