diff --git a/src/builder.rs b/src/builder.rs index c88c867cc..1f1af76d9 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -57,6 +57,7 @@ use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; +use crate::io::tier_store::TierStore; use crate::io::utils::{ open_or_migrate_fs_store, read_all_objects, read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, read_node_metrics, @@ -154,6 +155,12 @@ impl std::fmt::Debug for LogWriterConfig { } } +#[derive(Default, Debug)] +struct TierStoreConfig { + ephemeral_storage_dir_path: Option, + backup_storage_dir_path: Option, +} + /// An error encountered during building a [`Node`]. /// /// [`Node`]: crate::Node @@ -200,6 +207,7 @@ pub enum BuildError { AsyncPaymentsConfigMismatch, /// An attempt to setup a DNS Resolver failed. DNSResolverSetupFailed, + } impl fmt::Display for BuildError { @@ -237,6 +245,7 @@ impl fmt::Display for BuildError { Self::DNSResolverSetupFailed => { write!(f, "An attempt to setup a DNS resolver has failed.") }, + } } } @@ -289,6 +298,7 @@ pub struct NodeBuilder { liquidity_source_config: Option, log_writer_config: Option, async_payments_role: Option, + tier_store_config: Option, runtime_handle: Option, pathfinding_scores_sync_config: Option, recovery_mode: bool, @@ -307,6 +317,7 @@ impl NodeBuilder { let gossip_source_config = None; let liquidity_source_config = None; let log_writer_config = None; + let tier_store_config = None; let runtime_handle = None; let pathfinding_scores_sync_config = None; let recovery_mode = false; @@ -316,6 +327,7 @@ impl NodeBuilder { gossip_source_config, liquidity_source_config, log_writer_config, + tier_store_config, runtime_handle, async_payments_role: None, pathfinding_scores_sync_config, @@ -625,6 +637,39 @@ impl NodeBuilder { self } + /// Configures a local SQLite backup store for disaster recovery. + /// + /// When building with tiered storage, a SQLite store will be created at the + /// given directory path using [`SQLITE_BACKUP_DB_FILE_NAME`] as its database + /// file name. It receives a second durable copy of data written to the + /// primary store. + /// + /// Writes and removals for primary-backed data only succeed once both the + /// primary and backup SQLite stores complete successfully. + /// + /// If not set, durable data will be stored only in the primary store. + /// + /// [`SQLITE_BACKUP_DB_FILE_NAME`]: crate::io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME + pub fn set_backup_storage_dir_path(&mut self, backup_storage_dir_path: String) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.backup_storage_dir_path = Some(backup_storage_dir_path.into()); + self + } + + /// Configures the ephemeral storage directory path for non-critical, frequently-accessed data. + /// + /// When set, a local SQLite store is created at this path for ephemeral data like + /// the network graph and scorer. Data stored here can be rebuilt if lost. + /// + /// If not set, non-critical data will be stored in the primary store. + pub fn set_ephemeral_storage_dir_path( + &mut self, ephemeral_storage_dir_path: String, + ) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.ephemeral_storage_dir_path = Some(ephemeral_storage_dir_path.into()); + self + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: NodeEntropy) -> Result { @@ -826,11 +871,18 @@ impl NodeBuilder { } /// Builds a [`Node`] instance according to the options previously configured. + /// + /// The provided `kv_store` will be used as the primary storage backend. Optionally, + /// an ephemeral store for frequently-accessed non-critical data (e.g., network graph, scorer) + /// and a local SQLite backup store for disaster recovery can be configured via + /// [`set_ephemeral_storage_dir_path`] and [`set_backup_storage_dir_path`]. + /// + /// [`set_ephemeral_storage_dir_path`]: Self::set_ephemeral_storage_dir_path + /// [`set_backup_storage_dir_path`]: Self::set_backup_storage_dir_path pub fn build_with_store( &self, node_entropy: NodeEntropy, kv_store: S, ) -> Result { let logger = setup_logger(&self.log_writer_config, &self.config)?; - self.build_with_store_and_logger(node_entropy, kv_store, logger) } @@ -855,6 +907,39 @@ impl NodeBuilder { fn build_with_store_runtime_and_logger( &self, node_entropy: NodeEntropy, kv_store: S, runtime: Arc, logger: Arc, ) -> Result { + let ts_config = self.tier_store_config.as_ref(); + let primary_store = Arc::new(DynStoreWrapper(kv_store)); + let mut tier_store = TierStore::new(primary_store, Arc::clone(&logger)); + if let Some(config) = ts_config { + if let Some(ephemeral_storage_dir_path) = config.ephemeral_storage_dir_path.as_ref() { + let ephemeral_store = SqliteStore::new( + ephemeral_storage_dir_path.clone(), + Some(io::sqlite_store::SQLITE_EPHEMERAL_DB_FILE_NAME.to_string()), + Some(io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .map_err(|e| { + log_error!(logger, "Failed to setup ephemeral SQLite store: {}", e); + BuildError::KVStoreSetupFailed + })?; + let ephemeral_store: Arc = Arc::new(DynStoreWrapper(ephemeral_store)); + tier_store.set_ephemeral_store(ephemeral_store); + } + + if let Some(backup_storage_dir_path) = config.backup_storage_dir_path.as_ref() { + let backup_store = SqliteStore::new( + backup_storage_dir_path.clone(), + Some(io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME.to_string()), + Some(io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .map_err(|e| { + log_error!(logger, "Failed to setup backup SQLite store: {}", e); + BuildError::KVStoreSetupFailed + })?; + let backup_store: Arc = Arc::new(DynStoreWrapper(backup_store)); + tier_store.set_backup_store(backup_store); + } + } + let seed_bytes = node_entropy.to_seed_bytes(); let config = Arc::new(self.config.clone()); @@ -869,7 +954,7 @@ impl NodeBuilder { seed_bytes, runtime, logger, - Arc::new(DynStoreWrapper(kv_store)), + Arc::new(DynStoreWrapper(tier_store)), ) } } diff --git a/src/io/mod.rs b/src/io/mod.rs index e16a99975..c7dfe7af0 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -12,6 +12,7 @@ pub mod postgres_store; pub mod sqlite_store; #[cfg(test)] pub(crate) mod test_utils; +pub(crate) mod tier_store; pub(crate) mod utils; pub mod vss_store; diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs index 076aeef9b..ddafc3748 100644 --- a/src/io/sqlite_store/mod.rs +++ b/src/io/sqlite_store/mod.rs @@ -24,6 +24,10 @@ mod migrations; /// LDK Node's database file name. pub const SQLITE_DB_FILE_NAME: &str = "ldk_node_data.sqlite"; +/// LDK Node's backup database file name. +pub const SQLITE_BACKUP_DB_FILE_NAME: &str = "ldk_node_data_backup.sqlite"; +/// LDK Node's ephemeral database file name. +pub const SQLITE_EPHEMERAL_DB_FILE_NAME: &str = "ldk_node_data_ephemeral.sqlite"; /// LDK Node's table in which we store all data. pub const KV_TABLE_NAME: &str = "ldk_node_data"; diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs new file mode 100644 index 000000000..0f0f9b804 --- /dev/null +++ b/src/io/tier_store.rs @@ -0,0 +1,704 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use crate::io::utils::check_namespace_key_validity; +use crate::logger::{LdkLogger, Logger}; +use crate::types::DynStore; + +use lightning::util::persist::{ + KVStore, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, +}; +use lightning::{io, log_error}; + +use std::collections::HashMap; +use std::future::Future; +use std::sync::{Arc, Mutex}; +use tokio::sync::Mutex as TokioMutex; + +/// A 3-tiered [`KVStore`] implementation that routes data across +/// storage backends that may be local or remote: +/// - a primary store for durable, authoritative persistence, +/// - an optional backup store that maintains an additional durable copy of +/// primary-backed data, and +/// - an optional ephemeral store for non-critical, rebuildable cached data. +/// +/// When a backup store is configured, writes and removals for primary-backed data +/// are issued to the primary and backup stores concurrently and only succeed once +/// both stores complete successfully. +/// +/// Reads and lists do not consult the backup store during normal operation. +/// Ephemeral data is read from and written to the ephemeral store when configured. +/// +/// Note that dual-store writes and removals are not atomic across the primary and +/// backup stores. If one store succeeds and the other fails, the operation +/// returns an error even though one store may already reflect the change. +pub(crate) struct TierStore { + inner: Arc, +} + +impl TierStore { + pub fn new(primary_store: Arc, logger: Arc) -> Self { + let inner = Arc::new(TierStoreInner::new(primary_store, Arc::clone(&logger))); + + Self { inner } + } + + /// Configures a backup store for primary-backed data. + /// + /// Once set, writes and removals targeting the primary tier succeed only if both + /// the primary and backup stores succeed. The two operations are issued + /// concurrently, and any failure is returned to the caller. + /// + /// Note: dual-store writes/removals are not atomic. An error may be returned + /// after the primary store has already been updated if the backup store fails. + /// + /// The backup store is not consulted for normal reads or lists. + pub fn set_backup_store(&mut self, backup: Arc) { + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.backup_store = Some(backup); + } + + /// Configures the ephemeral store for non-critical, rebuildable data. + /// + /// When configured, selected cache-like data is routed to this store instead of + /// the primary store. + pub fn set_ephemeral_store(&mut self, ephemeral: Arc) { + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.ephemeral_store = Some(ephemeral); + } +} + +impl KVStore for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.read_internal(primary_namespace, secondary_namespace, key).await } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.write_internal(primary_namespace, secondary_namespace, key, buf).await } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.remove_internal(primary_namespace, secondary_namespace, key, lazy).await } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + + async move { inner.list_internal(primary_namespace, secondary_namespace).await } + } +} + +struct TierStoreInner { + /// The authoritative store for durable data. + primary_store: Arc, + /// The store used for non-critical, rebuildable cached data. + ephemeral_store: Option>, + /// An optional second durable store for primary-backed data. + backup_store: Option>, + /// Per-key locks for serializing primary+backup operations. + locks: Mutex>>>, + logger: Arc, +} + +impl TierStoreInner { + /// Creates a tier store with the primary data store. + pub fn new(primary_store: Arc, logger: Arc) -> Self { + Self { + primary_store, + ephemeral_store: None, + backup_store: None, + locks: Mutex::new(HashMap::new()), + logger, + } + } + + fn get_key_lock(&self, locking_key: String) -> Arc> { + let mut locks = self.locks.lock().expect("lock"); + Arc::clone(locks.entry(locking_key).or_default()) + } + + fn clean_locks(&self, lock_ref: &Arc>, locking_key: String) { + let mut locks = self.locks.lock().expect("lock"); + let strong_count = Arc::strong_count(lock_ref); + debug_assert!(strong_count >= 2, "Unexpected TierStore lock strong count"); + if strong_count == 2 { + locks.remove(&locking_key); + } + } + + fn build_locking_key( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> String { + if primary_namespace.is_empty() { + key.to_owned() + } else { + format!("{}#{}#{}", primary_namespace, secondary_namespace, key) + } + } + + /// Reads from the primary data store. + async fn read_primary( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + match KVStore::read( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + ) + .await + { + Ok(data) => Ok(data), + Err(e) => { + log_error!( + self.logger, + "Failed to read from primary store for key {}/{}/{}: {}.", + primary_namespace, + secondary_namespace, + key, + e + ); + Err(e) + }, + } + } + + /// Lists keys from the primary data store. + async fn list_primary( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + match KVStore::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) + .await + { + Ok(keys) => Ok(keys), + Err(e) => { + log_error!( + self.logger, + "Failed to list from primary store for namespace {}/{}: {}.", + primary_namespace, + secondary_namespace, + e + ); + Err(e) + }, + } + } + + async fn write_primary_backup_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + let primary_fut = KVStore::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ); + + if let Some(backup_store) = self.backup_store.as_ref() { + let backup_fut = KVStore::write( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf, + ); + + let (primary_res, backup_res) = tokio::join!(primary_fut, backup_fut); + + self.handle_primary_backup_results( + "write", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + primary_fut.await + } + } + + async fn remove_primary_backup_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + let primary_fut = KVStore::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + if let Some(backup_store) = self.backup_store.as_ref() { + let backup_fut = KVStore::remove( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + let (primary_res, backup_res) = tokio::join!(primary_fut, backup_fut); + + self.handle_primary_backup_results( + "removal", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + primary_fut.await + } + } + + async fn read_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "read", + )?; + + if is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key) { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + // We don't retry ephemeral-store reads here. Local failures are treated as + // terminal for this access path rather than falling back to another store. + return KVStore::read( + eph_store.as_ref(), + &primary_namespace, + &secondary_namespace, + &key, + ) + .await; + } + } + + self.read_primary(&primary_namespace, &secondary_namespace, &key).await + } + + async fn write_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "write", + )?; + + if is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key) { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + return KVStore::write( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await; + } + } + + let locking_key = self.build_locking_key( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + ); + let key_lock = self.get_key_lock(locking_key.clone()); + + let res = { + let _guard = key_lock.lock().await; + self.write_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + }; + + self.clean_locks(&key_lock, locking_key); + res + } + + async fn remove_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "remove", + )?; + + if is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key) { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + return KVStore::remove( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await; + } + } + + let locking_key = self.build_locking_key( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + ); + let key_lock = self.get_key_lock(locking_key.clone()); + + let res = { + let _guard = key_lock.lock().await; + self.remove_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + }; + + self.clean_locks(&key_lock, locking_key); + res + } + + async fn list_internal( + &self, primary_namespace: String, secondary_namespace: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + None, + "list", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str()) { + ( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + ) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + // We don't retry ephemeral-store lists here. Local failures are treated as + // terminal for this access path rather than falling back to another store. + KVStore::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace) + .await + } else { + self.list_primary(&primary_namespace, &secondary_namespace).await + } + }, + _ => self.list_primary(&primary_namespace, &secondary_namespace).await, + } + } + + fn handle_primary_backup_results( + &self, op: &str, primary_namespace: &str, secondary_namespace: &str, key: &str, + primary_res: io::Result<()>, backup_res: io::Result<()>, + ) -> io::Result<()> { + match (primary_res, backup_res) { + (Ok(()), Ok(())) => Ok(()), + (Err(primary_err), Ok(())) => Err(primary_err), + (Ok(()), Err(backup_err)) => Err(backup_err), + (Err(primary_err), Err(backup_err)) => { + log_error!( + self.logger, + "Primary and backup {}s both failed for key {}/{}/{}: primary={}, backup={}", + op, + primary_namespace, + secondary_namespace, + key, + primary_err, + backup_err + ); + Err(primary_err) + }, + } + } +} + +fn is_ephemeral_cached_key(pn: &str, sn: &str, key: &str) -> bool { + matches!( + (pn, sn, key), + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) + ) +} + +#[cfg(test)] +mod tests { + use std::panic::RefUnwindSafe; + use std::path::PathBuf; + use std::sync::Arc; + + use lightning::util::logger::Level; + use lightning::util::persist::{ + CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + }; + use lightning_persister::fs_store::v1::FilesystemStore; + + use crate::io::test_utils::{do_read_write_remove_list_persist, random_storage_path}; + use crate::io::tier_store::TierStore; + use crate::logger::Logger; + use crate::types::DynStore; + use crate::types::DynStoreWrapper; + + use super::*; + + impl RefUnwindSafe for TierStore {} + + struct CleanupDir(PathBuf); + impl Drop for CleanupDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } + } + + fn setup_tier_store(primary_store: Arc, logger: Arc) -> TierStore { + TierStore::new(primary_store, logger) + } + + #[tokio::test] + async fn write_read_list_remove() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let tier = setup_tier_store(primary_store, logger); + + do_read_write_remove_list_persist(&tier).await; + } + + #[tokio::test] + async fn ephemeral_routing() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let ephemeral_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("ephemeral")))); + tier.set_ephemeral_store(Arc::clone(&ephemeral_store)); + + let data = vec![42u8; 32]; + + tier.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + data.clone(), + ) + .await + .unwrap(); + + tier.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .await + .unwrap(); + + let primary_read_ng = primary_store + .read( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ) + .await; + let ephemeral_read_ng = ephemeral_store + .read( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ) + .await; + + let primary_read_cm = primary_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .await; + let ephemeral_read_cm = ephemeral_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .await; + + assert!(primary_read_ng.is_err()); + assert_eq!(ephemeral_read_ng.unwrap(), data); + + assert!(ephemeral_read_cm.is_err()); + assert_eq!(primary_read_cm.unwrap(), data); + } + + #[tokio::test] + async fn backup_write_is_part_of_success_path() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup")))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + + tier.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .await + .unwrap(); + + let primary_read = primary_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .await; + let backup_read = backup_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .await; + + assert_eq!(primary_read.unwrap(), data); + assert_eq!(backup_read.unwrap(), data); + } + + #[tokio::test] + async fn backup_remove_is_part_of_success_path() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup")))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + let key = CHANNEL_MANAGER_PERSISTENCE_KEY; + + tier.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + data, + ) + .await + .unwrap(); + + tier.remove( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + true, + ) + .await + .unwrap(); + + let primary_read = primary_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ) + .await; + let backup_read = backup_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ) + .await; + + assert!(primary_read.is_err()); + assert!(backup_read.is_err()); + } +} diff --git a/src/types.rs b/src/types.rs index 64209430b..1f3f84751 100644 --- a/src/types.rs +++ b/src/types.rs @@ -46,7 +46,7 @@ use crate::message_handler::NodeCustomMessageHandler; use crate::payment::{PaymentDetails, PendingPaymentDetails}; use crate::runtime::RuntimeSpawner; -pub(crate) trait DynStoreTrait: Send + Sync { +pub trait DynStoreTrait: Send + Sync { fn read_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Pin, bitcoin::io::Error>> + Send + 'static>>; @@ -87,7 +87,7 @@ impl<'a> KVStore for dyn DynStoreTrait + 'a { } } -pub(crate) type DynStore = dyn DynStoreTrait; +pub type DynStore = dyn DynStoreTrait; // Newtype wrapper that implements `KVStore` for `Arc`. This is needed because `KVStore` // methods return `impl Future`, which is not object-safe. `DynStoreTrait` works around this by diff --git a/tests/common/mod.rs b/tests/common/mod.rs index adeb327bf..4998d6a99 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -532,6 +532,7 @@ pub(crate) fn setup_two_nodes_with_store( pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> TestNode { setup_builder!(builder, config.node_config); + match chain_source { TestChainSource::Esplora(electrsd) => { let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); @@ -601,10 +602,6 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> }, }; - if config.recovery_mode { - builder.set_wallet_recovery_mode(); - } - node.start().unwrap(); assert!(node.status().is_running); assert!(node.status().latest_fee_rate_cache_update_timestamp.is_some()); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 309d5bf4d..8e30a0ab5 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -30,6 +30,8 @@ use electrsd::corepc_node::Node as BitcoinD; use electrsd::ElectrsD; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; +#[cfg(not(feature = "uniffi"))] +use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::liquidity::LSPS2ServiceConfig; use ldk_node::payment::{ ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, @@ -39,6 +41,8 @@ use ldk_node::{Builder, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; +#[cfg(not(feature = "uniffi"))] +use lightning::util::persist::KVStore; use lightning_invoice::{Bolt11InvoiceDescription, Description}; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use log::LevelFilter; @@ -3033,3 +3037,74 @@ async fn splice_in_with_all_balance() { node_a.stop().unwrap(); node_b.stop().unwrap(); } + +#[cfg(not(feature = "uniffi"))] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn builder_configures_sqlite_backup_store() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let mut config_a = random_config(true); + config_a.store_type = TestStoreType::Sqlite; + let primary_dir = config_a.node_config.storage_dir_path.clone(); + let backup_dir = common::random_storage_path(); + + // Build node_a with backup storage configured + setup_builder!(builder_a, config_a.node_config.clone()); + builder_a.set_chain_source_esplora( + format!("http://{}", electrsd.esplora_url.as_ref().unwrap()), + None, + ); + builder_a.set_filesystem_logger(None, None); + builder_a.set_backup_storage_dir_path(backup_dir.to_str().unwrap().to_owned()); + + let node_a = builder_a.build(config_a.node_entropy.into()).unwrap(); + node_a.start().unwrap(); + assert!(node_a.status().is_running); + assert!(node_a.status().latest_fee_rate_cache_update_timestamp.is_some()); + + let config_b = random_config(true); + let node_b = setup_node(&chain_source, config_b); + + do_channel_full_cycle( + node_a, + node_b, + &bitcoind.client, + &electrsd.client, + false, + true, + true, + false, + ) + .await; + + let primary_store = SqliteStore::new( + primary_dir.into(), + Some(ldk_node::io::sqlite_store::SQLITE_DB_FILE_NAME.to_string()), + Some(ldk_node::io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .unwrap(); + + let backup_store = SqliteStore::new( + backup_dir, + Some(ldk_node::io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME.to_string()), + Some(ldk_node::io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .unwrap(); + + for (pn, sn, key) in [ + ("bdk_wallet", "", "descriptor"), + ("bdk_wallet", "", "change_descriptor"), + ("bdk_wallet", "", "network"), + ("", "", "node_metrics"), + ("", "", "events"), + ("", "", "peers"), + ] { + let primary = primary_store.read(pn, sn, key).await.unwrap(); + let backup = backup_store.read(pn, sn, key).await.unwrap(); + + assert_eq!(backup, primary, "backup mismatch for {pn}/{sn}/{key}"); + } +} + +