Skip to content

Commit

Permalink
Finish integration of Neutrino; still untested.
Browse files Browse the repository at this point in the history
  • Loading branch information
aakselrod committed May 24, 2017
1 parent 2065d2f commit 3b48247
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 54 deletions.
50 changes: 45 additions & 5 deletions btcwallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ import (
"net/http"
_ "net/http/pprof"
"os"
"path/filepath"
"runtime"
"sync"

"github.com/btcsuite/btcwallet/chain"
"github.com/btcsuite/btcwallet/rpc/legacyrpc"
"github.com/btcsuite/btcwallet/wallet"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/lightninglabs/neutrino"
)

var (
Expand Down Expand Up @@ -136,13 +139,50 @@ func walletMain() error {
// associated with the server for RPC passthrough and to enable additional
// methods.
func rpcClientConnectLoop(legacyRPCServer *legacyrpc.Server, loader *wallet.Loader) {
certs := readCAFile()
var certs []byte
if !cfg.UseSPV {
certs = readCAFile()
}

for {
chainClient, err := startChainRPC(certs)
if err != nil {
log.Errorf("Unable to open connection to consensus RPC server: %v", err)
continue
var (
chainClient chain.Interface
err error
)

if cfg.UseSPV {
var (
chainService *neutrino.ChainService
spvdb walletdb.DB
)
netDir := networkDir(cfg.AppDataDir.Value, activeNet.Params)
spvdb, err = walletdb.Create("bdb",
filepath.Join(netDir, "neutrino.db"))
defer spvdb.Close()
if err != nil {
log.Errorf("Unable to create Neutrino DB: %s", err)
continue
}
chainService, err = neutrino.NewChainService(
neutrino.Config{
DataDir: netDir,
Database: spvdb,
ChainParams: *activeNet.Params,
ConnectPeers: cfg.ConnectPeers,
AddPeers: cfg.AddPeers,
})
if err != nil {
log.Errorf("Couldn't create Neutrino ChainService: %s", err)
continue
}
chainService.Start()
chainClient = chain.NewSPVChain(chainService)
} else {
chainClient, err = startChainRPC(certs)
if err != nil {
log.Errorf("Unable to open connection to consensus RPC server: %v", err)
continue
}
}

// Rather than inlining this logic directly into the loader
Expand Down
13 changes: 13 additions & 0 deletions chain/neutrino.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,26 @@ func (s *SPVChain) WaitForShutdown() {
func (s *SPVChain) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) {
// TODO(roasbeef): add a block cache?
// * which evication strategy? depends on use case
// Should the block cache be INSIDE neutrino instead of in btcwallet?
block, err := s.cs.GetBlockFromNetwork(*hash)
if err != nil {
return nil, err
}
return block.MsgBlock(), nil
}

// GetBlockHeight gets the height of a block by its hash. It serves as a
// replacement for the use of GetBlockVerboseTxAsync for the wallet package
// since we can't actually return a FutureGetBlockVerboseResult because the
// underlying type is private to btcrpcclient.
func (s *SPVChain) GetBlockHeight(hash *chainhash.Hash) (int32, error) {
_, height, err := s.cs.GetBlockByHash(*hash)
if err != nil {
return 0, err
}
return int32(height), nil
}

// GetBestBlock replicates the RPC client's GetBestBlock command.
func (s *SPVChain) GetBestBlock() (*chainhash.Hash, int32, error) {
header, height, err := s.cs.LatestBlock()
Expand Down
93 changes: 50 additions & 43 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,60 +502,67 @@ func loadConfig() (*config, []string, error) {
return nil, nil, err
}

if cfg.RPCConnect == "" {
cfg.RPCConnect = net.JoinHostPort("localhost", activeNet.RPCClientPort)
}

// Add default port to connect flag if missing.
cfg.RPCConnect, err = cfgutil.NormalizeAddress(cfg.RPCConnect,
activeNet.RPCClientPort)
if err != nil {
fmt.Fprintf(os.Stderr,
"Invalid rpcconnect network address: %v\n", err)
return nil, nil, err
}

localhostListeners := map[string]struct{}{
"localhost": {},
"127.0.0.1": {},
"::1": {},
}
RPCHost, _, err := net.SplitHostPort(cfg.RPCConnect)
if err != nil {
return nil, nil, err
}
if cfg.DisableClientTLS {
if _, ok := localhostListeners[RPCHost]; !ok {
str := "%s: the --noclienttls option may not be used " +
"when connecting RPC to non localhost " +
"addresses: %s"
err := fmt.Errorf(str, funcName, cfg.RPCConnect)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)

if cfg.UseSPV {
neutrino.MaxPeers = cfg.MaxPeers
neutrino.BanDuration = cfg.BanDuration
neutrino.BanThreshold = cfg.BanThreshold
} else {
if cfg.RPCConnect == "" {
cfg.RPCConnect = net.JoinHostPort("localhost", activeNet.RPCClientPort)
}

// Add default port to connect flag if missing.
cfg.RPCConnect, err = cfgutil.NormalizeAddress(cfg.RPCConnect,
activeNet.RPCClientPort)
if err != nil {
fmt.Fprintf(os.Stderr,
"Invalid rpcconnect network address: %v\n", err)
return nil, nil, err
}
} else {
// If CAFile is unset, choose either the copy or local btcd cert.
if !cfg.CAFile.ExplicitlySet() {
cfg.CAFile.Value = filepath.Join(cfg.AppDataDir.Value, defaultCAFilename)

// If the CA copy does not exist, check if we're connecting to
// a local btcd and switch to its RPC cert if it exists.
certExists, err := cfgutil.FileExists(cfg.CAFile.Value)
if err != nil {
RPCHost, _, err := net.SplitHostPort(cfg.RPCConnect)
if err != nil {
return nil, nil, err
}
if cfg.DisableClientTLS {
if _, ok := localhostListeners[RPCHost]; !ok {
str := "%s: the --noclienttls option may not be used " +
"when connecting RPC to non localhost " +
"addresses: %s"
err := fmt.Errorf(str, funcName, cfg.RPCConnect)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
}
if !certExists {
if _, ok := localhostListeners[RPCHost]; ok {
btcdCertExists, err := cfgutil.FileExists(
btcdDefaultCAFile)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return nil, nil, err
}
if btcdCertExists {
cfg.CAFile.Value = btcdDefaultCAFile
} else {
// If CAFile is unset, choose either the copy or local btcd cert.
if !cfg.CAFile.ExplicitlySet() {
cfg.CAFile.Value = filepath.Join(cfg.AppDataDir.Value, defaultCAFilename)

// If the CA copy does not exist, check if we're connecting to
// a local btcd and switch to its RPC cert if it exists.
certExists, err := cfgutil.FileExists(cfg.CAFile.Value)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return nil, nil, err
}
if !certExists {
if _, ok := localhostListeners[RPCHost]; ok {
btcdCertExists, err := cfgutil.FileExists(
btcdDefaultCAFile)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return nil, nil, err
}
if btcdCertExists {
cfg.CAFile.Value = btcdDefaultCAFile
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rpc/legacyrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (s *Server) Stop() {
// functional bitcoin wallet RPC server. This can be called to enable RPC
// passthrough even before a loaded wallet is set, but the wallet's RPC client
// is preferred.
func (s *Server) SetChainServer(chainClient *chain.RPCClient) {
func (s *Server) SetChainServer(chainClient chain.Interface) {
s.handlerMu.Lock()
s.chainClient = chainClient
s.handlerMu.Unlock()
Expand Down
26 changes: 22 additions & 4 deletions wallet/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Wallet struct {
Manager *waddrmgr.Manager
TxStore *wtxmgr.Store

chainClient *chain.RPCClient
chainClient chain.Interface
chainClientLock sync.Mutex
chainClientSynced bool
chainClientSyncMtx sync.Mutex
Expand Down Expand Up @@ -139,7 +139,7 @@ func (w *Wallet) Start() {
//
// This method is unstable and will be removed when all syncing logic is moved
// outside of the wallet package.
func (w *Wallet) SynchronizeRPC(chainClient *chain.RPCClient) {
func (w *Wallet) SynchronizeRPC(chainClient chain.Interface) {
w.quitMu.Lock()
select {
case <-w.quit:
Expand Down Expand Up @@ -1350,7 +1350,16 @@ func (w *Wallet) GetTransactions(startBlock, endBlock *BlockIdentifier, cancel <
if chainClient == nil {
return nil, errors.New("no chain server client")
}
startResp = chainClient.GetBlockVerboseTxAsync(startBlock.hash)
switch client := chainClient.(type) {
case *chain.RPCClient:
startResp = client.GetBlockVerboseTxAsync(startBlock.hash)
case *chain.SPVChain:
var err error
start, err = client.GetBlockHeight(startBlock.hash)
if err != nil {
return nil, err
}
}
}
}
if endBlock != nil {
Expand All @@ -1360,7 +1369,16 @@ func (w *Wallet) GetTransactions(startBlock, endBlock *BlockIdentifier, cancel <
if chainClient == nil {
return nil, errors.New("no chain server client")
}
endResp = chainClient.GetBlockVerboseTxAsync(endBlock.hash)
switch client := chainClient.(type) {
case *chain.RPCClient:
endResp = client.GetBlockVerboseTxAsync(endBlock.hash)
case *chain.SPVChain:
var err error
end, err = client.GetBlockHeight(endBlock.hash)
if err != nil {
return nil, err
}
}
}
}
if startResp != nil {
Expand Down

0 comments on commit 3b48247

Please sign in to comment.