Skip to content

Commit

Permalink
itest: use node.rpc namespace inside harness net
Browse files Browse the repository at this point in the history
This commit adds a new struct RPCClients to better handle rpc clients.
A private field, rpc, is added to HarnessNode to prevent direct access
to its clients. Inside RPCClients, all clients are exported in case a
test case need access to a specific client.
  • Loading branch information
yyforyongyu committed Jan 5, 2022
1 parent f8cf7c8 commit f5f1289
Showing 1 changed file with 94 additions and 49 deletions.
143 changes: 94 additions & 49 deletions lntest/harness_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ type HarnessNode struct {
PubKey [33]byte
PubKeyStr string

cmd *exec.Cmd
logFile *os.File
// rpc holds a list of RPC clients.
rpc *RPCClients

// chanWatchRequests receives a request for watching a particular event
// for a given channel.
Expand All @@ -317,41 +317,50 @@ type HarnessNode struct {
// node and the outpoint.
policyUpdates policyUpdateMap

// backupDbDir is the path where a database backup is stored, if any.
backupDbDir string

// postgresDbName is the name of the postgres database where lnd data is
// stored in.
postgresDbName string

// runCtx is a context with cancel method. It's used to signal when the
// node needs to quit, and used as the parent context when spawning
// children contexts for RPC requests.
runCtx context.Context
cancel context.CancelFunc

wg sync.WaitGroup
wg sync.WaitGroup
cmd *exec.Cmd
logFile *os.File

// TODO(yy): remove
lnrpc.LightningClient

lnrpc.WalletUnlockerClient

invoicesrpc.InvoicesClient

// SignerClient cannot be embedded because the name collisions of the
// methods SignMessage and VerifyMessage.
SignerClient signrpc.SignerClient

// conn is the underlying connection to the grpc endpoint of the node.
conn *grpc.ClientConn

// RouterClient, WalletKitClient, WatchtowerClient cannot be embedded,
// because a name collision would occur with LightningClient.
SignerClient signrpc.SignerClient
RouterClient routerrpc.RouterClient
WalletKitClient walletrpc.WalletKitClient
Watchtower watchtowerrpc.WatchtowerClient
WatchtowerClient wtclientrpc.WatchtowerClientClient
StateClient lnrpc.StateClient
}

// backupDbDir is the path where a database backup is stored, if any.
backupDbDir string
// RPCClients wraps a list of RPC clients into a single struct for easier
// access.
type RPCClients struct {
// conn is the underlying connection to the grpc endpoint of the node.
conn *grpc.ClientConn

// postgresDbName is the name of the postgres database where lnd data is
// stored in.
postgresDbName string
LN lnrpc.LightningClient
WalletUnlocker lnrpc.WalletUnlockerClient
Invoice invoicesrpc.InvoicesClient
Signer signrpc.SignerClient
Router routerrpc.RouterClient
WalletKit walletrpc.WalletKitClient
Watchtower watchtowerrpc.WatchtowerClient
WatchtowerClient wtclientrpc.WatchtowerClientClient
State lnrpc.StateClient
}

// Assert *HarnessNode implements the lnrpc.LightningClient interface.
Expand Down Expand Up @@ -719,16 +728,20 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error,
return err
}

// Init all the RPC clients.
hn.initRPCClients(conn)

// If the node was created with a seed, we will need to perform an
// additional step to unlock the wallet. The connection returned will
// only use the TLS certs, and can only perform operations necessary to
// unlock the daemon.
if hn.Cfg.HasSeed {
// TODO(yy): remove
hn.WalletUnlockerClient = lnrpc.NewWalletUnlockerClient(conn)
return nil
}

return hn.initLightningClient(conn)
return hn.initLightningClient()
}

