From 557d9065c6a063388d5344fb9f5fd7468df7e5fc Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 18 Mar 2026 23:11:20 -0500 Subject: [PATCH 1/8] Add rbf_channel API for fee-bumping pending splices When a splice is already pending, the user needs a way to replace its funding transaction at a higher feerate. This adds rbf_channel() to handle that case and guards splice_in/splice_out against being called while a pending splice exists, directing users to rbf_channel instead. Also fixes signing for RBF replacements, which requires accessing outputs spent by unconfirmed transactions. Co-Authored-By: Claude Opus 4.6 (1M context) --- bindings/ldk_node.udl | 2 + src/lib.rs | 71 ++++++++++++++++++- src/wallet/mod.rs | 8 ++- tests/integration_tests_rust.rs | 118 +++++++++++++++++++++++++++++++- 4 files changed, 194 insertions(+), 5 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 7e9e61f5d..8a92b4f0c 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -125,6 +125,8 @@ interface Node { [Throws=NodeError] void splice_out([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, [ByRef]Address address, u64 splice_amount_sats); [Throws=NodeError] + void rbf_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); + [Throws=NodeError] void close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); [Throws=NodeError] void force_close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, string? reason); diff --git a/src/lib.rs b/src/lib.rs index 614be098b..de18021ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1595,7 +1595,7 @@ impl Node { if funding_template.prior_contribution().is_some() { log_error!( self.logger, - "Failed to splice channel: a prior splice contribution is pending" + "Failed to splice channel: a prior splice contribution is pending; use rbf_channel instead" ); return Err(Error::ChannelSplicingFailed); } @@ -1716,7 +1716,7 @@ impl Node { if funding_template.prior_contribution().is_some() { log_error!( self.logger, - "Failed to splice channel: a prior splice contribution is pending" + "Failed to splice channel: a prior splice contribution is pending; use rbf_channel instead" ); return Err(Error::ChannelSplicingFailed); } @@ -1753,6 +1753,73 @@ impl Node { } } + /// Replace a pending splice's funding transaction with a higher-feerate version. + /// + /// If a prior splice negotiation is pending, this bumps its feerate via RBF. The prior + /// contribution is reused when possible; otherwise, coin selection is re-run. + /// + /// # Experimental API + /// + /// This API is experimental and may change in the future. + pub fn rbf_channel( + &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, + ) -> Result<(), Error> { + let open_channels = + self.channel_manager.list_channels_with_counterparty(&counterparty_node_id); + if let Some(channel_details) = + open_channels.iter().find(|c| c.user_channel_id == user_channel_id.0) + { + let min_feerate = + self.fee_estimator.estimate_fee_rate(ConfirmationTarget::ChannelFunding); + let max_feerate = FeeRate::from_sat_per_kwu(min_feerate.to_sat_per_kwu() * 3 / 2); + + let funding_template = self + .channel_manager + .splice_channel(&channel_details.channel_id, &counterparty_node_id) + .map_err(|e| { + log_error!(self.logger, "Failed to RBF channel: {:?}", e); + Error::ChannelSplicingFailed + })?; + + if funding_template.min_rbf_feerate().is_none() { + log_error!(self.logger, "Failed to RBF channel: no pending splice to replace"); + return Err(Error::ChannelSplicingFailed); + } + + let contribution = self + .runtime + .block_on(funding_template.rbf_prior_contribution( + None, + max_feerate, + Arc::clone(&self.wallet), + )) + .map_err(|e| { + log_error!(self.logger, "Failed to RBF channel: {}", e); + Error::ChannelSplicingFailed + })?; + + self.channel_manager + .funding_contributed( + &channel_details.channel_id, + &counterparty_node_id, + contribution, + None, + ) + .map_err(|e| { + log_error!(self.logger, "Failed to RBF channel: {:?}", e); + Error::ChannelSplicingFailed + }) + } else { + log_error!( + self.logger, + "Channel not found for user_channel_id {} and counterparty {}", + user_channel_id, + counterparty_node_id + ); + Err(Error::ChannelSplicingFailed) + } + } + /// Manually sync the LDK and BDK wallets with the current chain state and update the fee rate /// cache. /// diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 13b1f384f..db212cd24 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -5,6 +5,7 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use std::collections::HashMap; use std::future::Future; use std::ops::Deref; use std::str::FromStr; @@ -1082,9 +1083,12 @@ impl Wallet { let mut psbt = Psbt::from_unsigned_tx(unsigned_tx).map_err(|e| { log_error!(self.logger, "Failed to construct PSBT: {}", e); })?; + // Use list_output rather than get_utxo to include outputs spent by unconfirmed + // transactions (e.g., a prior splice being replaced via RBF). + let mut wallet_outputs: HashMap = + locked_wallet.list_output().map(|o| (o.outpoint, o)).collect(); for (i, txin) in psbt.unsigned_tx.input.iter().enumerate() { - if let Some(utxo) = locked_wallet.get_utxo(txin.previous_output) { - debug_assert!(!utxo.is_spent); + if let Some(utxo) = wallet_outputs.remove(&txin.previous_output) { psbt.inputs[i] = locked_wallet.get_psbt_input(utxo, None, true).map_err(|e| { log_error!(self.logger, "Failed to construct PSBT input: {}", e); })?; diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 1ea6c4584..a3388f595 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -26,7 +26,7 @@ use common::{ setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, splice_in_with_all, wait_for_tx, TestChainSource, TestConfig, TestStoreType, TestSyncStore, }; -use electrsd::corepc_node::Node as BitcoinD; +use electrsd::corepc_node::{self, Node as BitcoinD}; use electrsd::ElectrsD; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; @@ -1142,6 +1142,122 @@ async fn splice_channel() { ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn rbf_splice_channel() { + // Use a custom bitcoind config with a lower incrementalrelayfee so that the +25 sat/kwu + // (0.1 sat/vB) RBF feerate bump satisfies BIP125's absolute fee increase requirement. + let bitcoind_exe = std::env::var("BITCOIND_EXE") + .ok() + .or_else(|| corepc_node::downloaded_exe_path().ok()) + .expect( + "you need to provide an env var BITCOIND_EXE or specify a bitcoind version feature", + ); + let mut bitcoind_conf = corepc_node::Conf::default(); + bitcoind_conf.network = "regtest"; + bitcoind_conf.args.push("-rest"); + bitcoind_conf.args.push("-incrementalrelayfee=0.00000100"); + let bitcoind = BitcoinD::with_conf(bitcoind_exe, &bitcoind_conf).unwrap(); + + let electrs_exe = std::env::var("ELECTRS_EXE") + .ok() + .or_else(electrsd::downloaded_exe_path) + .expect("you need to provide env var ELECTRS_EXE or specify an electrsd version feature"); + let mut electrsd_conf = electrsd::Conf::default(); + electrsd_conf.http_enabled = true; + electrsd_conf.network = "regtest"; + let electrsd = ElectrsD::with_conf(electrs_exe, &bitcoind, &electrsd_conf).unwrap(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + + let address_a = node_a.onchain_payment().new_address().unwrap(); + let address_b = node_b.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 5_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a, address_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 4_000_000, false, &electrsd).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); + let user_channel_id_b = expect_channel_ready_event!(node_b, node_a.node_id()); + + // rbf_channel should fail when there's no pending splice + assert_eq!( + node_b.rbf_channel(&user_channel_id_b, node_a.node_id()), + Err(NodeError::ChannelSplicingFailed), + ); + + // Initiate a splice-in to create a pending splice + node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000).unwrap(); + + let original_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + + // splice_in should fail when there's a pending splice (RBF guard) + assert_eq!( + node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000), + Err(NodeError::ChannelSplicingFailed), + ); + + // splice_out should fail when there's a pending splice (RBF guard) + let address = node_a.onchain_payment().new_address().unwrap(); + assert_eq!( + node_a.splice_out(&user_channel_id_a, node_b.node_id(), &address, 100_000), + Err(NodeError::ChannelSplicingFailed), + ); + + // rbf_channel should succeed when there's a pending splice + node_b.rbf_channel(&user_channel_id_b, node_a.node_id()).unwrap(); + + let rbf_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + + assert_ne!(original_txo, rbf_txo, "RBF should produce a different funding txo"); + + // Wait for the RBF transaction to replace the original in the mempool + wait_for_tx(&electrsd.client, rbf_txo.txid).await; + + // Mine blocks and confirm the RBF splice + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Verify the RBF transaction is the one that locked, not the original + match node_a.next_event_async().await { + Event::ChannelReady { funding_txo, counterparty_node_id, .. } => { + assert_eq!(counterparty_node_id, Some(node_b.node_id())); + assert_eq!(funding_txo, Some(rbf_txo)); + node_a.event_handled().unwrap(); + }, + ref e => panic!("node_a got unexpected event: {:?}", e), + } + match node_b.next_event_async().await { + Event::ChannelReady { funding_txo, counterparty_node_id, .. } => { + assert_eq!(counterparty_node_id, Some(node_a.node_id())); + assert_eq!(funding_txo, Some(rbf_txo)); + node_b.event_handled().unwrap(); + }, + ref e => panic!("node_b got unexpected event: {:?}", e), + } + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn simple_bolt12_send_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); From 9cbdb063e372c5038e2f4f4dee3d39cd100e9b0b Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Fri, 5 Jun 2026 14:28:08 -0500 Subject: [PATCH 2/8] f - Rename rbf_channel to bump_channel_funding_fee Co-Authored-By: Claude --- bindings/ldk_node.udl | 2 +- src/lib.rs | 6 +++--- tests/integration_tests_rust.rs | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 8a92b4f0c..8198cfb34 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -125,7 +125,7 @@ interface Node { [Throws=NodeError] void splice_out([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, [ByRef]Address address, u64 splice_amount_sats); [Throws=NodeError] - void rbf_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); + void bump_channel_funding_fee([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); [Throws=NodeError] void close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); [Throws=NodeError] diff --git a/src/lib.rs b/src/lib.rs index de18021ae..f9edc32ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1595,7 +1595,7 @@ impl Node { if funding_template.prior_contribution().is_some() { log_error!( self.logger, - "Failed to splice channel: a prior splice contribution is pending; use rbf_channel instead" + "Failed to splice channel: a prior splice contribution is pending; use bump_channel_funding_fee instead" ); return Err(Error::ChannelSplicingFailed); } @@ -1716,7 +1716,7 @@ impl Node { if funding_template.prior_contribution().is_some() { log_error!( self.logger, - "Failed to splice channel: a prior splice contribution is pending; use rbf_channel instead" + "Failed to splice channel: a prior splice contribution is pending; use bump_channel_funding_fee instead" ); return Err(Error::ChannelSplicingFailed); } @@ -1761,7 +1761,7 @@ impl Node { /// # Experimental API /// /// This API is experimental and may change in the future. - pub fn rbf_channel( + pub fn bump_channel_funding_fee( &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, ) -> Result<(), Error> { let open_channels = diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index a3388f595..c2f19115e 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1194,9 +1194,9 @@ async fn rbf_splice_channel() { let user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); let user_channel_id_b = expect_channel_ready_event!(node_b, node_a.node_id()); - // rbf_channel should fail when there's no pending splice + // bump_channel_funding_fee should fail when there's no pending splice assert_eq!( - node_b.rbf_channel(&user_channel_id_b, node_a.node_id()), + node_b.bump_channel_funding_fee(&user_channel_id_b, node_a.node_id()), Err(NodeError::ChannelSplicingFailed), ); @@ -1219,8 +1219,8 @@ async fn rbf_splice_channel() { Err(NodeError::ChannelSplicingFailed), ); - // rbf_channel should succeed when there's a pending splice - node_b.rbf_channel(&user_channel_id_b, node_a.node_id()).unwrap(); + // bump_channel_funding_fee should succeed when there's a pending splice + node_b.bump_channel_funding_fee(&user_channel_id_b, node_a.node_id()).unwrap(); let rbf_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); expect_splice_negotiated_event!(node_b, node_a.node_id()); From b91de2386f1525e9bf01ba6c33c4a1f892478177 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Mon, 20 Apr 2026 19:15:37 -0500 Subject: [PATCH 3/8] Tie funding payment status transitions to Lightning lifecycle events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Channel-opening and splice transactions transition to Succeeded when ChannelReady fires, not after ANTI_REORG_DELAY confirmations. This matches the point at which the Lightning layer considers the channel usable: a zero-conf channel graduates as soon as its counterparty signals, and a high-conf channel waits however many confirmations the peer requires, rather than always stopping at six. For splice RBF, the payment records whichever candidate actually confirmed, with that candidate's amount and this node's share of the fee — not the fee-estimate used for weight at coin-selection time, and not the whole-tx fee for a multi-contributor splice. A channel closure whose funding or splice never confirmed discards its payment record instead of leaving it pending forever. Generated with assistance from Claude Code. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/builder.rs | 2 + src/event.rs | 22 ++ src/payment/pending_payment_store.rs | 45 ++- src/tx_broadcaster.rs | 35 +- src/wallet/mod.rs | 571 +++++++++++++++++++++++++-- tests/integration_tests_rust.rs | 49 +++ 6 files changed, 693 insertions(+), 31 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 03ded494f..1d32a8a12 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1624,6 +1624,8 @@ fn build_with_store_internal( Arc::clone(&pending_payment_store), )); + tx_broadcaster.set_wallet(Arc::downgrade(&wallet)); + // Initialize the KeysManager let cur_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).map_err(|e| { log_error!(logger, "Failed to get current time: {}", e); diff --git a/src/event.rs b/src/event.rs index 7d23be99a..304737269 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1580,6 +1580,18 @@ where ); } + if let Err(e) = + self.wallet.handle_channel_ready(channel_id, funding_txo.map(|txo| txo.txid)) + { + log_error!( + self.logger, + "Failed to graduate funding payment on ChannelReady for channel {}: {:?}", + channel_id, + e, + ); + return Err(ReplayEvent()); + } + if let Some(liquidity_source) = self.liquidity_source.as_ref() { liquidity_source .handle_channel_ready(user_channel_id, &channel_id, &counterparty_node_id) @@ -1609,6 +1621,16 @@ where } => { log_info!(self.logger, "Channel {} closed due to: {}", channel_id, reason); + if let Err(e) = self.wallet.handle_channel_closed(channel_id) { + log_error!( + self.logger, + "Failed to handle ChannelClosed for channel {}: {:?}", + channel_id, + e, + ); + return Err(ReplayEvent()); + } + let event = Event::ChannelClosed { channel_id, user_channel_id: UserChannelId(user_channel_id), diff --git a/src/payment/pending_payment_store.rs b/src/payment/pending_payment_store.rs index eb72f89ec..16837d70c 100644 --- a/src/payment/pending_payment_store.rs +++ b/src/payment/pending_payment_store.rs @@ -6,6 +6,7 @@ // accordance with one or both of these licenses. use bitcoin::Txid; +use lightning::chain::chaininterface::FundingCandidate; use lightning::impl_writeable_tlv_based; use lightning::ln::channelmanager::PaymentId; @@ -13,6 +14,19 @@ use crate::data_store::{StorableObject, StorableObjectUpdate}; use crate::payment::store::PaymentDetailsUpdate; use crate::payment::PaymentDetails; +/// Marks an on-chain payment as belonging to an interactive-funding negotiation. The +/// last entry in `candidates` is the currently-broadcast tx; earlier entries are RBF +/// predecessors that may still confirm if reorgs intervene. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct FundingDetails { + /// Every negotiated candidate, oldest first. + pub candidates: Vec, +} + +impl_writeable_tlv_based!(FundingDetails, { + (0, candidates, optional_vec), +}); + /// Represents a pending payment #[derive(Clone, Debug, PartialEq, Eq)] pub struct PendingPaymentDetails { @@ -20,11 +34,24 @@ pub struct PendingPaymentDetails { pub details: PaymentDetails, /// Transaction IDs that have replaced or conflict with this payment. pub conflicting_txids: Vec, + /// Set when the payment's transaction is an interactive-funding broadcast (channel + /// open or splice). The record transitions to [`PaymentStatus::Succeeded`] on + /// `ChannelReady` instead of after [`ANTI_REORG_DELAY`] confirmations. + /// + /// [`PaymentStatus::Succeeded`]: crate::payment::store::PaymentStatus::Succeeded + /// [`ANTI_REORG_DELAY`]: lightning::chain::channelmonitor::ANTI_REORG_DELAY + pub funding_details: Option, } impl PendingPaymentDetails { pub(crate) fn new(details: PaymentDetails, conflicting_txids: Vec) -> Self { - Self { details, conflicting_txids } + Self { details, conflicting_txids, funding_details: None } + } + + pub(crate) fn with_funding_details( + details: PaymentDetails, conflicting_txids: Vec, funding_details: FundingDetails, + ) -> Self { + Self { details, conflicting_txids, funding_details: Some(funding_details) } } /// Convert to finalized payment for the main payment store @@ -36,6 +63,7 @@ impl PendingPaymentDetails { impl_writeable_tlv_based!(PendingPaymentDetails, { (0, details, required), (2, conflicting_txids, optional_vec), + (4, funding_details, option), }); #[derive(Clone, Debug, PartialEq, Eq)] @@ -43,6 +71,7 @@ pub(crate) struct PendingPaymentDetailsUpdate { pub id: PaymentId, pub payment_update: Option, pub conflicting_txids: Option>, + pub funding_details: Option>, } impl StorableObject for PendingPaymentDetails { @@ -68,6 +97,13 @@ impl StorableObject for PendingPaymentDetails { } } + if let Some(new_funding_details) = update.funding_details { + if self.funding_details != new_funding_details { + self.funding_details = new_funding_details; + updated = true; + } + } + updated } @@ -89,6 +125,11 @@ impl From<&PendingPaymentDetails> for PendingPaymentDetailsUpdate { } else { Some(value.conflicting_txids.clone()) }; - Self { id: value.id(), payment_update: Some(value.details.to_update()), conflicting_txids } + Self { + id: value.id(), + payment_update: Some(value.details.to_update()), + conflicting_txids, + funding_details: Some(value.funding_details.clone()), + } } } diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index 7084135b0..24abf8f11 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -6,12 +6,14 @@ // accordance with one or both of these licenses. use std::ops::Deref; +use std::sync::{Mutex as StdMutex, Weak}; use bitcoin::Transaction; use lightning::chain::chaininterface::{BroadcasterInterface, TransactionType}; use tokio::sync::{mpsc, Mutex, MutexGuard}; use crate::logger::{log_error, LdkLogger}; +use crate::types::Wallet; const BCAST_PACKAGE_QUEUE_SIZE: usize = 50; @@ -21,6 +23,12 @@ where { queue_sender: mpsc::Sender>, queue_receiver: Mutex>>, + /// Weak handle to the [`Wallet`] that performs classification of funding broadcasts + /// (channel opens and splices) into payment records. Remains `None` while the + /// builder is wiring the node up, during which broadcasts are still forwarded to + /// the queue but no payment record is written. [`Self::set_wallet`] installs the + /// handle once the [`Wallet`] exists. + wallet: StdMutex>>, logger: L, } @@ -30,7 +38,19 @@ where { pub(crate) fn new(logger: L) -> Self { let (queue_sender, queue_receiver) = mpsc::channel(BCAST_PACKAGE_QUEUE_SIZE); - Self { queue_sender, queue_receiver: Mutex::new(queue_receiver), logger } + Self { + queue_sender, + queue_receiver: Mutex::new(queue_receiver), + wallet: StdMutex::new(None), + logger, + } + } + + /// Installs the [`Wallet`] handle used to classify funding broadcasts (channel + /// opens and splices) into payment records. Called once the builder has constructed + /// both the broadcaster and the wallet. + pub(crate) fn set_wallet(&self, wallet: Weak) { + *self.wallet.lock().expect("lock") = Some(wallet); } pub(crate) async fn get_broadcast_queue( @@ -45,6 +65,19 @@ where L::Target: LdkLogger, { fn broadcast_transactions(&self, txs: &[(&Transaction, TransactionType)]) { + let wallet = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade); + if let Some(wallet) = wallet { + for (tx, tx_type) in txs { + if let Err(e) = wallet.classify_broadcast(tx, tx_type) { + log_error!( + self.logger, + "Failed to classify broadcast tx {}: {:?}", + tx.compute_txid(), + e, + ); + } + } + } let package = txs.iter().map(|(t, _)| (*t).clone()).collect::>(); self.queue_sender.try_send(package).unwrap_or_else(|e| { log_error!(self.logger, "Failed to broadcast transactions: {}", e); diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index db212cd24..3e4ac41b3 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -33,14 +33,16 @@ use bitcoin::{ WitnessProgram, WitnessVersion, }; use lightning::chain::chaininterface::{ - BroadcasterInterface, INCREMENTAL_RELAY_FEE_SAT_PER_1000_WEIGHT, + BroadcasterInterface, TransactionType, INCREMENTAL_RELAY_FEE_SAT_PER_1000_WEIGHT, }; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::{BlockLocator, ClaimId, Listen}; use lightning::ln::channelmanager::PaymentId; +use lightning::ln::funding::FundingContribution; use lightning::ln::inbound_payment::ExpandedKey; use lightning::ln::msgs::UnsignedGossipMessage; use lightning::ln::script::ShutdownScript; +use lightning::ln::types::ChannelId as LnChannelId; use lightning::sign::{ ChangeDestinationSource, EntropySource, InMemorySigner, KeysManager, NodeSigner, OutputSpender, PeerStorageKey, Recipient, SignerProvider, SpendableOutputDescriptor, @@ -55,6 +57,9 @@ use persist::KVStoreWalletPersister; use crate::config::Config; use crate::fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; +use lightning::chain::chaininterface::{ChannelFunding, FundingCandidate, FundingPurpose}; + +use crate::payment::pending_payment_store::FundingDetails; use crate::payment::store::ConfirmationStatus; use crate::payment::{ PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, PendingPaymentDetails, @@ -251,18 +256,9 @@ impl Wallet { for event in events { match event { WalletEvent::TxConfirmed { txid, tx, block_time, .. } => { - let cur_height = locked_wallet.latest_checkpoint().height(); - let confirmation_height = block_time.block_id.height; - let payment_status = if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 - { - PaymentStatus::Succeeded - } else { - PaymentStatus::Pending - }; - let confirmation_status = ConfirmationStatus::Confirmed { block_hash: block_time.block_id.hash, - height: confirmation_height, + height: block_time.block_id.height, timestamp: block_time.confirmation_time, }; @@ -270,6 +266,23 @@ impl Wallet { .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + if self.apply_funding_details_status_update( + payment_id, + txid, + confirmation_status, + )? { + continue; + } + + let cur_height = locked_wallet.latest_checkpoint().height(); + let confirmation_height = block_time.block_id.height; + let payment_status = if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 + { + PaymentStatus::Succeeded + } else { + PaymentStatus::Pending + }; + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -279,13 +292,12 @@ impl Wallet { confirmation_status, ); - self.payment_store.insert_or_update(payment.clone())?; - if payment_status == PaymentStatus::Pending { let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); - - self.pending_payment_store.insert_or_update(pending_payment)?; + self.persist_pending(pending_payment)?; + } else { + self.payment_store.insert_or_update(payment)?; } }, WalletEvent::ChainTipChanged { new_tip, .. } => { @@ -296,8 +308,11 @@ impl Wallet { "Non-pending payment {:?} found in pending store", p.details.id, ); + // Funding records complete on `ChannelReady`, not after + // `ANTI_REORG_DELAY` confirmations. p.details.status == PaymentStatus::Pending && matches!(p.details.kind, PaymentKind::Onchain { .. }) + && p.funding_details.is_none() }); let mut unconfirmed_outbound_txids: Vec = Vec::new(); @@ -358,6 +373,14 @@ impl Wallet { .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + if self.apply_funding_details_status_update( + payment_id, + txid, + ConfirmationStatus::Unconfirmed, + )? { + continue; + } + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -366,10 +389,8 @@ impl Wallet { PaymentStatus::Pending, ConfirmationStatus::Unconfirmed, ); - let pending_payment = - self.create_pending_payment_from_tx(payment.clone(), Vec::new()); - self.payment_store.insert_or_update(payment)?; - self.pending_payment_store.insert_or_update(pending_payment)?; + let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); + self.persist_pending(pending_payment)?; }, WalletEvent::TxReplaced { txid, conflicts, .. } => { let Some(payment_id) = self.find_payment_by_txid(txid) else { @@ -405,6 +426,15 @@ impl Wallet { let payment_id = self .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + + if self.apply_funding_details_status_update( + payment_id, + txid, + ConfirmationStatus::Unconfirmed, + )? { + continue; + } + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -413,10 +443,8 @@ impl Wallet { PaymentStatus::Pending, ConfirmationStatus::Unconfirmed, ); - let pending_payment = - self.create_pending_payment_from_tx(payment.clone(), Vec::new()); - self.payment_store.insert_or_update(payment)?; - self.pending_payment_store.insert_or_update(pending_payment)?; + let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); + self.persist_pending(pending_payment)?; }, _ => { continue; @@ -1146,6 +1174,41 @@ impl Wallet { Ok(tx) } + /// Computes the amount, fee, and direction of an on-chain payment from the + /// wallet's view of the transaction. Used by [`TransactionBroadcaster`] to + /// describe a single-funded channel-open, for which no [`FundingContribution`] + /// is available. + /// + /// [`TransactionBroadcaster`]: crate::tx_broadcaster::TransactionBroadcaster + /// [`FundingContribution`]: lightning::ln::funding::FundingContribution + pub(crate) fn onchain_payment_fields( + &self, tx: &Transaction, + ) -> (Option, Option, PaymentDirection) { + let locked_wallet = self.inner.lock().expect("lock"); + let fee = locked_wallet.calculate_fee(tx).unwrap_or(Amount::ZERO); + let (sent, received) = locked_wallet.sent_and_received(tx); + let fee_sat = fee.to_sat(); + + let (direction, amount_msat) = if sent > received { + ( + PaymentDirection::Outbound, + Some( + (sent.to_sat().saturating_sub(fee_sat).saturating_sub(received.to_sat())) + * 1000, + ), + ) + } else { + ( + PaymentDirection::Inbound, + Some( + received.to_sat().saturating_sub(sent.to_sat().saturating_sub(fee_sat)) * 1000, + ), + ) + }; + + (amount_msat, Some(fee_sat * 1000), direction) + } + fn create_payment_from_tx( &self, locked_wallet: &PersistedWallet, txid: Txid, payment_id: PaymentId, tx: &Transaction, payment_status: PaymentStatus, @@ -1202,6 +1265,223 @@ impl Wallet { PendingPaymentDetails::new(payment, conflicting_txids) } + /// Writes a [`PendingPaymentDetails`] and its inner [`PaymentDetails`] to their + /// respective stores in a fixed order. Callers that need to keep the two stores in + /// sync should always go through this. + fn persist_pending(&self, pending: PendingPaymentDetails) -> Result<(), Error> { + self.payment_store.insert_or_update(pending.details.clone())?; + self.pending_payment_store.insert_or_update(pending)?; + Ok(()) + } + + /// Called on `ChannelReady` to mark a funding payment (channel open or splice) as + /// succeeded. + /// + /// If `funding_txo_txid` matches a candidate other than the currently-active one, + /// that candidate is promoted to active first and the outer [`PaymentDetails`] is + /// updated from its contribution. If no candidate matches (the confirmed funding + /// txid belongs to a broadcast this node didn't contribute to), the pending record + /// is left in place for later handling. + pub(crate) fn handle_channel_ready( + &self, channel_id: LnChannelId, funding_txo_txid: Option, + ) -> Result<(), Error> { + let funding_txo_txid = match funding_txo_txid { + Some(t) => t, + None => return Ok(()), + }; + + let mut pending = match self + .pending_payment_store + .list_filter(|p| { + p.funding_details + .as_ref() + .map(|fd| record_includes_channel(fd, channel_id)) + .unwrap_or(false) + }) + .into_iter() + .next() + { + Some(p) => p, + None => return Ok(()), + }; + let funding_details = match pending.funding_details.clone() { + Some(fd) => fd, + None => return Ok(()), + }; + + let candidate = match funding_details.candidates.iter().find(|c| c.txid == funding_txo_txid) + { + Some(c) => c.clone(), + None => { + // Confirmed `funding_txo` wasn't produced by any of our broadcasts. The + // record is left alone; some higher-level flow decides what to do. + log_debug!( + self.logger, + "ChannelReady for channel {}: confirmed funding_txo {} is not one of our candidates", + channel_id, + funding_txo_txid, + ); + return Ok(()); + }, + }; + + let old_txid = match pending.details.kind { + PaymentKind::Onchain { txid, .. } => txid, + _ => { + debug_assert!(false, "funding record must use PaymentKind::Onchain"); + return Ok(()); + }, + }; + + if old_txid != funding_txo_txid { + if !pending.conflicting_txids.contains(&old_txid) { + pending.conflicting_txids.push(old_txid); + } + pending.conflicting_txids.retain(|t| *t != funding_txo_txid); + + let aggregate = aggregate_local_stakes(&candidate); + pending.details.amount_msat = aggregate.amount_msat; + pending.details.fee_paid_msat = aggregate.fee_paid_msat; + } + + // Preserve the confirmation status already on the record (set by wallet sync if + // it's seen the tx confirm). `ChannelReady` alone doesn't carry block details. + let existing_status = match pending.details.kind { + PaymentKind::Onchain { status, .. } => status, + _ => ConfirmationStatus::Unconfirmed, + }; + pending.details.kind = + PaymentKind::Onchain { txid: funding_txo_txid, status: existing_status }; + + pending.details.status = PaymentStatus::Succeeded; + let payment_id = pending.details.id; + self.payment_store.insert_or_update(pending.details)?; + self.pending_payment_store.remove(&payment_id)?; + + Ok(()) + } + + /// Called on `ChannelClosed`. Removes any funding record (channel open or splice) + /// for `channel_id` whose candidates never reached confirmed — e.g. a funding + /// transaction that never made it on-chain. A record that does reflect a confirmed + /// transaction is left alone and will transition to `Succeeded` normally. + pub(crate) fn handle_channel_closed(&self, channel_id: LnChannelId) -> Result<(), Error> { + let pending = match self + .pending_payment_store + .list_filter(|p| { + p.funding_details + .as_ref() + .map(|fd| record_includes_channel(fd, channel_id)) + .unwrap_or(false) + }) + .into_iter() + .next() + { + Some(p) => p, + None => return Ok(()), + }; + + let is_confirmed = matches!( + pending.details.kind, + PaymentKind::Onchain { status: ConfirmationStatus::Confirmed { .. }, .. } + ); + if is_confirmed { + return Ok(()); + } + + let payment_id = pending.details.id; + self.pending_payment_store.remove(&payment_id)?; + self.payment_store.remove(&payment_id)?; + Ok(()) + } + + /// Updates a funding record's `kind` in response to a wallet-sync event, swapping + /// the active candidate when `event_txid` differs from the current one. + /// + /// Amount, fee, and direction are not recomputed from the wallet's view: they were + /// set at broadcast time from the `FundingContribution` and must persist until + /// `ChannelReady`. + /// + /// Returns `true` when a funding record was updated (so the caller skips the + /// default Onchain create/update path), `false` otherwise. + fn apply_funding_details_status_update( + &self, payment_id: PaymentId, event_txid: Txid, confirmation_status: ConfirmationStatus, + ) -> Result { + // `ChannelReady` may move the payment to the main store before wallet sync + // sees the tx confirm. In that case, update `kind` directly; recomputing from + // the wallet's view would overwrite the per-node fee set at broadcast time. + if let Some(mut existing) = self.payment_store.get(&payment_id) { + if existing.status == PaymentStatus::Succeeded + && matches!(existing.kind, PaymentKind::Onchain { .. }) + && self.pending_payment_store.get(&payment_id).is_none() + { + let needs_update = match existing.kind { + PaymentKind::Onchain { txid, status } => { + txid != event_txid || status != confirmation_status + }, + _ => false, + }; + if needs_update { + existing.kind = + PaymentKind::Onchain { txid: event_txid, status: confirmation_status }; + self.payment_store.insert_or_update(existing)?; + } + return Ok(true); + } + } + + let mut pending = match self.pending_payment_store.get(&payment_id) { + Some(p) => p, + None => return Ok(false), + }; + let funding_details = match pending.funding_details.as_ref() { + Some(fd) => fd, + None => return Ok(false), + }; + + let candidate = match funding_details.candidates.iter().find(|c| c.txid == event_txid) { + Some(c) => c.clone(), + None => { + log_debug!( + self.logger, + "Event txid {} resolved to funding_details payment {} but is not in candidates", + event_txid, + payment_id, + ); + return Ok(false); + }, + }; + + let old_txid = match pending.details.kind { + PaymentKind::Onchain { txid, .. } => txid, + _ => { + debug_assert!(false, "funding_details record must use PaymentKind::Onchain"); + return Ok(false); + }, + }; + + if old_txid != event_txid { + // A different candidate confirmed. Move the previous active txid onto + // `conflicting_txids` and re-derive amount/fee from the new candidate's + // contributions. + if !pending.conflicting_txids.contains(&old_txid) { + pending.conflicting_txids.push(old_txid); + } + pending.conflicting_txids.retain(|t| *t != event_txid); + + let aggregate = aggregate_local_stakes(&candidate); + pending.details.amount_msat = aggregate.amount_msat; + pending.details.fee_paid_msat = aggregate.fee_paid_msat; + } + + pending.details.kind = + PaymentKind::Onchain { txid: event_txid, status: confirmation_status }; + + self.persist_pending(pending)?; + + Ok(true) + } + fn find_payment_by_txid(&self, target_txid: Txid) -> Option { let direct_payment_id = PaymentId(target_txid.to_byte_array()); if self.pending_payment_store.contains_key(&direct_payment_id) { @@ -1213,12 +1493,28 @@ impl Wallet { .list_filter(|p| { matches!(p.details.kind, PaymentKind::Onchain { txid, .. } if txid == target_txid) || p.conflicting_txids.contains(&target_txid) + || p.funding_details + .as_ref() + .map(|fd| fd.candidates.iter().any(|c| c.txid == target_txid)) + .unwrap_or(false) }) .first() { return Some(replaced_details.details.id); } + // Once moved to the main store, a funding payment is still matched by its + // confirmed txid so late wallet events resolve correctly. + if let Some(p) = self + .payment_store + .list_filter( + |p| matches!(p.kind, PaymentKind::Onchain { txid, .. } if txid == target_txid), + ) + .first() + { + return Some(p.id); + } + None } @@ -1417,16 +1713,235 @@ impl Wallet { ConfirmationStatus::Unconfirmed, ); - let pending_payment_store = - self.create_pending_payment_from_tx(new_payment.clone(), Vec::new()); - - self.pending_payment_store.insert_or_update(pending_payment_store)?; - self.payment_store.insert_or_update(new_payment)?; + let pending_payment = self.create_pending_payment_from_tx(new_payment, Vec::new()); + self.persist_pending(pending_payment)?; log_info!(self.logger, "RBF successful: replaced {} with {}", txid, new_txid); Ok(new_txid) } + + pub(crate) fn classify_broadcast( + &self, tx: &Transaction, tx_type: &TransactionType, + ) -> Result<(), Error> { + match tx_type { + TransactionType::Funding { channels } => self.classify_funding(tx, channels), + TransactionType::InteractiveFunding { candidates } => { + self.classify_interactive_funding(tx, candidates) + }, + _ => Ok(()), + } + } + + fn classify_funding( + &self, tx: &Transaction, channels: &[(PublicKey, LnChannelId)], + ) -> Result<(), Error> { + // Batch funding (one transaction funding multiple channels) isn't supported; let + // wallet sync record the payment normally so graduation still runs through + // ANTI_REORG_DELAY. + if channels.len() != 1 { + if channels.len() > 1 { + log_trace!( + self.logger, + "Skipping funding classification for batched broadcast ({} channels)", + channels.len() + ); + } + return Ok(()); + } + + let (counterparty_node_id, channel_id) = channels[0]; + let txid = tx.compute_txid(); + let (amount_msat, fee_paid_msat, direction) = self.onchain_payment_fields(tx); + + let candidate = FundingCandidate { + txid, + channels: vec![ChannelFunding { + counterparty_node_id, + channel_id, + purpose: FundingPurpose::Establishment, + contribution: None, + }], + }; + + let details = PaymentDetails::new( + PaymentId(txid.to_byte_array()), + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed }, + amount_msat, + fee_paid_msat, + direction, + PaymentStatus::Pending, + ); + + let funding_details = FundingDetails { candidates: vec![candidate] }; + + let pending = + PendingPaymentDetails::with_funding_details(details, Vec::new(), funding_details); + + self.persist_pending(pending)?; + log_debug!( + self.logger, + "Recorded channel-funding broadcast {} for channel {}", + txid, + channel_id, + ); + Ok(()) + } + + fn classify_interactive_funding( + &self, tx: &Transaction, candidates: &[FundingCandidate], + ) -> Result<(), Error> { + // `InteractiveFunding` carries the full negotiated history. The currently-broadcast + // candidate is the last entry; earlier entries are RBF predecessors. + let active = match candidates.last() { + Some(c) => c, + None => return Ok(()), + }; + let first = match candidates.first() { + Some(c) => c, + None => return Ok(()), + }; + + let txid = tx.compute_txid(); + debug_assert_eq!(active.txid, txid, "broadcast tx must match the active candidate"); + + // Aggregate amount/fee/direction across this candidate's channels by summing the + // local-stake contributions. If we didn't contribute on this candidate, leave the + // record to wallet sync — there's nothing for us to track here, and any wallet- + // visible activity (e.g. a counterparty's splice-out paid to our address) is + // better surfaced as a plain on-chain receive. + let aggregate = aggregate_local_stakes(active); + let amount_msat = match aggregate.amount_msat { + Some(amt) => Some(amt), + None => { + log_trace!( + self.logger, + "Skipping interactive-funding broadcast {}: no local contribution", + txid, + ); + return Ok(()); + }, + }; + let fee_paid_msat = aggregate.fee_paid_msat; + let direction = aggregate.direction; + + // Skip broadcasts that don't move funds in or out of our on-chain wallet — e.g. + // a splice-out we initiated toward an external address. + let (wallet_amount_msat, _wallet_fee_msat, _wallet_direction) = + self.onchain_payment_fields(tx); + if wallet_amount_msat == Some(0) { + log_trace!( + self.logger, + "Skipping interactive-funding broadcast {}: no wallet-level activity", + txid, + ); + return Ok(()); + } + + // Anchor the PaymentId to the first negotiated candidate so the record stays + // stable across RBF replacements. + let payment_id = PaymentId(first.txid.to_byte_array()); + let candidate_count = candidates.len(); + let active_channel_count = active.channels.len(); + + let details = PaymentDetails::new( + payment_id, + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed }, + amount_msat, + fee_paid_msat, + direction, + PaymentStatus::Pending, + ); + + // Funding records carry their own RBF history in `candidates`; lookup by txid + // (find_payment_by_txid) already searches that, so no separate + // `conflicting_txids` Vec is needed. + let funding_details = FundingDetails { candidates: candidates.to_vec() }; + + let pending = + PendingPaymentDetails::with_funding_details(details, Vec::new(), funding_details); + + self.persist_pending(pending)?; + log_debug!( + self.logger, + "Recorded interactive-funding broadcast {} ({} candidates, {} channels)", + txid, + candidate_count, + active_channel_count, + ); + Ok(()) + } +} + +/// Returns this node's share of the on-chain fee for a funding transaction (channel +/// open or splice), in millisatoshis. Sourced from the contribution's +/// [`FundingContribution::estimated_fee`], which upstream computes per-contributor. +fn our_actual_fee_msat(contribution: &FundingContribution) -> u64 { + contribution.estimated_fee().to_sat() * 1000 +} + +fn record_includes_channel(details: &FundingDetails, channel_id: LnChannelId) -> bool { + details.candidates.iter().any(|c| c.channels.iter().any(|ch| ch.channel_id == channel_id)) +} + +struct LocalStakeAggregate { + amount_msat: Option, + fee_paid_msat: Option, + direction: PaymentDirection, +} + +/// Aggregates local-stake amount/fee/direction across the channels of a single +/// [`FundingCandidate`]. Each channel's contribution (when present) is treated as +/// local-stake-only, so contributions across channels are summed without +/// double-counting. +fn aggregate_local_stakes(candidate: &FundingCandidate) -> LocalStakeAggregate { + let mut amount_outbound: u64 = 0; + let mut amount_inbound: u64 = 0; + let mut fee: u64 = 0; + let mut have_contribution = false; + for channel in &candidate.channels { + if let Some(c) = channel.contribution.as_ref() { + have_contribution = true; + fee = fee.saturating_add(our_actual_fee_msat(c)); + match contribution_direction(c) { + Some((PaymentDirection::Outbound, amt)) => { + amount_outbound = amount_outbound.saturating_add(amt); + }, + Some((PaymentDirection::Inbound, amt)) => { + amount_inbound = amount_inbound.saturating_add(amt); + }, + None => {}, + } + } + } + if !have_contribution { + return LocalStakeAggregate { + amount_msat: None, + fee_paid_msat: None, + direction: PaymentDirection::Outbound, + }; + } + let (direction, amount_msat) = if amount_outbound >= amount_inbound { + (PaymentDirection::Outbound, amount_outbound.saturating_sub(amount_inbound)) + } else { + (PaymentDirection::Inbound, amount_inbound.saturating_sub(amount_outbound)) + }; + LocalStakeAggregate { amount_msat: Some(amount_msat), fee_paid_msat: Some(fee), direction } +} + +/// Returns this contribution's direction and magnitude in msat, or `None` if it can't +/// be classified as a single inbound or outbound payment. +fn contribution_direction(contribution: &FundingContribution) -> Option<(PaymentDirection, u64)> { + let value_added = contribution.value_added(); + let outputs_total: Amount = contribution.outputs().iter().map(|o| o.value).sum(); + + if value_added > Amount::ZERO && outputs_total == Amount::ZERO { + Some((PaymentDirection::Outbound, value_added.to_sat() * 1000)) + } else if value_added == Amount::ZERO && outputs_total > Amount::ZERO { + Some((PaymentDirection::Inbound, outputs_total.to_sat() * 1000)) + } else { + None + } } impl Listen for Wallet { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index c2f19115e..925f2e4c3 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1227,6 +1227,35 @@ async fn rbf_splice_channel() { assert_ne!(original_txo, rbf_txo, "RBF should produce a different funding txo"); + // After RBF but before confirmation, node_b (the initiator) should have a single + // on-chain payment covering both candidates: id anchored to the first broadcast, + // `kind.txid` pointing at the latest (RBF) candidate, and the original candidate + // recorded as a replaced one on the pending record. + { + let payment_id = PaymentId(original_txo.txid.to_byte_array()); + let payment = node_b.payment(&payment_id).expect("splice payment exists"); + match payment.kind { + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed } => { + assert_eq!(txid, rbf_txo.txid); + }, + ref other => panic!("expected Onchain Unconfirmed, got {:?}", other), + } + assert_eq!(payment.status, PaymentStatus::Pending); + // Only one Onchain Pending payment for this splice attempt (not one per candidate). + let splice_payments = node_b.list_payments_with_filter(|p| { + p.direction == PaymentDirection::Outbound + && matches!(p.kind, PaymentKind::Onchain { .. }) + && p.status == PaymentStatus::Pending + }); + assert_eq!( + splice_payments.len(), + 1, + "expected exactly one pending Onchain payment for the splice, got {}: {:#?}", + splice_payments.len(), + splice_payments, + ); + } + // Wait for the RBF transaction to replace the original in the mempool wait_for_tx(&electrsd.client, rbf_txo.txid).await; @@ -1254,6 +1283,26 @@ async fn rbf_splice_channel() { ref e => panic!("node_b got unexpected event: {:?}", e), } + // After `ChannelReady` we should have graduated to `Succeeded` — even though + // `ANTI_REORG_DELAY` may not have elapsed yet — and the `kind.txid` should + // reflect the winning RBF candidate, with `fee_paid_msat` matching our + // per-node `FundingContribution::estimated_fee` for that candidate. + { + let payment_id = PaymentId(original_txo.txid.to_byte_array()); + let payment = node_b.payment(&payment_id).expect("splice payment graduated"); + assert_eq!(payment.status, PaymentStatus::Succeeded); + match payment.kind { + PaymentKind::Onchain { txid, status: ConfirmationStatus::Confirmed { .. } } => { + assert_eq!(txid, rbf_txo.txid); + }, + ref other => panic!("expected Onchain Confirmed, got {:?}", other), + } + assert!( + payment.fee_paid_msat.is_some(), + "splice payment should carry a fee from its FundingContribution", + ); + } + node_a.stop().unwrap(); node_b.stop().unwrap(); } From 248ad8f3986519c52ba9910693d173620f0947c7 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 3 Jun 2026 15:56:16 -0500 Subject: [PATCH 4/8] f - Preserve funding details when a splice candidate is replaced The TxReplaced wallet event rebuilt the payment record from scratch, dropping its funding details. When a wallet sync fell between a splice broadcast and its RBF, the replacement of the original candidate cleared those details, so the payment no longer graduated to Succeeded on ChannelReady. Funding records are managed by the classify path and the Lightning lifecycle handlers, so leave them untouched on replacement. Co-Authored-By: Claude --- src/wallet/mod.rs | 12 ++++++++++++ tests/integration_tests_rust.rs | 18 ++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 3e4ac41b3..958325077 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -402,6 +402,18 @@ impl Wallet { continue; }; + // Funding records (channel opens and splices) track their active candidate and + // status through `classify_*` and the Lightning lifecycle handlers. A replaced + // candidate is expected during splice RBF and must not reset the record or drop + // its funding details, so leave such records untouched here. + if self + .pending_payment_store + .get(&payment_id) + .map_or(false, |p| p.funding_details.is_some()) + { + continue; + } + // Collect all conflict txids let mut conflict_txids: Vec = conflicts.iter().map(|(_, conflict_txid)| *conflict_txid).collect(); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 925f2e4c3..a1b25e4d6 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1206,6 +1206,24 @@ async fn rbf_splice_channel() { let original_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); expect_splice_negotiated_event!(node_b, node_a.node_id()); + // Sync so the original splice candidate is recorded as a canonical wallet transaction before + // the RBF below replaces it. This makes the post-RBF sync observe the original candidate being + // replaced (a `WalletEvent::TxReplaced`), which must not drop the payment's funding details. + // + // This is a best-effort regression guard rather than a deterministic one: with the + // funding-details-preservation fix in place the splice still graduates correctly, but without + // it the resulting inconsistency only surfaces intermittently (via a timing-dependent + // `debug_assert` in the chain-tip handler), so a reverted fix is caught probabilistically. + // + // TODO: Make this deterministic. If funding payments carried a durable classification in the + // main payment store (e.g. a `tx_type` on `PaymentKind::Onchain`, as in + // lightningdevkit/ldk-node#791), a dropped funding-details record would be a detectable + // contradiction on `ChannelReady` rather than a timing-dependent assert, letting this test + // fail reliably whenever the fix is reverted. + wait_for_tx(&electrsd.client, original_txo.txid).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + // splice_in should fail when there's a pending splice (RBF guard) assert_eq!( node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000), From 7b7647bcca02cab13597d13f36cfd1ae5785924d Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 3 Jun 2026 16:01:15 -0500 Subject: [PATCH 5/8] f - Reject on-chain RBF of funding and splice payments bump_fee_rbf accepted channel-funding and splice payments because they are recorded as outbound, unconfirmed on-chain payments. Replacing such a transaction via wallet RBF would broadcast one LDK isn't tracking, and for splices the shared input can't be wallet-signed. Reject these and leave splice fee-bumping to rbf_channel. Co-Authored-By: Claude --- src/wallet/mod.rs | 17 ++++++++++ tests/integration_tests_rust.rs | 57 +++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 958325077..e5ac26bf1 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -1539,6 +1539,23 @@ impl Wallet { Error::InvalidPaymentId })?; + // Funding transactions (channel opens and splices) are driven by LDK's funding/splice + // lifecycle, not the on-chain wallet. Replacing one via on-chain RBF would broadcast a + // transaction LDK isn't tracking (and, for splices, can't sign). Fee-bumping a pending + // splice goes through `bump_channel_funding_fee` instead. + if self + .pending_payment_store + .get(&payment_id) + .map_or(false, |p| p.funding_details.is_some()) + { + log_error!( + self.logger, + "Cannot RBF funding payment {} via bump_fee_rbf; use bump_channel_funding_fee instead", + payment_id, + ); + return Err(Error::InvalidPaymentId); + } + if let PaymentKind::Onchain { status, .. } = &payment.kind { match status { ConfirmationStatus::Confirmed { .. } => { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index a1b25e4d6..522e396d5 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1325,6 +1325,63 @@ async fn rbf_splice_channel() { node_b.stop().unwrap(); } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn bump_fee_rbf_rejects_funding_payment() { + // A channel-funding or splice transaction is driven by LDK's funding/splice lifecycle, not the + // on-chain wallet. `bump_fee_rbf` must reject such payments — replacing the funding transaction + // via plain wallet RBF would broadcast a transaction LDK isn't tracking (and, for splices, + // can't even sign). Fee-bumping a pending splice goes through `bump_channel_funding_fee` instead. + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + + let address_a = node_a.onchain_payment().new_address().unwrap(); + let address_b = node_b.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 5_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a, address_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 4_000_000, false, &electrsd).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let _user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); + let user_channel_id_b = expect_channel_ready_event!(node_b, node_a.node_id()); + + // Splice-in to create a pending splice payment. + node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000).unwrap(); + + let txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + + // Make node_b's wallet aware of the splice transaction so `bump_fee_rbf` reaches its funding + // guard rather than failing earlier for a transaction it can't find. + wait_for_tx(&electrsd.client, txo.txid).await; + node_b.sync_wallets().unwrap(); + + // The splice payment is an on-chain, outbound, unconfirmed record, so it passes + // `bump_fee_rbf`'s other guards; it must nonetheless be rejected as a funding payment. + let splice_payment_id = PaymentId(txo.txid.to_byte_array()); + assert_eq!( + node_b.onchain_payment().bump_fee_rbf(splice_payment_id, None), + Err(NodeError::InvalidPaymentId), + ); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn simple_bolt12_send_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); From 511d194e066388058fb2c7c0a84aef34b9b7761a Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Fri, 5 Jun 2026 14:16:08 -0500 Subject: [PATCH 6/8] f - Rename persist_pending to persist_pending_payment Co-Authored-By: Claude --- src/wallet/mod.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index e5ac26bf1..cd8c6ec2d 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -295,7 +295,7 @@ impl Wallet { if payment_status == PaymentStatus::Pending { let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); - self.persist_pending(pending_payment)?; + self.persist_pending_payment(pending_payment)?; } else { self.payment_store.insert_or_update(payment)?; } @@ -390,7 +390,7 @@ impl Wallet { ConfirmationStatus::Unconfirmed, ); let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); - self.persist_pending(pending_payment)?; + self.persist_pending_payment(pending_payment)?; }, WalletEvent::TxReplaced { txid, conflicts, .. } => { let Some(payment_id) = self.find_payment_by_txid(txid) else { @@ -456,7 +456,7 @@ impl Wallet { ConfirmationStatus::Unconfirmed, ); let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); - self.persist_pending(pending_payment)?; + self.persist_pending_payment(pending_payment)?; }, _ => { continue; @@ -1280,7 +1280,7 @@ impl Wallet { /// Writes a [`PendingPaymentDetails`] and its inner [`PaymentDetails`] to their /// respective stores in a fixed order. Callers that need to keep the two stores in /// sync should always go through this. - fn persist_pending(&self, pending: PendingPaymentDetails) -> Result<(), Error> { + fn persist_pending_payment(&self, pending: PendingPaymentDetails) -> Result<(), Error> { self.payment_store.insert_or_update(pending.details.clone())?; self.pending_payment_store.insert_or_update(pending)?; Ok(()) @@ -1489,7 +1489,7 @@ impl Wallet { pending.details.kind = PaymentKind::Onchain { txid: event_txid, status: confirmation_status }; - self.persist_pending(pending)?; + self.persist_pending_payment(pending)?; Ok(true) } @@ -1743,7 +1743,7 @@ impl Wallet { ); let pending_payment = self.create_pending_payment_from_tx(new_payment, Vec::new()); - self.persist_pending(pending_payment)?; + self.persist_pending_payment(pending_payment)?; log_info!(self.logger, "RBF successful: replaced {} with {}", txid, new_txid); @@ -1807,7 +1807,7 @@ impl Wallet { let pending = PendingPaymentDetails::with_funding_details(details, Vec::new(), funding_details); - self.persist_pending(pending)?; + self.persist_pending_payment(pending)?; log_debug!( self.logger, "Recorded channel-funding broadcast {} for channel {}", @@ -1890,7 +1890,7 @@ impl Wallet { let pending = PendingPaymentDetails::with_funding_details(details, Vec::new(), funding_details); - self.persist_pending(pending)?; + self.persist_pending_payment(pending)?; log_debug!( self.logger, "Recorded interactive-funding broadcast {} ({} candidates, {} channels)", From bec7723f663a7890c74f5e4c372bfbdc78922a13 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Thu, 23 Apr 2026 14:58:20 -0500 Subject: [PATCH 7/8] Persist payment transaction data without blocking LDK Previously the BroadcasterInterface implementation wrote the payment record synchronously when LDK invoked it. With a remote KV store this could block LDK's message handling for hundreds of milliseconds per call, noticeably during force-close bursts or splice broadcasts. Persistence now happens asynchronously and must complete before the transaction is sent to the chain client. If persistence fails, the broadcast is dropped: a payment record must exist for every on-chain tx we emit, otherwise a crash could leave the tx confirmed with no matching record. Generated with assistance from Claude Code. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/chain/bitcoind.rs | 8 ++++--- src/chain/electrum.rs | 6 +++-- src/chain/esplora.rs | 8 ++++--- src/chain/mod.rs | 20 ++++++++++++---- src/tx_broadcaster.rs | 56 +++++++++++++++++++++++++++---------------- 5 files changed, 66 insertions(+), 32 deletions(-) diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 2582f32f6..eeed41a8d 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -568,16 +568,18 @@ impl BitcoindChainSource { Ok(()) } - pub(crate) async fn process_broadcast_package(&self, package: Vec) { + pub(crate) async fn process_broadcast_package( + &self, txs: impl IntoIterator, + ) { // While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28 // features, we should eventually switch to use `submitpackage` via the // `rust-bitcoind-json-rpc` crate rather than just broadcasting individual // transactions. - for tx in &package { + for tx in txs { let txid = tx.compute_txid(); let timeout_fut = tokio::time::timeout( Duration::from_secs(DEFAULT_TX_BROADCAST_TIMEOUT_SECS), - self.api_client.broadcast_transaction(tx), + self.api_client.broadcast_transaction(&tx), ); match timeout_fut.await { Ok(res) => match res { diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index 54e7fff0c..28825b191 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -275,7 +275,9 @@ impl ElectrumChainSource { Ok(()) } - pub(crate) async fn process_broadcast_package(&self, package: Vec) { + pub(crate) async fn process_broadcast_package( + &self, txs: impl IntoIterator, + ) { let electrum_client: Arc = if let Some(client) = self.electrum_runtime_status.read().expect("lock").client().as_ref() { @@ -285,7 +287,7 @@ impl ElectrumChainSource { return; }; - for tx in package { + for tx in txs { electrum_client.broadcast(tx).await; } } diff --git a/src/chain/esplora.rs b/src/chain/esplora.rs index 5825a0984..5f88ad76e 100644 --- a/src/chain/esplora.rs +++ b/src/chain/esplora.rs @@ -352,12 +352,14 @@ impl EsploraChainSource { Ok(()) } - pub(crate) async fn process_broadcast_package(&self, package: Vec) { - for tx in &package { + pub(crate) async fn process_broadcast_package( + &self, txs: impl IntoIterator, + ) { + for tx in txs { let txid = tx.compute_txid(); let timeout_fut = tokio::time::timeout( Duration::from_secs(self.sync_config.timeouts_config.tx_broadcast_timeout_secs), - self.esplora_client.broadcast(tx), + self.esplora_client.broadcast(&tx), ); match timeout_fut.await { Ok(res) => match res { diff --git a/src/chain/mod.rs b/src/chain/mod.rs index cb8541be6..cf0c946ba 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -24,7 +24,7 @@ use crate::config::{ WALLET_SYNC_INTERVAL_MINIMUM_SECS, }; use crate::fee_estimator::OnchainFeeEstimator; -use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger}; +use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; use crate::{Error, NodeMetrics}; @@ -453,15 +453,27 @@ impl ChainSource { return; } Some(next_package) = receiver.recv() => { + let package = match self.tx_broadcaster.classify_package(next_package).await { + Ok(p) => p, + Err(e) => { + log_error!( + tx_bcast_logger, + "Skipping broadcast: failed to persist payment records: {:?}", + e, + ); + continue; + }, + }; + let txs = package.into_iter().map(|(tx, _)| tx); match &self.kind { ChainSourceKind::Esplora(esplora_chain_source) => { - esplora_chain_source.process_broadcast_package(next_package).await + esplora_chain_source.process_broadcast_package(txs).await }, ChainSourceKind::Electrum(electrum_chain_source) => { - electrum_chain_source.process_broadcast_package(next_package).await + electrum_chain_source.process_broadcast_package(txs).await }, ChainSourceKind::Bitcoind(bitcoind_chain_source) => { - bitcoind_chain_source.process_broadcast_package(next_package).await + bitcoind_chain_source.process_broadcast_package(txs).await }, } } diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index 24abf8f11..1e65fb1aa 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -14,20 +14,26 @@ use tokio::sync::{mpsc, Mutex, MutexGuard}; use crate::logger::{log_error, LdkLogger}; use crate::types::Wallet; +use crate::Error; const BCAST_PACKAGE_QUEUE_SIZE: usize = 50; +/// A package of transactions that LDK handed to the broadcaster in one +/// `broadcast_transactions` call, along with each transaction's type. Queued until the +/// background task classifies and broadcasts it. +pub(crate) type BroadcastPackage = Vec<(Transaction, TransactionType)>; + pub(crate) struct TransactionBroadcaster where L::Target: LdkLogger, { - queue_sender: mpsc::Sender>, - queue_receiver: Mutex>>, + queue_sender: mpsc::Sender, + queue_receiver: Mutex>, /// Weak handle to the [`Wallet`] that performs classification of funding broadcasts /// (channel opens and splices) into payment records. Remains `None` while the - /// builder is wiring the node up, during which broadcasts are still forwarded to - /// the queue but no payment record is written. [`Self::set_wallet`] installs the - /// handle once the [`Wallet`] exists. + /// builder is wiring the node up, during which broadcasts are forwarded to the + /// queue but no payment record is written. [`Self::set_wallet`] installs the handle + /// once the [`Wallet`] exists. wallet: StdMutex>>, logger: L, } @@ -55,9 +61,31 @@ where pub(crate) async fn get_broadcast_queue( &self, - ) -> MutexGuard<'_, mpsc::Receiver>> { + ) -> MutexGuard<'_, mpsc::Receiver> { self.queue_receiver.lock().await } + + /// Classifies a queued package into payment records and returns the package ready + /// for the chain client. Returns `Err` if any classification fails; callers must + /// not broadcast the package in that case, since a crash would leave the tx + /// on-chain without a record. + pub(crate) async fn classify_package( + &self, package: BroadcastPackage, + ) -> Result { + let wallet_opt = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade); + if let Some(wallet) = wallet_opt { + tokio::task::spawn_blocking(move || { + for (tx, tx_type) in &package { + wallet.classify_broadcast(tx, tx_type)?; + } + Ok::<_, Error>(package) + }) + .await + .map_err(|_| Error::PersistenceFailed)? + } else { + Ok(package) + } + } } impl BroadcasterInterface for TransactionBroadcaster @@ -65,20 +93,8 @@ where L::Target: LdkLogger, { fn broadcast_transactions(&self, txs: &[(&Transaction, TransactionType)]) { - let wallet = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade); - if let Some(wallet) = wallet { - for (tx, tx_type) in txs { - if let Err(e) = wallet.classify_broadcast(tx, tx_type) { - log_error!( - self.logger, - "Failed to classify broadcast tx {}: {:?}", - tx.compute_txid(), - e, - ); - } - } - } - let package = txs.iter().map(|(t, _)| (*t).clone()).collect::>(); + let package: BroadcastPackage = + txs.iter().map(|(tx, tx_type)| ((*tx).clone(), tx_type.clone())).collect(); self.queue_sender.try_send(package).unwrap_or_else(|e| { log_error!(self.logger, "Failed to broadcast transactions: {}", e); }); From 492bd32461d512ffd2418b4ffa3f8a5aa8e1d044 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 10 Jun 2026 16:14:53 -0500 Subject: [PATCH 8/8] Retry user-initiated splices across restarts and disconnects LDK does not durably record a splice until its negotiation reaches the signature exchange, and it abandons an in-progress negotiation whenever the peer disconnects -- which includes stopping the node. A restart or an ill-timed disconnect after splice_in, splice_out, or bump_channel_funding_fee returned Ok would therefore silently drop the splice. Persist the splice intent in a new UserChannelId-keyed channel record store before handing the contribution to LDK, and resubmit it until the splice locks. A startup reconciler probes LDK's live splice state to detect dropped intents -- including those lost to a crash before LDK persisted anything -- and the SpliceNegotiationFailed handler retries recoverable failures, rebuilding the contribution with fresh parameters when the stored one has gone stale. Resubmission does not require the peer to be connected, as LDK holds the contribution and initiates quiescence once the peer reconnects. Event::SpliceNegotiationFailed is now emitted only once a splice is given up on (a non-transient failure or retries exhausted) rather than for every failed negotiation round. Generated with assistance from Claude Code. Co-Authored-By: Claude Fable 5 --- src/builder.rs | 33 ++- src/channel/mod.rs | 342 ++++++++++++++++++++++++++++++++ src/channel/store.rs | 184 +++++++++++++++++ src/event.rs | 31 ++- src/io/mod.rs | 4 + src/lib.rs | 113 ++++++++++- src/types.rs | 5 +- tests/integration_tests_rust.rs | 187 +++++++++++++++++ 8 files changed, 884 insertions(+), 15 deletions(-) create mode 100644 src/channel/mod.rs create mode 100644 src/channel/store.rs diff --git a/src/builder.rs b/src/builder.rs index 1d32a8a12..5386bf570 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -64,7 +64,9 @@ use crate::io::utils::{ }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ - self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + self, CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; @@ -79,9 +81,9 @@ use crate::peer_store::PeerStore; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreRef, DynStoreWrapper, - GossipSync, Graph, HRNResolver, KeysManager, MessageRouter, OnionMessenger, PaymentStore, - PeerManager, PendingPaymentStore, SyncAndAsyncKVStore, + AsyncPersister, ChainMonitor, ChannelManager, ChannelRecordStore, DynStore, DynStoreRef, + DynStoreWrapper, GossipSync, Graph, HRNResolver, KeysManager, MessageRouter, OnionMessenger, + PaymentStore, PeerManager, PendingPaymentStore, SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1394,7 +1396,7 @@ fn build_with_store_internal( let kv_store_ref = Arc::clone(&kv_store); let logger_ref = Arc::clone(&logger); - let (payment_store_res, node_metris_res, pending_payment_store_res) = + let (payment_store_res, node_metris_res, pending_payment_store_res, channel_record_store_res) = runtime.block_on(async move { tokio::join!( read_all_objects( @@ -1409,6 +1411,12 @@ fn build_with_store_internal( PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, Arc::clone(&logger_ref), + ), + read_all_objects( + &*kv_store_ref, + CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), ) ) }); @@ -1612,6 +1620,20 @@ fn build_with_store_internal( }, }; + let channel_record_store = match channel_record_store_res { + Ok(channel_records) => Arc::new(ChannelRecordStore::new( + channel_records, + CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read channel record data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + let wallet = Arc::new(Wallet::new( bdk_wallet, wallet_persister, @@ -2171,6 +2193,7 @@ fn build_with_store_internal( scorer, peer_store, payment_store, + channel_record_store, lnurl_auth, is_running, node_metrics, diff --git a/src/channel/mod.rs b/src/channel/mod.rs new file mode 100644 index 000000000..3868da385 --- /dev/null +++ b/src/channel/mod.rs @@ -0,0 +1,342 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +//! Per-channel state tracking. + +pub(crate) mod store; + +use std::ops::Deref; +use std::sync::Arc; + +use bitcoin::secp256k1::PublicKey; +use bitcoin::{Amount, FeeRate}; +use lightning::events::NegotiationFailureReason; +use lightning::ln::funding::FundingContribution; +use lightning::ln::types::ChannelId; + +use crate::event::{Event, EventQueue}; +use crate::fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; +use crate::logger::{log_error, log_info, LdkLogger}; +use crate::types::{ChannelManager, ChannelRecordStore, UserChannelId, Wallet}; +use crate::Error; + +pub(crate) use self::store::MAX_SPLICE_ATTEMPTS; +use self::store::{ChannelRecord, ChannelRecordUpdate, SpliceIntent, SpliceKind}; + +/// Resubmits user-initiated splices that LDK dropped before durably recording them. +/// +/// LDK only persists a splice once its negotiation reaches `AwaitingSignatures`, and it abandons +/// an earlier negotiation whenever the peer disconnects (which includes restarting the node). The +/// splice entry points persist a [`SpliceIntent`] before handing the contribution to LDK; this +/// type drives that intent back into [`ChannelManager::funding_contributed`] until the splice +/// either locks (clearing the intent on `ChannelReady`) or fails for a reason that retrying +/// cannot address. +/// +/// Resubmitting does not require the peer to be connected: LDK holds on to the contribution and +/// initiates quiescence once the peer reconnects. +/// +/// [`ChannelManager::funding_contributed`]: lightning::ln::channelmanager::ChannelManager::funding_contributed +pub(crate) struct SpliceRetrier +where + L::Target: LdkLogger, +{ + channel_manager: Arc, + wallet: Arc, + fee_estimator: Arc, + channel_record_store: Arc, + event_queue: Arc>, + logger: L, +} + +impl SpliceRetrier +where + L::Target: LdkLogger, +{ + pub(crate) fn new( + channel_manager: Arc, wallet: Arc, + fee_estimator: Arc, channel_record_store: Arc, + event_queue: Arc>, logger: L, + ) -> Self { + Self { channel_manager, wallet, fee_estimator, channel_record_store, event_queue, logger } + } + + /// Reconciles persisted splice intents against live channel state. Run once at startup. + pub(crate) async fn reconcile(&self) { + let records = self.channel_record_store.list_filter(|r| r.pending_splice().is_some()); + for record in records { + let ChannelRecord::Funded { + user_channel_id, counterparty_node_id, pending_splice, .. + } = record; + let intent = match pending_splice { + Some(intent) => intent, + None => continue, + }; + + let channel = self + .channel_manager + .list_channels_with_counterparty(&counterparty_node_id) + .into_iter() + .find(|c| c.user_channel_id == user_channel_id.0); + let channel = match channel { + Some(channel) => channel, + None => { + // The channel is gone; there is nothing to splice anymore. + let _ = self.channel_record_store.remove(&user_channel_id); + continue; + }, + }; + + if channel.funding_txo != Some(intent.pre_splice_funding_txo) { + // The funding moved on, so the splice (or a replacement) locked. + let _ = self.channel_record_store.remove(&user_channel_id); + continue; + } + + // `splice_channel` is a read-only probe of LDK's splice state. It fails when we + // already have a splice in flight (a held contribution, an in-progress negotiation, + // or one awaiting signatures), all of which LDK drives to completion on its own. + let template = match self + .channel_manager + .splice_channel(&channel.channel_id, &counterparty_node_id) + { + Ok(template) => template, + Err(_) => continue, + }; + + // The template's prior contribution is our last negotiated one. LDK persists a splice + // once negotiated, so its presence means the intent was carried out unless the intent + // was a fee bump at a higher feerate than what was negotiated. + let should_retry = match (&intent.kind, template.prior_contribution()) { + (SpliceKind::Rbf {}, Some(prior)) => { + prior.feerate() < intent.contribution.feerate() + }, + (SpliceKind::Rbf {}, None) => { + // The splice to bump is gone entirely; surface rather than guess. + self.abandon(user_channel_id, channel.channel_id, counterparty_node_id).await; + continue; + }, + (_, Some(_)) => false, + (_, None) => true, + }; + if !should_retry { + continue; + } + + if intent.attempts >= MAX_SPLICE_ATTEMPTS { + self.abandon(user_channel_id, channel.channel_id, counterparty_node_id).await; + continue; + } + + log_info!( + self.logger, + "Resubmitting splice for channel {} with counterparty {}", + channel.channel_id, + counterparty_node_id, + ); + let _ = + self.submit(&channel.channel_id, &counterparty_node_id, user_channel_id, intent); + } + } + + /// Applies a `SpliceNegotiationFailed` event to any matching splice intent, retrying when the + /// failure is recoverable. Returns whether the failure should be surfaced to the user. + pub(crate) async fn on_negotiation_failed( + &self, user_channel_id: UserChannelId, reason: NegotiationFailureReason, + contribution: Option, + ) -> bool { + let record = match self.channel_record_store.get(&user_channel_id) { + Some(record) => record, + None => return true, + }; + let ChannelRecord::Funded { channel_id, counterparty_node_id, pending_splice, .. } = record; + let mut intent = match pending_splice { + Some(intent) => intent, + None => return true, + }; + + // Only act on failures of the splice we are tracking. A mismatch means the failure + // concerns some other attempt (e.g., a stale event replayed after the user initiated a + // new splice), in which case the record must be left alone. + if contribution.as_ref() != Some(&intent.contribution) { + return true; + } + + if intent.attempts >= MAX_SPLICE_ATTEMPTS { + let _ = self.channel_record_store.remove(&user_channel_id); + return true; + } + + match reason { + NegotiationFailureReason::PeerDisconnected => { + // The same contribution remains valid. Skip if LDK already has a splice in + // flight for this channel (e.g., the startup reconciler resubmitted first). + if self.channel_manager.splice_channel(&channel_id, &counterparty_node_id).is_err() + { + return false; + } + log_info!( + self.logger, + "Resubmitting splice for channel {} with counterparty {} after disconnect", + channel_id, + counterparty_node_id, + ); + let _ = self.submit(&channel_id, &counterparty_node_id, user_channel_id, intent); + false + }, + NegotiationFailureReason::FeeRateTooLow + | NegotiationFailureReason::ContributionInvalid => { + // The contribution went stale (e.g., another splice negotiation outpaced ours, + // turning the resubmission into an underpaying fee bump of it). Rebuild a fresh + // contribution from the original call's parameters. + match self + .rebuild_contribution(&channel_id, &counterparty_node_id, &intent.kind) + .await + { + Ok(contribution) => { + log_info!( + self.logger, + "Resubmitting rebuilt splice for channel {} with counterparty {}", + channel_id, + counterparty_node_id, + ); + intent.contribution = contribution; + let _ = self.submit( + &channel_id, + &counterparty_node_id, + user_channel_id, + intent, + ); + false + }, + Err(e) => { + log_error!( + self.logger, + "Abandoning splice for channel {}: failed to rebuild contribution: {:?}", + channel_id, + e, + ); + let _ = self.channel_record_store.remove(&user_channel_id); + true + }, + } + }, + _ => { + // Terminal failure; retrying cannot address it. + let _ = self.channel_record_store.remove(&user_channel_id); + true + }, + } + } + + /// Clears any splice intent made obsolete by a locked funding transaction. + pub(crate) fn on_channel_ready( + &self, user_channel_id: UserChannelId, funding_txo: Option, + ) { + let record = match self.channel_record_store.get(&user_channel_id) { + Some(record) => record, + None => return, + }; + // Only clear an intent predating the locked funding transaction. An intent with a + // matching pre-splice funding outpoint was created after the lock and is still pending. + let clear = match (record.pending_splice(), funding_txo) { + (Some(intent), Some(funding_txo)) => { + intent.pre_splice_funding_txo.into_bitcoin_outpoint() != funding_txo + }, + (Some(_), None) => false, + (None, _) => true, + }; + if clear { + let _ = self.channel_record_store.remove(&user_channel_id); + } + } + + /// Clears any splice intent for a closed channel, as there is nothing left to splice. + pub(crate) fn on_channel_closed(&self, user_channel_id: UserChannelId) { + let _ = self.channel_record_store.remove(&user_channel_id); + } + + /// Persists the incremented attempt count and hands the contribution back to LDK. The count + /// is persisted first so that a crash mid-submission cannot lead to unbounded retries. + fn submit( + &self, channel_id: &ChannelId, counterparty_node_id: &PublicKey, + user_channel_id: UserChannelId, mut intent: SpliceIntent, + ) -> Result<(), Error> { + intent.attempts += 1; + let contribution = intent.contribution.clone(); + let update = ChannelRecordUpdate { user_channel_id, pending_splice: Some(Some(intent)) }; + self.channel_record_store.update(update)?; + + self.channel_manager + .funding_contributed(channel_id, counterparty_node_id, contribution, None) + .map_err(|e| { + log_error!( + self.logger, + "Failed to resubmit splice for channel {} with counterparty {}: {:?}", + channel_id, + counterparty_node_id, + e, + ); + Error::ChannelSplicingFailed + }) + } + + /// Builds a fresh contribution from the parameters of the originating API call, mirroring the + /// corresponding [`Node`] method. + /// + /// [`Node`]: crate::Node + async fn rebuild_contribution( + &self, channel_id: &ChannelId, counterparty_node_id: &PublicKey, kind: &SpliceKind, + ) -> Result { + let template = self + .channel_manager + .splice_channel(channel_id, counterparty_node_id) + .map_err(|_| Error::ChannelSplicingFailed)?; + + let est_feerate = self.fee_estimator.estimate_fee_rate(ConfirmationTarget::ChannelFunding); + let min_feerate = + template.min_rbf_feerate().map_or(est_feerate, |min_rbf| est_feerate.max(min_rbf)); + let max_feerate = FeeRate::from_sat_per_kwu(min_feerate.to_sat_per_kwu() * 3 / 2); + + match kind { + SpliceKind::In { amount_sats } => template + .splice_in( + Amount::from_sat(*amount_sats), + min_feerate, + max_feerate, + Arc::clone(&self.wallet), + ) + .await + .map_err(|_| Error::ChannelSplicingFailed), + SpliceKind::Out { outputs } => template + .splice_out(outputs.clone(), min_feerate, max_feerate) + .map_err(|_| Error::ChannelSplicingFailed), + SpliceKind::Rbf {} => template + .rbf_prior_contribution(None, max_feerate, Arc::clone(&self.wallet)) + .await + .map_err(|_| Error::ChannelSplicingFailed), + } + } + + /// Gives up on a splice intent and surfaces the failure to the user. + async fn abandon( + &self, user_channel_id: UserChannelId, channel_id: ChannelId, + counterparty_node_id: PublicKey, + ) { + log_error!( + self.logger, + "Abandoning splice for channel {} with counterparty {}", + channel_id, + counterparty_node_id, + ); + let _ = self.channel_record_store.remove(&user_channel_id); + let event = + Event::SpliceNegotiationFailed { channel_id, user_channel_id, counterparty_node_id }; + if let Err(e) = self.event_queue.add_event(event).await { + log_error!(self.logger, "Failed to push to event queue: {}", e); + } + } +} diff --git a/src/channel/store.rs b/src/channel/store.rs new file mode 100644 index 000000000..1d4cd3cab --- /dev/null +++ b/src/channel/store.rs @@ -0,0 +1,184 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use bitcoin::secp256k1::PublicKey; +use bitcoin::TxOut; +use lightning::chain::transaction::OutPoint; +use lightning::ln::funding::FundingContribution; +use lightning::ln::types::ChannelId; +use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; + +use crate::data_store::{StorableObject, StorableObjectId, StorableObjectUpdate}; +use crate::hex_utils; +use crate::types::UserChannelId; + +/// The number of times a splice intent is resubmitted before it is abandoned and the failure is +/// surfaced to the user. +pub(crate) const MAX_SPLICE_ATTEMPTS: u8 = 3; + +/// A user-initiated splice that has been handed to LDK but is not yet guaranteed to survive a +/// restart. LDK only persists a splice once its negotiation reaches `AwaitingSignatures`, so until +/// the new funding transaction locks we keep enough state to resubmit the splice ourselves. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct SpliceIntent { + /// The channel's funding outpoint when the splice was initiated. It only changes once a splice + /// locks, so a mismatch with the channel's current funding outpoint means the splice (or a + /// replacement) completed and there is nothing left to resubmit. + pub pre_splice_funding_txo: OutPoint, + /// The contribution handed to [`ChannelManager::funding_contributed`], resubmitted verbatim. + /// + /// [`ChannelManager::funding_contributed`]: lightning::ln::channelmanager::ChannelManager::funding_contributed + pub contribution: FundingContribution, + /// The parameters of the originating API call, used to rebuild a fresh contribution when the + /// stored one has become stale (e.g., its feerate is no longer sufficient). + pub kind: SpliceKind, + /// The number of times the contribution has been resubmitted to LDK after the originating API + /// call handed it off. + pub attempts: u8, +} + +impl_writeable_tlv_based!(SpliceIntent, { + (0, pre_splice_funding_txo, required), + (2, contribution, required), + (4, kind, required), + (6, attempts, required), +}); + +/// The parameters of the API call that initiated a splice. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum SpliceKind { + /// [`Node::splice_in`] with a resolved amount. + /// + /// [`Node::splice_in`]: crate::Node::splice_in + In { amount_sats: u64 }, + /// [`Node::splice_out`] to the given outputs. + /// + /// [`Node::splice_out`]: crate::Node::splice_out + Out { outputs: Vec }, + /// [`Node::bump_channel_funding_fee`] of a pending splice. + /// + /// [`Node::bump_channel_funding_fee`]: crate::Node::bump_channel_funding_fee + Rbf {}, +} + +impl_writeable_tlv_based_enum!(SpliceKind, + (0, In) => { + (0, amount_sats, required), + }, + (2, Out) => { + (0, outputs, required_vec), + }, + (4, Rbf) => {}, +); + +/// Persistent per-channel state tracked by LDK Node, keyed by [`UserChannelId`]. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum ChannelRecord { + /// State for a channel whose funding transaction exists, currently limited to an in-flight + /// splice intent. A record without a pending splice intent is removed from the store. + Funded { + user_channel_id: UserChannelId, + counterparty_node_id: PublicKey, + channel_id: ChannelId, + pending_splice: Option, + }, +} + +impl ChannelRecord { + pub(crate) fn pending_splice(&self) -> Option<&SpliceIntent> { + match self { + ChannelRecord::Funded { pending_splice, .. } => pending_splice.as_ref(), + } + } +} + +impl_writeable_tlv_based_enum!(ChannelRecord, + (0, Funded) => { + (0, user_channel_id, required), + (2, counterparty_node_id, required), + (4, channel_id, required), + (6, pending_splice, option), + }, +); + +impl StorableObjectId for UserChannelId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0.to_be_bytes()) + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct ChannelRecordUpdate { + pub user_channel_id: UserChannelId, + pub pending_splice: Option>, +} + +impl StorableObject for ChannelRecord { + type Id = UserChannelId; + type Update = ChannelRecordUpdate; + + fn id(&self) -> Self::Id { + match self { + ChannelRecord::Funded { user_channel_id, .. } => *user_channel_id, + } + } + + fn update(&mut self, update: Self::Update) -> bool { + let mut updated = false; + match self { + ChannelRecord::Funded { pending_splice, .. } => { + if let Some(new_pending_splice) = update.pending_splice { + if *pending_splice != new_pending_splice { + *pending_splice = new_pending_splice; + updated = true; + } + } + }, + } + updated + } + + fn to_update(&self) -> Self::Update { + match self { + ChannelRecord::Funded { user_channel_id, pending_splice, .. } => Self::Update { + user_channel_id: *user_channel_id, + pending_splice: Some(pending_splice.clone()), + }, + } + } +} + +impl StorableObjectUpdate for ChannelRecordUpdate { + fn id(&self) -> ::Id { + self.user_channel_id + } +} + +#[cfg(test)] +mod tests { + use lightning::util::ser::{Readable, Writeable}; + + use super::*; + + #[test] + fn channel_record_is_serializable() { + let user_channel_id = UserChannelId(42); + let counterparty_node_id = bitcoin::secp256k1::PublicKey::from_slice(&[2u8; 33]).unwrap(); + let channel_id = ChannelId([3u8; 32]); + let record = ChannelRecord::Funded { + user_channel_id, + counterparty_node_id, + channel_id, + pending_splice: None, + }; + + let encoded = record.encode(); + let decoded = ChannelRecord::read(&mut &encoded[..]).unwrap(); + assert_eq!(record, decoded); + assert_eq!(decoded.id(), user_channel_id); + } +} diff --git a/src/event.rs b/src/event.rs index 304737269..beddf53d1 100644 --- a/src/event.rs +++ b/src/event.rs @@ -33,6 +33,7 @@ use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; use lightning_liquidity::lsps2::utils::compute_opening_fee; use lightning_types::payment::{PaymentHash, PaymentPreimage}; +use crate::channel::SpliceRetrier; use crate::config::{may_announce_channel, Config}; use crate::connection::ConnectionManager; use crate::data_store::DataStoreUpdateResult; @@ -283,7 +284,11 @@ pub enum Event { /// The outpoint of the channel's splice funding transaction. new_funding_txo: OutPoint, }, - /// A channel splice negotiation round has failed. + /// A channel splice has failed and is no longer being pursued. + /// + /// Recoverable failures (e.g., a peer disconnecting mid-negotiation) are retried + /// automatically, including across restarts; this event is emitted only once the splice is + /// given up on. SpliceNegotiationFailed { /// The `channel_id` of the channel. channel_id: ChannelId, @@ -543,6 +548,7 @@ where static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, + splice_retrier: Arc>, } impl EventHandler @@ -558,7 +564,8 @@ where payment_store: Arc, peer_store: Arc>, keys_manager: Arc, static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, - runtime: Arc, logger: L, config: Arc, + splice_retrier: Arc>, runtime: Arc, logger: L, + config: Arc, ) -> Self { Self { event_queue, @@ -578,6 +585,7 @@ where static_invoice_store, onion_messenger, om_mailbox, + splice_retrier, } } @@ -1598,6 +1606,8 @@ where .await; } + self.splice_retrier.on_channel_ready(UserChannelId(user_channel_id), funding_txo); + let event = Event::ChannelReady { channel_id, user_channel_id: UserChannelId(user_channel_id), @@ -1631,6 +1641,8 @@ where return Err(ReplayEvent()); } + self.splice_retrier.on_channel_closed(UserChannelId(user_channel_id)); + let event = Event::ChannelClosed { channel_id, user_channel_id: UserChannelId(user_channel_id), @@ -1900,15 +1912,26 @@ where channel_id, user_channel_id, counterparty_node_id, - .. + reason, + contribution, } => { log_info!( self.logger, - "Channel {} with counterparty {} splice negotiation failed", + "Channel {} with counterparty {} splice negotiation failed: {:?}", channel_id, counterparty_node_id, + reason, ); + // Only surface failures of splices that are not (or no longer) being retried. + let surface = self + .splice_retrier + .on_negotiation_failed(UserChannelId(user_channel_id), reason, contribution) + .await; + if !surface { + return Ok(()); + } + let event = Event::SpliceNegotiationFailed { channel_id, user_channel_id: UserChannelId(user_channel_id), diff --git a/src/io/mod.rs b/src/io/mod.rs index e16a99975..df974dfe1 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -84,3 +84,7 @@ pub(crate) const STATIC_INVOICE_STORE_PRIMARY_NAMESPACE: &str = "static_invoices /// The pending payment information will be persisted under this prefix. pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "pending_payments"; pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + +/// The per-channel records will be persisted under this prefix. +pub(crate) const CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE: &str = "channel_records"; +pub(crate) const CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/lib.rs b/src/lib.rs index f9edc32ef..985e090b8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,7 @@ mod balance; mod builder; mod chain; +mod channel; pub mod config; mod connection; mod data_store; @@ -129,6 +130,8 @@ pub use builder::BuildError; #[cfg(not(feature = "uniffi"))] pub use builder::NodeBuilder as Builder; use chain::ChainSource; +use channel::store::{ChannelRecord, SpliceIntent, SpliceKind}; +use channel::SpliceRetrier; use config::{ default_user_config, may_announce_channel, AsyncPaymentsRole, ChannelConfig, Config, LNURL_AUTH_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, @@ -152,6 +155,7 @@ use lightning::ln::chan_utils::FUNDING_TRANSACTION_WITNESS_WEIGHT; use lightning::ln::channel_state::ChannelDetails as LdkChannelDetails; pub use lightning::ln::channel_state::ChannelShutdownState; use lightning::ln::channelmanager::PaymentId; +use lightning::ln::funding::FundingContribution; use lightning::ln::msgs::{BaseMessageHandler, SocketAddress}; use lightning::ln::peer_handler::CustomMessageHandler; use lightning::routing::gossip::NodeAlias; @@ -176,9 +180,9 @@ use peer_store::{PeerInfo, PeerStore}; use runtime::Runtime; pub use tokio; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, - HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, - Wallet, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, ChannelRecordStore, + DynStore, Graph, HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, + Scorer, Sweeper, Wallet, }; pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, SyncAndAsyncKVStore, UserChannelId}; pub use vss_client; @@ -241,6 +245,7 @@ pub struct Node { scorer: Arc>, peer_store: Arc>>, payment_store: Arc, + channel_record_store: Arc, lnurl_auth: Arc, is_running: Arc>, node_metrics: Arc>, @@ -589,6 +594,15 @@ impl Node { None }; + let splice_retrier = Arc::new(SpliceRetrier::new( + Arc::clone(&self.channel_manager), + Arc::clone(&self.wallet), + Arc::clone(&self.fee_estimator), + Arc::clone(&self.channel_record_store), + Arc::clone(&self.event_queue), + Arc::clone(&self.logger), + )); + let event_handler = Arc::new(EventHandler::new( Arc::clone(&self.event_queue), Arc::clone(&self.wallet), @@ -604,11 +618,17 @@ impl Node { static_invoice_store, Arc::clone(&self.onion_messenger), self.om_mailbox.clone(), + Arc::clone(&splice_retrier), Arc::clone(&self.runtime), Arc::clone(&self.logger), Arc::clone(&self.config), )); + // Resubmit any persisted splice intents that LDK dropped before durably recording them. + self.runtime.spawn_background_task(async move { + splice_retrier.reconcile().await; + }); + // Setup background processing let background_persister = Arc::clone(&self.kv_store); let background_event_handler = Arc::clone(&event_handler); @@ -1515,6 +1535,46 @@ impl Node { ) } + /// Persists a splice intent so that the splice can be resubmitted if LDK drops it before + /// durably recording it (e.g., when restarting or disconnecting mid-negotiation). Must be + /// called before handing the contribution to [`ChannelManager::funding_contributed`] so that + /// a crash in between is also covered. + /// + /// [`ChannelManager::funding_contributed`]: lightning::ln::channelmanager::ChannelManager::funding_contributed + fn persist_splice_intent( + &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, + channel_details: &LdkChannelDetails, contribution: FundingContribution, kind: SpliceKind, + ) -> Result<(), Error> { + let pre_splice_funding_txo = channel_details.funding_txo.ok_or_else(|| { + log_error!(self.logger, "Failed to splice channel: channel not yet ready"); + Error::ChannelSplicingFailed + })?; + let record = ChannelRecord::Funded { + user_channel_id: *user_channel_id, + counterparty_node_id, + channel_id: channel_details.channel_id, + pending_splice: Some(SpliceIntent { + pre_splice_funding_txo, + contribution, + kind, + attempts: 0, + }), + }; + self.channel_record_store.insert(record).map(|_| ()) + } + + /// Removes a splice intent if it still holds the given contribution. The guard ensures an + /// intent persisted by a newer call is left alone. + fn clear_splice_intent( + &self, user_channel_id: &UserChannelId, contribution: &FundingContribution, + ) { + if let Some(record) = self.channel_record_store.get(user_channel_id) { + if record.pending_splice().map(|intent| &intent.contribution) == Some(contribution) { + let _ = self.channel_record_store.remove(user_channel_id); + } + } + } + fn splice_in_inner( &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, splice_amount_sats: FundingAmount, @@ -1613,6 +1673,15 @@ impl Node { Error::ChannelSplicingFailed })?; + self.persist_splice_intent( + user_channel_id, + counterparty_node_id, + channel_details, + contribution.clone(), + SpliceKind::In { amount_sats: splice_amount_sats }, + )?; + + let intent_contribution = contribution.clone(); self.channel_manager .funding_contributed( &channel_details.channel_id, @@ -1622,6 +1691,7 @@ impl Node { ) .map_err(|e| { log_error!(self.logger, "Failed to splice channel: {:?}", e); + self.clear_splice_intent(user_channel_id, &intent_contribution); Error::ChannelSplicingFailed }) } else { @@ -1641,6 +1711,10 @@ impl Node { /// it. Once negotiation with the counterparty is complete, the channel remains operational /// while waiting for a new funding transaction to confirm. /// + /// The splice is retried automatically, including across restarts, until it either completes + /// or fails for a reason retrying cannot address, at which point + /// [`Event::SpliceNegotiationFailed`] is emitted. + /// /// # Experimental API /// /// This API is experimental. Currently, a splice-in will be marked as an outbound payment, but @@ -1681,6 +1755,10 @@ impl Node { /// it. Once negotiation with the counterparty is complete, the channel remains operational /// while waiting for a new funding transaction to confirm. /// + /// The splice is retried automatically, including across restarts, until it either completes + /// or fails for a reason retrying cannot address, at which point + /// [`Event::SpliceNegotiationFailed`] is emitted. + /// /// # Experimental API /// /// This API is experimental. Currently, a splice-out will be marked as an inbound payment if @@ -1725,12 +1803,22 @@ impl Node { value: Amount::from_sat(splice_amount_sats), script_pubkey: address.script_pubkey(), }]; - let contribution = - funding_template.splice_out(outputs, min_feerate, max_feerate).map_err(|e| { + let contribution = funding_template + .splice_out(outputs.clone(), min_feerate, max_feerate) + .map_err(|e| { log_error!(self.logger, "Failed to splice channel: {}", e); Error::ChannelSplicingFailed })?; + self.persist_splice_intent( + user_channel_id, + counterparty_node_id, + channel_details, + contribution.clone(), + SpliceKind::Out { outputs }, + )?; + + let intent_contribution = contribution.clone(); self.channel_manager .funding_contributed( &channel_details.channel_id, @@ -1740,6 +1828,7 @@ impl Node { ) .map_err(|e| { log_error!(self.logger, "Failed to splice channel: {:?}", e); + self.clear_splice_intent(user_channel_id, &intent_contribution); Error::ChannelSplicingFailed }) } else { @@ -1758,6 +1847,10 @@ impl Node { /// If a prior splice negotiation is pending, this bumps its feerate via RBF. The prior /// contribution is reused when possible; otherwise, coin selection is re-run. /// + /// The fee bump is retried automatically, including across restarts, until it either + /// completes or fails for a reason retrying cannot address, at which point + /// [`Event::SpliceNegotiationFailed`] is emitted. + /// /// # Experimental API /// /// This API is experimental and may change in the future. @@ -1798,6 +1891,15 @@ impl Node { Error::ChannelSplicingFailed })?; + self.persist_splice_intent( + user_channel_id, + counterparty_node_id, + channel_details, + contribution.clone(), + SpliceKind::Rbf {}, + )?; + + let intent_contribution = contribution.clone(); self.channel_manager .funding_contributed( &channel_details.channel_id, @@ -1807,6 +1909,7 @@ impl Node { ) .map_err(|e| { log_error!(self.logger, "Failed to RBF channel: {:?}", e); + self.clear_splice_intent(user_channel_id, &intent_contribution); Error::ChannelSplicingFailed }) } else { diff --git a/src/types.rs b/src/types.rs index 06e65fbd0..ac27fe069 100644 --- a/src/types.rs +++ b/src/types.rs @@ -38,6 +38,7 @@ use lightning_net_tokio::SocketDescriptor; use crate::chain::bitcoind::UtxoSourceClient; use crate::chain::ChainSource; +use crate::channel::store::ChannelRecord; use crate::config::ChannelConfig; use crate::data_store::DataStore; use crate::fee_estimator::OnchainFeeEstimator; @@ -392,7 +393,7 @@ pub(crate) type PaymentStore = DataStore>; /// A local, potentially user-provided, identifier of a channel. /// /// By default, this will be randomly generated for the user to ensure local uniqueness. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct UserChannelId(pub u128); impl Writeable for UserChannelId { @@ -702,3 +703,5 @@ impl From<&(u64, Vec)> for CustomTlvRecord { } pub(crate) type PendingPaymentStore = DataStore>; + +pub(crate) type ChannelRecordStore = DataStore>; diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 522e396d5..91d2dd1ba 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1325,6 +1325,193 @@ async fn rbf_splice_channel() { node_b.stop().unwrap(); } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn splice_resumed_after_restart() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + // Set up node_a manually so it can be restarted with the same config. + let mut config_a = random_config(true); + config_a.store_type = TestStoreType::Sqlite; + let config_b = random_config(true); + let node_b = setup_node(&chain_source, config_b); + + let onchain_balance_before_sat = { + let node_a = setup_node(&chain_source, config_a.clone()); + + let address_a = node_a.onchain_payment().new_address().unwrap(); + let address_b = node_b.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 5_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a, address_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 4_000_000, false, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); + expect_channel_ready_event!(node_b, node_a.node_id()); + + // Initiate a splice-out while disconnected: LDK accepts the contribution but cannot make + // progress before the restart below drops it, having neither negotiated nor persisted + // anything. Only the persisted splice intent allows resuming the splice. + node_a.disconnect(node_b.node_id()).unwrap(); + let address = node_a.onchain_payment().new_address().unwrap(); + node_a.splice_out(&user_channel_id_a, node_b.node_id(), &address, 500_000).unwrap(); + + let onchain_balance_before_sat = node_a.list_balances().total_onchain_balance_sats; + node_a.stop().unwrap(); + onchain_balance_before_sat + }; + + // On restart, the reconciler resubmits the splice, which proceeds once the peer connects. + let node_a = setup_node(&chain_source, config_a.clone()); + node_a.sync_wallets().unwrap(); + let node_b_addr = node_b.listening_addresses().unwrap().first().unwrap().clone(); + node_a.connect(node_b.node_id(), node_b_addr.clone(), false).unwrap(); + + let txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + + wait_for_tx(&electrsd.client, txo.txid).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_channel_ready_event!(node_b, node_a.node_id()); + + assert!( + node_a.list_balances().total_onchain_balance_sats > onchain_balance_before_sat + 400_000, + "resumed splice-out should have moved ~500k sats to the on-chain balance", + ); + + // The locked splice cleared the intent, so another restart must not resubmit it. + node_a.stop().unwrap(); + let node_a = setup_node(&chain_source, config_a); + node_a.sync_wallets().unwrap(); + node_a.connect(node_b.node_id(), node_b_addr, false).unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + assert!(node_a.next_event().is_none(), "completed splice should not be resubmitted"); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn splice_rbf_resumed_after_restart() { + // Use a custom bitcoind config with a lower incrementalrelayfee so that the +25 sat/kwu + // (0.1 sat/vB) RBF feerate bump satisfies BIP125's absolute fee increase requirement. + let bitcoind_exe = std::env::var("BITCOIND_EXE") + .ok() + .or_else(|| corepc_node::downloaded_exe_path().ok()) + .expect( + "you need to provide an env var BITCOIND_EXE or specify a bitcoind version feature", + ); + let mut bitcoind_conf = corepc_node::Conf::default(); + bitcoind_conf.network = "regtest"; + bitcoind_conf.args.push("-rest"); + bitcoind_conf.args.push("-incrementalrelayfee=0.00000100"); + let bitcoind = BitcoinD::with_conf(bitcoind_exe, &bitcoind_conf).unwrap(); + + let electrs_exe = std::env::var("ELECTRS_EXE") + .ok() + .or_else(electrsd::downloaded_exe_path) + .expect("you need to provide env var ELECTRS_EXE or specify an electrsd version feature"); + let mut electrsd_conf = electrsd::Conf::default(); + electrsd_conf.http_enabled = true; + electrsd_conf.network = "regtest"; + let electrsd = ElectrsD::with_conf(electrs_exe, &bitcoind, &electrsd_conf).unwrap(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + // Set up node_a manually so it can be restarted with the same config. + let mut config_a = random_config(true); + config_a.store_type = TestStoreType::Sqlite; + let config_b = random_config(true); + let node_b = setup_node(&chain_source, config_b); + + let original_txo = { + let node_a = setup_node(&chain_source, config_a.clone()); + + let address_a = node_a.onchain_payment().new_address().unwrap(); + let address_b = node_b.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 5_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a, address_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 4_000_000, false, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); + expect_channel_ready_event!(node_b, node_a.node_id()); + + // Negotiate a splice but leave its transaction unconfirmed so it can be fee-bumped. + node_a.splice_in(&user_channel_id_a, node_b.node_id(), 500_000).unwrap(); + let original_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + wait_for_tx(&electrsd.client, original_txo.txid).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Bump the fee while disconnected and restart before anything could be negotiated: only + // the persisted intent knows about the fee bump, while LDK still has the negotiated + // splice at the original feerate. + node_a.disconnect(node_b.node_id()).unwrap(); + node_a.bump_channel_funding_fee(&user_channel_id_a, node_b.node_id()).unwrap(); + node_a.stop().unwrap(); + original_txo + }; + + // On restart, the reconciler sees that the negotiated splice is still at a lower feerate + // than the persisted fee-bump intent and resubmits the bump. + let node_a = setup_node(&chain_source, config_a.clone()); + node_a.sync_wallets().unwrap(); + let node_b_addr = node_b.listening_addresses().unwrap().first().unwrap().clone(); + node_a.connect(node_b.node_id(), node_b_addr.clone(), false).unwrap(); + + let rbf_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + assert_ne!(original_txo, rbf_txo, "resubmitted RBF should produce a different funding txo"); + + // Restarting again must not resubmit the bump: the negotiated splice now carries it. + node_a.stop().unwrap(); + let node_a = setup_node(&chain_source, config_a); + node_a.sync_wallets().unwrap(); + node_a.connect(node_b.node_id(), node_b_addr, false).unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + assert!(node_a.next_event().is_none(), "carried-out fee bump should not be resubmitted"); + + wait_for_tx(&electrsd.client, rbf_txo.txid).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_channel_ready_event!(node_b, node_a.node_id()); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn bump_fee_rbf_rejects_funding_payment() { // A channel-funding or splice transaction is driven by LDK's funding/splice lifecycle, not the