diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 7e9e61f5d..8198cfb34 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -125,6 +125,8 @@ interface Node { [Throws=NodeError] void splice_out([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, [ByRef]Address address, u64 splice_amount_sats); [Throws=NodeError] + void bump_channel_funding_fee([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); + [Throws=NodeError] void close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); [Throws=NodeError] void force_close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, string? reason); diff --git a/src/builder.rs b/src/builder.rs index 03ded494f..5386bf570 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -64,7 +64,9 @@ use crate::io::utils::{ }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ - self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + self, CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; @@ -79,9 +81,9 @@ use crate::peer_store::PeerStore; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreRef, DynStoreWrapper, - GossipSync, Graph, HRNResolver, KeysManager, MessageRouter, OnionMessenger, PaymentStore, - PeerManager, PendingPaymentStore, SyncAndAsyncKVStore, + AsyncPersister, ChainMonitor, ChannelManager, ChannelRecordStore, DynStore, DynStoreRef, + DynStoreWrapper, GossipSync, Graph, HRNResolver, KeysManager, MessageRouter, OnionMessenger, + PaymentStore, PeerManager, PendingPaymentStore, SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1394,7 +1396,7 @@ fn build_with_store_internal( let kv_store_ref = Arc::clone(&kv_store); let logger_ref = Arc::clone(&logger); - let (payment_store_res, node_metris_res, pending_payment_store_res) = + let (payment_store_res, node_metris_res, pending_payment_store_res, channel_record_store_res) = runtime.block_on(async move { tokio::join!( read_all_objects( @@ -1409,6 +1411,12 @@ fn build_with_store_internal( PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, Arc::clone(&logger_ref), + ), + read_all_objects( + &*kv_store_ref, + CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), ) ) }); @@ -1612,6 +1620,20 @@ fn build_with_store_internal( }, }; + let channel_record_store = match channel_record_store_res { + Ok(channel_records) => Arc::new(ChannelRecordStore::new( + channel_records, + CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read channel record data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + let wallet = Arc::new(Wallet::new( bdk_wallet, wallet_persister, @@ -1624,6 +1646,8 @@ fn build_with_store_internal( Arc::clone(&pending_payment_store), )); + tx_broadcaster.set_wallet(Arc::downgrade(&wallet)); + // Initialize the KeysManager let cur_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).map_err(|e| { log_error!(logger, "Failed to get current time: {}", e); @@ -2169,6 +2193,7 @@ fn build_with_store_internal( scorer, peer_store, payment_store, + channel_record_store, lnurl_auth, is_running, node_metrics, diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 2582f32f6..eeed41a8d 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -568,16 +568,18 @@ impl BitcoindChainSource { Ok(()) } - pub(crate) async fn process_broadcast_package(&self, package: Vec) { + pub(crate) async fn process_broadcast_package( + &self, txs: impl IntoIterator, + ) { // While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28 // features, we should eventually switch to use `submitpackage` via the // `rust-bitcoind-json-rpc` crate rather than just broadcasting individual // transactions. - for tx in &package { + for tx in txs { let txid = tx.compute_txid(); let timeout_fut = tokio::time::timeout( Duration::from_secs(DEFAULT_TX_BROADCAST_TIMEOUT_SECS), - self.api_client.broadcast_transaction(tx), + self.api_client.broadcast_transaction(&tx), ); match timeout_fut.await { Ok(res) => match res { diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index 54e7fff0c..28825b191 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -275,7 +275,9 @@ impl ElectrumChainSource { Ok(()) } - pub(crate) async fn process_broadcast_package(&self, package: Vec) { + pub(crate) async fn process_broadcast_package( + &self, txs: impl IntoIterator, + ) { let electrum_client: Arc = if let Some(client) = self.electrum_runtime_status.read().expect("lock").client().as_ref() { @@ -285,7 +287,7 @@ impl ElectrumChainSource { return; }; - for tx in package { + for tx in txs { electrum_client.broadcast(tx).await; } } diff --git a/src/chain/esplora.rs b/src/chain/esplora.rs index 5825a0984..5f88ad76e 100644 --- a/src/chain/esplora.rs +++ b/src/chain/esplora.rs @@ -352,12 +352,14 @@ impl EsploraChainSource { Ok(()) } - pub(crate) async fn process_broadcast_package(&self, package: Vec) { - for tx in &package { + pub(crate) async fn process_broadcast_package( + &self, txs: impl IntoIterator, + ) { + for tx in txs { let txid = tx.compute_txid(); let timeout_fut = tokio::time::timeout( Duration::from_secs(self.sync_config.timeouts_config.tx_broadcast_timeout_secs), - self.esplora_client.broadcast(tx), + self.esplora_client.broadcast(&tx), ); match timeout_fut.await { Ok(res) => match res { diff --git a/src/chain/mod.rs b/src/chain/mod.rs index cb8541be6..cf0c946ba 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -24,7 +24,7 @@ use crate::config::{ WALLET_SYNC_INTERVAL_MINIMUM_SECS, }; use crate::fee_estimator::OnchainFeeEstimator; -use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger}; +use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; use crate::{Error, NodeMetrics}; @@ -453,15 +453,27 @@ impl ChainSource { return; } Some(next_package) = receiver.recv() => { + let package = match self.tx_broadcaster.classify_package(next_package).await { + Ok(p) => p, + Err(e) => { + log_error!( + tx_bcast_logger, + "Skipping broadcast: failed to persist payment records: {:?}", + e, + ); + continue; + }, + }; + let txs = package.into_iter().map(|(tx, _)| tx); match &self.kind { ChainSourceKind::Esplora(esplora_chain_source) => { - esplora_chain_source.process_broadcast_package(next_package).await + esplora_chain_source.process_broadcast_package(txs).await }, ChainSourceKind::Electrum(electrum_chain_source) => { - electrum_chain_source.process_broadcast_package(next_package).await + electrum_chain_source.process_broadcast_package(txs).await }, ChainSourceKind::Bitcoind(bitcoind_chain_source) => { - bitcoind_chain_source.process_broadcast_package(next_package).await + bitcoind_chain_source.process_broadcast_package(txs).await }, } } diff --git a/src/channel/mod.rs b/src/channel/mod.rs new file mode 100644 index 000000000..3868da385 --- /dev/null +++ b/src/channel/mod.rs @@ -0,0 +1,342 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +//! Per-channel state tracking. + +pub(crate) mod store; + +use std::ops::Deref; +use std::sync::Arc; + +use bitcoin::secp256k1::PublicKey; +use bitcoin::{Amount, FeeRate}; +use lightning::events::NegotiationFailureReason; +use lightning::ln::funding::FundingContribution; +use lightning::ln::types::ChannelId; + +use crate::event::{Event, EventQueue}; +use crate::fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; +use crate::logger::{log_error, log_info, LdkLogger}; +use crate::types::{ChannelManager, ChannelRecordStore, UserChannelId, Wallet}; +use crate::Error; + +pub(crate) use self::store::MAX_SPLICE_ATTEMPTS; +use self::store::{ChannelRecord, ChannelRecordUpdate, SpliceIntent, SpliceKind}; + +/// Resubmits user-initiated splices that LDK dropped before durably recording them. +/// +/// LDK only persists a splice once its negotiation reaches `AwaitingSignatures`, and it abandons +/// an earlier negotiation whenever the peer disconnects (which includes restarting the node). The +/// splice entry points persist a [`SpliceIntent`] before handing the contribution to LDK; this +/// type drives that intent back into [`ChannelManager::funding_contributed`] until the splice +/// either locks (clearing the intent on `ChannelReady`) or fails for a reason that retrying +/// cannot address. +/// +/// Resubmitting does not require the peer to be connected: LDK holds on to the contribution and +/// initiates quiescence once the peer reconnects. +/// +/// [`ChannelManager::funding_contributed`]: lightning::ln::channelmanager::ChannelManager::funding_contributed +pub(crate) struct SpliceRetrier +where + L::Target: LdkLogger, +{ + channel_manager: Arc, + wallet: Arc, + fee_estimator: Arc, + channel_record_store: Arc, + event_queue: Arc>, + logger: L, +} + +impl SpliceRetrier +where + L::Target: LdkLogger, +{ + pub(crate) fn new( + channel_manager: Arc, wallet: Arc, + fee_estimator: Arc, channel_record_store: Arc, + event_queue: Arc>, logger: L, + ) -> Self { + Self { channel_manager, wallet, fee_estimator, channel_record_store, event_queue, logger } + } + + /// Reconciles persisted splice intents against live channel state. Run once at startup. + pub(crate) async fn reconcile(&self) { + let records = self.channel_record_store.list_filter(|r| r.pending_splice().is_some()); + for record in records { + let ChannelRecord::Funded { + user_channel_id, counterparty_node_id, pending_splice, .. + } = record; + let intent = match pending_splice { + Some(intent) => intent, + None => continue, + }; + + let channel = self + .channel_manager + .list_channels_with_counterparty(&counterparty_node_id) + .into_iter() + .find(|c| c.user_channel_id == user_channel_id.0); + let channel = match channel { + Some(channel) => channel, + None => { + // The channel is gone; there is nothing to splice anymore. + let _ = self.channel_record_store.remove(&user_channel_id); + continue; + }, + }; + + if channel.funding_txo != Some(intent.pre_splice_funding_txo) { + // The funding moved on, so the splice (or a replacement) locked. + let _ = self.channel_record_store.remove(&user_channel_id); + continue; + } + + // `splice_channel` is a read-only probe of LDK's splice state. It fails when we + // already have a splice in flight (a held contribution, an in-progress negotiation, + // or one awaiting signatures), all of which LDK drives to completion on its own. + let template = match self + .channel_manager + .splice_channel(&channel.channel_id, &counterparty_node_id) + { + Ok(template) => template, + Err(_) => continue, + }; + + // The template's prior contribution is our last negotiated one. LDK persists a splice + // once negotiated, so its presence means the intent was carried out unless the intent + // was a fee bump at a higher feerate than what was negotiated. + let should_retry = match (&intent.kind, template.prior_contribution()) { + (SpliceKind::Rbf {}, Some(prior)) => { + prior.feerate() < intent.contribution.feerate() + }, + (SpliceKind::Rbf {}, None) => { + // The splice to bump is gone entirely; surface rather than guess. + self.abandon(user_channel_id, channel.channel_id, counterparty_node_id).await; + continue; + }, + (_, Some(_)) => false, + (_, None) => true, + }; + if !should_retry { + continue; + } + + if intent.attempts >= MAX_SPLICE_ATTEMPTS { + self.abandon(user_channel_id, channel.channel_id, counterparty_node_id).await; + continue; + } + + log_info!( + self.logger, + "Resubmitting splice for channel {} with counterparty {}", + channel.channel_id, + counterparty_node_id, + ); + let _ = + self.submit(&channel.channel_id, &counterparty_node_id, user_channel_id, intent); + } + } + + /// Applies a `SpliceNegotiationFailed` event to any matching splice intent, retrying when the + /// failure is recoverable. Returns whether the failure should be surfaced to the user. + pub(crate) async fn on_negotiation_failed( + &self, user_channel_id: UserChannelId, reason: NegotiationFailureReason, + contribution: Option, + ) -> bool { + let record = match self.channel_record_store.get(&user_channel_id) { + Some(record) => record, + None => return true, + }; + let ChannelRecord::Funded { channel_id, counterparty_node_id, pending_splice, .. } = record; + let mut intent = match pending_splice { + Some(intent) => intent, + None => return true, + }; + + // Only act on failures of the splice we are tracking. A mismatch means the failure + // concerns some other attempt (e.g., a stale event replayed after the user initiated a + // new splice), in which case the record must be left alone. + if contribution.as_ref() != Some(&intent.contribution) { + return true; + } + + if intent.attempts >= MAX_SPLICE_ATTEMPTS { + let _ = self.channel_record_store.remove(&user_channel_id); + return true; + } + + match reason { + NegotiationFailureReason::PeerDisconnected => { + // The same contribution remains valid. Skip if LDK already has a splice in + // flight for this channel (e.g., the startup reconciler resubmitted first). + if self.channel_manager.splice_channel(&channel_id, &counterparty_node_id).is_err() + { + return false; + } + log_info!( + self.logger, + "Resubmitting splice for channel {} with counterparty {} after disconnect", + channel_id, + counterparty_node_id, + ); + let _ = self.submit(&channel_id, &counterparty_node_id, user_channel_id, intent); + false + }, + NegotiationFailureReason::FeeRateTooLow + | NegotiationFailureReason::ContributionInvalid => { + // The contribution went stale (e.g., another splice negotiation outpaced ours, + // turning the resubmission into an underpaying fee bump of it). Rebuild a fresh + // contribution from the original call's parameters. + match self + .rebuild_contribution(&channel_id, &counterparty_node_id, &intent.kind) + .await + { + Ok(contribution) => { + log_info!( + self.logger, + "Resubmitting rebuilt splice for channel {} with counterparty {}", + channel_id, + counterparty_node_id, + ); + intent.contribution = contribution; + let _ = self.submit( + &channel_id, + &counterparty_node_id, + user_channel_id, + intent, + ); + false + }, + Err(e) => { + log_error!( + self.logger, + "Abandoning splice for channel {}: failed to rebuild contribution: {:?}", + channel_id, + e, + ); + let _ = self.channel_record_store.remove(&user_channel_id); + true + }, + } + }, + _ => { + // Terminal failure; retrying cannot address it. + let _ = self.channel_record_store.remove(&user_channel_id); + true + }, + } + } + + /// Clears any splice intent made obsolete by a locked funding transaction. + pub(crate) fn on_channel_ready( + &self, user_channel_id: UserChannelId, funding_txo: Option, + ) { + let record = match self.channel_record_store.get(&user_channel_id) { + Some(record) => record, + None => return, + }; + // Only clear an intent predating the locked funding transaction. An intent with a + // matching pre-splice funding outpoint was created after the lock and is still pending. + let clear = match (record.pending_splice(), funding_txo) { + (Some(intent), Some(funding_txo)) => { + intent.pre_splice_funding_txo.into_bitcoin_outpoint() != funding_txo + }, + (Some(_), None) => false, + (None, _) => true, + }; + if clear { + let _ = self.channel_record_store.remove(&user_channel_id); + } + } + + /// Clears any splice intent for a closed channel, as there is nothing left to splice. + pub(crate) fn on_channel_closed(&self, user_channel_id: UserChannelId) { + let _ = self.channel_record_store.remove(&user_channel_id); + } + + /// Persists the incremented attempt count and hands the contribution back to LDK. The count + /// is persisted first so that a crash mid-submission cannot lead to unbounded retries. + fn submit( + &self, channel_id: &ChannelId, counterparty_node_id: &PublicKey, + user_channel_id: UserChannelId, mut intent: SpliceIntent, + ) -> Result<(), Error> { + intent.attempts += 1; + let contribution = intent.contribution.clone(); + let update = ChannelRecordUpdate { user_channel_id, pending_splice: Some(Some(intent)) }; + self.channel_record_store.update(update)?; + + self.channel_manager + .funding_contributed(channel_id, counterparty_node_id, contribution, None) + .map_err(|e| { + log_error!( + self.logger, + "Failed to resubmit splice for channel {} with counterparty {}: {:?}", + channel_id, + counterparty_node_id, + e, + ); + Error::ChannelSplicingFailed + }) + } + + /// Builds a fresh contribution from the parameters of the originating API call, mirroring the + /// corresponding [`Node`] method. + /// + /// [`Node`]: crate::Node + async fn rebuild_contribution( + &self, channel_id: &ChannelId, counterparty_node_id: &PublicKey, kind: &SpliceKind, + ) -> Result { + let template = self + .channel_manager + .splice_channel(channel_id, counterparty_node_id) + .map_err(|_| Error::ChannelSplicingFailed)?; + + let est_feerate = self.fee_estimator.estimate_fee_rate(ConfirmationTarget::ChannelFunding); + let min_feerate = + template.min_rbf_feerate().map_or(est_feerate, |min_rbf| est_feerate.max(min_rbf)); + let max_feerate = FeeRate::from_sat_per_kwu(min_feerate.to_sat_per_kwu() * 3 / 2); + + match kind { + SpliceKind::In { amount_sats } => template + .splice_in( + Amount::from_sat(*amount_sats), + min_feerate, + max_feerate, + Arc::clone(&self.wallet), + ) + .await + .map_err(|_| Error::ChannelSplicingFailed), + SpliceKind::Out { outputs } => template + .splice_out(outputs.clone(), min_feerate, max_feerate) + .map_err(|_| Error::ChannelSplicingFailed), + SpliceKind::Rbf {} => template + .rbf_prior_contribution(None, max_feerate, Arc::clone(&self.wallet)) + .await + .map_err(|_| Error::ChannelSplicingFailed), + } + } + + /// Gives up on a splice intent and surfaces the failure to the user. + async fn abandon( + &self, user_channel_id: UserChannelId, channel_id: ChannelId, + counterparty_node_id: PublicKey, + ) { + log_error!( + self.logger, + "Abandoning splice for channel {} with counterparty {}", + channel_id, + counterparty_node_id, + ); + let _ = self.channel_record_store.remove(&user_channel_id); + let event = + Event::SpliceNegotiationFailed { channel_id, user_channel_id, counterparty_node_id }; + if let Err(e) = self.event_queue.add_event(event).await { + log_error!(self.logger, "Failed to push to event queue: {}", e); + } + } +} diff --git a/src/channel/store.rs b/src/channel/store.rs new file mode 100644 index 000000000..1d4cd3cab --- /dev/null +++ b/src/channel/store.rs @@ -0,0 +1,184 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use bitcoin::secp256k1::PublicKey; +use bitcoin::TxOut; +use lightning::chain::transaction::OutPoint; +use lightning::ln::funding::FundingContribution; +use lightning::ln::types::ChannelId; +use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; + +use crate::data_store::{StorableObject, StorableObjectId, StorableObjectUpdate}; +use crate::hex_utils; +use crate::types::UserChannelId; + +/// The number of times a splice intent is resubmitted before it is abandoned and the failure is +/// surfaced to the user. +pub(crate) const MAX_SPLICE_ATTEMPTS: u8 = 3; + +/// A user-initiated splice that has been handed to LDK but is not yet guaranteed to survive a +/// restart. LDK only persists a splice once its negotiation reaches `AwaitingSignatures`, so until +/// the new funding transaction locks we keep enough state to resubmit the splice ourselves. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct SpliceIntent { + /// The channel's funding outpoint when the splice was initiated. It only changes once a splice + /// locks, so a mismatch with the channel's current funding outpoint means the splice (or a + /// replacement) completed and there is nothing left to resubmit. + pub pre_splice_funding_txo: OutPoint, + /// The contribution handed to [`ChannelManager::funding_contributed`], resubmitted verbatim. + /// + /// [`ChannelManager::funding_contributed`]: lightning::ln::channelmanager::ChannelManager::funding_contributed + pub contribution: FundingContribution, + /// The parameters of the originating API call, used to rebuild a fresh contribution when the + /// stored one has become stale (e.g., its feerate is no longer sufficient). + pub kind: SpliceKind, + /// The number of times the contribution has been resubmitted to LDK after the originating API + /// call handed it off. + pub attempts: u8, +} + +impl_writeable_tlv_based!(SpliceIntent, { + (0, pre_splice_funding_txo, required), + (2, contribution, required), + (4, kind, required), + (6, attempts, required), +}); + +/// The parameters of the API call that initiated a splice. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum SpliceKind { + /// [`Node::splice_in`] with a resolved amount. + /// + /// [`Node::splice_in`]: crate::Node::splice_in + In { amount_sats: u64 }, + /// [`Node::splice_out`] to the given outputs. + /// + /// [`Node::splice_out`]: crate::Node::splice_out + Out { outputs: Vec }, + /// [`Node::bump_channel_funding_fee`] of a pending splice. + /// + /// [`Node::bump_channel_funding_fee`]: crate::Node::bump_channel_funding_fee + Rbf {}, +} + +impl_writeable_tlv_based_enum!(SpliceKind, + (0, In) => { + (0, amount_sats, required), + }, + (2, Out) => { + (0, outputs, required_vec), + }, + (4, Rbf) => {}, +); + +/// Persistent per-channel state tracked by LDK Node, keyed by [`UserChannelId`]. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum ChannelRecord { + /// State for a channel whose funding transaction exists, currently limited to an in-flight + /// splice intent. A record without a pending splice intent is removed from the store. + Funded { + user_channel_id: UserChannelId, + counterparty_node_id: PublicKey, + channel_id: ChannelId, + pending_splice: Option, + }, +} + +impl ChannelRecord { + pub(crate) fn pending_splice(&self) -> Option<&SpliceIntent> { + match self { + ChannelRecord::Funded { pending_splice, .. } => pending_splice.as_ref(), + } + } +} + +impl_writeable_tlv_based_enum!(ChannelRecord, + (0, Funded) => { + (0, user_channel_id, required), + (2, counterparty_node_id, required), + (4, channel_id, required), + (6, pending_splice, option), + }, +); + +impl StorableObjectId for UserChannelId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0.to_be_bytes()) + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct ChannelRecordUpdate { + pub user_channel_id: UserChannelId, + pub pending_splice: Option>, +} + +impl StorableObject for ChannelRecord { + type Id = UserChannelId; + type Update = ChannelRecordUpdate; + + fn id(&self) -> Self::Id { + match self { + ChannelRecord::Funded { user_channel_id, .. } => *user_channel_id, + } + } + + fn update(&mut self, update: Self::Update) -> bool { + let mut updated = false; + match self { + ChannelRecord::Funded { pending_splice, .. } => { + if let Some(new_pending_splice) = update.pending_splice { + if *pending_splice != new_pending_splice { + *pending_splice = new_pending_splice; + updated = true; + } + } + }, + } + updated + } + + fn to_update(&self) -> Self::Update { + match self { + ChannelRecord::Funded { user_channel_id, pending_splice, .. } => Self::Update { + user_channel_id: *user_channel_id, + pending_splice: Some(pending_splice.clone()), + }, + } + } +} + +impl StorableObjectUpdate for ChannelRecordUpdate { + fn id(&self) -> ::Id { + self.user_channel_id + } +} + +#[cfg(test)] +mod tests { + use lightning::util::ser::{Readable, Writeable}; + + use super::*; + + #[test] + fn channel_record_is_serializable() { + let user_channel_id = UserChannelId(42); + let counterparty_node_id = bitcoin::secp256k1::PublicKey::from_slice(&[2u8; 33]).unwrap(); + let channel_id = ChannelId([3u8; 32]); + let record = ChannelRecord::Funded { + user_channel_id, + counterparty_node_id, + channel_id, + pending_splice: None, + }; + + let encoded = record.encode(); + let decoded = ChannelRecord::read(&mut &encoded[..]).unwrap(); + assert_eq!(record, decoded); + assert_eq!(decoded.id(), user_channel_id); + } +} diff --git a/src/event.rs b/src/event.rs index 7d23be99a..beddf53d1 100644 --- a/src/event.rs +++ b/src/event.rs @@ -33,6 +33,7 @@ use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; use lightning_liquidity::lsps2::utils::compute_opening_fee; use lightning_types::payment::{PaymentHash, PaymentPreimage}; +use crate::channel::SpliceRetrier; use crate::config::{may_announce_channel, Config}; use crate::connection::ConnectionManager; use crate::data_store::DataStoreUpdateResult; @@ -283,7 +284,11 @@ pub enum Event { /// The outpoint of the channel's splice funding transaction. new_funding_txo: OutPoint, }, - /// A channel splice negotiation round has failed. + /// A channel splice has failed and is no longer being pursued. + /// + /// Recoverable failures (e.g., a peer disconnecting mid-negotiation) are retried + /// automatically, including across restarts; this event is emitted only once the splice is + /// given up on. SpliceNegotiationFailed { /// The `channel_id` of the channel. channel_id: ChannelId, @@ -543,6 +548,7 @@ where static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, + splice_retrier: Arc>, } impl EventHandler @@ -558,7 +564,8 @@ where payment_store: Arc, peer_store: Arc>, keys_manager: Arc, static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, - runtime: Arc, logger: L, config: Arc, + splice_retrier: Arc>, runtime: Arc, logger: L, + config: Arc, ) -> Self { Self { event_queue, @@ -578,6 +585,7 @@ where static_invoice_store, onion_messenger, om_mailbox, + splice_retrier, } } @@ -1580,12 +1588,26 @@ where ); } + if let Err(e) = + self.wallet.handle_channel_ready(channel_id, funding_txo.map(|txo| txo.txid)) + { + log_error!( + self.logger, + "Failed to graduate funding payment on ChannelReady for channel {}: {:?}", + channel_id, + e, + ); + return Err(ReplayEvent()); + } + if let Some(liquidity_source) = self.liquidity_source.as_ref() { liquidity_source .handle_channel_ready(user_channel_id, &channel_id, &counterparty_node_id) .await; } + self.splice_retrier.on_channel_ready(UserChannelId(user_channel_id), funding_txo); + let event = Event::ChannelReady { channel_id, user_channel_id: UserChannelId(user_channel_id), @@ -1609,6 +1631,18 @@ where } => { log_info!(self.logger, "Channel {} closed due to: {}", channel_id, reason); + if let Err(e) = self.wallet.handle_channel_closed(channel_id) { + log_error!( + self.logger, + "Failed to handle ChannelClosed for channel {}: {:?}", + channel_id, + e, + ); + return Err(ReplayEvent()); + } + + self.splice_retrier.on_channel_closed(UserChannelId(user_channel_id)); + let event = Event::ChannelClosed { channel_id, user_channel_id: UserChannelId(user_channel_id), @@ -1878,15 +1912,26 @@ where channel_id, user_channel_id, counterparty_node_id, - .. + reason, + contribution, } => { log_info!( self.logger, - "Channel {} with counterparty {} splice negotiation failed", + "Channel {} with counterparty {} splice negotiation failed: {:?}", channel_id, counterparty_node_id, + reason, ); + // Only surface failures of splices that are not (or no longer) being retried. + let surface = self + .splice_retrier + .on_negotiation_failed(UserChannelId(user_channel_id), reason, contribution) + .await; + if !surface { + return Ok(()); + } + let event = Event::SpliceNegotiationFailed { channel_id, user_channel_id: UserChannelId(user_channel_id), diff --git a/src/io/mod.rs b/src/io/mod.rs index e16a99975..df974dfe1 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -84,3 +84,7 @@ pub(crate) const STATIC_INVOICE_STORE_PRIMARY_NAMESPACE: &str = "static_invoices /// The pending payment information will be persisted under this prefix. pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "pending_payments"; pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + +/// The per-channel records will be persisted under this prefix. +pub(crate) const CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE: &str = "channel_records"; +pub(crate) const CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/lib.rs b/src/lib.rs index 614be098b..985e090b8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,7 @@ mod balance; mod builder; mod chain; +mod channel; pub mod config; mod connection; mod data_store; @@ -129,6 +130,8 @@ pub use builder::BuildError; #[cfg(not(feature = "uniffi"))] pub use builder::NodeBuilder as Builder; use chain::ChainSource; +use channel::store::{ChannelRecord, SpliceIntent, SpliceKind}; +use channel::SpliceRetrier; use config::{ default_user_config, may_announce_channel, AsyncPaymentsRole, ChannelConfig, Config, LNURL_AUTH_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, @@ -152,6 +155,7 @@ use lightning::ln::chan_utils::FUNDING_TRANSACTION_WITNESS_WEIGHT; use lightning::ln::channel_state::ChannelDetails as LdkChannelDetails; pub use lightning::ln::channel_state::ChannelShutdownState; use lightning::ln::channelmanager::PaymentId; +use lightning::ln::funding::FundingContribution; use lightning::ln::msgs::{BaseMessageHandler, SocketAddress}; use lightning::ln::peer_handler::CustomMessageHandler; use lightning::routing::gossip::NodeAlias; @@ -176,9 +180,9 @@ use peer_store::{PeerInfo, PeerStore}; use runtime::Runtime; pub use tokio; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, - HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, - Wallet, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, ChannelRecordStore, + DynStore, Graph, HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, + Scorer, Sweeper, Wallet, }; pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, SyncAndAsyncKVStore, UserChannelId}; pub use vss_client; @@ -241,6 +245,7 @@ pub struct Node { scorer: Arc>, peer_store: Arc>>, payment_store: Arc, + channel_record_store: Arc, lnurl_auth: Arc, is_running: Arc>, node_metrics: Arc>, @@ -589,6 +594,15 @@ impl Node { None }; + let splice_retrier = Arc::new(SpliceRetrier::new( + Arc::clone(&self.channel_manager), + Arc::clone(&self.wallet), + Arc::clone(&self.fee_estimator), + Arc::clone(&self.channel_record_store), + Arc::clone(&self.event_queue), + Arc::clone(&self.logger), + )); + let event_handler = Arc::new(EventHandler::new( Arc::clone(&self.event_queue), Arc::clone(&self.wallet), @@ -604,11 +618,17 @@ impl Node { static_invoice_store, Arc::clone(&self.onion_messenger), self.om_mailbox.clone(), + Arc::clone(&splice_retrier), Arc::clone(&self.runtime), Arc::clone(&self.logger), Arc::clone(&self.config), )); + // Resubmit any persisted splice intents that LDK dropped before durably recording them. + self.runtime.spawn_background_task(async move { + splice_retrier.reconcile().await; + }); + // Setup background processing let background_persister = Arc::clone(&self.kv_store); let background_event_handler = Arc::clone(&event_handler); @@ -1515,6 +1535,46 @@ impl Node { ) } + /// Persists a splice intent so that the splice can be resubmitted if LDK drops it before + /// durably recording it (e.g., when restarting or disconnecting mid-negotiation). Must be + /// called before handing the contribution to [`ChannelManager::funding_contributed`] so that + /// a crash in between is also covered. + /// + /// [`ChannelManager::funding_contributed`]: lightning::ln::channelmanager::ChannelManager::funding_contributed + fn persist_splice_intent( + &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, + channel_details: &LdkChannelDetails, contribution: FundingContribution, kind: SpliceKind, + ) -> Result<(), Error> { + let pre_splice_funding_txo = channel_details.funding_txo.ok_or_else(|| { + log_error!(self.logger, "Failed to splice channel: channel not yet ready"); + Error::ChannelSplicingFailed + })?; + let record = ChannelRecord::Funded { + user_channel_id: *user_channel_id, + counterparty_node_id, + channel_id: channel_details.channel_id, + pending_splice: Some(SpliceIntent { + pre_splice_funding_txo, + contribution, + kind, + attempts: 0, + }), + }; + self.channel_record_store.insert(record).map(|_| ()) + } + + /// Removes a splice intent if it still holds the given contribution. The guard ensures an + /// intent persisted by a newer call is left alone. + fn clear_splice_intent( + &self, user_channel_id: &UserChannelId, contribution: &FundingContribution, + ) { + if let Some(record) = self.channel_record_store.get(user_channel_id) { + if record.pending_splice().map(|intent| &intent.contribution) == Some(contribution) { + let _ = self.channel_record_store.remove(user_channel_id); + } + } + } + fn splice_in_inner( &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, splice_amount_sats: FundingAmount, @@ -1595,7 +1655,7 @@ impl Node { if funding_template.prior_contribution().is_some() { log_error!( self.logger, - "Failed to splice channel: a prior splice contribution is pending" + "Failed to splice channel: a prior splice contribution is pending; use bump_channel_funding_fee instead" ); return Err(Error::ChannelSplicingFailed); } @@ -1613,6 +1673,15 @@ impl Node { Error::ChannelSplicingFailed })?; + self.persist_splice_intent( + user_channel_id, + counterparty_node_id, + channel_details, + contribution.clone(), + SpliceKind::In { amount_sats: splice_amount_sats }, + )?; + + let intent_contribution = contribution.clone(); self.channel_manager .funding_contributed( &channel_details.channel_id, @@ -1622,6 +1691,7 @@ impl Node { ) .map_err(|e| { log_error!(self.logger, "Failed to splice channel: {:?}", e); + self.clear_splice_intent(user_channel_id, &intent_contribution); Error::ChannelSplicingFailed }) } else { @@ -1641,6 +1711,10 @@ impl Node { /// it. Once negotiation with the counterparty is complete, the channel remains operational /// while waiting for a new funding transaction to confirm. /// + /// The splice is retried automatically, including across restarts, until it either completes + /// or fails for a reason retrying cannot address, at which point + /// [`Event::SpliceNegotiationFailed`] is emitted. + /// /// # Experimental API /// /// This API is experimental. Currently, a splice-in will be marked as an outbound payment, but @@ -1681,6 +1755,10 @@ impl Node { /// it. Once negotiation with the counterparty is complete, the channel remains operational /// while waiting for a new funding transaction to confirm. /// + /// The splice is retried automatically, including across restarts, until it either completes + /// or fails for a reason retrying cannot address, at which point + /// [`Event::SpliceNegotiationFailed`] is emitted. + /// /// # Experimental API /// /// This API is experimental. Currently, a splice-out will be marked as an inbound payment if @@ -1716,7 +1794,7 @@ impl Node { if funding_template.prior_contribution().is_some() { log_error!( self.logger, - "Failed to splice channel: a prior splice contribution is pending" + "Failed to splice channel: a prior splice contribution is pending; use bump_channel_funding_fee instead" ); return Err(Error::ChannelSplicingFailed); } @@ -1725,12 +1803,22 @@ impl Node { value: Amount::from_sat(splice_amount_sats), script_pubkey: address.script_pubkey(), }]; - let contribution = - funding_template.splice_out(outputs, min_feerate, max_feerate).map_err(|e| { + let contribution = funding_template + .splice_out(outputs.clone(), min_feerate, max_feerate) + .map_err(|e| { log_error!(self.logger, "Failed to splice channel: {}", e); Error::ChannelSplicingFailed })?; + self.persist_splice_intent( + user_channel_id, + counterparty_node_id, + channel_details, + contribution.clone(), + SpliceKind::Out { outputs }, + )?; + + let intent_contribution = contribution.clone(); self.channel_manager .funding_contributed( &channel_details.channel_id, @@ -1740,6 +1828,88 @@ impl Node { ) .map_err(|e| { log_error!(self.logger, "Failed to splice channel: {:?}", e); + self.clear_splice_intent(user_channel_id, &intent_contribution); + Error::ChannelSplicingFailed + }) + } else { + log_error!( + self.logger, + "Channel not found for user_channel_id {} and counterparty {}", + user_channel_id, + counterparty_node_id + ); + Err(Error::ChannelSplicingFailed) + } + } + + /// Replace a pending splice's funding transaction with a higher-feerate version. + /// + /// If a prior splice negotiation is pending, this bumps its feerate via RBF. The prior + /// contribution is reused when possible; otherwise, coin selection is re-run. + /// + /// The fee bump is retried automatically, including across restarts, until it either + /// completes or fails for a reason retrying cannot address, at which point + /// [`Event::SpliceNegotiationFailed`] is emitted. + /// + /// # Experimental API + /// + /// This API is experimental and may change in the future. + pub fn bump_channel_funding_fee( + &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, + ) -> Result<(), Error> { + let open_channels = + self.channel_manager.list_channels_with_counterparty(&counterparty_node_id); + if let Some(channel_details) = + open_channels.iter().find(|c| c.user_channel_id == user_channel_id.0) + { + let min_feerate = + self.fee_estimator.estimate_fee_rate(ConfirmationTarget::ChannelFunding); + let max_feerate = FeeRate::from_sat_per_kwu(min_feerate.to_sat_per_kwu() * 3 / 2); + + let funding_template = self + .channel_manager + .splice_channel(&channel_details.channel_id, &counterparty_node_id) + .map_err(|e| { + log_error!(self.logger, "Failed to RBF channel: {:?}", e); + Error::ChannelSplicingFailed + })?; + + if funding_template.min_rbf_feerate().is_none() { + log_error!(self.logger, "Failed to RBF channel: no pending splice to replace"); + return Err(Error::ChannelSplicingFailed); + } + + let contribution = self + .runtime + .block_on(funding_template.rbf_prior_contribution( + None, + max_feerate, + Arc::clone(&self.wallet), + )) + .map_err(|e| { + log_error!(self.logger, "Failed to RBF channel: {}", e); + Error::ChannelSplicingFailed + })?; + + self.persist_splice_intent( + user_channel_id, + counterparty_node_id, + channel_details, + contribution.clone(), + SpliceKind::Rbf {}, + )?; + + let intent_contribution = contribution.clone(); + self.channel_manager + .funding_contributed( + &channel_details.channel_id, + &counterparty_node_id, + contribution, + None, + ) + .map_err(|e| { + log_error!(self.logger, "Failed to RBF channel: {:?}", e); + self.clear_splice_intent(user_channel_id, &intent_contribution); Error::ChannelSplicingFailed }) } else { diff --git a/src/payment/pending_payment_store.rs b/src/payment/pending_payment_store.rs index eb72f89ec..16837d70c 100644 --- a/src/payment/pending_payment_store.rs +++ b/src/payment/pending_payment_store.rs @@ -6,6 +6,7 @@ // accordance with one or both of these licenses. use bitcoin::Txid; +use lightning::chain::chaininterface::FundingCandidate; use lightning::impl_writeable_tlv_based; use lightning::ln::channelmanager::PaymentId; @@ -13,6 +14,19 @@ use crate::data_store::{StorableObject, StorableObjectUpdate}; use crate::payment::store::PaymentDetailsUpdate; use crate::payment::PaymentDetails; +/// Marks an on-chain payment as belonging to an interactive-funding negotiation. The +/// last entry in `candidates` is the currently-broadcast tx; earlier entries are RBF +/// predecessors that may still confirm if reorgs intervene. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct FundingDetails { + /// Every negotiated candidate, oldest first. + pub candidates: Vec, +} + +impl_writeable_tlv_based!(FundingDetails, { + (0, candidates, optional_vec), +}); + /// Represents a pending payment #[derive(Clone, Debug, PartialEq, Eq)] pub struct PendingPaymentDetails { @@ -20,11 +34,24 @@ pub struct PendingPaymentDetails { pub details: PaymentDetails, /// Transaction IDs that have replaced or conflict with this payment. pub conflicting_txids: Vec, + /// Set when the payment's transaction is an interactive-funding broadcast (channel + /// open or splice). The record transitions to [`PaymentStatus::Succeeded`] on + /// `ChannelReady` instead of after [`ANTI_REORG_DELAY`] confirmations. + /// + /// [`PaymentStatus::Succeeded`]: crate::payment::store::PaymentStatus::Succeeded + /// [`ANTI_REORG_DELAY`]: lightning::chain::channelmonitor::ANTI_REORG_DELAY + pub funding_details: Option, } impl PendingPaymentDetails { pub(crate) fn new(details: PaymentDetails, conflicting_txids: Vec) -> Self { - Self { details, conflicting_txids } + Self { details, conflicting_txids, funding_details: None } + } + + pub(crate) fn with_funding_details( + details: PaymentDetails, conflicting_txids: Vec, funding_details: FundingDetails, + ) -> Self { + Self { details, conflicting_txids, funding_details: Some(funding_details) } } /// Convert to finalized payment for the main payment store @@ -36,6 +63,7 @@ impl PendingPaymentDetails { impl_writeable_tlv_based!(PendingPaymentDetails, { (0, details, required), (2, conflicting_txids, optional_vec), + (4, funding_details, option), }); #[derive(Clone, Debug, PartialEq, Eq)] @@ -43,6 +71,7 @@ pub(crate) struct PendingPaymentDetailsUpdate { pub id: PaymentId, pub payment_update: Option, pub conflicting_txids: Option>, + pub funding_details: Option>, } impl StorableObject for PendingPaymentDetails { @@ -68,6 +97,13 @@ impl StorableObject for PendingPaymentDetails { } } + if let Some(new_funding_details) = update.funding_details { + if self.funding_details != new_funding_details { + self.funding_details = new_funding_details; + updated = true; + } + } + updated } @@ -89,6 +125,11 @@ impl From<&PendingPaymentDetails> for PendingPaymentDetailsUpdate { } else { Some(value.conflicting_txids.clone()) }; - Self { id: value.id(), payment_update: Some(value.details.to_update()), conflicting_txids } + Self { + id: value.id(), + payment_update: Some(value.details.to_update()), + conflicting_txids, + funding_details: Some(value.funding_details.clone()), + } } } diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index 7084135b0..1e65fb1aa 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -6,21 +6,35 @@ // accordance with one or both of these licenses. use std::ops::Deref; +use std::sync::{Mutex as StdMutex, Weak}; use bitcoin::Transaction; use lightning::chain::chaininterface::{BroadcasterInterface, TransactionType}; use tokio::sync::{mpsc, Mutex, MutexGuard}; use crate::logger::{log_error, LdkLogger}; +use crate::types::Wallet; +use crate::Error; const BCAST_PACKAGE_QUEUE_SIZE: usize = 50; +/// A package of transactions that LDK handed to the broadcaster in one +/// `broadcast_transactions` call, along with each transaction's type. Queued until the +/// background task classifies and broadcasts it. +pub(crate) type BroadcastPackage = Vec<(Transaction, TransactionType)>; + pub(crate) struct TransactionBroadcaster where L::Target: LdkLogger, { - queue_sender: mpsc::Sender>, - queue_receiver: Mutex>>, + queue_sender: mpsc::Sender, + queue_receiver: Mutex>, + /// Weak handle to the [`Wallet`] that performs classification of funding broadcasts + /// (channel opens and splices) into payment records. Remains `None` while the + /// builder is wiring the node up, during which broadcasts are forwarded to the + /// queue but no payment record is written. [`Self::set_wallet`] installs the handle + /// once the [`Wallet`] exists. + wallet: StdMutex>>, logger: L, } @@ -30,14 +44,48 @@ where { pub(crate) fn new(logger: L) -> Self { let (queue_sender, queue_receiver) = mpsc::channel(BCAST_PACKAGE_QUEUE_SIZE); - Self { queue_sender, queue_receiver: Mutex::new(queue_receiver), logger } + Self { + queue_sender, + queue_receiver: Mutex::new(queue_receiver), + wallet: StdMutex::new(None), + logger, + } + } + + /// Installs the [`Wallet`] handle used to classify funding broadcasts (channel + /// opens and splices) into payment records. Called once the builder has constructed + /// both the broadcaster and the wallet. + pub(crate) fn set_wallet(&self, wallet: Weak) { + *self.wallet.lock().expect("lock") = Some(wallet); } pub(crate) async fn get_broadcast_queue( &self, - ) -> MutexGuard<'_, mpsc::Receiver>> { + ) -> MutexGuard<'_, mpsc::Receiver> { self.queue_receiver.lock().await } + + /// Classifies a queued package into payment records and returns the package ready + /// for the chain client. Returns `Err` if any classification fails; callers must + /// not broadcast the package in that case, since a crash would leave the tx + /// on-chain without a record. + pub(crate) async fn classify_package( + &self, package: BroadcastPackage, + ) -> Result { + let wallet_opt = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade); + if let Some(wallet) = wallet_opt { + tokio::task::spawn_blocking(move || { + for (tx, tx_type) in &package { + wallet.classify_broadcast(tx, tx_type)?; + } + Ok::<_, Error>(package) + }) + .await + .map_err(|_| Error::PersistenceFailed)? + } else { + Ok(package) + } + } } impl BroadcasterInterface for TransactionBroadcaster @@ -45,7 +93,8 @@ where L::Target: LdkLogger, { fn broadcast_transactions(&self, txs: &[(&Transaction, TransactionType)]) { - let package = txs.iter().map(|(t, _)| (*t).clone()).collect::>(); + let package: BroadcastPackage = + txs.iter().map(|(tx, tx_type)| ((*tx).clone(), tx_type.clone())).collect(); self.queue_sender.try_send(package).unwrap_or_else(|e| { log_error!(self.logger, "Failed to broadcast transactions: {}", e); }); diff --git a/src/types.rs b/src/types.rs index 06e65fbd0..ac27fe069 100644 --- a/src/types.rs +++ b/src/types.rs @@ -38,6 +38,7 @@ use lightning_net_tokio::SocketDescriptor; use crate::chain::bitcoind::UtxoSourceClient; use crate::chain::ChainSource; +use crate::channel::store::ChannelRecord; use crate::config::ChannelConfig; use crate::data_store::DataStore; use crate::fee_estimator::OnchainFeeEstimator; @@ -392,7 +393,7 @@ pub(crate) type PaymentStore = DataStore>; /// A local, potentially user-provided, identifier of a channel. /// /// By default, this will be randomly generated for the user to ensure local uniqueness. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct UserChannelId(pub u128); impl Writeable for UserChannelId { @@ -702,3 +703,5 @@ impl From<&(u64, Vec)> for CustomTlvRecord { } pub(crate) type PendingPaymentStore = DataStore>; + +pub(crate) type ChannelRecordStore = DataStore>; diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 13b1f384f..cd8c6ec2d 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -5,6 +5,7 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use std::collections::HashMap; use std::future::Future; use std::ops::Deref; use std::str::FromStr; @@ -32,14 +33,16 @@ use bitcoin::{ WitnessProgram, WitnessVersion, }; use lightning::chain::chaininterface::{ - BroadcasterInterface, INCREMENTAL_RELAY_FEE_SAT_PER_1000_WEIGHT, + BroadcasterInterface, TransactionType, INCREMENTAL_RELAY_FEE_SAT_PER_1000_WEIGHT, }; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::{BlockLocator, ClaimId, Listen}; use lightning::ln::channelmanager::PaymentId; +use lightning::ln::funding::FundingContribution; use lightning::ln::inbound_payment::ExpandedKey; use lightning::ln::msgs::UnsignedGossipMessage; use lightning::ln::script::ShutdownScript; +use lightning::ln::types::ChannelId as LnChannelId; use lightning::sign::{ ChangeDestinationSource, EntropySource, InMemorySigner, KeysManager, NodeSigner, OutputSpender, PeerStorageKey, Recipient, SignerProvider, SpendableOutputDescriptor, @@ -54,6 +57,9 @@ use persist::KVStoreWalletPersister; use crate::config::Config; use crate::fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; +use lightning::chain::chaininterface::{ChannelFunding, FundingCandidate, FundingPurpose}; + +use crate::payment::pending_payment_store::FundingDetails; use crate::payment::store::ConfirmationStatus; use crate::payment::{ PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, PendingPaymentDetails, @@ -250,18 +256,9 @@ impl Wallet { for event in events { match event { WalletEvent::TxConfirmed { txid, tx, block_time, .. } => { - let cur_height = locked_wallet.latest_checkpoint().height(); - let confirmation_height = block_time.block_id.height; - let payment_status = if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 - { - PaymentStatus::Succeeded - } else { - PaymentStatus::Pending - }; - let confirmation_status = ConfirmationStatus::Confirmed { block_hash: block_time.block_id.hash, - height: confirmation_height, + height: block_time.block_id.height, timestamp: block_time.confirmation_time, }; @@ -269,6 +266,23 @@ impl Wallet { .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + if self.apply_funding_details_status_update( + payment_id, + txid, + confirmation_status, + )? { + continue; + } + + let cur_height = locked_wallet.latest_checkpoint().height(); + let confirmation_height = block_time.block_id.height; + let payment_status = if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 + { + PaymentStatus::Succeeded + } else { + PaymentStatus::Pending + }; + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -278,13 +292,12 @@ impl Wallet { confirmation_status, ); - self.payment_store.insert_or_update(payment.clone())?; - if payment_status == PaymentStatus::Pending { let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); - - self.pending_payment_store.insert_or_update(pending_payment)?; + self.persist_pending_payment(pending_payment)?; + } else { + self.payment_store.insert_or_update(payment)?; } }, WalletEvent::ChainTipChanged { new_tip, .. } => { @@ -295,8 +308,11 @@ impl Wallet { "Non-pending payment {:?} found in pending store", p.details.id, ); + // Funding records complete on `ChannelReady`, not after + // `ANTI_REORG_DELAY` confirmations. p.details.status == PaymentStatus::Pending && matches!(p.details.kind, PaymentKind::Onchain { .. }) + && p.funding_details.is_none() }); let mut unconfirmed_outbound_txids: Vec = Vec::new(); @@ -357,6 +373,14 @@ impl Wallet { .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + if self.apply_funding_details_status_update( + payment_id, + txid, + ConfirmationStatus::Unconfirmed, + )? { + continue; + } + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -365,10 +389,8 @@ impl Wallet { PaymentStatus::Pending, ConfirmationStatus::Unconfirmed, ); - let pending_payment = - self.create_pending_payment_from_tx(payment.clone(), Vec::new()); - self.payment_store.insert_or_update(payment)?; - self.pending_payment_store.insert_or_update(pending_payment)?; + let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); + self.persist_pending_payment(pending_payment)?; }, WalletEvent::TxReplaced { txid, conflicts, .. } => { let Some(payment_id) = self.find_payment_by_txid(txid) else { @@ -380,6 +402,18 @@ impl Wallet { continue; }; + // Funding records (channel opens and splices) track their active candidate and + // status through `classify_*` and the Lightning lifecycle handlers. A replaced + // candidate is expected during splice RBF and must not reset the record or drop + // its funding details, so leave such records untouched here. + if self + .pending_payment_store + .get(&payment_id) + .map_or(false, |p| p.funding_details.is_some()) + { + continue; + } + // Collect all conflict txids let mut conflict_txids: Vec = conflicts.iter().map(|(_, conflict_txid)| *conflict_txid).collect(); @@ -404,6 +438,15 @@ impl Wallet { let payment_id = self .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + + if self.apply_funding_details_status_update( + payment_id, + txid, + ConfirmationStatus::Unconfirmed, + )? { + continue; + } + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -412,10 +455,8 @@ impl Wallet { PaymentStatus::Pending, ConfirmationStatus::Unconfirmed, ); - let pending_payment = - self.create_pending_payment_from_tx(payment.clone(), Vec::new()); - self.payment_store.insert_or_update(payment)?; - self.pending_payment_store.insert_or_update(pending_payment)?; + let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); + self.persist_pending_payment(pending_payment)?; }, _ => { continue; @@ -1082,9 +1123,12 @@ impl Wallet { let mut psbt = Psbt::from_unsigned_tx(unsigned_tx).map_err(|e| { log_error!(self.logger, "Failed to construct PSBT: {}", e); })?; + // Use list_output rather than get_utxo to include outputs spent by unconfirmed + // transactions (e.g., a prior splice being replaced via RBF). + let mut wallet_outputs: HashMap = + locked_wallet.list_output().map(|o| (o.outpoint, o)).collect(); for (i, txin) in psbt.unsigned_tx.input.iter().enumerate() { - if let Some(utxo) = locked_wallet.get_utxo(txin.previous_output) { - debug_assert!(!utxo.is_spent); + if let Some(utxo) = wallet_outputs.remove(&txin.previous_output) { psbt.inputs[i] = locked_wallet.get_psbt_input(utxo, None, true).map_err(|e| { log_error!(self.logger, "Failed to construct PSBT input: {}", e); })?; @@ -1142,6 +1186,41 @@ impl Wallet { Ok(tx) } + /// Computes the amount, fee, and direction of an on-chain payment from the + /// wallet's view of the transaction. Used by [`TransactionBroadcaster`] to + /// describe a single-funded channel-open, for which no [`FundingContribution`] + /// is available. + /// + /// [`TransactionBroadcaster`]: crate::tx_broadcaster::TransactionBroadcaster + /// [`FundingContribution`]: lightning::ln::funding::FundingContribution + pub(crate) fn onchain_payment_fields( + &self, tx: &Transaction, + ) -> (Option, Option, PaymentDirection) { + let locked_wallet = self.inner.lock().expect("lock"); + let fee = locked_wallet.calculate_fee(tx).unwrap_or(Amount::ZERO); + let (sent, received) = locked_wallet.sent_and_received(tx); + let fee_sat = fee.to_sat(); + + let (direction, amount_msat) = if sent > received { + ( + PaymentDirection::Outbound, + Some( + (sent.to_sat().saturating_sub(fee_sat).saturating_sub(received.to_sat())) + * 1000, + ), + ) + } else { + ( + PaymentDirection::Inbound, + Some( + received.to_sat().saturating_sub(sent.to_sat().saturating_sub(fee_sat)) * 1000, + ), + ) + }; + + (amount_msat, Some(fee_sat * 1000), direction) + } + fn create_payment_from_tx( &self, locked_wallet: &PersistedWallet, txid: Txid, payment_id: PaymentId, tx: &Transaction, payment_status: PaymentStatus, @@ -1198,6 +1277,223 @@ impl Wallet { PendingPaymentDetails::new(payment, conflicting_txids) } + /// Writes a [`PendingPaymentDetails`] and its inner [`PaymentDetails`] to their + /// respective stores in a fixed order. Callers that need to keep the two stores in + /// sync should always go through this. + fn persist_pending_payment(&self, pending: PendingPaymentDetails) -> Result<(), Error> { + self.payment_store.insert_or_update(pending.details.clone())?; + self.pending_payment_store.insert_or_update(pending)?; + Ok(()) + } + + /// Called on `ChannelReady` to mark a funding payment (channel open or splice) as + /// succeeded. + /// + /// If `funding_txo_txid` matches a candidate other than the currently-active one, + /// that candidate is promoted to active first and the outer [`PaymentDetails`] is + /// updated from its contribution. If no candidate matches (the confirmed funding + /// txid belongs to a broadcast this node didn't contribute to), the pending record + /// is left in place for later handling. + pub(crate) fn handle_channel_ready( + &self, channel_id: LnChannelId, funding_txo_txid: Option, + ) -> Result<(), Error> { + let funding_txo_txid = match funding_txo_txid { + Some(t) => t, + None => return Ok(()), + }; + + let mut pending = match self + .pending_payment_store + .list_filter(|p| { + p.funding_details + .as_ref() + .map(|fd| record_includes_channel(fd, channel_id)) + .unwrap_or(false) + }) + .into_iter() + .next() + { + Some(p) => p, + None => return Ok(()), + }; + let funding_details = match pending.funding_details.clone() { + Some(fd) => fd, + None => return Ok(()), + }; + + let candidate = match funding_details.candidates.iter().find(|c| c.txid == funding_txo_txid) + { + Some(c) => c.clone(), + None => { + // Confirmed `funding_txo` wasn't produced by any of our broadcasts. The + // record is left alone; some higher-level flow decides what to do. + log_debug!( + self.logger, + "ChannelReady for channel {}: confirmed funding_txo {} is not one of our candidates", + channel_id, + funding_txo_txid, + ); + return Ok(()); + }, + }; + + let old_txid = match pending.details.kind { + PaymentKind::Onchain { txid, .. } => txid, + _ => { + debug_assert!(false, "funding record must use PaymentKind::Onchain"); + return Ok(()); + }, + }; + + if old_txid != funding_txo_txid { + if !pending.conflicting_txids.contains(&old_txid) { + pending.conflicting_txids.push(old_txid); + } + pending.conflicting_txids.retain(|t| *t != funding_txo_txid); + + let aggregate = aggregate_local_stakes(&candidate); + pending.details.amount_msat = aggregate.amount_msat; + pending.details.fee_paid_msat = aggregate.fee_paid_msat; + } + + // Preserve the confirmation status already on the record (set by wallet sync if + // it's seen the tx confirm). `ChannelReady` alone doesn't carry block details. + let existing_status = match pending.details.kind { + PaymentKind::Onchain { status, .. } => status, + _ => ConfirmationStatus::Unconfirmed, + }; + pending.details.kind = + PaymentKind::Onchain { txid: funding_txo_txid, status: existing_status }; + + pending.details.status = PaymentStatus::Succeeded; + let payment_id = pending.details.id; + self.payment_store.insert_or_update(pending.details)?; + self.pending_payment_store.remove(&payment_id)?; + + Ok(()) + } + + /// Called on `ChannelClosed`. Removes any funding record (channel open or splice) + /// for `channel_id` whose candidates never reached confirmed — e.g. a funding + /// transaction that never made it on-chain. A record that does reflect a confirmed + /// transaction is left alone and will transition to `Succeeded` normally. + pub(crate) fn handle_channel_closed(&self, channel_id: LnChannelId) -> Result<(), Error> { + let pending = match self + .pending_payment_store + .list_filter(|p| { + p.funding_details + .as_ref() + .map(|fd| record_includes_channel(fd, channel_id)) + .unwrap_or(false) + }) + .into_iter() + .next() + { + Some(p) => p, + None => return Ok(()), + }; + + let is_confirmed = matches!( + pending.details.kind, + PaymentKind::Onchain { status: ConfirmationStatus::Confirmed { .. }, .. } + ); + if is_confirmed { + return Ok(()); + } + + let payment_id = pending.details.id; + self.pending_payment_store.remove(&payment_id)?; + self.payment_store.remove(&payment_id)?; + Ok(()) + } + + /// Updates a funding record's `kind` in response to a wallet-sync event, swapping + /// the active candidate when `event_txid` differs from the current one. + /// + /// Amount, fee, and direction are not recomputed from the wallet's view: they were + /// set at broadcast time from the `FundingContribution` and must persist until + /// `ChannelReady`. + /// + /// Returns `true` when a funding record was updated (so the caller skips the + /// default Onchain create/update path), `false` otherwise. + fn apply_funding_details_status_update( + &self, payment_id: PaymentId, event_txid: Txid, confirmation_status: ConfirmationStatus, + ) -> Result { + // `ChannelReady` may move the payment to the main store before wallet sync + // sees the tx confirm. In that case, update `kind` directly; recomputing from + // the wallet's view would overwrite the per-node fee set at broadcast time. + if let Some(mut existing) = self.payment_store.get(&payment_id) { + if existing.status == PaymentStatus::Succeeded + && matches!(existing.kind, PaymentKind::Onchain { .. }) + && self.pending_payment_store.get(&payment_id).is_none() + { + let needs_update = match existing.kind { + PaymentKind::Onchain { txid, status } => { + txid != event_txid || status != confirmation_status + }, + _ => false, + }; + if needs_update { + existing.kind = + PaymentKind::Onchain { txid: event_txid, status: confirmation_status }; + self.payment_store.insert_or_update(existing)?; + } + return Ok(true); + } + } + + let mut pending = match self.pending_payment_store.get(&payment_id) { + Some(p) => p, + None => return Ok(false), + }; + let funding_details = match pending.funding_details.as_ref() { + Some(fd) => fd, + None => return Ok(false), + }; + + let candidate = match funding_details.candidates.iter().find(|c| c.txid == event_txid) { + Some(c) => c.clone(), + None => { + log_debug!( + self.logger, + "Event txid {} resolved to funding_details payment {} but is not in candidates", + event_txid, + payment_id, + ); + return Ok(false); + }, + }; + + let old_txid = match pending.details.kind { + PaymentKind::Onchain { txid, .. } => txid, + _ => { + debug_assert!(false, "funding_details record must use PaymentKind::Onchain"); + return Ok(false); + }, + }; + + if old_txid != event_txid { + // A different candidate confirmed. Move the previous active txid onto + // `conflicting_txids` and re-derive amount/fee from the new candidate's + // contributions. + if !pending.conflicting_txids.contains(&old_txid) { + pending.conflicting_txids.push(old_txid); + } + pending.conflicting_txids.retain(|t| *t != event_txid); + + let aggregate = aggregate_local_stakes(&candidate); + pending.details.amount_msat = aggregate.amount_msat; + pending.details.fee_paid_msat = aggregate.fee_paid_msat; + } + + pending.details.kind = + PaymentKind::Onchain { txid: event_txid, status: confirmation_status }; + + self.persist_pending_payment(pending)?; + + Ok(true) + } + fn find_payment_by_txid(&self, target_txid: Txid) -> Option { let direct_payment_id = PaymentId(target_txid.to_byte_array()); if self.pending_payment_store.contains_key(&direct_payment_id) { @@ -1209,12 +1505,28 @@ impl Wallet { .list_filter(|p| { matches!(p.details.kind, PaymentKind::Onchain { txid, .. } if txid == target_txid) || p.conflicting_txids.contains(&target_txid) + || p.funding_details + .as_ref() + .map(|fd| fd.candidates.iter().any(|c| c.txid == target_txid)) + .unwrap_or(false) }) .first() { return Some(replaced_details.details.id); } + // Once moved to the main store, a funding payment is still matched by its + // confirmed txid so late wallet events resolve correctly. + if let Some(p) = self + .payment_store + .list_filter( + |p| matches!(p.kind, PaymentKind::Onchain { txid, .. } if txid == target_txid), + ) + .first() + { + return Some(p.id); + } + None } @@ -1227,6 +1539,23 @@ impl Wallet { Error::InvalidPaymentId })?; + // Funding transactions (channel opens and splices) are driven by LDK's funding/splice + // lifecycle, not the on-chain wallet. Replacing one via on-chain RBF would broadcast a + // transaction LDK isn't tracking (and, for splices, can't sign). Fee-bumping a pending + // splice goes through `bump_channel_funding_fee` instead. + if self + .pending_payment_store + .get(&payment_id) + .map_or(false, |p| p.funding_details.is_some()) + { + log_error!( + self.logger, + "Cannot RBF funding payment {} via bump_fee_rbf; use bump_channel_funding_fee instead", + payment_id, + ); + return Err(Error::InvalidPaymentId); + } + if let PaymentKind::Onchain { status, .. } = &payment.kind { match status { ConfirmationStatus::Confirmed { .. } => { @@ -1413,16 +1742,235 @@ impl Wallet { ConfirmationStatus::Unconfirmed, ); - let pending_payment_store = - self.create_pending_payment_from_tx(new_payment.clone(), Vec::new()); - - self.pending_payment_store.insert_or_update(pending_payment_store)?; - self.payment_store.insert_or_update(new_payment)?; + let pending_payment = self.create_pending_payment_from_tx(new_payment, Vec::new()); + self.persist_pending_payment(pending_payment)?; log_info!(self.logger, "RBF successful: replaced {} with {}", txid, new_txid); Ok(new_txid) } + + pub(crate) fn classify_broadcast( + &self, tx: &Transaction, tx_type: &TransactionType, + ) -> Result<(), Error> { + match tx_type { + TransactionType::Funding { channels } => self.classify_funding(tx, channels), + TransactionType::InteractiveFunding { candidates } => { + self.classify_interactive_funding(tx, candidates) + }, + _ => Ok(()), + } + } + + fn classify_funding( + &self, tx: &Transaction, channels: &[(PublicKey, LnChannelId)], + ) -> Result<(), Error> { + // Batch funding (one transaction funding multiple channels) isn't supported; let + // wallet sync record the payment normally so graduation still runs through + // ANTI_REORG_DELAY. + if channels.len() != 1 { + if channels.len() > 1 { + log_trace!( + self.logger, + "Skipping funding classification for batched broadcast ({} channels)", + channels.len() + ); + } + return Ok(()); + } + + let (counterparty_node_id, channel_id) = channels[0]; + let txid = tx.compute_txid(); + let (amount_msat, fee_paid_msat, direction) = self.onchain_payment_fields(tx); + + let candidate = FundingCandidate { + txid, + channels: vec![ChannelFunding { + counterparty_node_id, + channel_id, + purpose: FundingPurpose::Establishment, + contribution: None, + }], + }; + + let details = PaymentDetails::new( + PaymentId(txid.to_byte_array()), + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed }, + amount_msat, + fee_paid_msat, + direction, + PaymentStatus::Pending, + ); + + let funding_details = FundingDetails { candidates: vec![candidate] }; + + let pending = + PendingPaymentDetails::with_funding_details(details, Vec::new(), funding_details); + + self.persist_pending_payment(pending)?; + log_debug!( + self.logger, + "Recorded channel-funding broadcast {} for channel {}", + txid, + channel_id, + ); + Ok(()) + } + + fn classify_interactive_funding( + &self, tx: &Transaction, candidates: &[FundingCandidate], + ) -> Result<(), Error> { + // `InteractiveFunding` carries the full negotiated history. The currently-broadcast + // candidate is the last entry; earlier entries are RBF predecessors. + let active = match candidates.last() { + Some(c) => c, + None => return Ok(()), + }; + let first = match candidates.first() { + Some(c) => c, + None => return Ok(()), + }; + + let txid = tx.compute_txid(); + debug_assert_eq!(active.txid, txid, "broadcast tx must match the active candidate"); + + // Aggregate amount/fee/direction across this candidate's channels by summing the + // local-stake contributions. If we didn't contribute on this candidate, leave the + // record to wallet sync — there's nothing for us to track here, and any wallet- + // visible activity (e.g. a counterparty's splice-out paid to our address) is + // better surfaced as a plain on-chain receive. + let aggregate = aggregate_local_stakes(active); + let amount_msat = match aggregate.amount_msat { + Some(amt) => Some(amt), + None => { + log_trace!( + self.logger, + "Skipping interactive-funding broadcast {}: no local contribution", + txid, + ); + return Ok(()); + }, + }; + let fee_paid_msat = aggregate.fee_paid_msat; + let direction = aggregate.direction; + + // Skip broadcasts that don't move funds in or out of our on-chain wallet — e.g. + // a splice-out we initiated toward an external address. + let (wallet_amount_msat, _wallet_fee_msat, _wallet_direction) = + self.onchain_payment_fields(tx); + if wallet_amount_msat == Some(0) { + log_trace!( + self.logger, + "Skipping interactive-funding broadcast {}: no wallet-level activity", + txid, + ); + return Ok(()); + } + + // Anchor the PaymentId to the first negotiated candidate so the record stays + // stable across RBF replacements. + let payment_id = PaymentId(first.txid.to_byte_array()); + let candidate_count = candidates.len(); + let active_channel_count = active.channels.len(); + + let details = PaymentDetails::new( + payment_id, + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed }, + amount_msat, + fee_paid_msat, + direction, + PaymentStatus::Pending, + ); + + // Funding records carry their own RBF history in `candidates`; lookup by txid + // (find_payment_by_txid) already searches that, so no separate + // `conflicting_txids` Vec is needed. + let funding_details = FundingDetails { candidates: candidates.to_vec() }; + + let pending = + PendingPaymentDetails::with_funding_details(details, Vec::new(), funding_details); + + self.persist_pending_payment(pending)?; + log_debug!( + self.logger, + "Recorded interactive-funding broadcast {} ({} candidates, {} channels)", + txid, + candidate_count, + active_channel_count, + ); + Ok(()) + } +} + +/// Returns this node's share of the on-chain fee for a funding transaction (channel +/// open or splice), in millisatoshis. Sourced from the contribution's +/// [`FundingContribution::estimated_fee`], which upstream computes per-contributor. +fn our_actual_fee_msat(contribution: &FundingContribution) -> u64 { + contribution.estimated_fee().to_sat() * 1000 +} + +fn record_includes_channel(details: &FundingDetails, channel_id: LnChannelId) -> bool { + details.candidates.iter().any(|c| c.channels.iter().any(|ch| ch.channel_id == channel_id)) +} + +struct LocalStakeAggregate { + amount_msat: Option, + fee_paid_msat: Option, + direction: PaymentDirection, +} + +/// Aggregates local-stake amount/fee/direction across the channels of a single +/// [`FundingCandidate`]. Each channel's contribution (when present) is treated as +/// local-stake-only, so contributions across channels are summed without +/// double-counting. +fn aggregate_local_stakes(candidate: &FundingCandidate) -> LocalStakeAggregate { + let mut amount_outbound: u64 = 0; + let mut amount_inbound: u64 = 0; + let mut fee: u64 = 0; + let mut have_contribution = false; + for channel in &candidate.channels { + if let Some(c) = channel.contribution.as_ref() { + have_contribution = true; + fee = fee.saturating_add(our_actual_fee_msat(c)); + match contribution_direction(c) { + Some((PaymentDirection::Outbound, amt)) => { + amount_outbound = amount_outbound.saturating_add(amt); + }, + Some((PaymentDirection::Inbound, amt)) => { + amount_inbound = amount_inbound.saturating_add(amt); + }, + None => {}, + } + } + } + if !have_contribution { + return LocalStakeAggregate { + amount_msat: None, + fee_paid_msat: None, + direction: PaymentDirection::Outbound, + }; + } + let (direction, amount_msat) = if amount_outbound >= amount_inbound { + (PaymentDirection::Outbound, amount_outbound.saturating_sub(amount_inbound)) + } else { + (PaymentDirection::Inbound, amount_inbound.saturating_sub(amount_outbound)) + }; + LocalStakeAggregate { amount_msat: Some(amount_msat), fee_paid_msat: Some(fee), direction } +} + +/// Returns this contribution's direction and magnitude in msat, or `None` if it can't +/// be classified as a single inbound or outbound payment. +fn contribution_direction(contribution: &FundingContribution) -> Option<(PaymentDirection, u64)> { + let value_added = contribution.value_added(); + let outputs_total: Amount = contribution.outputs().iter().map(|o| o.value).sum(); + + if value_added > Amount::ZERO && outputs_total == Amount::ZERO { + Some((PaymentDirection::Outbound, value_added.to_sat() * 1000)) + } else if value_added == Amount::ZERO && outputs_total > Amount::ZERO { + Some((PaymentDirection::Inbound, outputs_total.to_sat() * 1000)) + } else { + None + } } impl Listen for Wallet { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 1ea6c4584..91d2dd1ba 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -26,7 +26,7 @@ use common::{ setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, splice_in_with_all, wait_for_tx, TestChainSource, TestConfig, TestStoreType, TestSyncStore, }; -use electrsd::corepc_node::Node as BitcoinD; +use electrsd::corepc_node::{self, Node as BitcoinD}; use electrsd::ElectrsD; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; @@ -1142,6 +1142,433 @@ async fn splice_channel() { ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn rbf_splice_channel() { + // Use a custom bitcoind config with a lower incrementalrelayfee so that the +25 sat/kwu + // (0.1 sat/vB) RBF feerate bump satisfies BIP125's absolute fee increase requirement. + let bitcoind_exe = std::env::var("BITCOIND_EXE") + .ok() + .or_else(|| corepc_node::downloaded_exe_path().ok()) + .expect( + "you need to provide an env var BITCOIND_EXE or specify a bitcoind version feature", + ); + let mut bitcoind_conf = corepc_node::Conf::default(); + bitcoind_conf.network = "regtest"; + bitcoind_conf.args.push("-rest"); + bitcoind_conf.args.push("-incrementalrelayfee=0.00000100"); + let bitcoind = BitcoinD::with_conf(bitcoind_exe, &bitcoind_conf).unwrap(); + + let electrs_exe = std::env::var("ELECTRS_EXE") + .ok() + .or_else(electrsd::downloaded_exe_path) + .expect("you need to provide env var ELECTRS_EXE or specify an electrsd version feature"); + let mut electrsd_conf = electrsd::Conf::default(); + electrsd_conf.http_enabled = true; + electrsd_conf.network = "regtest"; + let electrsd = ElectrsD::with_conf(electrs_exe, &bitcoind, &electrsd_conf).unwrap(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + + let address_a = node_a.onchain_payment().new_address().unwrap(); + let address_b = node_b.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 5_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a, address_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 4_000_000, false, &electrsd).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); + let user_channel_id_b = expect_channel_ready_event!(node_b, node_a.node_id()); + + // bump_channel_funding_fee should fail when there's no pending splice + assert_eq!( + node_b.bump_channel_funding_fee(&user_channel_id_b, node_a.node_id()), + Err(NodeError::ChannelSplicingFailed), + ); + + // Initiate a splice-in to create a pending splice + node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000).unwrap(); + + let original_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + + // Sync so the original splice candidate is recorded as a canonical wallet transaction before + // the RBF below replaces it. This makes the post-RBF sync observe the original candidate being + // replaced (a `WalletEvent::TxReplaced`), which must not drop the payment's funding details. + // + // This is a best-effort regression guard rather than a deterministic one: with the + // funding-details-preservation fix in place the splice still graduates correctly, but without + // it the resulting inconsistency only surfaces intermittently (via a timing-dependent + // `debug_assert` in the chain-tip handler), so a reverted fix is caught probabilistically. + // + // TODO: Make this deterministic. If funding payments carried a durable classification in the + // main payment store (e.g. a `tx_type` on `PaymentKind::Onchain`, as in + // lightningdevkit/ldk-node#791), a dropped funding-details record would be a detectable + // contradiction on `ChannelReady` rather than a timing-dependent assert, letting this test + // fail reliably whenever the fix is reverted. + wait_for_tx(&electrsd.client, original_txo.txid).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // splice_in should fail when there's a pending splice (RBF guard) + assert_eq!( + node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000), + Err(NodeError::ChannelSplicingFailed), + ); + + // splice_out should fail when there's a pending splice (RBF guard) + let address = node_a.onchain_payment().new_address().unwrap(); + assert_eq!( + node_a.splice_out(&user_channel_id_a, node_b.node_id(), &address, 100_000), + Err(NodeError::ChannelSplicingFailed), + ); + + // bump_channel_funding_fee should succeed when there's a pending splice + node_b.bump_channel_funding_fee(&user_channel_id_b, node_a.node_id()).unwrap(); + + let rbf_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + + assert_ne!(original_txo, rbf_txo, "RBF should produce a different funding txo"); + + // After RBF but before confirmation, node_b (the initiator) should have a single + // on-chain payment covering both candidates: id anchored to the first broadcast, + // `kind.txid` pointing at the latest (RBF) candidate, and the original candidate + // recorded as a replaced one on the pending record. + { + let payment_id = PaymentId(original_txo.txid.to_byte_array()); + let payment = node_b.payment(&payment_id).expect("splice payment exists"); + match payment.kind { + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed } => { + assert_eq!(txid, rbf_txo.txid); + }, + ref other => panic!("expected Onchain Unconfirmed, got {:?}", other), + } + assert_eq!(payment.status, PaymentStatus::Pending); + // Only one Onchain Pending payment for this splice attempt (not one per candidate). + let splice_payments = node_b.list_payments_with_filter(|p| { + p.direction == PaymentDirection::Outbound + && matches!(p.kind, PaymentKind::Onchain { .. }) + && p.status == PaymentStatus::Pending + }); + assert_eq!( + splice_payments.len(), + 1, + "expected exactly one pending Onchain payment for the splice, got {}: {:#?}", + splice_payments.len(), + splice_payments, + ); + } + + // Wait for the RBF transaction to replace the original in the mempool + wait_for_tx(&electrsd.client, rbf_txo.txid).await; + + // Mine blocks and confirm the RBF splice + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Verify the RBF transaction is the one that locked, not the original + match node_a.next_event_async().await { + Event::ChannelReady { funding_txo, counterparty_node_id, .. } => { + assert_eq!(counterparty_node_id, Some(node_b.node_id())); + assert_eq!(funding_txo, Some(rbf_txo)); + node_a.event_handled().unwrap(); + }, + ref e => panic!("node_a got unexpected event: {:?}", e), + } + match node_b.next_event_async().await { + Event::ChannelReady { funding_txo, counterparty_node_id, .. } => { + assert_eq!(counterparty_node_id, Some(node_a.node_id())); + assert_eq!(funding_txo, Some(rbf_txo)); + node_b.event_handled().unwrap(); + }, + ref e => panic!("node_b got unexpected event: {:?}", e), + } + + // After `ChannelReady` we should have graduated to `Succeeded` — even though + // `ANTI_REORG_DELAY` may not have elapsed yet — and the `kind.txid` should + // reflect the winning RBF candidate, with `fee_paid_msat` matching our + // per-node `FundingContribution::estimated_fee` for that candidate. + { + let payment_id = PaymentId(original_txo.txid.to_byte_array()); + let payment = node_b.payment(&payment_id).expect("splice payment graduated"); + assert_eq!(payment.status, PaymentStatus::Succeeded); + match payment.kind { + PaymentKind::Onchain { txid, status: ConfirmationStatus::Confirmed { .. } } => { + assert_eq!(txid, rbf_txo.txid); + }, + ref other => panic!("expected Onchain Confirmed, got {:?}", other), + } + assert!( + payment.fee_paid_msat.is_some(), + "splice payment should carry a fee from its FundingContribution", + ); + } + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn splice_resumed_after_restart() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + // Set up node_a manually so it can be restarted with the same config. + let mut config_a = random_config(true); + config_a.store_type = TestStoreType::Sqlite; + let config_b = random_config(true); + let node_b = setup_node(&chain_source, config_b); + + let onchain_balance_before_sat = { + let node_a = setup_node(&chain_source, config_a.clone()); + + let address_a = node_a.onchain_payment().new_address().unwrap(); + let address_b = node_b.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 5_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a, address_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 4_000_000, false, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); + expect_channel_ready_event!(node_b, node_a.node_id()); + + // Initiate a splice-out while disconnected: LDK accepts the contribution but cannot make + // progress before the restart below drops it, having neither negotiated nor persisted + // anything. Only the persisted splice intent allows resuming the splice. + node_a.disconnect(node_b.node_id()).unwrap(); + let address = node_a.onchain_payment().new_address().unwrap(); + node_a.splice_out(&user_channel_id_a, node_b.node_id(), &address, 500_000).unwrap(); + + let onchain_balance_before_sat = node_a.list_balances().total_onchain_balance_sats; + node_a.stop().unwrap(); + onchain_balance_before_sat + }; + + // On restart, the reconciler resubmits the splice, which proceeds once the peer connects. + let node_a = setup_node(&chain_source, config_a.clone()); + node_a.sync_wallets().unwrap(); + let node_b_addr = node_b.listening_addresses().unwrap().first().unwrap().clone(); + node_a.connect(node_b.node_id(), node_b_addr.clone(), false).unwrap(); + + let txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + + wait_for_tx(&electrsd.client, txo.txid).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_channel_ready_event!(node_b, node_a.node_id()); + + assert!( + node_a.list_balances().total_onchain_balance_sats > onchain_balance_before_sat + 400_000, + "resumed splice-out should have moved ~500k sats to the on-chain balance", + ); + + // The locked splice cleared the intent, so another restart must not resubmit it. + node_a.stop().unwrap(); + let node_a = setup_node(&chain_source, config_a); + node_a.sync_wallets().unwrap(); + node_a.connect(node_b.node_id(), node_b_addr, false).unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + assert!(node_a.next_event().is_none(), "completed splice should not be resubmitted"); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn splice_rbf_resumed_after_restart() { + // Use a custom bitcoind config with a lower incrementalrelayfee so that the +25 sat/kwu + // (0.1 sat/vB) RBF feerate bump satisfies BIP125's absolute fee increase requirement. + let bitcoind_exe = std::env::var("BITCOIND_EXE") + .ok() + .or_else(|| corepc_node::downloaded_exe_path().ok()) + .expect( + "you need to provide an env var BITCOIND_EXE or specify a bitcoind version feature", + ); + let mut bitcoind_conf = corepc_node::Conf::default(); + bitcoind_conf.network = "regtest"; + bitcoind_conf.args.push("-rest"); + bitcoind_conf.args.push("-incrementalrelayfee=0.00000100"); + let bitcoind = BitcoinD::with_conf(bitcoind_exe, &bitcoind_conf).unwrap(); + + let electrs_exe = std::env::var("ELECTRS_EXE") + .ok() + .or_else(electrsd::downloaded_exe_path) + .expect("you need to provide env var ELECTRS_EXE or specify an electrsd version feature"); + let mut electrsd_conf = electrsd::Conf::default(); + electrsd_conf.http_enabled = true; + electrsd_conf.network = "regtest"; + let electrsd = ElectrsD::with_conf(electrs_exe, &bitcoind, &electrsd_conf).unwrap(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + // Set up node_a manually so it can be restarted with the same config. + let mut config_a = random_config(true); + config_a.store_type = TestStoreType::Sqlite; + let config_b = random_config(true); + let node_b = setup_node(&chain_source, config_b); + + let original_txo = { + let node_a = setup_node(&chain_source, config_a.clone()); + + let address_a = node_a.onchain_payment().new_address().unwrap(); + let address_b = node_b.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 5_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a, address_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 4_000_000, false, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); + expect_channel_ready_event!(node_b, node_a.node_id()); + + // Negotiate a splice but leave its transaction unconfirmed so it can be fee-bumped. + node_a.splice_in(&user_channel_id_a, node_b.node_id(), 500_000).unwrap(); + let original_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + wait_for_tx(&electrsd.client, original_txo.txid).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Bump the fee while disconnected and restart before anything could be negotiated: only + // the persisted intent knows about the fee bump, while LDK still has the negotiated + // splice at the original feerate. + node_a.disconnect(node_b.node_id()).unwrap(); + node_a.bump_channel_funding_fee(&user_channel_id_a, node_b.node_id()).unwrap(); + node_a.stop().unwrap(); + original_txo + }; + + // On restart, the reconciler sees that the negotiated splice is still at a lower feerate + // than the persisted fee-bump intent and resubmits the bump. + let node_a = setup_node(&chain_source, config_a.clone()); + node_a.sync_wallets().unwrap(); + let node_b_addr = node_b.listening_addresses().unwrap().first().unwrap().clone(); + node_a.connect(node_b.node_id(), node_b_addr.clone(), false).unwrap(); + + let rbf_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + assert_ne!(original_txo, rbf_txo, "resubmitted RBF should produce a different funding txo"); + + // Restarting again must not resubmit the bump: the negotiated splice now carries it. + node_a.stop().unwrap(); + let node_a = setup_node(&chain_source, config_a); + node_a.sync_wallets().unwrap(); + node_a.connect(node_b.node_id(), node_b_addr, false).unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + assert!(node_a.next_event().is_none(), "carried-out fee bump should not be resubmitted"); + + wait_for_tx(&electrsd.client, rbf_txo.txid).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_channel_ready_event!(node_b, node_a.node_id()); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn bump_fee_rbf_rejects_funding_payment() { + // A channel-funding or splice transaction is driven by LDK's funding/splice lifecycle, not the + // on-chain wallet. `bump_fee_rbf` must reject such payments — replacing the funding transaction + // via plain wallet RBF would broadcast a transaction LDK isn't tracking (and, for splices, + // can't even sign). Fee-bumping a pending splice goes through `bump_channel_funding_fee` instead. + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + + let address_a = node_a.onchain_payment().new_address().unwrap(); + let address_b = node_b.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 5_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a, address_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 4_000_000, false, &electrsd).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let _user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); + let user_channel_id_b = expect_channel_ready_event!(node_b, node_a.node_id()); + + // Splice-in to create a pending splice payment. + node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000).unwrap(); + + let txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + + // Make node_b's wallet aware of the splice transaction so `bump_fee_rbf` reaches its funding + // guard rather than failing earlier for a transaction it can't find. + wait_for_tx(&electrsd.client, txo.txid).await; + node_b.sync_wallets().unwrap(); + + // The splice payment is an on-chain, outbound, unconfirmed record, so it passes + // `bump_fee_rbf`'s other guards; it must nonetheless be rejected as a funding payment. + let splice_payment_id = PaymentId(txo.txid.to_byte_array()); + assert_eq!( + node_b.onchain_payment().bump_fee_rbf(splice_payment_id, None), + Err(NodeError::InvalidPaymentId), + ); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn simple_bolt12_send_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();