Skip to content

Commit

Permalink
Better handling of remote commit confirmation in TxPublisher (#1905)
Browse files Browse the repository at this point in the history
* Never retry anchor publish when commit confirmed

We used to check the feerate before checking whether the commit tx was
already confirmed: when the commit feerate was good enough, we would
respawn a publish actor every block whereas the commit tx was already
confirmed.

* Abandon evicted replaceable txs

This ensures the bitcoind wallet won't keep the transaction around and
avoid using its inputs for other transactions.

Fixes #1898
  • Loading branch information
t-bast authored Aug 23, 2021
1 parent 9a0fc14 commit c504658
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) extends Logging
getRawTransaction(tx.txid).map(_ => tx.txid).recoverWith { case _ => Future.failed(e) }
}

/**
* Mark a transaction as abandoned, which will allow for its wallet inputs to be re-spent.
* This method can be used to replace "stuck" or evicted transactions.
* It only works on transactions which are not included in a block and are not currently in the mempool.
*/
def abandonTransaction(txId: ByteVector32)(implicit ec: ExecutionContext): Future[Boolean] = {
rpcClient.invoke("abandontransaction", txId).map(_ => true).recover(_ => false)
}

/**
* @param outPoints outpoints to unlock.
* @return true if all outpoints were successfully unlocked, false otherwise.
Expand Down Expand Up @@ -282,7 +291,7 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) extends Logging
val JBool(safe) = utxo \ "safe"
val JDecimal(amount) = utxo \ "amount"
val JString(txid) = utxo \ "txid"
val label = utxo \ "label" match {
val label = utxo \ "label" match {
case JString(label) => Some(label)
case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient.FundTransactionOptions
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishLogContext
import fr.acinq.eclair.channel.publish.TxPublisher.{TxPublishLogContext, TxRejectedReason}
import fr.acinq.eclair.channel.publish.TxTimeLocksMonitor.CheckTx
import fr.acinq.eclair.channel.{Commitments, HtlcTxAndRemoteSig}
import fr.acinq.eclair.transactions.Transactions
Expand Down Expand Up @@ -195,45 +195,46 @@ private class ReplaceableTxPublisher(nodeParams: NodeParams,
}

def checkAnchorPreconditions(replyTo: ActorRef[TxPublisher.PublishTxResult], cmd: TxPublisher.PublishReplaceableTx, targetFeerate: FeeratePerKw): Behavior[Command] = {
val commitFeerate = cmd.commitments.localCommit.spec.feeratePerKw
if (targetFeerate <= commitFeerate) {
log.info("skipping {}: commit feerate is high enough (feerate={})", cmd.desc, commitFeerate)
// We set retry = true in case the on-chain feerate rises before the commit tx is confirmed: if that happens we'll
// want to claim our anchor to raise the feerate of the commit tx and get it confirmed faster.
sendResult(replyTo, TxPublisher.TxRejected(loggingInfo.id, cmd, TxPublisher.TxRejectedReason.TxSkipped(retryNextBlock = true)))
} else {
// We verify that:
// - our commit is not confirmed (if it is, no need to claim our anchor)
// - their commit is not confirmed (if it is, no need to claim our anchor either)
// - our commit tx is in the mempool (otherwise we can't claim our anchor)
val commitTx = cmd.commitments.fullySignedLocalCommitTx(nodeParams.channelKeyManager).tx
val fundingOutpoint = cmd.commitments.commitInput.outPoint
context.pipeToSelf(bitcoinClient.isTransactionOutputSpendable(fundingOutpoint.txid, fundingOutpoint.index.toInt, includeMempool = false).flatMap {
case false => Future.failed(CommitTxAlreadyConfirmed)
case true =>
// We must ensure our local commit tx is in the mempool before publishing the anchor transaction.
// If it's already published, this call will be a no-op.
bitcoinClient.publishTransaction(commitTx)
}) {
case Success(_) => PreconditionsOk
case Failure(CommitTxAlreadyConfirmed) => CommitTxAlreadyConfirmed
case Failure(reason) if reason.getMessage.contains("rejecting replacement") => RemoteCommitTxPublished
case Failure(reason) => UnknownFailure(reason)
}
Behaviors.receiveMessagePartial {
case PreconditionsOk => fund(replyTo, cmd, targetFeerate)
case CommitTxAlreadyConfirmed =>
log.debug("commit tx is already confirmed, no need to claim our anchor")
sendResult(replyTo, TxPublisher.TxRejected(loggingInfo.id, cmd, TxPublisher.TxRejectedReason.TxSkipped(retryNextBlock = false)))
case RemoteCommitTxPublished =>
log.warn("cannot publish commit tx: there is a conflicting tx in the mempool")
// We retry until that conflicting commit tx is confirmed or we're able to publish our local commit tx.
// We verify that:
// - our commit is not confirmed (if it is, no need to claim our anchor)
// - their commit is not confirmed (if it is, no need to claim our anchor either)
// - our commit tx is in the mempool (otherwise we can't claim our anchor)
val commitTx = cmd.commitments.fullySignedLocalCommitTx(nodeParams.channelKeyManager).tx
val fundingOutpoint = cmd.commitments.commitInput.outPoint
context.pipeToSelf(bitcoinClient.isTransactionOutputSpendable(fundingOutpoint.txid, fundingOutpoint.index.toInt, includeMempool = false).flatMap {
case false => Future.failed(CommitTxAlreadyConfirmed)
case true =>
// We must ensure our local commit tx is in the mempool before publishing the anchor transaction.
// If it's already published, this call will be a no-op.
bitcoinClient.publishTransaction(commitTx)
}) {
case Success(_) => PreconditionsOk
case Failure(CommitTxAlreadyConfirmed) => CommitTxAlreadyConfirmed
case Failure(reason) if reason.getMessage.contains("rejecting replacement") => RemoteCommitTxPublished
case Failure(reason) => UnknownFailure(reason)
}
Behaviors.receiveMessagePartial {
case PreconditionsOk =>
val commitFeerate = cmd.commitments.localCommit.spec.feeratePerKw
if (targetFeerate <= commitFeerate) {
log.info("skipping {}: commit feerate is high enough (feerate={})", cmd.desc, commitFeerate)
// We set retry = true in case the on-chain feerate rises before the commit tx is confirmed: if that happens we'll
// want to claim our anchor to raise the feerate of the commit tx and get it confirmed faster.
sendResult(replyTo, TxPublisher.TxRejected(loggingInfo.id, cmd, TxPublisher.TxRejectedReason.TxSkipped(retryNextBlock = true)))
case UnknownFailure(reason) =>
log.error(s"could not check ${cmd.desc} preconditions", reason)
sendResult(replyTo, TxPublisher.TxRejected(loggingInfo.id, cmd, TxPublisher.TxRejectedReason.UnknownTxFailure))
case Stop => Behaviors.stopped
}
} else {
fund(replyTo, cmd, targetFeerate)
}
case CommitTxAlreadyConfirmed =>
log.debug("commit tx is already confirmed, no need to claim our anchor")
sendResult(replyTo, TxPublisher.TxRejected(loggingInfo.id, cmd, TxPublisher.TxRejectedReason.TxSkipped(retryNextBlock = false)))
case RemoteCommitTxPublished =>
log.warn("cannot publish commit tx: there is a conflicting tx in the mempool")
// We retry until that conflicting commit tx is confirmed or we're able to publish our local commit tx.
sendResult(replyTo, TxPublisher.TxRejected(loggingInfo.id, cmd, TxPublisher.TxRejectedReason.TxSkipped(retryNextBlock = true)))
case UnknownFailure(reason) =>
log.error(s"could not check ${cmd.desc} preconditions", reason)
sendResult(replyTo, TxPublisher.TxRejected(loggingInfo.id, cmd, TxPublisher.TxRejectedReason.UnknownTxFailure))
case Stop => Behaviors.stopped
}
}

Expand Down Expand Up @@ -328,6 +329,14 @@ private class ReplaceableTxPublisher(nodeParams: NodeParams,
Behaviors.receiveMessagePartial {
case WrappedTxResult(MempoolTxMonitor.TxConfirmed) => sendResult(replyTo, TxPublisher.TxConfirmed(cmd, tx))
case WrappedTxResult(MempoolTxMonitor.TxRejected(reason)) =>
reason match {
case TxRejectedReason.WalletInputGone =>
// The transaction now has an unknown input from bitcoind's point of view, so it will keep it in the wallet in
// case that input appears later in the mempool or the blockchain. In our case, we know it won't happen so we
// abandon that transaction and will retry with a different set of inputs (if it still makes sense to publish).
bitcoinClient.abandonTransaction(tx.txid)
case _ => // nothing to do
}
replyTo ! TxPublisher.TxRejected(loggingInfo.id, cmd, reason)
// We wait for our parent to stop us: when that happens we will unlock utxos.
Behaviors.same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,44 @@ class ExtendedBitcoinClientSpec extends TestKitBaseClass with BitcoindService wi
assert(mempoolTx3.ancestorFees === mempoolTx1.fees + 12500.sat)
}

test("abandon transaction") {
val sender = TestProbe()
val bitcoinClient = new ExtendedBitcoinClient(bitcoinrpcclient)

// Broadcast a wallet transaction.
val opts = FundTransactionOptions(TestConstants.feeratePerKw, changePosition = Some(1))
bitcoinClient.fundTransaction(Transaction(2, Nil, Seq(TxOut(250000 sat, Script.pay2wpkh(randomKey().publicKey))), 0), opts).pipeTo(sender.ref)
val fundedTx1 = sender.expectMsgType[FundTransactionResponse].tx
bitcoinClient.signTransaction(fundedTx1, Nil).pipeTo(sender.ref)
val signedTx1 = sender.expectMsgType[SignTransactionResponse].tx
bitcoinClient.publishTransaction(signedTx1).pipeTo(sender.ref)
sender.expectMsg(signedTx1.txid)

// Double-spend that transaction.
val fundedTx2 = fundedTx1.copy(txOut = TxOut(200000 sat, Script.pay2wpkh(randomKey().publicKey)) +: fundedTx1.txOut.tail)
bitcoinClient.signTransaction(fundedTx2, Nil).pipeTo(sender.ref)
val signedTx2 = sender.expectMsgType[SignTransactionResponse].tx
assert(signedTx2.txid != signedTx1.txid)
bitcoinClient.publishTransaction(signedTx2).pipeTo(sender.ref)
sender.expectMsg(signedTx2.txid)

// Abandon the first wallet transaction.
bitcoinClient.abandonTransaction(signedTx1.txid).pipeTo(sender.ref)
sender.expectMsg(true)

// Abandoning an already-abandoned transaction is a no-op.
bitcoinClient.abandonTransaction(signedTx1.txid).pipeTo(sender.ref)
sender.expectMsg(true)

// We can't abandon the second transaction (it's in the mempool).
bitcoinClient.abandonTransaction(signedTx2.txid).pipeTo(sender.ref)
sender.expectMsg(false)

// We can't abandon a confirmed transaction.
bitcoinClient.abandonTransaction(signedTx2.txIn.head.outPoint.txid).pipeTo(sender.ref)
sender.expectMsg(false)
}

test("detect if tx has been double-spent") {
val sender = TestProbe()
val bitcoinClient = new ExtendedBitcoinClient(bitcoinrpcclient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import fr.acinq.eclair.blockchain.bitcoind.{BitcoinCoreWallet, BitcoindService}
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.publish.ReplaceableTxPublisher.{Publish, Stop}
import fr.acinq.eclair.channel.publish.TxPublisher.TxRejectedReason.{ConflictingTxUnconfirmed, CouldNotFund, TxSkipped}
import fr.acinq.eclair.channel.publish.TxPublisher.TxRejectedReason.{ConflictingTxUnconfirmed, CouldNotFund, TxSkipped, WalletInputGone}
import fr.acinq.eclair.channel.publish.TxPublisher._
import fr.acinq.eclair.channel.states.{ChannelStateTestsHelperMethods, ChannelStateTestsTags}
import fr.acinq.eclair.transactions.Transactions.{ClaimLocalAnchorOutputTx, HtlcSuccessTx, HtlcTimeoutTx}
Expand Down Expand Up @@ -115,7 +115,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w

// Generate blocks to ensure the funding tx is confirmed.
generateBlocks(1)
// TODO: do I need a method that generateBlocks AND sets blockCount AND sends event?

// Execute our test.
val publisher = system.spawn(ReplaceableTxPublisher(aliceNodeParams, bitcoinClient, alice2blockchain.ref, TxPublishLogContext(testId, TestConstants.Bob.nodeParams.nodeId, None)), testId.toString)
Expand Down Expand Up @@ -173,6 +172,23 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
})
}

test("commit tx feerate high enough and commit tx confirmed, not spending anchor output") {
withFixture(Seq(500 millibtc), f => {
import f._

val commitFeerate = alice.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.spec.feeratePerKw
val (commitTx, anchorTx) = closeChannelWithoutHtlcs(f)
walletClient.publishTransaction(commitTx.tx).pipeTo(probe.ref)
probe.expectMsg(commitTx.tx.txid)
generateBlocks(1)

publisher ! Publish(probe.ref, anchorTx, commitFeerate)
val result = probe.expectMsgType[TxRejected]
assert(result.cmd === anchorTx)
assert(result.reason === TxSkipped(retryNextBlock = false))
})
}

test("remote commit tx confirmed, not spending anchor output") {
withFixture(Seq(500 millibtc), f => {
import f._
Expand Down Expand Up @@ -208,6 +224,33 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
})
}

test("remote commit tx replaces local commit tx, not spending anchor output") {
withFixture(Seq(500 millibtc), f => {
import f._

val remoteCommit = bob.stateData.asInstanceOf[DATA_NORMAL].commitments.fullySignedLocalCommitTx(bob.underlyingActor.nodeParams.channelKeyManager)
assert(bob.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.spec.feeratePerKw === FeeratePerKw(2500 sat))

// We lower the feerate to make it easy to replace our commit tx by theirs in the mempool.
val lowFeerate = FeeratePerKw(500 sat)
updateFee(lowFeerate, alice, bob, alice2bob, bob2alice)
val (localCommit, anchorTx) = closeChannelWithoutHtlcs(f)
publisher ! Publish(probe.ref, anchorTx, FeeratePerKw(600 sat))
val mempoolTxs = getMempoolTxs(2)
assert(mempoolTxs.map(_.txid).contains(localCommit.tx.txid))

// Our commit tx is replaced by theirs.
walletClient.publishTransaction(remoteCommit.tx).pipeTo(probe.ref)
probe.expectMsg(remoteCommit.tx.txid)
generateBlocks(1)
system.eventStream.publish(CurrentBlockCount(currentBlockHeight(probe)))

val result = probe.expectMsgType[TxRejected]
assert(result.cmd === anchorTx)
assert(result.reason === WalletInputGone)
})
}

test("not enough funds to increase commit tx feerate") {
withFixture(Seq(10.5 millibtc), f => {
import f._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import fr.acinq.bitcoin.{ByteVector32, Crypto, SatoshiLong, ScriptFlags, Transac
import fr.acinq.eclair.TestConstants.{Alice, Bob, TestFeeEstimator}
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._
import fr.acinq.eclair.blockchain.fee.FeeTargets
import fr.acinq.eclair.blockchain.fee.{FeeTargets, FeeratePerKw}
import fr.acinq.eclair.blockchain.{EclairWallet, TestWallet}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.publish.TxPublisher
Expand Down Expand Up @@ -270,6 +270,21 @@ trait ChannelStateTestsHelperMethods extends TestKitBase {
}
}

def updateFee(feerate: FeeratePerKw, s: TestFSMRef[ChannelState, ChannelData, Channel], r: TestFSMRef[ChannelState, ChannelData, Channel], s2r: TestProbe, r2s: TestProbe): Unit = {
s ! CMD_UPDATE_FEE(feerate, commit = true)
s2r.expectMsgType[UpdateFee]
s2r.forward(r)
s2r.expectMsgType[CommitSig]
s2r.forward(r)
r2s.expectMsgType[RevokeAndAck]
r2s.forward(s)
r2s.expectMsgType[CommitSig]
r2s.forward(s)
s2r.expectMsgType[RevokeAndAck]
s2r.forward(r)
awaitCond(s.stateData.asInstanceOf[HasCommitments].commitments.localCommit.spec.feeratePerKw == feerate)
}

def mutualClose(s: TestFSMRef[ChannelState, ChannelData, Channel], r: TestFSMRef[ChannelState, ChannelData, Channel], s2r: TestProbe, r2s: TestProbe, s2blockchain: TestProbe, r2blockchain: TestProbe): Unit = {
val sender = TestProbe()
// s initiates a closing
Expand Down

0 comments on commit c504658

Please sign in to comment.