// WaitUntilStarted waits until the wallet state flips from "WAITING_TO_START".
Expand Down Expand Up @@ -757,7 +770,7 @@ func (hn *HarnessNode) waitForState(conn grpc.ClientConnInterface,
predicate func(state lnrpc.WalletState) bool) error {

stateClient := lnrpc.NewStateClient(conn)
ctx, cancel := context.WithCancel(hn.runCtx)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

stateStream, err := stateClient.SubscribeState(
Expand Down Expand Up @@ -820,16 +833,21 @@ func (hn *HarnessNode) WaitUntilLeader(timeout time.Duration) error {
return err
}

// Init all the RPC clients.
hn.initRPCClients(conn)

// If the node was created with a seed, we will need to perform an
// additional step to unlock the wallet. The connection returned will
// only use the TLS certs, and can only perform operations necessary to
// unlock the daemon.
if hn.Cfg.HasSeed {
// TODO(yy): remove
hn.WalletUnlockerClient = lnrpc.NewWalletUnlockerClient(conn)

return nil
}

return hn.initLightningClient(conn)
return hn.initLightningClient()
}

// initClientWhenReady waits until the main gRPC server is detected as active,
Expand All @@ -847,7 +865,10 @@ func (hn *HarnessNode) initClientWhenReady(timeout time.Duration) error {
return err
}

return hn.initLightningClient(conn)
// Init all the RPC clients.
hn.initRPCClients(conn)

return hn.initLightningClient()
}

// Init initializes a harness node by passing the init request via rpc. After
Expand Down Expand Up @@ -889,7 +910,10 @@ func (hn *HarnessNode) Init(
return nil, err
}

return response, hn.initLightningClient(conn)
// Init all the RPC clients.
hn.initRPCClients(conn)

return response, hn.initLightningClient()
}

// InitChangePassword initializes a harness node by passing the change password
Expand Down Expand Up @@ -932,7 +956,10 @@ func (hn *HarnessNode) InitChangePassword(
return nil, err
}

return response, hn.initLightningClient(conn)
// Init all the RPC clients.
hn.initRPCClients(conn)

return response, hn.initLightningClient()
}

