From 3d8d0422218a3faf2b1f29e5132b42efb48db3f6 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 18 Mar 2026 23:11:20 -0500 Subject: [PATCH 01/10] Add rbf_channel API for fee-bumping pending splices When a splice is already pending, the user needs a way to replace its funding transaction at a higher feerate. This adds rbf_channel() to handle that case and guards splice_in/splice_out against being called while a pending splice exists, directing users to rbf_channel instead. Also fixes signing for RBF replacements, which requires accessing outputs spent by unconfirmed transactions. Co-Authored-By: Claude Opus 4.6 (1M context) --- bindings/ldk_node.udl | 2 + src/lib.rs | 71 ++++++++++++++++++- src/wallet/mod.rs | 8 ++- tests/integration_tests_rust.rs | 118 +++++++++++++++++++++++++++++++- 4 files changed, 194 insertions(+), 5 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 851583c5a..641469821 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -124,6 +124,8 @@ interface Node { [Throws=NodeError] void splice_out([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, [ByRef]Address address, u64 splice_amount_sats); [Throws=NodeError] + void rbf_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); + [Throws=NodeError] void close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); [Throws=NodeError] void force_close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, string? reason); diff --git a/src/lib.rs b/src/lib.rs index b45064287..944a21e27 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1647,7 +1647,7 @@ impl Node { if funding_template.prior_contribution().is_some() { log_error!( self.logger, - "Failed to splice channel: a prior splice contribution is pending" + "Failed to splice channel: a prior splice contribution is pending; use rbf_channel instead" ); return Err(Error::ChannelSplicingFailed); } @@ -1770,7 +1770,7 @@ impl Node { if funding_template.prior_contribution().is_some() { log_error!( self.logger, - "Failed to splice channel: a prior splice contribution is pending" + "Failed to splice channel: a prior splice contribution is pending; use rbf_channel instead" ); return Err(Error::ChannelSplicingFailed); } @@ -1807,6 +1807,73 @@ impl Node { } } + /// Replace a pending splice's funding transaction with a higher-feerate version. + /// + /// If a prior splice negotiation is pending, this bumps its feerate via RBF. The prior + /// contribution is reused when possible; otherwise, coin selection is re-run. + /// + /// # Experimental API + /// + /// This API is experimental and may change in the future. + pub fn rbf_channel( + &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, + ) -> Result<(), Error> { + let open_channels = + self.channel_manager.list_channels_with_counterparty(&counterparty_node_id); + if let Some(channel_details) = + open_channels.iter().find(|c| c.user_channel_id == user_channel_id.0) + { + let min_feerate = + self.fee_estimator.estimate_fee_rate(ConfirmationTarget::ChannelFunding); + let max_feerate = FeeRate::from_sat_per_kwu(min_feerate.to_sat_per_kwu() * 3 / 2); + + let funding_template = self + .channel_manager + .splice_channel(&channel_details.channel_id, &counterparty_node_id) + .map_err(|e| { + log_error!(self.logger, "Failed to RBF channel: {:?}", e); + Error::ChannelSplicingFailed + })?; + + if funding_template.min_rbf_feerate().is_none() { + log_error!(self.logger, "Failed to RBF channel: no pending splice to replace"); + return Err(Error::ChannelSplicingFailed); + } + + let contribution = self + .runtime + .block_on(funding_template.rbf_prior_contribution( + None, + max_feerate, + Arc::clone(&self.wallet), + )) + .map_err(|e| { + log_error!(self.logger, "Failed to RBF channel: {}", e); + Error::ChannelSplicingFailed + })?; + + self.channel_manager + .funding_contributed( + &channel_details.channel_id, + &counterparty_node_id, + contribution, + None, + ) + .map_err(|e| { + log_error!(self.logger, "Failed to RBF channel: {:?}", e); + Error::ChannelSplicingFailed + }) + } else { + log_error!( + self.logger, + "Channel not found for user_channel_id {} and counterparty {}", + user_channel_id, + counterparty_node_id + ); + Err(Error::ChannelSplicingFailed) + } + } + /// Manually sync the LDK and BDK wallets with the current chain state and update the fee rate /// cache. /// diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 76f2aa9ce..8eeb82af4 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -5,6 +5,7 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use std::collections::HashMap; use std::future::Future; use std::ops::Deref; use std::str::FromStr; @@ -1100,9 +1101,12 @@ impl Wallet { let mut psbt = Psbt::from_unsigned_tx(unsigned_tx).map_err(|e| { log_error!(self.logger, "Failed to construct PSBT: {}", e); })?; + // Use list_output rather than get_utxo to include outputs spent by unconfirmed + // transactions (e.g., a prior splice being replaced via RBF). + let mut wallet_outputs: HashMap = + locked_wallet.list_output().map(|o| (o.outpoint, o)).collect(); for (i, txin) in psbt.unsigned_tx.input.iter().enumerate() { - if let Some(utxo) = locked_wallet.get_utxo(txin.previous_output) { - debug_assert!(!utxo.is_spent); + if let Some(utxo) = wallet_outputs.remove(&txin.previous_output) { psbt.inputs[i] = locked_wallet.get_psbt_input(utxo, None, true).map_err(|e| { log_error!(self.logger, "Failed to construct PSBT input: {}", e); })?; diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 0b06716b2..fb064ddc7 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -26,7 +26,7 @@ use common::{ setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, splice_in_with_all, wait_for_tx, TestChainSource, TestConfig, TestStoreType, TestSyncStore, }; -use electrsd::corepc_node::Node as BitcoinD; +use electrsd::corepc_node::{self, Node as BitcoinD}; use electrsd::ElectrsD; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; @@ -1154,6 +1154,122 @@ async fn splice_channel() { ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn rbf_splice_channel() { + // Use a custom bitcoind config with a lower incrementalrelayfee so that the +25 sat/kwu + // (0.1 sat/vB) RBF feerate bump satisfies BIP125's absolute fee increase requirement. + let bitcoind_exe = std::env::var("BITCOIND_EXE") + .ok() + .or_else(|| corepc_node::downloaded_exe_path().ok()) + .expect( + "you need to provide an env var BITCOIND_EXE or specify a bitcoind version feature", + ); + let mut bitcoind_conf = corepc_node::Conf::default(); + bitcoind_conf.network = "regtest"; + bitcoind_conf.args.push("-rest"); + bitcoind_conf.args.push("-incrementalrelayfee=0.00000100"); + let bitcoind = BitcoinD::with_conf(bitcoind_exe, &bitcoind_conf).unwrap(); + + let electrs_exe = std::env::var("ELECTRS_EXE") + .ok() + .or_else(electrsd::downloaded_exe_path) + .expect("you need to provide env var ELECTRS_EXE or specify an electrsd version feature"); + let mut electrsd_conf = electrsd::Conf::default(); + electrsd_conf.http_enabled = true; + electrsd_conf.network = "regtest"; + let electrsd = ElectrsD::with_conf(electrs_exe, &bitcoind, &electrsd_conf).unwrap(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + + let address_a = node_a.onchain_payment().new_address().unwrap(); + let address_b = node_b.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 5_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a, address_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 4_000_000, false, &electrsd).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); + let user_channel_id_b = expect_channel_ready_event!(node_b, node_a.node_id()); + + // rbf_channel should fail when there's no pending splice + assert_eq!( + node_b.rbf_channel(&user_channel_id_b, node_a.node_id()), + Err(NodeError::ChannelSplicingFailed), + ); + + // Initiate a splice-in to create a pending splice + node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000).unwrap(); + + let original_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + + // splice_in should fail when there's a pending splice (RBF guard) + assert_eq!( + node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000), + Err(NodeError::ChannelSplicingFailed), + ); + + // splice_out should fail when there's a pending splice (RBF guard) + let address = node_a.onchain_payment().new_address().unwrap(); + assert_eq!( + node_a.splice_out(&user_channel_id_a, node_b.node_id(), &address, 100_000), + Err(NodeError::ChannelSplicingFailed), + ); + + // rbf_channel should succeed when there's a pending splice + node_b.rbf_channel(&user_channel_id_b, node_a.node_id()).unwrap(); + + let rbf_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + + assert_ne!(original_txo, rbf_txo, "RBF should produce a different funding txo"); + + // Wait for the RBF transaction to replace the original in the mempool + wait_for_tx(&electrsd.client, rbf_txo.txid).await; + + // Mine blocks and confirm the RBF splice + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Verify the RBF transaction is the one that locked, not the original + match node_a.next_event_async().await { + Event::ChannelReady { funding_txo, counterparty_node_id, .. } => { + assert_eq!(counterparty_node_id, Some(node_b.node_id())); + assert_eq!(funding_txo, Some(rbf_txo)); + node_a.event_handled().unwrap(); + }, + ref e => panic!("node_a got unexpected event: {:?}", e), + } + match node_b.next_event_async().await { + Event::ChannelReady { funding_txo, counterparty_node_id, .. } => { + assert_eq!(counterparty_node_id, Some(node_a.node_id())); + assert_eq!(funding_txo, Some(rbf_txo)); + node_b.event_handled().unwrap(); + }, + ref e => panic!("node_b got unexpected event: {:?}", e), + } + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn simple_bolt12_send_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); From 9e6114b003688ae1dc64ad8951a3cf583feee1a6 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Fri, 5 Jun 2026 14:28:08 -0500 Subject: [PATCH 02/10] f - Rename rbf_channel to bump_channel_funding_fee Co-Authored-By: Claude --- bindings/ldk_node.udl | 2 +- src/lib.rs | 6 +++--- tests/integration_tests_rust.rs | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 641469821..5621f1751 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -124,7 +124,7 @@ interface Node { [Throws=NodeError] void splice_out([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, [ByRef]Address address, u64 splice_amount_sats); [Throws=NodeError] - void rbf_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); + void bump_channel_funding_fee([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); [Throws=NodeError] void close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); [Throws=NodeError] diff --git a/src/lib.rs b/src/lib.rs index 944a21e27..deec84b1e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1647,7 +1647,7 @@ impl Node { if funding_template.prior_contribution().is_some() { log_error!( self.logger, - "Failed to splice channel: a prior splice contribution is pending; use rbf_channel instead" + "Failed to splice channel: a prior splice contribution is pending; use bump_channel_funding_fee instead" ); return Err(Error::ChannelSplicingFailed); } @@ -1770,7 +1770,7 @@ impl Node { if funding_template.prior_contribution().is_some() { log_error!( self.logger, - "Failed to splice channel: a prior splice contribution is pending; use rbf_channel instead" + "Failed to splice channel: a prior splice contribution is pending; use bump_channel_funding_fee instead" ); return Err(Error::ChannelSplicingFailed); } @@ -1815,7 +1815,7 @@ impl Node { /// # Experimental API /// /// This API is experimental and may change in the future. - pub fn rbf_channel( + pub fn bump_channel_funding_fee( &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, ) -> Result<(), Error> { let open_channels = diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index fb064ddc7..4847fa3ad 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1206,9 +1206,9 @@ async fn rbf_splice_channel() { let user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); let user_channel_id_b = expect_channel_ready_event!(node_b, node_a.node_id()); - // rbf_channel should fail when there's no pending splice + // bump_channel_funding_fee should fail when there's no pending splice assert_eq!( - node_b.rbf_channel(&user_channel_id_b, node_a.node_id()), + node_b.bump_channel_funding_fee(&user_channel_id_b, node_a.node_id()), Err(NodeError::ChannelSplicingFailed), ); @@ -1231,8 +1231,8 @@ async fn rbf_splice_channel() { Err(NodeError::ChannelSplicingFailed), ); - // rbf_channel should succeed when there's a pending splice - node_b.rbf_channel(&user_channel_id_b, node_a.node_id()).unwrap(); + // bump_channel_funding_fee should succeed when there's a pending splice + node_b.bump_channel_funding_fee(&user_channel_id_b, node_a.node_id()).unwrap(); let rbf_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); expect_splice_negotiated_event!(node_b, node_a.node_id()); From f1a0a79737453a6534f2ece24fdb18907849d7d3 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Mon, 15 Jun 2026 14:40:13 -0500 Subject: [PATCH 03/10] f - Clarify the bump_channel_funding_fee hint for a pending splice bump_channel_funding_fee only bumps a pending splice's fee; it cannot change the splice amount, so phrasing it as an alternative to splicing was misleading. Co-Authored-By: Claude --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index deec84b1e..7b910aa5c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1647,7 +1647,7 @@ impl Node { if funding_template.prior_contribution().is_some() { log_error!( self.logger, - "Failed to splice channel: a prior splice contribution is pending; use bump_channel_funding_fee instead" + "Failed to splice channel: a prior splice contribution is pending; use bump_channel_funding_fee to bump its fee" ); return Err(Error::ChannelSplicingFailed); } @@ -1770,7 +1770,7 @@ impl Node { if funding_template.prior_contribution().is_some() { log_error!( self.logger, - "Failed to splice channel: a prior splice contribution is pending; use bump_channel_funding_fee instead" + "Failed to splice channel: a prior splice contribution is pending; use bump_channel_funding_fee to bump its fee" ); return Err(Error::ChannelSplicingFailed); } From c8e0713b6c41e22ed7736eaad1e5f1b92d48967d Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Mon, 20 Apr 2026 19:15:37 -0500 Subject: [PATCH 04/10] Tie funding payment status transitions to Lightning lifecycle events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Channel-opening and splice transactions transition to Succeeded when ChannelReady fires, not after ANTI_REORG_DELAY confirmations. This matches the point at which the Lightning layer considers the channel usable: a zero-conf channel graduates as soon as its counterparty signals, and a high-conf channel waits however many confirmations the peer requires, rather than always stopping at six. For splice RBF, the payment records whichever candidate actually confirmed, with that candidate's amount and this node's share of the fee — not the fee-estimate used for weight at coin-selection time, and not the whole-tx fee for a multi-contributor splice. A channel closure whose funding or splice never confirmed discards its payment record instead of leaving it pending forever. Generated with assistance from Claude Code. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/builder.rs | 2 + src/event.rs | 24 ++ src/payment/pending_payment_store.rs | 45 ++- src/tx_broadcaster.rs | 35 +- src/wallet/mod.rs | 576 +++++++++++++++++++++++++-- tests/integration_tests_rust.rs | 49 +++ 6 files changed, 695 insertions(+), 36 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 3df594b7c..efa5c50da 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1618,6 +1618,8 @@ fn build_with_store_internal( Arc::clone(&pending_payment_store), )); + tx_broadcaster.set_wallet(Arc::downgrade(&wallet)); + // Initialize the KeysManager let cur_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).map_err(|e| { log_error!(logger, "Failed to get current time: {}", e); diff --git a/src/event.rs b/src/event.rs index 80acd0690..dcb117ff6 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1585,6 +1585,20 @@ where ); } + if let Err(e) = self + .wallet + .handle_channel_ready(channel_id, funding_txo.map(|txo| txo.txid)) + .await + { + log_error!( + self.logger, + "Failed to graduate funding payment on ChannelReady for channel {}: {:?}", + channel_id, + e, + ); + return Err(ReplayEvent()); + } + self.liquidity_source .lsps2_service() .handle_channel_ready(user_channel_id, &channel_id, &counterparty_node_id) @@ -1613,6 +1627,16 @@ where } => { log_info!(self.logger, "Channel {} closed due to: {}", channel_id, reason); + if let Err(e) = self.wallet.handle_channel_closed(channel_id).await { + log_error!( + self.logger, + "Failed to handle ChannelClosed for channel {}: {:?}", + channel_id, + e, + ); + return Err(ReplayEvent()); + } + let event = Event::ChannelClosed { channel_id, user_channel_id: UserChannelId(user_channel_id), diff --git a/src/payment/pending_payment_store.rs b/src/payment/pending_payment_store.rs index eb72f89ec..16837d70c 100644 --- a/src/payment/pending_payment_store.rs +++ b/src/payment/pending_payment_store.rs @@ -6,6 +6,7 @@ // accordance with one or both of these licenses. use bitcoin::Txid; +use lightning::chain::chaininterface::FundingCandidate; use lightning::impl_writeable_tlv_based; use lightning::ln::channelmanager::PaymentId; @@ -13,6 +14,19 @@ use crate::data_store::{StorableObject, StorableObjectUpdate}; use crate::payment::store::PaymentDetailsUpdate; use crate::payment::PaymentDetails; +/// Marks an on-chain payment as belonging to an interactive-funding negotiation. The +/// last entry in `candidates` is the currently-broadcast tx; earlier entries are RBF +/// predecessors that may still confirm if reorgs intervene. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct FundingDetails { + /// Every negotiated candidate, oldest first. + pub candidates: Vec, +} + +impl_writeable_tlv_based!(FundingDetails, { + (0, candidates, optional_vec), +}); + /// Represents a pending payment #[derive(Clone, Debug, PartialEq, Eq)] pub struct PendingPaymentDetails { @@ -20,11 +34,24 @@ pub struct PendingPaymentDetails { pub details: PaymentDetails, /// Transaction IDs that have replaced or conflict with this payment. pub conflicting_txids: Vec, + /// Set when the payment's transaction is an interactive-funding broadcast (channel + /// open or splice). The record transitions to [`PaymentStatus::Succeeded`] on + /// `ChannelReady` instead of after [`ANTI_REORG_DELAY`] confirmations. + /// + /// [`PaymentStatus::Succeeded`]: crate::payment::store::PaymentStatus::Succeeded + /// [`ANTI_REORG_DELAY`]: lightning::chain::channelmonitor::ANTI_REORG_DELAY + pub funding_details: Option, } impl PendingPaymentDetails { pub(crate) fn new(details: PaymentDetails, conflicting_txids: Vec) -> Self { - Self { details, conflicting_txids } + Self { details, conflicting_txids, funding_details: None } + } + + pub(crate) fn with_funding_details( + details: PaymentDetails, conflicting_txids: Vec, funding_details: FundingDetails, + ) -> Self { + Self { details, conflicting_txids, funding_details: Some(funding_details) } } /// Convert to finalized payment for the main payment store @@ -36,6 +63,7 @@ impl PendingPaymentDetails { impl_writeable_tlv_based!(PendingPaymentDetails, { (0, details, required), (2, conflicting_txids, optional_vec), + (4, funding_details, option), }); #[derive(Clone, Debug, PartialEq, Eq)] @@ -43,6 +71,7 @@ pub(crate) struct PendingPaymentDetailsUpdate { pub id: PaymentId, pub payment_update: Option, pub conflicting_txids: Option>, + pub funding_details: Option>, } impl StorableObject for PendingPaymentDetails { @@ -68,6 +97,13 @@ impl StorableObject for PendingPaymentDetails { } } + if let Some(new_funding_details) = update.funding_details { + if self.funding_details != new_funding_details { + self.funding_details = new_funding_details; + updated = true; + } + } + updated } @@ -89,6 +125,11 @@ impl From<&PendingPaymentDetails> for PendingPaymentDetailsUpdate { } else { Some(value.conflicting_txids.clone()) }; - Self { id: value.id(), payment_update: Some(value.details.to_update()), conflicting_txids } + Self { + id: value.id(), + payment_update: Some(value.details.to_update()), + conflicting_txids, + funding_details: Some(value.funding_details.clone()), + } } } diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index 7084135b0..24abf8f11 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -6,12 +6,14 @@ // accordance with one or both of these licenses. use std::ops::Deref; +use std::sync::{Mutex as StdMutex, Weak}; use bitcoin::Transaction; use lightning::chain::chaininterface::{BroadcasterInterface, TransactionType}; use tokio::sync::{mpsc, Mutex, MutexGuard}; use crate::logger::{log_error, LdkLogger}; +use crate::types::Wallet; const BCAST_PACKAGE_QUEUE_SIZE: usize = 50; @@ -21,6 +23,12 @@ where { queue_sender: mpsc::Sender>, queue_receiver: Mutex>>, + /// Weak handle to the [`Wallet`] that performs classification of funding broadcasts + /// (channel opens and splices) into payment records. Remains `None` while the + /// builder is wiring the node up, during which broadcasts are still forwarded to + /// the queue but no payment record is written. [`Self::set_wallet`] installs the + /// handle once the [`Wallet`] exists. + wallet: StdMutex>>, logger: L, } @@ -30,7 +38,19 @@ where { pub(crate) fn new(logger: L) -> Self { let (queue_sender, queue_receiver) = mpsc::channel(BCAST_PACKAGE_QUEUE_SIZE); - Self { queue_sender, queue_receiver: Mutex::new(queue_receiver), logger } + Self { + queue_sender, + queue_receiver: Mutex::new(queue_receiver), + wallet: StdMutex::new(None), + logger, + } + } + + /// Installs the [`Wallet`] handle used to classify funding broadcasts (channel + /// opens and splices) into payment records. Called once the builder has constructed + /// both the broadcaster and the wallet. + pub(crate) fn set_wallet(&self, wallet: Weak) { + *self.wallet.lock().expect("lock") = Some(wallet); } pub(crate) async fn get_broadcast_queue( @@ -45,6 +65,19 @@ where L::Target: LdkLogger, { fn broadcast_transactions(&self, txs: &[(&Transaction, TransactionType)]) { + let wallet = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade); + if let Some(wallet) = wallet { + for (tx, tx_type) in txs { + if let Err(e) = wallet.classify_broadcast(tx, tx_type) { + log_error!( + self.logger, + "Failed to classify broadcast tx {}: {:?}", + tx.compute_txid(), + e, + ); + } + } + } let package = txs.iter().map(|(t, _)| (*t).clone()).collect::>(); self.queue_sender.try_send(package).unwrap_or_else(|e| { log_error!(self.logger, "Failed to broadcast transactions: {}", e); diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 8eeb82af4..658f603cf 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -33,14 +33,16 @@ use bitcoin::{ WitnessProgram, WitnessVersion, }; use lightning::chain::chaininterface::{ - BroadcasterInterface, INCREMENTAL_RELAY_FEE_SAT_PER_1000_WEIGHT, + BroadcasterInterface, TransactionType, INCREMENTAL_RELAY_FEE_SAT_PER_1000_WEIGHT, }; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::{BlockLocator, ClaimId, Listen}; use lightning::ln::channelmanager::PaymentId; +use lightning::ln::funding::FundingContribution; use lightning::ln::inbound_payment::ExpandedKey; use lightning::ln::msgs::UnsignedGossipMessage; use lightning::ln::script::ShutdownScript; +use lightning::ln::types::ChannelId as LnChannelId; use lightning::sign::{ ChangeDestinationSource, EntropySource, InMemorySigner, KeysManager, NodeSigner, OutputSpender, PeerStorageKey, Recipient, SignerProvider, SpendableOutputDescriptor, @@ -55,6 +57,9 @@ use persist::KVStoreWalletPersister; use crate::config::Config; use crate::fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; +use lightning::chain::chaininterface::{ChannelFunding, FundingCandidate, FundingPurpose}; + +use crate::payment::pending_payment_store::FundingDetails; use crate::payment::store::ConfirmationStatus; use crate::payment::{ PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, PendingPaymentDetails, @@ -256,18 +261,9 @@ impl Wallet { for event in events { match event { WalletEvent::TxConfirmed { txid, tx, block_time, .. } => { - let cur_height = locked_wallet.latest_checkpoint().height(); - let confirmation_height = block_time.block_id.height; - let payment_status = if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 - { - PaymentStatus::Succeeded - } else { - PaymentStatus::Pending - }; - let confirmation_status = ConfirmationStatus::Confirmed { block_hash: block_time.block_id.hash, - height: confirmation_height, + height: block_time.block_id.height, timestamp: block_time.confirmation_time, }; @@ -275,6 +271,23 @@ impl Wallet { .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + if self.apply_funding_details_status_update( + payment_id, + txid, + confirmation_status, + )? { + continue; + } + + let cur_height = locked_wallet.latest_checkpoint().height(); + let confirmation_height = block_time.block_id.height; + let payment_status = if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 + { + PaymentStatus::Succeeded + } else { + PaymentStatus::Pending + }; + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -284,15 +297,12 @@ impl Wallet { confirmation_status, ); - self.runtime.block_on(self.payment_store.insert_or_update(payment.clone()))?; - if payment_status == PaymentStatus::Pending { let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); - - self.runtime.block_on( - self.pending_payment_store.insert_or_update(pending_payment), - )?; + self.persist_pending(pending_payment)?; + } else { + self.runtime.block_on(self.payment_store.insert_or_update(payment))?; } }, WalletEvent::ChainTipChanged { new_tip, .. } => { @@ -303,8 +313,11 @@ impl Wallet { "Non-pending payment {:?} found in pending store", p.details.id, ); + // Funding records complete on `ChannelReady`, not after + // `ANTI_REORG_DELAY` confirmations. p.details.status == PaymentStatus::Pending && matches!(p.details.kind, PaymentKind::Onchain { .. }) + && p.funding_details.is_none() }); let mut unconfirmed_outbound_txids: Vec = Vec::new(); @@ -368,6 +381,14 @@ impl Wallet { .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + if self.apply_funding_details_status_update( + payment_id, + txid, + ConfirmationStatus::Unconfirmed, + )? { + continue; + } + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -376,11 +397,8 @@ impl Wallet { PaymentStatus::Pending, ConfirmationStatus::Unconfirmed, ); - let pending_payment = - self.create_pending_payment_from_tx(payment.clone(), Vec::new()); - self.runtime.block_on(self.payment_store.insert_or_update(payment))?; - self.runtime - .block_on(self.pending_payment_store.insert_or_update(pending_payment))?; + let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); + self.persist_pending(pending_payment)?; }, WalletEvent::TxReplaced { txid, conflicts, .. } => { let Some(payment_id) = self.find_payment_by_txid(txid) else { @@ -418,6 +436,15 @@ impl Wallet { let payment_id = self .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + + if self.apply_funding_details_status_update( + payment_id, + txid, + ConfirmationStatus::Unconfirmed, + )? { + continue; + } + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -426,11 +453,8 @@ impl Wallet { PaymentStatus::Pending, ConfirmationStatus::Unconfirmed, ); - let pending_payment = - self.create_pending_payment_from_tx(payment.clone(), Vec::new()); - self.runtime.block_on(self.payment_store.insert_or_update(payment))?; - self.runtime - .block_on(self.pending_payment_store.insert_or_update(pending_payment))?; + let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); + self.persist_pending(pending_payment)?; }, _ => { continue; @@ -1164,6 +1188,41 @@ impl Wallet { Ok(tx) } + /// Computes the amount, fee, and direction of an on-chain payment from the + /// wallet's view of the transaction. Used by [`TransactionBroadcaster`] to + /// describe a single-funded channel-open, for which no [`FundingContribution`] + /// is available. + /// + /// [`TransactionBroadcaster`]: crate::tx_broadcaster::TransactionBroadcaster + /// [`FundingContribution`]: lightning::ln::funding::FundingContribution + pub(crate) fn onchain_payment_fields( + &self, tx: &Transaction, + ) -> (Option, Option, PaymentDirection) { + let locked_wallet = self.inner.lock().expect("lock"); + let fee = locked_wallet.calculate_fee(tx).unwrap_or(Amount::ZERO); + let (sent, received) = locked_wallet.sent_and_received(tx); + let fee_sat = fee.to_sat(); + + let (direction, amount_msat) = if sent > received { + ( + PaymentDirection::Outbound, + Some( + (sent.to_sat().saturating_sub(fee_sat).saturating_sub(received.to_sat())) + * 1000, + ), + ) + } else { + ( + PaymentDirection::Inbound, + Some( + received.to_sat().saturating_sub(sent.to_sat().saturating_sub(fee_sat)) * 1000, + ), + ) + }; + + (amount_msat, Some(fee_sat * 1000), direction) + } + fn create_payment_from_tx( &self, locked_wallet: &PersistedWallet, txid: Txid, payment_id: PaymentId, tx: &Transaction, payment_status: PaymentStatus, @@ -1220,6 +1279,223 @@ impl Wallet { PendingPaymentDetails::new(payment, conflicting_txids) } + /// Writes a [`PendingPaymentDetails`] and its inner [`PaymentDetails`] to their + /// respective stores in a fixed order. Callers that need to keep the two stores in + /// sync should always go through this. + fn persist_pending(&self, pending: PendingPaymentDetails) -> Result<(), Error> { + self.runtime.block_on(self.payment_store.insert_or_update(pending.details.clone()))?; + self.runtime.block_on(self.pending_payment_store.insert_or_update(pending))?; + Ok(()) + } + + /// Called on `ChannelReady` to mark a funding payment (channel open or splice) as + /// succeeded. + /// + /// If `funding_txo_txid` matches a candidate other than the currently-active one, + /// that candidate is promoted to active first and the outer [`PaymentDetails`] is + /// updated from its contribution. If no candidate matches (the confirmed funding + /// txid belongs to a broadcast this node didn't contribute to), the pending record + /// is left in place for later handling. + pub(crate) async fn handle_channel_ready( + &self, channel_id: LnChannelId, funding_txo_txid: Option, + ) -> Result<(), Error> { + let funding_txo_txid = match funding_txo_txid { + Some(t) => t, + None => return Ok(()), + }; + + let mut pending = match self + .pending_payment_store + .list_filter(|p| { + p.funding_details + .as_ref() + .map(|fd| record_includes_channel(fd, channel_id)) + .unwrap_or(false) + }) + .into_iter() + .next() + { + Some(p) => p, + None => return Ok(()), + }; + let funding_details = match pending.funding_details.clone() { + Some(fd) => fd, + None => return Ok(()), + }; + + let candidate = match funding_details.candidates.iter().find(|c| c.txid == funding_txo_txid) + { + Some(c) => c.clone(), + None => { + // Confirmed `funding_txo` wasn't produced by any of our broadcasts. The + // record is left alone; some higher-level flow decides what to do. + log_debug!( + self.logger, + "ChannelReady for channel {}: confirmed funding_txo {} is not one of our candidates", + channel_id, + funding_txo_txid, + ); + return Ok(()); + }, + }; + + let old_txid = match pending.details.kind { + PaymentKind::Onchain { txid, .. } => txid, + _ => { + debug_assert!(false, "funding record must use PaymentKind::Onchain"); + return Ok(()); + }, + }; + + if old_txid != funding_txo_txid { + if !pending.conflicting_txids.contains(&old_txid) { + pending.conflicting_txids.push(old_txid); + } + pending.conflicting_txids.retain(|t| *t != funding_txo_txid); + + let aggregate = aggregate_local_stakes(&candidate); + pending.details.amount_msat = aggregate.amount_msat; + pending.details.fee_paid_msat = aggregate.fee_paid_msat; + } + + // Preserve the confirmation status already on the record (set by wallet sync if + // it's seen the tx confirm). `ChannelReady` alone doesn't carry block details. + let existing_status = match pending.details.kind { + PaymentKind::Onchain { status, .. } => status, + _ => ConfirmationStatus::Unconfirmed, + }; + pending.details.kind = + PaymentKind::Onchain { txid: funding_txo_txid, status: existing_status }; + + pending.details.status = PaymentStatus::Succeeded; + let payment_id = pending.details.id; + self.payment_store.insert_or_update(pending.details).await?; + self.pending_payment_store.remove(&payment_id).await?; + + Ok(()) + } + + /// Called on `ChannelClosed`. Removes any funding record (channel open or splice) + /// for `channel_id` whose candidates never reached confirmed — e.g. a funding + /// transaction that never made it on-chain. A record that does reflect a confirmed + /// transaction is left alone and will transition to `Succeeded` normally. + pub(crate) async fn handle_channel_closed(&self, channel_id: LnChannelId) -> Result<(), Error> { + let pending = match self + .pending_payment_store + .list_filter(|p| { + p.funding_details + .as_ref() + .map(|fd| record_includes_channel(fd, channel_id)) + .unwrap_or(false) + }) + .into_iter() + .next() + { + Some(p) => p, + None => return Ok(()), + }; + + let is_confirmed = matches!( + pending.details.kind, + PaymentKind::Onchain { status: ConfirmationStatus::Confirmed { .. }, .. } + ); + if is_confirmed { + return Ok(()); + } + + let payment_id = pending.details.id; + self.pending_payment_store.remove(&payment_id).await?; + self.payment_store.remove(&payment_id).await?; + Ok(()) + } + + /// Updates a funding record's `kind` in response to a wallet-sync event, swapping + /// the active candidate when `event_txid` differs from the current one. + /// + /// Amount, fee, and direction are not recomputed from the wallet's view: they were + /// set at broadcast time from the `FundingContribution` and must persist until + /// `ChannelReady`. + /// + /// Returns `true` when a funding record was updated (so the caller skips the + /// default Onchain create/update path), `false` otherwise. + fn apply_funding_details_status_update( + &self, payment_id: PaymentId, event_txid: Txid, confirmation_status: ConfirmationStatus, + ) -> Result { + // `ChannelReady` may move the payment to the main store before wallet sync + // sees the tx confirm. In that case, update `kind` directly; recomputing from + // the wallet's view would overwrite the per-node fee set at broadcast time. + if let Some(mut existing) = self.payment_store.get(&payment_id) { + if existing.status == PaymentStatus::Succeeded + && matches!(existing.kind, PaymentKind::Onchain { .. }) + && self.pending_payment_store.get(&payment_id).is_none() + { + let needs_update = match existing.kind { + PaymentKind::Onchain { txid, status } => { + txid != event_txid || status != confirmation_status + }, + _ => false, + }; + if needs_update { + existing.kind = + PaymentKind::Onchain { txid: event_txid, status: confirmation_status }; + self.runtime.block_on(self.payment_store.insert_or_update(existing))?; + } + return Ok(true); + } + } + + let mut pending = match self.pending_payment_store.get(&payment_id) { + Some(p) => p, + None => return Ok(false), + }; + let funding_details = match pending.funding_details.as_ref() { + Some(fd) => fd, + None => return Ok(false), + }; + + let candidate = match funding_details.candidates.iter().find(|c| c.txid == event_txid) { + Some(c) => c.clone(), + None => { + log_debug!( + self.logger, + "Event txid {} resolved to funding_details payment {} but is not in candidates", + event_txid, + payment_id, + ); + return Ok(false); + }, + }; + + let old_txid = match pending.details.kind { + PaymentKind::Onchain { txid, .. } => txid, + _ => { + debug_assert!(false, "funding_details record must use PaymentKind::Onchain"); + return Ok(false); + }, + }; + + if old_txid != event_txid { + // A different candidate confirmed. Move the previous active txid onto + // `conflicting_txids` and re-derive amount/fee from the new candidate's + // contributions. + if !pending.conflicting_txids.contains(&old_txid) { + pending.conflicting_txids.push(old_txid); + } + pending.conflicting_txids.retain(|t| *t != event_txid); + + let aggregate = aggregate_local_stakes(&candidate); + pending.details.amount_msat = aggregate.amount_msat; + pending.details.fee_paid_msat = aggregate.fee_paid_msat; + } + + pending.details.kind = + PaymentKind::Onchain { txid: event_txid, status: confirmation_status }; + + self.persist_pending(pending)?; + + Ok(true) + } + fn find_payment_by_txid(&self, target_txid: Txid) -> Option { let direct_payment_id = PaymentId(target_txid.to_byte_array()); if self.pending_payment_store.contains_key(&direct_payment_id) { @@ -1231,12 +1507,28 @@ impl Wallet { .list_filter(|p| { matches!(p.details.kind, PaymentKind::Onchain { txid, .. } if txid == target_txid) || p.conflicting_txids.contains(&target_txid) + || p.funding_details + .as_ref() + .map(|fd| fd.candidates.iter().any(|c| c.txid == target_txid)) + .unwrap_or(false) }) .first() { return Some(replaced_details.details.id); } + // Once moved to the main store, a funding payment is still matched by its + // confirmed txid so late wallet events resolve correctly. + if let Some(p) = self + .payment_store + .list_filter( + |p| matches!(p.kind, PaymentKind::Onchain { txid, .. } if txid == target_txid), + ) + .first() + { + return Some(p.id); + } + None } @@ -1435,17 +1727,235 @@ impl Wallet { ConfirmationStatus::Unconfirmed, ); - let pending_payment_store = - self.create_pending_payment_from_tx(new_payment.clone(), Vec::new()); - - self.runtime - .block_on(self.pending_payment_store.insert_or_update(pending_payment_store))?; - self.runtime.block_on(self.payment_store.insert_or_update(new_payment))?; + let pending_payment = self.create_pending_payment_from_tx(new_payment, Vec::new()); + self.persist_pending(pending_payment)?; log_info!(self.logger, "RBF successful: replaced {} with {}", txid, new_txid); Ok(new_txid) } + + pub(crate) fn classify_broadcast( + &self, tx: &Transaction, tx_type: &TransactionType, + ) -> Result<(), Error> { + match tx_type { + TransactionType::Funding { channels } => self.classify_funding(tx, channels), + TransactionType::InteractiveFunding { candidates } => { + self.classify_interactive_funding(tx, candidates) + }, + _ => Ok(()), + } + } + + fn classify_funding( + &self, tx: &Transaction, channels: &[(PublicKey, LnChannelId)], + ) -> Result<(), Error> { + // Batch funding (one transaction funding multiple channels) isn't supported; let + // wallet sync record the payment normally so graduation still runs through + // ANTI_REORG_DELAY. + if channels.len() != 1 { + if channels.len() > 1 { + log_trace!( + self.logger, + "Skipping funding classification for batched broadcast ({} channels)", + channels.len() + ); + } + return Ok(()); + } + + let (counterparty_node_id, channel_id) = channels[0]; + let txid = tx.compute_txid(); + let (amount_msat, fee_paid_msat, direction) = self.onchain_payment_fields(tx); + + let candidate = FundingCandidate { + txid, + channels: vec![ChannelFunding { + counterparty_node_id, + channel_id, + purpose: FundingPurpose::Establishment, + contribution: None, + }], + }; + + let details = PaymentDetails::new( + PaymentId(txid.to_byte_array()), + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed }, + amount_msat, + fee_paid_msat, + direction, + PaymentStatus::Pending, + ); + + let funding_details = FundingDetails { candidates: vec![candidate] }; + + let pending = + PendingPaymentDetails::with_funding_details(details, Vec::new(), funding_details); + + self.persist_pending(pending)?; + log_debug!( + self.logger, + "Recorded channel-funding broadcast {} for channel {}", + txid, + channel_id, + ); + Ok(()) + } + + fn classify_interactive_funding( + &self, tx: &Transaction, candidates: &[FundingCandidate], + ) -> Result<(), Error> { + // `InteractiveFunding` carries the full negotiated history. The currently-broadcast + // candidate is the last entry; earlier entries are RBF predecessors. + let active = match candidates.last() { + Some(c) => c, + None => return Ok(()), + }; + let first = match candidates.first() { + Some(c) => c, + None => return Ok(()), + }; + + let txid = tx.compute_txid(); + debug_assert_eq!(active.txid, txid, "broadcast tx must match the active candidate"); + + // Aggregate amount/fee/direction across this candidate's channels by summing the + // local-stake contributions. If we didn't contribute on this candidate, leave the + // record to wallet sync — there's nothing for us to track here, and any wallet- + // visible activity (e.g. a counterparty's splice-out paid to our address) is + // better surfaced as a plain on-chain receive. + let aggregate = aggregate_local_stakes(active); + let amount_msat = match aggregate.amount_msat { + Some(amt) => Some(amt), + None => { + log_trace!( + self.logger, + "Skipping interactive-funding broadcast {}: no local contribution", + txid, + ); + return Ok(()); + }, + }; + let fee_paid_msat = aggregate.fee_paid_msat; + let direction = aggregate.direction; + + // Skip broadcasts that don't move funds in or out of our on-chain wallet — e.g. + // a splice-out we initiated toward an external address. + let (wallet_amount_msat, _wallet_fee_msat, _wallet_direction) = + self.onchain_payment_fields(tx); + if wallet_amount_msat == Some(0) { + log_trace!( + self.logger, + "Skipping interactive-funding broadcast {}: no wallet-level activity", + txid, + ); + return Ok(()); + } + + // Anchor the PaymentId to the first negotiated candidate so the record stays + // stable across RBF replacements. + let payment_id = PaymentId(first.txid.to_byte_array()); + let candidate_count = candidates.len(); + let active_channel_count = active.channels.len(); + + let details = PaymentDetails::new( + payment_id, + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed }, + amount_msat, + fee_paid_msat, + direction, + PaymentStatus::Pending, + ); + + // Funding records carry their own RBF history in `candidates`; lookup by txid + // (find_payment_by_txid) already searches that, so no separate + // `conflicting_txids` Vec is needed. + let funding_details = FundingDetails { candidates: candidates.to_vec() }; + + let pending = + PendingPaymentDetails::with_funding_details(details, Vec::new(), funding_details); + + self.persist_pending(pending)?; + log_debug!( + self.logger, + "Recorded interactive-funding broadcast {} ({} candidates, {} channels)", + txid, + candidate_count, + active_channel_count, + ); + Ok(()) + } +} + +/// Returns this node's share of the on-chain fee for a funding transaction (channel +/// open or splice), in millisatoshis. Sourced from the contribution's +/// [`FundingContribution::estimated_fee`], which upstream computes per-contributor. +fn our_actual_fee_msat(contribution: &FundingContribution) -> u64 { + contribution.estimated_fee().to_sat() * 1000 +} + +fn record_includes_channel(details: &FundingDetails, channel_id: LnChannelId) -> bool { + details.candidates.iter().any(|c| c.channels.iter().any(|ch| ch.channel_id == channel_id)) +} + +struct LocalStakeAggregate { + amount_msat: Option, + fee_paid_msat: Option, + direction: PaymentDirection, +} + +/// Aggregates local-stake amount/fee/direction across the channels of a single +/// [`FundingCandidate`]. Each channel's contribution (when present) is treated as +/// local-stake-only, so contributions across channels are summed without +/// double-counting. +fn aggregate_local_stakes(candidate: &FundingCandidate) -> LocalStakeAggregate { + let mut amount_outbound: u64 = 0; + let mut amount_inbound: u64 = 0; + let mut fee: u64 = 0; + let mut have_contribution = false; + for channel in &candidate.channels { + if let Some(c) = channel.contribution.as_ref() { + have_contribution = true; + fee = fee.saturating_add(our_actual_fee_msat(c)); + match contribution_direction(c) { + Some((PaymentDirection::Outbound, amt)) => { + amount_outbound = amount_outbound.saturating_add(amt); + }, + Some((PaymentDirection::Inbound, amt)) => { + amount_inbound = amount_inbound.saturating_add(amt); + }, + None => {}, + } + } + } + if !have_contribution { + return LocalStakeAggregate { + amount_msat: None, + fee_paid_msat: None, + direction: PaymentDirection::Outbound, + }; + } + let (direction, amount_msat) = if amount_outbound >= amount_inbound { + (PaymentDirection::Outbound, amount_outbound.saturating_sub(amount_inbound)) + } else { + (PaymentDirection::Inbound, amount_inbound.saturating_sub(amount_outbound)) + }; + LocalStakeAggregate { amount_msat: Some(amount_msat), fee_paid_msat: Some(fee), direction } +} + +/// Returns this contribution's direction and magnitude in msat, or `None` if it can't +/// be classified as a single inbound or outbound payment. +fn contribution_direction(contribution: &FundingContribution) -> Option<(PaymentDirection, u64)> { + let value_added = contribution.value_added(); + let outputs_total: Amount = contribution.outputs().iter().map(|o| o.value).sum(); + + if value_added > Amount::ZERO && outputs_total == Amount::ZERO { + Some((PaymentDirection::Outbound, value_added.to_sat() * 1000)) + } else if value_added == Amount::ZERO && outputs_total > Amount::ZERO { + Some((PaymentDirection::Inbound, outputs_total.to_sat() * 1000)) + } else { + None + } } impl Listen for Wallet { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 4847fa3ad..cb71c974b 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1239,6 +1239,35 @@ async fn rbf_splice_channel() { assert_ne!(original_txo, rbf_txo, "RBF should produce a different funding txo"); + // After RBF but before confirmation, node_b (the initiator) should have a single + // on-chain payment covering both candidates: id anchored to the first broadcast, + // `kind.txid` pointing at the latest (RBF) candidate, and the original candidate + // recorded as a replaced one on the pending record. + { + let payment_id = PaymentId(original_txo.txid.to_byte_array()); + let payment = node_b.payment(&payment_id).expect("splice payment exists"); + match payment.kind { + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed } => { + assert_eq!(txid, rbf_txo.txid); + }, + ref other => panic!("expected Onchain Unconfirmed, got {:?}", other), + } + assert_eq!(payment.status, PaymentStatus::Pending); + // Only one Onchain Pending payment for this splice attempt (not one per candidate). + let splice_payments = node_b.list_payments_with_filter(|p| { + p.direction == PaymentDirection::Outbound + && matches!(p.kind, PaymentKind::Onchain { .. }) + && p.status == PaymentStatus::Pending + }); + assert_eq!( + splice_payments.len(), + 1, + "expected exactly one pending Onchain payment for the splice, got {}: {:#?}", + splice_payments.len(), + splice_payments, + ); + } + // Wait for the RBF transaction to replace the original in the mempool wait_for_tx(&electrsd.client, rbf_txo.txid).await; @@ -1266,6 +1295,26 @@ async fn rbf_splice_channel() { ref e => panic!("node_b got unexpected event: {:?}", e), } + // After `ChannelReady` we should have graduated to `Succeeded` — even though + // `ANTI_REORG_DELAY` may not have elapsed yet — and the `kind.txid` should + // reflect the winning RBF candidate, with `fee_paid_msat` matching our + // per-node `FundingContribution::estimated_fee` for that candidate. + { + let payment_id = PaymentId(original_txo.txid.to_byte_array()); + let payment = node_b.payment(&payment_id).expect("splice payment graduated"); + assert_eq!(payment.status, PaymentStatus::Succeeded); + match payment.kind { + PaymentKind::Onchain { txid, status: ConfirmationStatus::Confirmed { .. } } => { + assert_eq!(txid, rbf_txo.txid); + }, + ref other => panic!("expected Onchain Confirmed, got {:?}", other), + } + assert!( + payment.fee_paid_msat.is_some(), + "splice payment should carry a fee from its FundingContribution", + ); + } + node_a.stop().unwrap(); node_b.stop().unwrap(); } From 9a8a93847635d4181d3c385434111124ad74c855 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 3 Jun 2026 15:56:16 -0500 Subject: [PATCH 05/10] f - Preserve funding details when a splice candidate is replaced The TxReplaced wallet event rebuilt the payment record from scratch, dropping its funding details. When a wallet sync fell between a splice broadcast and its RBF, the replacement of the original candidate cleared those details, so the payment no longer graduated to Succeeded on ChannelReady. Funding records are managed by the classify path and the Lightning lifecycle handlers, so leave them untouched on replacement. Co-Authored-By: Claude --- src/wallet/mod.rs | 12 ++++++++++++ tests/integration_tests_rust.rs | 18 ++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 658f603cf..880f88530 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -410,6 +410,18 @@ impl Wallet { continue; }; + // Funding records (channel opens and splices) track their active candidate and + // status through `classify_*` and the Lightning lifecycle handlers. A replaced + // candidate is expected during splice RBF and must not reset the record or drop + // its funding details, so leave such records untouched here. + if self + .pending_payment_store + .get(&payment_id) + .map_or(false, |p| p.funding_details.is_some()) + { + continue; + } + // Collect all conflict txids let mut conflict_txids: Vec = conflicts.iter().map(|(_, conflict_txid)| *conflict_txid).collect(); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index cb71c974b..262745619 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1218,6 +1218,24 @@ async fn rbf_splice_channel() { let original_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); expect_splice_negotiated_event!(node_b, node_a.node_id()); + // Sync so the original splice candidate is recorded as a canonical wallet transaction before + // the RBF below replaces it. This makes the post-RBF sync observe the original candidate being + // replaced (a `WalletEvent::TxReplaced`), which must not drop the payment's funding details. + // + // This is a best-effort regression guard rather than a deterministic one: with the + // funding-details-preservation fix in place the splice still graduates correctly, but without + // it the resulting inconsistency only surfaces intermittently (via a timing-dependent + // `debug_assert` in the chain-tip handler), so a reverted fix is caught probabilistically. + // + // TODO: Make this deterministic. If funding payments carried a durable classification in the + // main payment store (e.g. a `tx_type` on `PaymentKind::Onchain`, as in + // lightningdevkit/ldk-node#791), a dropped funding-details record would be a detectable + // contradiction on `ChannelReady` rather than a timing-dependent assert, letting this test + // fail reliably whenever the fix is reverted. + wait_for_tx(&electrsd.client, original_txo.txid).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + // splice_in should fail when there's a pending splice (RBF guard) assert_eq!( node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000), From b8ca9c4a242a9e40aeaa334e133af214fd38de95 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 3 Jun 2026 16:01:15 -0500 Subject: [PATCH 06/10] f - Reject on-chain RBF of funding and splice payments bump_fee_rbf accepted channel-funding and splice payments because they are recorded as outbound, unconfirmed on-chain payments. Replacing such a transaction via wallet RBF would broadcast one LDK isn't tracking, and for splices the shared input can't be wallet-signed. Reject these and leave splice fee-bumping to rbf_channel. Co-Authored-By: Claude --- src/wallet/mod.rs | 17 ++++++++++ tests/integration_tests_rust.rs | 57 +++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 880f88530..1284ed590 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -1553,6 +1553,23 @@ impl Wallet { Error::InvalidPaymentId })?; + // Funding transactions (channel opens and splices) are driven by LDK's funding/splice + // lifecycle, not the on-chain wallet. Replacing one via on-chain RBF would broadcast a + // transaction LDK isn't tracking (and, for splices, can't sign). Fee-bumping a pending + // splice goes through `bump_channel_funding_fee` instead. + if self + .pending_payment_store + .get(&payment_id) + .map_or(false, |p| p.funding_details.is_some()) + { + log_error!( + self.logger, + "Cannot RBF funding payment {} via bump_fee_rbf; use bump_channel_funding_fee instead", + payment_id, + ); + return Err(Error::InvalidPaymentId); + } + if let PaymentKind::Onchain { status, .. } = &payment.kind { match status { ConfirmationStatus::Confirmed { .. } => { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 262745619..e122f9345 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1337,6 +1337,63 @@ async fn rbf_splice_channel() { node_b.stop().unwrap(); } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn bump_fee_rbf_rejects_funding_payment() { + // A channel-funding or splice transaction is driven by LDK's funding/splice lifecycle, not the + // on-chain wallet. `bump_fee_rbf` must reject such payments — replacing the funding transaction + // via plain wallet RBF would broadcast a transaction LDK isn't tracking (and, for splices, + // can't even sign). Fee-bumping a pending splice goes through `bump_channel_funding_fee` instead. + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + + let address_a = node_a.onchain_payment().new_address().unwrap(); + let address_b = node_b.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 5_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a, address_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 4_000_000, false, &electrsd).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let _user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); + let user_channel_id_b = expect_channel_ready_event!(node_b, node_a.node_id()); + + // Splice-in to create a pending splice payment. + node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000).unwrap(); + + let txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + + // Make node_b's wallet aware of the splice transaction so `bump_fee_rbf` reaches its funding + // guard rather than failing earlier for a transaction it can't find. + wait_for_tx(&electrsd.client, txo.txid).await; + node_b.sync_wallets().unwrap(); + + // The splice payment is an on-chain, outbound, unconfirmed record, so it passes + // `bump_fee_rbf`'s other guards; it must nonetheless be rejected as a funding payment. + let splice_payment_id = PaymentId(txo.txid.to_byte_array()); + assert_eq!( + node_b.onchain_payment().bump_fee_rbf(splice_payment_id, None), + Err(NodeError::InvalidPaymentId), + ); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn simple_bolt12_send_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); From 88be9ceaba3cb0c5a2f32940e07ef598d87269d8 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Fri, 5 Jun 2026 14:16:08 -0500 Subject: [PATCH 07/10] f - Rename persist_pending to persist_pending_payment Co-Authored-By: Claude --- src/wallet/mod.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 1284ed590..c101c373b 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -300,7 +300,7 @@ impl Wallet { if payment_status == PaymentStatus::Pending { let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); - self.persist_pending(pending_payment)?; + self.persist_pending_payment(pending_payment)?; } else { self.runtime.block_on(self.payment_store.insert_or_update(payment))?; } @@ -398,7 +398,7 @@ impl Wallet { ConfirmationStatus::Unconfirmed, ); let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); - self.persist_pending(pending_payment)?; + self.persist_pending_payment(pending_payment)?; }, WalletEvent::TxReplaced { txid, conflicts, .. } => { let Some(payment_id) = self.find_payment_by_txid(txid) else { @@ -466,7 +466,7 @@ impl Wallet { ConfirmationStatus::Unconfirmed, ); let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); - self.persist_pending(pending_payment)?; + self.persist_pending_payment(pending_payment)?; }, _ => { continue; @@ -1294,7 +1294,7 @@ impl Wallet { /// Writes a [`PendingPaymentDetails`] and its inner [`PaymentDetails`] to their /// respective stores in a fixed order. Callers that need to keep the two stores in /// sync should always go through this. - fn persist_pending(&self, pending: PendingPaymentDetails) -> Result<(), Error> { + fn persist_pending_payment(&self, pending: PendingPaymentDetails) -> Result<(), Error> { self.runtime.block_on(self.payment_store.insert_or_update(pending.details.clone()))?; self.runtime.block_on(self.pending_payment_store.insert_or_update(pending))?; Ok(()) @@ -1503,7 +1503,7 @@ impl Wallet { pending.details.kind = PaymentKind::Onchain { txid: event_txid, status: confirmation_status }; - self.persist_pending(pending)?; + self.persist_pending_payment(pending)?; Ok(true) } @@ -1757,7 +1757,7 @@ impl Wallet { ); let pending_payment = self.create_pending_payment_from_tx(new_payment, Vec::new()); - self.persist_pending(pending_payment)?; + self.persist_pending_payment(pending_payment)?; log_info!(self.logger, "RBF successful: replaced {} with {}", txid, new_txid); @@ -1821,7 +1821,7 @@ impl Wallet { let pending = PendingPaymentDetails::with_funding_details(details, Vec::new(), funding_details); - self.persist_pending(pending)?; + self.persist_pending_payment(pending)?; log_debug!( self.logger, "Recorded channel-funding broadcast {} for channel {}", @@ -1904,7 +1904,7 @@ impl Wallet { let pending = PendingPaymentDetails::with_funding_details(details, Vec::new(), funding_details); - self.persist_pending(pending)?; + self.persist_pending_payment(pending)?; log_debug!( self.logger, "Recorded interactive-funding broadcast {} ({} candidates, {} channels)", From cf78007b5ea1702bfafcc9010147034caea62895 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Mon, 15 Jun 2026 13:36:26 -0500 Subject: [PATCH 08/10] f - Keep a graduated funding payment terminal across wallet sync Once ChannelReady graduates a funding payment to Succeeded (removing its pending record), a concurrent wallet-sync event for one of its candidates could re-stamp the old Pending status and revert the graduation. Treat a Succeeded on-chain payment as terminal during sync, refining only the confirmation status of the candidate that locked. Co-Authored-By: Claude --- src/wallet/mod.rs | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index c101c373b..514b3c04a 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -1433,24 +1433,21 @@ impl Wallet { fn apply_funding_details_status_update( &self, payment_id: PaymentId, event_txid: Txid, confirmation_status: ConfirmationStatus, ) -> Result { - // `ChannelReady` may move the payment to the main store before wallet sync - // sees the tx confirm. In that case, update `kind` directly; recomputing from - // the wallet's view would overwrite the per-node fee set at broadcast time. + // A funding payment becomes `Succeeded` only once `handle_channel_ready` has graduated it + // on `ChannelReady`, which also removes the pending record. A wallet-sync event arriving + // during that removal (the two are awaited separately, so a sync can interleave) or after + // it must never downgrade the terminal status or adopt a replaced candidate's txid. At + // most it refines the confirmation status of the candidate that actually locked — and + // recomputing from the wallet's view would clobber the per-node fee set at broadcast time. if let Some(mut existing) = self.payment_store.get(&payment_id) { if existing.status == PaymentStatus::Succeeded && matches!(existing.kind, PaymentKind::Onchain { .. }) - && self.pending_payment_store.get(&payment_id).is_none() { - let needs_update = match existing.kind { - PaymentKind::Onchain { txid, status } => { - txid != event_txid || status != confirmation_status - }, - _ => false, - }; - if needs_update { - existing.kind = - PaymentKind::Onchain { txid: event_txid, status: confirmation_status }; - self.runtime.block_on(self.payment_store.insert_or_update(existing))?; + if let PaymentKind::Onchain { txid, status } = existing.kind { + if txid == event_txid && status != confirmation_status { + existing.kind = PaymentKind::Onchain { txid, status: confirmation_status }; + self.runtime.block_on(self.payment_store.insert_or_update(existing))?; + } } return Ok(true); } From 328bdc3de946db5999b281434b52b4224a02c4b5 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Thu, 23 Apr 2026 14:58:20 -0500 Subject: [PATCH 09/10] Persist payment transaction data without blocking LDK Previously the BroadcasterInterface implementation wrote the payment record synchronously when LDK invoked it. With a remote KV store this could block LDK's message handling for hundreds of milliseconds per call, noticeably during force-close bursts or splice broadcasts. Persistence now happens asynchronously and must complete before the transaction is sent to the chain client. If persistence fails, the broadcast is dropped: a payment record must exist for every on-chain tx we emit, otherwise a crash could leave the tx confirmed with no matching record. Generated with assistance from Claude Code. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/chain/bitcoind.rs | 8 ++++--- src/chain/electrum.rs | 6 +++-- src/chain/esplora.rs | 8 ++++--- src/chain/mod.rs | 20 ++++++++++++---- src/tx_broadcaster.rs | 56 +++++++++++++++++++++++++++---------------- 5 files changed, 66 insertions(+), 32 deletions(-) diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 6bfa8ffd2..2aa05255d 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -571,16 +571,18 @@ impl BitcoindChainSource { Ok(()) } - pub(crate) async fn process_broadcast_package(&self, package: Vec) { + pub(crate) async fn process_broadcast_package( + &self, txs: impl IntoIterator, + ) { // While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28 // features, we should eventually switch to use `submitpackage` via the // `rust-bitcoind-json-rpc` crate rather than just broadcasting individual // transactions. - for tx in &package { + for tx in txs { let txid = tx.compute_txid(); let timeout_fut = tokio::time::timeout( Duration::from_secs(DEFAULT_TX_BROADCAST_TIMEOUT_SECS), - self.api_client.broadcast_transaction(tx), + self.api_client.broadcast_transaction(&tx), ); match timeout_fut.await { Ok(res) => match res { diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index 7406f06b4..0c09a2fda 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -294,7 +294,9 @@ impl ElectrumChainSource { Ok(()) } - pub(crate) async fn process_broadcast_package(&self, package: Vec) { + pub(crate) async fn process_broadcast_package( + &self, txs: impl IntoIterator, + ) { let electrum_client: Arc = if let Some(client) = self.electrum_runtime_status.read().expect("lock").client().as_ref() { @@ -304,7 +306,7 @@ impl ElectrumChainSource { return; }; - for tx in package { + for tx in txs { electrum_client.broadcast(tx).await; } } diff --git a/src/chain/esplora.rs b/src/chain/esplora.rs index eb23a395d..94078512f 100644 --- a/src/chain/esplora.rs +++ b/src/chain/esplora.rs @@ -355,12 +355,14 @@ impl EsploraChainSource { Ok(()) } - pub(crate) async fn process_broadcast_package(&self, package: Vec) { - for tx in &package { + pub(crate) async fn process_broadcast_package( + &self, txs: impl IntoIterator, + ) { + for tx in txs { let txid = tx.compute_txid(); let timeout_fut = tokio::time::timeout( Duration::from_secs(self.sync_config.timeouts_config.tx_broadcast_timeout_secs), - self.esplora_client.broadcast(tx), + self.esplora_client.broadcast(&tx), ); match timeout_fut.await { Ok(res) => match res { diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 92c4bdb64..3f9984df4 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -24,7 +24,7 @@ use crate::config::{ WALLET_SYNC_INTERVAL_MINIMUM_SECS, }; use crate::fee_estimator::OnchainFeeEstimator; -use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger}; +use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; use crate::{Error, PersistedNodeMetrics}; @@ -453,15 +453,27 @@ impl ChainSource { return; } Some(next_package) = receiver.recv() => { + let package = match self.tx_broadcaster.classify_package(next_package).await { + Ok(p) => p, + Err(e) => { + log_error!( + tx_bcast_logger, + "Skipping broadcast: failed to persist payment records: {:?}", + e, + ); + continue; + }, + }; + let txs = package.into_iter().map(|(tx, _)| tx); match &self.kind { ChainSourceKind::Esplora(esplora_chain_source) => { - esplora_chain_source.process_broadcast_package(next_package).await + esplora_chain_source.process_broadcast_package(txs).await }, ChainSourceKind::Electrum(electrum_chain_source) => { - electrum_chain_source.process_broadcast_package(next_package).await + electrum_chain_source.process_broadcast_package(txs).await }, ChainSourceKind::Bitcoind(bitcoind_chain_source) => { - bitcoind_chain_source.process_broadcast_package(next_package).await + bitcoind_chain_source.process_broadcast_package(txs).await }, } } diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index 24abf8f11..1e65fb1aa 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -14,20 +14,26 @@ use tokio::sync::{mpsc, Mutex, MutexGuard}; use crate::logger::{log_error, LdkLogger}; use crate::types::Wallet; +use crate::Error; const BCAST_PACKAGE_QUEUE_SIZE: usize = 50; +/// A package of transactions that LDK handed to the broadcaster in one +/// `broadcast_transactions` call, along with each transaction's type. Queued until the +/// background task classifies and broadcasts it. +pub(crate) type BroadcastPackage = Vec<(Transaction, TransactionType)>; + pub(crate) struct TransactionBroadcaster where L::Target: LdkLogger, { - queue_sender: mpsc::Sender>, - queue_receiver: Mutex>>, + queue_sender: mpsc::Sender, + queue_receiver: Mutex>, /// Weak handle to the [`Wallet`] that performs classification of funding broadcasts /// (channel opens and splices) into payment records. Remains `None` while the - /// builder is wiring the node up, during which broadcasts are still forwarded to - /// the queue but no payment record is written. [`Self::set_wallet`] installs the - /// handle once the [`Wallet`] exists. + /// builder is wiring the node up, during which broadcasts are forwarded to the + /// queue but no payment record is written. [`Self::set_wallet`] installs the handle + /// once the [`Wallet`] exists. wallet: StdMutex>>, logger: L, } @@ -55,9 +61,31 @@ where pub(crate) async fn get_broadcast_queue( &self, - ) -> MutexGuard<'_, mpsc::Receiver>> { + ) -> MutexGuard<'_, mpsc::Receiver> { self.queue_receiver.lock().await } + + /// Classifies a queued package into payment records and returns the package ready + /// for the chain client. Returns `Err` if any classification fails; callers must + /// not broadcast the package in that case, since a crash would leave the tx + /// on-chain without a record. + pub(crate) async fn classify_package( + &self, package: BroadcastPackage, + ) -> Result { + let wallet_opt = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade); + if let Some(wallet) = wallet_opt { + tokio::task::spawn_blocking(move || { + for (tx, tx_type) in &package { + wallet.classify_broadcast(tx, tx_type)?; + } + Ok::<_, Error>(package) + }) + .await + .map_err(|_| Error::PersistenceFailed)? + } else { + Ok(package) + } + } } impl BroadcasterInterface for TransactionBroadcaster @@ -65,20 +93,8 @@ where L::Target: LdkLogger, { fn broadcast_transactions(&self, txs: &[(&Transaction, TransactionType)]) { - let wallet = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade); - if let Some(wallet) = wallet { - for (tx, tx_type) in txs { - if let Err(e) = wallet.classify_broadcast(tx, tx_type) { - log_error!( - self.logger, - "Failed to classify broadcast tx {}: {:?}", - tx.compute_txid(), - e, - ); - } - } - } - let package = txs.iter().map(|(t, _)| (*t).clone()).collect::>(); + let package: BroadcastPackage = + txs.iter().map(|(tx, tx_type)| ((*tx).clone(), tx_type.clone())).collect(); self.queue_sender.try_send(package).unwrap_or_else(|e| { log_error!(self.logger, "Failed to broadcast transactions: {}", e); }); From 6098d0e03b1922a15795feb9f8d531fa034d0351 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 10 Jun 2026 16:14:53 -0500 Subject: [PATCH 10/10] Retry user-initiated splices across restarts and disconnects LDK does not durably record a splice until its negotiation reaches the signature exchange, and it abandons an in-progress negotiation whenever the peer disconnects -- which includes stopping the node. A restart or an ill-timed disconnect after splice_in, splice_out, or bump_channel_funding_fee returned Ok would therefore silently drop the splice. Persist the splice intent in a new UserChannelId-keyed channel record store before handing the contribution to LDK, and resubmit it until the splice locks. A startup reconciler probes LDK's live splice state to detect dropped intents -- including those lost to a crash before LDK persisted anything -- and the SpliceNegotiationFailed handler retries recoverable failures, rebuilding the contribution with fresh parameters when the stored one has gone stale. Resubmission does not require the peer to be connected, as LDK holds the contribution and initiates quiescence once the peer reconnects. Event::SpliceNegotiationFailed is now emitted only once a splice is given up on (a non-transient failure or retries exhausted) rather than for every failed negotiation round. Generated with assistance from Claude Code. Co-Authored-By: Claude Fable 5 --- src/builder.rs | 33 +++- src/channel/mod.rs | 341 ++++++++++++++++++++++++++++++++ src/channel/store.rs | 184 +++++++++++++++++ src/event.rs | 34 +++- src/io/mod.rs | 4 + src/lib.rs | 113 ++++++++++- src/types.rs | 5 +- tests/integration_tests_rust.rs | 187 ++++++++++++++++++ 8 files changed, 885 insertions(+), 16 deletions(-) create mode 100644 src/channel/mod.rs create mode 100644 src/channel/store.rs diff --git a/src/builder.rs b/src/builder.rs index efa5c50da..cab29bd5d 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -64,7 +64,9 @@ use crate::io::utils::{ }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ - self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + self, CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; @@ -77,9 +79,9 @@ use crate::peer_store::PeerStore; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreRef, DynStoreWrapper, - GossipSync, Graph, HRNResolver, KeysManager, MessageRouter, OnionMessenger, PaymentStore, - PeerManager, PendingPaymentStore, + AsyncPersister, ChainMonitor, ChannelManager, ChannelRecordStore, DynStore, DynStoreRef, + DynStoreWrapper, GossipSync, Graph, HRNResolver, KeysManager, MessageRouter, OnionMessenger, + PaymentStore, PeerManager, PendingPaymentStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1379,7 +1381,7 @@ fn build_with_store_internal( let kv_store_ref = Arc::clone(&kv_store); let logger_ref = Arc::clone(&logger); - let (payment_store_res, node_metris_res, pending_payment_store_res) = + let (payment_store_res, node_metris_res, pending_payment_store_res, channel_record_store_res) = runtime.block_on(async move { tokio::join!( read_all_objects( @@ -1394,6 +1396,12 @@ fn build_with_store_internal( PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, Arc::clone(&logger_ref), + ), + read_all_objects( + &*kv_store_ref, + CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), ) ) }); @@ -1605,6 +1613,20 @@ fn build_with_store_internal( }, }; + let channel_record_store = match channel_record_store_res { + Ok(channel_records) => Arc::new(ChannelRecordStore::new( + channel_records, + CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read channel record data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + let wallet = Arc::new(Wallet::new( bdk_wallet, wallet_persister, @@ -2151,6 +2173,7 @@ fn build_with_store_internal( scorer, peer_store, payment_store, + channel_record_store, lnurl_auth, is_running, node_metrics, diff --git a/src/channel/mod.rs b/src/channel/mod.rs new file mode 100644 index 000000000..12a2c3713 --- /dev/null +++ b/src/channel/mod.rs @@ -0,0 +1,341 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +//! Per-channel state tracking. + +pub(crate) mod store; + +use std::ops::Deref; +use std::sync::Arc; + +use bitcoin::secp256k1::PublicKey; +use bitcoin::{Amount, FeeRate}; +use lightning::events::NegotiationFailureReason; +use lightning::ln::funding::FundingContribution; +use lightning::ln::types::ChannelId; + +use crate::event::{Event, EventQueue}; +use crate::fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; +use crate::logger::{log_error, log_info, LdkLogger}; +use crate::types::{ChannelManager, ChannelRecordStore, UserChannelId, Wallet}; +use crate::Error; + +pub(crate) use self::store::MAX_SPLICE_ATTEMPTS; +use self::store::{ChannelRecord, ChannelRecordUpdate, SpliceIntent, SpliceKind}; + +/// Resubmits user-initiated splices that LDK dropped before durably recording them. +/// +/// LDK only persists a splice once its negotiation reaches `AwaitingSignatures`, and it abandons +/// an earlier negotiation whenever the peer disconnects (which includes restarting the node). The +/// splice entry points persist a [`SpliceIntent`] before handing the contribution to LDK; this +/// type drives that intent back into [`ChannelManager::funding_contributed`] until the splice +/// either locks (clearing the intent on `ChannelReady`) or fails for a reason that retrying +/// cannot address. +/// +/// Resubmitting does not require the peer to be connected: LDK holds on to the contribution and +/// initiates quiescence once the peer reconnects. +/// +/// [`ChannelManager::funding_contributed`]: lightning::ln::channelmanager::ChannelManager::funding_contributed +pub(crate) struct SpliceRetrier +where + L::Target: LdkLogger, +{ + channel_manager: Arc, + wallet: Arc, + fee_estimator: Arc, + channel_record_store: Arc, + event_queue: Arc>, + logger: L, +} + +impl SpliceRetrier +where + L::Target: LdkLogger, +{ + pub(crate) fn new( + channel_manager: Arc, wallet: Arc, + fee_estimator: Arc, channel_record_store: Arc, + event_queue: Arc>, logger: L, + ) -> Self { + Self { channel_manager, wallet, fee_estimator, channel_record_store, event_queue, logger } + } + + /// Reconciles persisted splice intents against live channel state. Run once at startup. + pub(crate) async fn reconcile(&self) { + let records = self.channel_record_store.list_filter(|r| r.pending_splice().is_some()); + for record in records { + let ChannelRecord::Funded { + user_channel_id, counterparty_node_id, pending_splice, .. + } = record; + let intent = match pending_splice { + Some(intent) => intent, + None => continue, + }; + + let channel = self + .channel_manager + .list_channels_with_counterparty(&counterparty_node_id) + .into_iter() + .find(|c| c.user_channel_id == user_channel_id.0); + let channel = match channel { + Some(channel) => channel, + None => { + // The channel is gone; there is nothing to splice anymore. + let _ = self.channel_record_store.remove(&user_channel_id).await; + continue; + }, + }; + + if channel.funding_txo != Some(intent.pre_splice_funding_txo) { + // The funding moved on, so the splice (or a replacement) locked. + let _ = self.channel_record_store.remove(&user_channel_id).await; + continue; + } + + // `splice_channel` is a read-only probe of LDK's splice state. It fails when we + // already have a splice in flight (a held contribution, an in-progress negotiation, + // or one awaiting signatures), all of which LDK drives to completion on its own. + let template = match self + .channel_manager + .splice_channel(&channel.channel_id, &counterparty_node_id) + { + Ok(template) => template, + Err(_) => continue, + }; + + // The template's prior contribution is our last negotiated one. LDK persists a splice + // once negotiated, so its presence means the intent was carried out unless the intent + // was a fee bump at a higher feerate than what was negotiated. + let should_retry = match (&intent.kind, template.prior_contribution()) { + (SpliceKind::Rbf {}, Some(prior)) => { + prior.feerate() < intent.contribution.feerate() + }, + (SpliceKind::Rbf {}, None) => { + // The splice to bump is gone entirely; surface rather than guess. + self.abandon(user_channel_id, channel.channel_id, counterparty_node_id).await; + continue; + }, + (_, Some(_)) => false, + (_, None) => true, + }; + if !should_retry { + continue; + } + + if intent.attempts >= MAX_SPLICE_ATTEMPTS { + self.abandon(user_channel_id, channel.channel_id, counterparty_node_id).await; + continue; + } + + log_info!( + self.logger, + "Resubmitting splice for channel {} with counterparty {}", + channel.channel_id, + counterparty_node_id, + ); + let _ = self + .submit(&channel.channel_id, &counterparty_node_id, user_channel_id, intent) + .await; + } + } + + /// Applies a `SpliceNegotiationFailed` event to any matching splice intent, retrying when the + /// failure is recoverable. Returns whether the failure should be surfaced to the user. + pub(crate) async fn on_negotiation_failed( + &self, user_channel_id: UserChannelId, reason: NegotiationFailureReason, + contribution: Option, + ) -> bool { + let record = match self.channel_record_store.get(&user_channel_id) { + Some(record) => record, + None => return true, + }; + let ChannelRecord::Funded { channel_id, counterparty_node_id, pending_splice, .. } = record; + let mut intent = match pending_splice { + Some(intent) => intent, + None => return true, + }; + + // Only act on failures of the splice we are tracking. A mismatch means the failure + // concerns some other attempt (e.g., a stale event replayed after the user initiated a + // new splice), in which case the record must be left alone. + if contribution.as_ref() != Some(&intent.contribution) { + return true; + } + + if intent.attempts >= MAX_SPLICE_ATTEMPTS { + let _ = self.channel_record_store.remove(&user_channel_id).await; + return true; + } + + match reason { + NegotiationFailureReason::PeerDisconnected => { + // The same contribution remains valid. Skip if LDK already has a splice in + // flight for this channel (e.g., the startup reconciler resubmitted first). + if self.channel_manager.splice_channel(&channel_id, &counterparty_node_id).is_err() + { + return false; + } + log_info!( + self.logger, + "Resubmitting splice for channel {} with counterparty {} after disconnect", + channel_id, + counterparty_node_id, + ); + let _ = + self.submit(&channel_id, &counterparty_node_id, user_channel_id, intent).await; + false + }, + NegotiationFailureReason::FeeRateTooLow + | NegotiationFailureReason::ContributionInvalid => { + // The contribution went stale (e.g., another splice negotiation outpaced ours, + // turning the resubmission into an underpaying fee bump of it). Rebuild a fresh + // contribution from the original call's parameters. + match self + .rebuild_contribution(&channel_id, &counterparty_node_id, &intent.kind) + .await + { + Ok(contribution) => { + log_info!( + self.logger, + "Resubmitting rebuilt splice for channel {} with counterparty {}", + channel_id, + counterparty_node_id, + ); + intent.contribution = contribution; + let _ = self + .submit(&channel_id, &counterparty_node_id, user_channel_id, intent) + .await; + false + }, + Err(e) => { + log_error!( + self.logger, + "Abandoning splice for channel {}: failed to rebuild contribution: {:?}", + channel_id, + e, + ); + let _ = self.channel_record_store.remove(&user_channel_id).await; + true + }, + } + }, + _ => { + // Terminal failure; retrying cannot address it. + let _ = self.channel_record_store.remove(&user_channel_id).await; + true + }, + } + } + + /// Clears any splice intent made obsolete by a locked funding transaction. + pub(crate) async fn on_channel_ready( + &self, user_channel_id: UserChannelId, funding_txo: Option, + ) { + let record = match self.channel_record_store.get(&user_channel_id) { + Some(record) => record, + None => return, + }; + // Only clear an intent predating the locked funding transaction. An intent with a + // matching pre-splice funding outpoint was created after the lock and is still pending. + let clear = match (record.pending_splice(), funding_txo) { + (Some(intent), Some(funding_txo)) => { + intent.pre_splice_funding_txo.into_bitcoin_outpoint() != funding_txo + }, + (Some(_), None) => false, + (None, _) => true, + }; + if clear { + let _ = self.channel_record_store.remove(&user_channel_id).await; + } + } + + /// Clears any splice intent for a closed channel, as there is nothing left to splice. + pub(crate) async fn on_channel_closed(&self, user_channel_id: UserChannelId) { + let _ = self.channel_record_store.remove(&user_channel_id).await; + } + + /// Persists the incremented attempt count and hands the contribution back to LDK. The count + /// is persisted first so that a crash mid-submission cannot lead to unbounded retries. + async fn submit( + &self, channel_id: &ChannelId, counterparty_node_id: &PublicKey, + user_channel_id: UserChannelId, mut intent: SpliceIntent, + ) -> Result<(), Error> { + intent.attempts += 1; + let contribution = intent.contribution.clone(); + let update = ChannelRecordUpdate { user_channel_id, pending_splice: Some(Some(intent)) }; + self.channel_record_store.update(update).await?; + + self.channel_manager + .funding_contributed(channel_id, counterparty_node_id, contribution, None) + .map_err(|e| { + log_error!( + self.logger, + "Failed to resubmit splice for channel {} with counterparty {}: {:?}", + channel_id, + counterparty_node_id, + e, + ); + Error::ChannelSplicingFailed + }) + } + + /// Builds a fresh contribution from the parameters of the originating API call, mirroring the + /// corresponding [`Node`] method. + /// + /// [`Node`]: crate::Node + async fn rebuild_contribution( + &self, channel_id: &ChannelId, counterparty_node_id: &PublicKey, kind: &SpliceKind, + ) -> Result { + let template = self + .channel_manager + .splice_channel(channel_id, counterparty_node_id) + .map_err(|_| Error::ChannelSplicingFailed)?; + + let est_feerate = self.fee_estimator.estimate_fee_rate(ConfirmationTarget::ChannelFunding); + let min_feerate = + template.min_rbf_feerate().map_or(est_feerate, |min_rbf| est_feerate.max(min_rbf)); + let max_feerate = FeeRate::from_sat_per_kwu(min_feerate.to_sat_per_kwu() * 3 / 2); + + match kind { + SpliceKind::In { amount_sats } => template + .splice_in( + Amount::from_sat(*amount_sats), + min_feerate, + max_feerate, + Arc::clone(&self.wallet), + ) + .await + .map_err(|_| Error::ChannelSplicingFailed), + SpliceKind::Out { outputs } => template + .splice_out(outputs.clone(), min_feerate, max_feerate) + .map_err(|_| Error::ChannelSplicingFailed), + SpliceKind::Rbf {} => template + .rbf_prior_contribution(None, max_feerate, Arc::clone(&self.wallet)) + .await + .map_err(|_| Error::ChannelSplicingFailed), + } + } + + /// Gives up on a splice intent and surfaces the failure to the user. + async fn abandon( + &self, user_channel_id: UserChannelId, channel_id: ChannelId, + counterparty_node_id: PublicKey, + ) { + log_error!( + self.logger, + "Abandoning splice for channel {} with counterparty {}", + channel_id, + counterparty_node_id, + ); + let _ = self.channel_record_store.remove(&user_channel_id).await; + let event = + Event::SpliceNegotiationFailed { channel_id, user_channel_id, counterparty_node_id }; + if let Err(e) = self.event_queue.add_event(event).await { + log_error!(self.logger, "Failed to push to event queue: {}", e); + } + } +} diff --git a/src/channel/store.rs b/src/channel/store.rs new file mode 100644 index 000000000..1d4cd3cab --- /dev/null +++ b/src/channel/store.rs @@ -0,0 +1,184 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use bitcoin::secp256k1::PublicKey; +use bitcoin::TxOut; +use lightning::chain::transaction::OutPoint; +use lightning::ln::funding::FundingContribution; +use lightning::ln::types::ChannelId; +use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; + +use crate::data_store::{StorableObject, StorableObjectId, StorableObjectUpdate}; +use crate::hex_utils; +use crate::types::UserChannelId; + +/// The number of times a splice intent is resubmitted before it is abandoned and the failure is +/// surfaced to the user. +pub(crate) const MAX_SPLICE_ATTEMPTS: u8 = 3; + +/// A user-initiated splice that has been handed to LDK but is not yet guaranteed to survive a +/// restart. LDK only persists a splice once its negotiation reaches `AwaitingSignatures`, so until +/// the new funding transaction locks we keep enough state to resubmit the splice ourselves. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct SpliceIntent { + /// The channel's funding outpoint when the splice was initiated. It only changes once a splice + /// locks, so a mismatch with the channel's current funding outpoint means the splice (or a + /// replacement) completed and there is nothing left to resubmit. + pub pre_splice_funding_txo: OutPoint, + /// The contribution handed to [`ChannelManager::funding_contributed`], resubmitted verbatim. + /// + /// [`ChannelManager::funding_contributed`]: lightning::ln::channelmanager::ChannelManager::funding_contributed + pub contribution: FundingContribution, + /// The parameters of the originating API call, used to rebuild a fresh contribution when the + /// stored one has become stale (e.g., its feerate is no longer sufficient). + pub kind: SpliceKind, + /// The number of times the contribution has been resubmitted to LDK after the originating API + /// call handed it off. + pub attempts: u8, +} + +impl_writeable_tlv_based!(SpliceIntent, { + (0, pre_splice_funding_txo, required), + (2, contribution, required), + (4, kind, required), + (6, attempts, required), +}); + +/// The parameters of the API call that initiated a splice. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum SpliceKind { + /// [`Node::splice_in`] with a resolved amount. + /// + /// [`Node::splice_in`]: crate::Node::splice_in + In { amount_sats: u64 }, + /// [`Node::splice_out`] to the given outputs. + /// + /// [`Node::splice_out`]: crate::Node::splice_out + Out { outputs: Vec }, + /// [`Node::bump_channel_funding_fee`] of a pending splice. + /// + /// [`Node::bump_channel_funding_fee`]: crate::Node::bump_channel_funding_fee + Rbf {}, +} + +impl_writeable_tlv_based_enum!(SpliceKind, + (0, In) => { + (0, amount_sats, required), + }, + (2, Out) => { + (0, outputs, required_vec), + }, + (4, Rbf) => {}, +); + +/// Persistent per-channel state tracked by LDK Node, keyed by [`UserChannelId`]. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum ChannelRecord { + /// State for a channel whose funding transaction exists, currently limited to an in-flight + /// splice intent. A record without a pending splice intent is removed from the store. + Funded { + user_channel_id: UserChannelId, + counterparty_node_id: PublicKey, + channel_id: ChannelId, + pending_splice: Option, + }, +} + +impl ChannelRecord { + pub(crate) fn pending_splice(&self) -> Option<&SpliceIntent> { + match self { + ChannelRecord::Funded { pending_splice, .. } => pending_splice.as_ref(), + } + } +} + +impl_writeable_tlv_based_enum!(ChannelRecord, + (0, Funded) => { + (0, user_channel_id, required), + (2, counterparty_node_id, required), + (4, channel_id, required), + (6, pending_splice, option), + }, +); + +impl StorableObjectId for UserChannelId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0.to_be_bytes()) + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct ChannelRecordUpdate { + pub user_channel_id: UserChannelId, + pub pending_splice: Option>, +} + +impl StorableObject for ChannelRecord { + type Id = UserChannelId; + type Update = ChannelRecordUpdate; + + fn id(&self) -> Self::Id { + match self { + ChannelRecord::Funded { user_channel_id, .. } => *user_channel_id, + } + } + + fn update(&mut self, update: Self::Update) -> bool { + let mut updated = false; + match self { + ChannelRecord::Funded { pending_splice, .. } => { + if let Some(new_pending_splice) = update.pending_splice { + if *pending_splice != new_pending_splice { + *pending_splice = new_pending_splice; + updated = true; + } + } + }, + } + updated + } + + fn to_update(&self) -> Self::Update { + match self { + ChannelRecord::Funded { user_channel_id, pending_splice, .. } => Self::Update { + user_channel_id: *user_channel_id, + pending_splice: Some(pending_splice.clone()), + }, + } + } +} + +impl StorableObjectUpdate for ChannelRecordUpdate { + fn id(&self) -> ::Id { + self.user_channel_id + } +} + +#[cfg(test)] +mod tests { + use lightning::util::ser::{Readable, Writeable}; + + use super::*; + + #[test] + fn channel_record_is_serializable() { + let user_channel_id = UserChannelId(42); + let counterparty_node_id = bitcoin::secp256k1::PublicKey::from_slice(&[2u8; 33]).unwrap(); + let channel_id = ChannelId([3u8; 32]); + let record = ChannelRecord::Funded { + user_channel_id, + counterparty_node_id, + channel_id, + pending_splice: None, + }; + + let encoded = record.encode(); + let decoded = ChannelRecord::read(&mut &encoded[..]).unwrap(); + assert_eq!(record, decoded); + assert_eq!(decoded.id(), user_channel_id); + } +} diff --git a/src/event.rs b/src/event.rs index dcb117ff6..3ac9b6ff3 100644 --- a/src/event.rs +++ b/src/event.rs @@ -33,6 +33,7 @@ use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; use lightning_liquidity::lsps2::utils::compute_opening_fee; use lightning_types::payment::{PaymentHash, PaymentPreimage}; +use crate::channel::SpliceRetrier; use crate::config::{may_announce_channel, Config}; use crate::connection::ConnectionManager; use crate::data_store::DataStoreUpdateResult; @@ -283,7 +284,11 @@ pub enum Event { /// The outpoint of the channel's splice funding transaction. new_funding_txo: OutPoint, }, - /// A channel splice negotiation round has failed. + /// A channel splice has failed and is no longer being pursued. + /// + /// Recoverable failures (e.g., a peer disconnecting mid-negotiation) are retried + /// automatically, including across restarts; this event is emitted only once the splice is + /// given up on. SpliceNegotiationFailed { /// The `channel_id` of the channel. channel_id: ChannelId, @@ -543,6 +548,7 @@ where static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, + splice_retrier: Arc>, } impl EventHandler @@ -557,8 +563,8 @@ where liquidity_source: Arc>>, payment_store: Arc, peer_store: Arc>, keys_manager: Arc, static_invoice_store: Option, onion_messenger: Arc, - om_mailbox: Option>, runtime: Arc, logger: L, - config: Arc, + om_mailbox: Option>, splice_retrier: Arc>, + runtime: Arc, logger: L, config: Arc, ) -> Self { Self { event_queue, @@ -578,6 +584,7 @@ where static_invoice_store, onion_messenger, om_mailbox, + splice_retrier, } } @@ -1604,6 +1611,10 @@ where .handle_channel_ready(user_channel_id, &channel_id, &counterparty_node_id) .await; + self.splice_retrier + .on_channel_ready(UserChannelId(user_channel_id), funding_txo) + .await; + let event = Event::ChannelReady { channel_id, user_channel_id: UserChannelId(user_channel_id), @@ -1637,6 +1648,8 @@ where return Err(ReplayEvent()); } + self.splice_retrier.on_channel_closed(UserChannelId(user_channel_id)).await; + let event = Event::ChannelClosed { channel_id, user_channel_id: UserChannelId(user_channel_id), @@ -1905,15 +1918,26 @@ where channel_id, user_channel_id, counterparty_node_id, - .. + reason, + contribution, } => { log_info!( self.logger, - "Channel {} with counterparty {} splice negotiation failed", + "Channel {} with counterparty {} splice negotiation failed: {:?}", channel_id, counterparty_node_id, + reason, ); + // Only surface failures of splices that are not (or no longer) being retried. + let surface = self + .splice_retrier + .on_negotiation_failed(UserChannelId(user_channel_id), reason, contribution) + .await; + if !surface { + return Ok(()); + } + let event = Event::SpliceNegotiationFailed { channel_id, user_channel_id: UserChannelId(user_channel_id), diff --git a/src/io/mod.rs b/src/io/mod.rs index e16a99975..df974dfe1 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -84,3 +84,7 @@ pub(crate) const STATIC_INVOICE_STORE_PRIMARY_NAMESPACE: &str = "static_invoices /// The pending payment information will be persisted under this prefix. pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "pending_payments"; pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + +/// The per-channel records will be persisted under this prefix. +pub(crate) const CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE: &str = "channel_records"; +pub(crate) const CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/lib.rs b/src/lib.rs index 7b910aa5c..a9a43d44a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,7 @@ mod balance; mod builder; mod chain; +mod channel; pub mod config; mod connection; mod data_store; @@ -128,6 +129,8 @@ pub use builder::BuildError; #[cfg(not(feature = "uniffi"))] pub use builder::NodeBuilder as Builder; use chain::ChainSource; +use channel::store::{ChannelRecord, SpliceIntent, SpliceKind}; +use channel::SpliceRetrier; use config::{ default_user_config, may_announce_channel, AsyncPaymentsRole, ChannelConfig, Config, LNURL_AUTH_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, @@ -151,6 +154,7 @@ use lightning::ln::chan_utils::FUNDING_TRANSACTION_WITNESS_WEIGHT; use lightning::ln::channel_state::ChannelDetails as LdkChannelDetails; pub use lightning::ln::channel_state::ChannelShutdownState; use lightning::ln::channelmanager::PaymentId; +use lightning::ln::funding::FundingContribution; use lightning::ln::msgs::{BaseMessageHandler, SocketAddress}; use lightning::ln::peer_handler::CustomMessageHandler; use lightning::routing::gossip::NodeAlias; @@ -175,9 +179,9 @@ use peer_store::{PeerInfo, PeerStore}; use runtime::Runtime; pub use tokio; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, - HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, - Wallet, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, ChannelRecordStore, + DynStore, Graph, HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, + Scorer, Sweeper, Wallet, }; pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, UserChannelId}; pub use vss_client; @@ -242,6 +246,7 @@ pub struct Node { scorer: Arc>, peer_store: Arc>>, payment_store: Arc, + channel_record_store: Arc, lnurl_auth: Arc, is_running: Arc>, node_metrics: Arc, @@ -593,6 +598,15 @@ impl Node { None }; + let splice_retrier = Arc::new(SpliceRetrier::new( + Arc::clone(&self.channel_manager), + Arc::clone(&self.wallet), + Arc::clone(&self.fee_estimator), + Arc::clone(&self.channel_record_store), + Arc::clone(&self.event_queue), + Arc::clone(&self.logger), + )); + let event_handler = Arc::new(EventHandler::new( Arc::clone(&self.event_queue), Arc::clone(&self.wallet), @@ -608,11 +622,17 @@ impl Node { static_invoice_store, Arc::clone(&self.onion_messenger), self.om_mailbox.clone(), + Arc::clone(&splice_retrier), Arc::clone(&self.runtime), Arc::clone(&self.logger), Arc::clone(&self.config), )); + // Resubmit any persisted splice intents that LDK dropped before durably recording them. + self.runtime.spawn_background_task(async move { + splice_retrier.reconcile().await; + }); + // Setup background processing let background_persister = Arc::clone(&self.kv_store); let background_event_handler = Arc::clone(&event_handler); @@ -1567,6 +1587,46 @@ impl Node { ) } + /// Persists a splice intent so that the splice can be resubmitted if LDK drops it before + /// durably recording it (e.g., when restarting or disconnecting mid-negotiation). Must be + /// called before handing the contribution to [`ChannelManager::funding_contributed`] so that + /// a crash in between is also covered. + /// + /// [`ChannelManager::funding_contributed`]: lightning::ln::channelmanager::ChannelManager::funding_contributed + fn persist_splice_intent( + &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, + channel_details: &LdkChannelDetails, contribution: FundingContribution, kind: SpliceKind, + ) -> Result<(), Error> { + let pre_splice_funding_txo = channel_details.funding_txo.ok_or_else(|| { + log_error!(self.logger, "Failed to splice channel: channel not yet ready"); + Error::ChannelSplicingFailed + })?; + let record = ChannelRecord::Funded { + user_channel_id: *user_channel_id, + counterparty_node_id, + channel_id: channel_details.channel_id, + pending_splice: Some(SpliceIntent { + pre_splice_funding_txo, + contribution, + kind, + attempts: 0, + }), + }; + self.runtime.block_on(self.channel_record_store.insert(record)).map(|_| ()) + } + + /// Removes a splice intent if it still holds the given contribution. The guard ensures an + /// intent persisted by a newer call is left alone. + fn clear_splice_intent( + &self, user_channel_id: &UserChannelId, contribution: &FundingContribution, + ) { + if let Some(record) = self.channel_record_store.get(user_channel_id) { + if record.pending_splice().map(|intent| &intent.contribution) == Some(contribution) { + let _ = self.runtime.block_on(self.channel_record_store.remove(user_channel_id)); + } + } + } + fn splice_in_inner( &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, splice_amount_sats: FundingAmount, @@ -1665,6 +1725,15 @@ impl Node { Error::ChannelSplicingFailed })?; + self.persist_splice_intent( + user_channel_id, + counterparty_node_id, + channel_details, + contribution.clone(), + SpliceKind::In { amount_sats: splice_amount_sats }, + )?; + + let intent_contribution = contribution.clone(); self.channel_manager .funding_contributed( &channel_details.channel_id, @@ -1674,6 +1743,7 @@ impl Node { ) .map_err(|e| { log_error!(self.logger, "Failed to splice channel: {:?}", e); + self.clear_splice_intent(user_channel_id, &intent_contribution); Error::ChannelSplicingFailed }) } else { @@ -1693,6 +1763,10 @@ impl Node { /// it. Once negotiation with the counterparty is complete, the channel remains operational /// while waiting for a new funding transaction to confirm. /// + /// The splice is retried automatically, including across restarts, until it either completes + /// or fails for a reason retrying cannot address, at which point + /// [`Event::SpliceNegotiationFailed`] is emitted. + /// /// # Experimental API /// /// This API is experimental. Currently, a splice-in will be marked as an outbound payment, but @@ -1733,6 +1807,10 @@ impl Node { /// it. Once negotiation with the counterparty is complete, the channel remains operational /// while waiting for a new funding transaction to confirm. /// + /// The splice is retried automatically, including across restarts, until it either completes + /// or fails for a reason retrying cannot address, at which point + /// [`Event::SpliceNegotiationFailed`] is emitted. + /// /// # Experimental API /// /// This API is experimental. Currently, a splice-out will be marked as an inbound payment if @@ -1779,12 +1857,22 @@ impl Node { value: Amount::from_sat(splice_amount_sats), script_pubkey: address.script_pubkey(), }]; - let contribution = - funding_template.splice_out(outputs, min_feerate, max_feerate).map_err(|e| { + let contribution = funding_template + .splice_out(outputs.clone(), min_feerate, max_feerate) + .map_err(|e| { log_error!(self.logger, "Failed to splice channel: {}", e); Error::ChannelSplicingFailed })?; + self.persist_splice_intent( + user_channel_id, + counterparty_node_id, + channel_details, + contribution.clone(), + SpliceKind::Out { outputs }, + )?; + + let intent_contribution = contribution.clone(); self.channel_manager .funding_contributed( &channel_details.channel_id, @@ -1794,6 +1882,7 @@ impl Node { ) .map_err(|e| { log_error!(self.logger, "Failed to splice channel: {:?}", e); + self.clear_splice_intent(user_channel_id, &intent_contribution); Error::ChannelSplicingFailed }) } else { @@ -1812,6 +1901,10 @@ impl Node { /// If a prior splice negotiation is pending, this bumps its feerate via RBF. The prior /// contribution is reused when possible; otherwise, coin selection is re-run. /// + /// The fee bump is retried automatically, including across restarts, until it either + /// completes or fails for a reason retrying cannot address, at which point + /// [`Event::SpliceNegotiationFailed`] is emitted. + /// /// # Experimental API /// /// This API is experimental and may change in the future. @@ -1852,6 +1945,15 @@ impl Node { Error::ChannelSplicingFailed })?; + self.persist_splice_intent( + user_channel_id, + counterparty_node_id, + channel_details, + contribution.clone(), + SpliceKind::Rbf {}, + )?; + + let intent_contribution = contribution.clone(); self.channel_manager .funding_contributed( &channel_details.channel_id, @@ -1861,6 +1963,7 @@ impl Node { ) .map_err(|e| { log_error!(self.logger, "Failed to RBF channel: {:?}", e); + self.clear_splice_intent(user_channel_id, &intent_contribution); Error::ChannelSplicingFailed }) } else { diff --git a/src/types.rs b/src/types.rs index 64209430b..49c19e723 100644 --- a/src/types.rs +++ b/src/types.rs @@ -38,6 +38,7 @@ use lightning_net_tokio::SocketDescriptor; use crate::chain::bitcoind::UtxoSourceClient; use crate::chain::ChainSource; +use crate::channel::store::ChannelRecord; use crate::config::ChannelConfig; use crate::data_store::DataStore; use crate::fee_estimator::OnchainFeeEstimator; @@ -318,7 +319,7 @@ pub(crate) type PaymentStore = DataStore>; /// A local, potentially user-provided, identifier of a channel. /// /// By default, this will be randomly generated for the user to ensure local uniqueness. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct UserChannelId(pub u128); impl Writeable for UserChannelId { @@ -628,3 +629,5 @@ impl From<&(u64, Vec)> for CustomTlvRecord { } pub(crate) type PendingPaymentStore = DataStore>; + +pub(crate) type ChannelRecordStore = DataStore>; diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index e122f9345..1ab2bec54 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1337,6 +1337,193 @@ async fn rbf_splice_channel() { node_b.stop().unwrap(); } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn splice_resumed_after_restart() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + // Set up node_a manually so it can be restarted with the same config. + let mut config_a = random_config(true); + config_a.store_type = TestStoreType::Sqlite; + let config_b = random_config(true); + let node_b = setup_node(&chain_source, config_b); + + let onchain_balance_before_sat = { + let node_a = setup_node(&chain_source, config_a.clone()); + + let address_a = node_a.onchain_payment().new_address().unwrap(); + let address_b = node_b.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 5_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a, address_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 4_000_000, false, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); + expect_channel_ready_event!(node_b, node_a.node_id()); + + // Initiate a splice-out while disconnected: LDK accepts the contribution but cannot make + // progress before the restart below drops it, having neither negotiated nor persisted + // anything. Only the persisted splice intent allows resuming the splice. + node_a.disconnect(node_b.node_id()).unwrap(); + let address = node_a.onchain_payment().new_address().unwrap(); + node_a.splice_out(&user_channel_id_a, node_b.node_id(), &address, 500_000).unwrap(); + + let onchain_balance_before_sat = node_a.list_balances().total_onchain_balance_sats; + node_a.stop().unwrap(); + onchain_balance_before_sat + }; + + // On restart, the reconciler resubmits the splice, which proceeds once the peer connects. + let node_a = setup_node(&chain_source, config_a.clone()); + node_a.sync_wallets().unwrap(); + let node_b_addr = node_b.listening_addresses().unwrap().first().unwrap().clone(); + node_a.connect(node_b.node_id(), node_b_addr.clone(), false).unwrap(); + + let txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + + wait_for_tx(&electrsd.client, txo.txid).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_channel_ready_event!(node_b, node_a.node_id()); + + assert!( + node_a.list_balances().total_onchain_balance_sats > onchain_balance_before_sat + 400_000, + "resumed splice-out should have moved ~500k sats to the on-chain balance", + ); + + // The locked splice cleared the intent, so another restart must not resubmit it. + node_a.stop().unwrap(); + let node_a = setup_node(&chain_source, config_a); + node_a.sync_wallets().unwrap(); + node_a.connect(node_b.node_id(), node_b_addr, false).unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + assert!(node_a.next_event().is_none(), "completed splice should not be resubmitted"); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn splice_rbf_resumed_after_restart() { + // Use a custom bitcoind config with a lower incrementalrelayfee so that the +25 sat/kwu + // (0.1 sat/vB) RBF feerate bump satisfies BIP125's absolute fee increase requirement. + let bitcoind_exe = std::env::var("BITCOIND_EXE") + .ok() + .or_else(|| corepc_node::downloaded_exe_path().ok()) + .expect( + "you need to provide an env var BITCOIND_EXE or specify a bitcoind version feature", + ); + let mut bitcoind_conf = corepc_node::Conf::default(); + bitcoind_conf.network = "regtest"; + bitcoind_conf.args.push("-rest"); + bitcoind_conf.args.push("-incrementalrelayfee=0.00000100"); + let bitcoind = BitcoinD::with_conf(bitcoind_exe, &bitcoind_conf).unwrap(); + + let electrs_exe = std::env::var("ELECTRS_EXE") + .ok() + .or_else(electrsd::downloaded_exe_path) + .expect("you need to provide env var ELECTRS_EXE or specify an electrsd version feature"); + let mut electrsd_conf = electrsd::Conf::default(); + electrsd_conf.http_enabled = true; + electrsd_conf.network = "regtest"; + let electrsd = ElectrsD::with_conf(electrs_exe, &bitcoind, &electrsd_conf).unwrap(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + // Set up node_a manually so it can be restarted with the same config. + let mut config_a = random_config(true); + config_a.store_type = TestStoreType::Sqlite; + let config_b = random_config(true); + let node_b = setup_node(&chain_source, config_b); + + let original_txo = { + let node_a = setup_node(&chain_source, config_a.clone()); + + let address_a = node_a.onchain_payment().new_address().unwrap(); + let address_b = node_b.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 5_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a, address_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 4_000_000, false, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); + expect_channel_ready_event!(node_b, node_a.node_id()); + + // Negotiate a splice but leave its transaction unconfirmed so it can be fee-bumped. + node_a.splice_in(&user_channel_id_a, node_b.node_id(), 500_000).unwrap(); + let original_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + wait_for_tx(&electrsd.client, original_txo.txid).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Bump the fee while disconnected and restart before anything could be negotiated: only + // the persisted intent knows about the fee bump, while LDK still has the negotiated + // splice at the original feerate. + node_a.disconnect(node_b.node_id()).unwrap(); + node_a.bump_channel_funding_fee(&user_channel_id_a, node_b.node_id()).unwrap(); + node_a.stop().unwrap(); + original_txo + }; + + // On restart, the reconciler sees that the negotiated splice is still at a lower feerate + // than the persisted fee-bump intent and resubmits the bump. + let node_a = setup_node(&chain_source, config_a.clone()); + node_a.sync_wallets().unwrap(); + let node_b_addr = node_b.listening_addresses().unwrap().first().unwrap().clone(); + node_a.connect(node_b.node_id(), node_b_addr.clone(), false).unwrap(); + + let rbf_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + assert_ne!(original_txo, rbf_txo, "resubmitted RBF should produce a different funding txo"); + + // Restarting again must not resubmit the bump: the negotiated splice now carries it. + node_a.stop().unwrap(); + let node_a = setup_node(&chain_source, config_a); + node_a.sync_wallets().unwrap(); + node_a.connect(node_b.node_id(), node_b_addr, false).unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + assert!(node_a.next_event().is_none(), "carried-out fee bump should not be resubmitted"); + + wait_for_tx(&electrsd.client, rbf_txo.txid).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_channel_ready_event!(node_b, node_a.node_id()); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn bump_fee_rbf_rejects_funding_payment() { // A channel-funding or splice transaction is driven by LDK's funding/splice lifecycle, not the