// Unlock attempts to unlock the wallet of the target HarnessNode. This method
Expand All @@ -945,7 +972,8 @@ func (hn *HarnessNode) Unlock(unlockReq *lnrpc.UnlockWalletRequest) error {

// Otherwise, we'll need to unlock the node before it's able to start
// up properly.
if _, err := hn.UnlockWallet(ctxt, unlockReq); err != nil {
_, err := hn.rpc.WalletUnlocker.UnlockWallet(ctxt, unlockReq)
if err != nil {
return err
}

Expand All @@ -960,7 +988,7 @@ func (hn *HarnessNode) waitTillServerStarted() error {
ctxt, cancel := context.WithTimeout(hn.runCtx, NodeStartTimeout)
defer cancel()

client, err := hn.StateClient.SubscribeState(
client, err := hn.rpc.State.SubscribeState(
ctxt, &lnrpc.SubscribeStateRequest{},
)
if err != nil {
Expand All @@ -978,17 +1006,33 @@ func (hn *HarnessNode) waitTillServerStarted() error {
return nil
}
}
}

// initRPCClients initializes a list of RPC clients for the node.
func (hn *HarnessNode) initRPCClients(c *grpc.ClientConn) {
hn.rpc = &RPCClients{
conn: c,
LN: lnrpc.NewLightningClient(c),
Invoice: invoicesrpc.NewInvoicesClient(c),
Router: routerrpc.NewRouterClient(c),
WalletKit: walletrpc.NewWalletKitClient(c),
WalletUnlocker: lnrpc.NewWalletUnlockerClient(c),
Watchtower: watchtowerrpc.NewWatchtowerClient(c),
WatchtowerClient: wtclientrpc.NewWatchtowerClientClient(c),
Signer: signrpc.NewSignerClient(c),
State: lnrpc.NewStateClient(c),
}
}

// initLightningClient constructs the grpc LightningClient from the given client
// connection and subscribes the harness node to graph topology updates.
// This method also spawns a lightning network watcher for this node,
// which watches for topology changes.
func (hn *HarnessNode) initLightningClient(conn *grpc.ClientConn) error {
// initLightningClient blocks until the lnd server is fully started and
// subscribes the harness node to graph topology updates. This method also
// spawns a lightning network watcher for this node, which watches for topology
// changes.
func (hn *HarnessNode) initLightningClient() error {
// TODO(yy): remove
// Construct the LightningClient that will allow us to use the
// HarnessNode directly for normal rpc operations.
hn.conn = conn
conn := hn.rpc.conn
hn.LightningClient = lnrpc.NewLightningClient(conn)
hn.InvoicesClient = invoicesrpc.NewInvoicesClient(conn)
hn.RouterClient = routerrpc.NewRouterClient(conn)
Expand Down Expand Up @@ -1021,8 +1065,7 @@ func (hn *HarnessNode) initLightningClient(conn *grpc.ClientConn) error {
// FetchNodeInfo queries an unlocked node to retrieve its public key.
func (hn *HarnessNode) FetchNodeInfo() error {
// Obtain the lnid of this node for quick identification purposes.
ctxb := context.Background()
info, err := hn.GetInfo(ctxb, &lnrpc.GetInfoRequest{})
info, err := hn.rpc.LN.GetInfo(hn.runCtx, &lnrpc.GetInfoRequest{})
if err != nil {
return err
}
Expand Down Expand Up @@ -1163,21 +1206,23 @@ func (hn *HarnessNode) stop() error {
return nil
}

// If start() failed before creating a client, we will just wait for the
// If start() failed before creating clients, we will just wait for the
// child process to die.
if hn.LightningClient != nil {
// Don't watch for error because sometimes the RPC connection gets
// closed before a response is returned.
if hn.rpc != nil && hn.rpc.LN != nil {
// Don't watch for error because sometimes the RPC connection
// gets closed before a response is returned.
req := lnrpc.StopRequest{}

err := wait.NoError(func() error {
_, err := hn.LightningClient.StopDaemon(hn.runCtx, &req)
_, err := hn.rpc.LN.StopDaemon(hn.runCtx, &req)
switch {
case err == nil:
return nil

// Try again if a recovery/rescan is in progress.
case strings.Contains(err.Error(), "recovery in progress"):
case strings.Contains(
err.Error(), "recovery in progress",
):
return err

default:
Expand Down Expand Up @@ -1217,8 +1262,8 @@ func (hn *HarnessNode) stop() error {
hn.WatchtowerClient = nil

// Close any attempts at further grpc connections.
if hn.conn != nil {
err := status.Code(hn.conn.Close())
if hn.rpc.conn != nil {
err := status.Code(hn.rpc.conn.Close())
switch err {
case codes.OK:
return nil
Expand Down Expand Up @@ -1497,7 +1542,7 @@ func (hn *HarnessNode) WaitForBlockchainSync() error {
defer ticker.Stop()

for {
resp, err := hn.GetInfo(ctxt, &lnrpc.GetInfoRequest{})
resp, err := hn.rpc.LN.GetInfo(ctxt, &lnrpc.GetInfoRequest{})
if err != nil {
return err
}
Expand Down Expand Up @@ -1525,7 +1570,7 @@ func (hn *HarnessNode) WaitForBalance(expectedBalance btcutil.Amount,

var lastBalance btcutil.Amount
doesBalanceMatch := func() bool {
balance, err := hn.WalletBalance(hn.runCtx, req)
balance, err := hn.rpc.LN.WalletBalance(hn.runCtx, req)
if err != nil {
return false
}
Expand Down Expand Up @@ -1695,7 +1740,7 @@ func (hn *HarnessNode) newTopologyClient(
ctx context.Context) (topologyClient, error) {

req := &lnrpc.GraphTopologySubscription{}
client, err := hn.SubscribeChannelGraph(ctx, req)
client, err := hn.rpc.LN.SubscribeChannelGraph(ctx, req)
if err != nil {
return nil, fmt.Errorf("%s(%d): unable to create topology "+
"client: %v (%s)", hn.Name(), hn.NodeID, err,
Expand Down Expand Up @@ -1765,7 +1810,7 @@ func (hn *HarnessNode) receiveTopologyClientStream(
default:
// An expected error is returned, return and leave it
// to be handled by the caller.
return fmt.Errorf("graph subscription err: %v", err)
return fmt.Errorf("graph subscription err: %w", err)
}

// Send the update or quit.
Expand Down Expand Up @@ -1814,7 +1859,7 @@ func (hn *HarnessNode) getChannelPolicies(include bool) policyUpdateMap {
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
defer cancel()

graph, err := hn.DescribeGraph(ctxt, &lnrpc.ChannelGraphRequest{
graph, err := hn.rpc.LN.DescribeGraph(ctxt, &lnrpc.ChannelGraphRequest{
IncludeUnannounced: include,
})
if err != nil {
Expand Down

0 comments on commit f5f1289

Please sign in to comment.