diff --git a/src/lib.rs b/src/lib.rs index 7465dfabf..4cd7ec4bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -274,6 +274,8 @@ impl Node { self.config.network ); + self.runtime.allow_cancellable_background_task_spawns(); + // Start up any runtime-dependant chain sources (e.g. Electrum) self.chain_source.start(Arc::clone(&self.runtime)).map_err(|e| { log_error!(self.logger, "Failed to start chain syncing: {}", e); diff --git a/src/runtime.rs b/src/runtime.rs index 9673d0eb7..3f82d704e 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -22,11 +22,22 @@ use crate::logger::{log_debug, log_error, log_trace, LdkLogger, Logger}; pub(crate) struct Runtime { mode: RuntimeMode, background_tasks: Mutex>, - cancellable_background_tasks: Mutex>, + cancellable_background_tasks: Mutex, background_processor_task: Mutex>>, logger: Arc, } +struct CancellableBackgroundTasks { + tasks: JoinSet<()>, + accepting_tasks: bool, +} + +impl CancellableBackgroundTasks { + fn new() -> Self { + Self { tasks: JoinSet::new(), accepting_tasks: true } + } +} + impl Runtime { pub fn new(logger: Arc) -> Result { let mode = match tokio::runtime::Handle::try_current() { @@ -55,7 +66,7 @@ impl Runtime { }, }; let background_tasks = Mutex::new(JoinSet::new()); - let cancellable_background_tasks = Mutex::new(JoinSet::new()); + let cancellable_background_tasks = Mutex::new(CancellableBackgroundTasks::new()); let background_processor_task = Mutex::new(None); Ok(Self { @@ -70,7 +81,7 @@ impl Runtime { pub fn with_handle(handle: tokio::runtime::Handle, logger: Arc) -> Self { let mode = RuntimeMode::Handle(handle); let background_tasks = Mutex::new(JoinSet::new()); - let cancellable_background_tasks = Mutex::new(JoinSet::new()); + let cancellable_background_tasks = Mutex::new(CancellableBackgroundTasks::new()); let background_processor_task = Mutex::new(None); Self { @@ -100,11 +111,22 @@ impl Runtime { { let mut cancellable_background_tasks = self.cancellable_background_tasks.lock().expect("lock"); + if !cancellable_background_tasks.accepting_tasks { + log_trace!( + self.logger, + "Ignoring cancellable background task spawned during shutdown." + ); + return; + } let runtime_handle = self.handle(); // Since it seems to make a difference to `tokio` (see // https://docs.rs/tokio/latest/tokio/time/fn.timeout.html#panics) we make sure the futures // are always put in an `async` / `.await` closure. - cancellable_background_tasks.spawn_on(async { future.await }, runtime_handle); + cancellable_background_tasks.tasks.spawn_on(async { future.await }, runtime_handle); + } + + pub fn allow_cancellable_background_task_spawns(&self) { + self.cancellable_background_tasks.lock().expect("lock").accepting_tasks = true; } pub fn spawn_background_processor_task(&self, future: F) @@ -142,8 +164,12 @@ impl Runtime { } pub fn abort_cancellable_background_tasks(&self) { - let mut tasks = - core::mem::take(&mut *self.cancellable_background_tasks.lock().expect("lock")); + let mut tasks = { + let mut cancellable_background_tasks = + self.cancellable_background_tasks.lock().expect("lock"); + cancellable_background_tasks.accepting_tasks = false; + core::mem::take(&mut cancellable_background_tasks.tasks) + }; debug_assert!(tasks.len() > 0, "Expected some cancellable background_tasks"); tasks.abort_all(); self.block_on(async { while let Some(_) = tasks.join_next().await {} }) @@ -352,3 +378,56 @@ impl FutureSpawner for RuntimeSpawner { output } } + +#[cfg(test)] +mod tests { + use super::*; + + use tokio::sync::oneshot; + + fn test_runtime() -> Runtime { + Runtime::new(Arc::new(Logger::new_log_facade())).unwrap() + } + + #[test] + fn late_cancellable_spawns_are_not_polled_after_abort() { + let runtime = test_runtime(); + let (started_sender, started_receiver) = oneshot::channel(); + runtime.spawn_cancellable_background_task(async move { + let _ = started_sender.send(()); + std::future::pending::<()>().await; + }); + runtime.block_on(async { + started_receiver.await.expect("initial task should start"); + }); + + runtime.abort_cancellable_background_tasks(); + + let (late_spawn_sender, late_spawn_receiver) = oneshot::channel(); + runtime.spawn_cancellable_background_task(async move { + let _ = late_spawn_sender.send(()); + }); + let late_spawn_was_polled = runtime.block_on(async { + match tokio::time::timeout(Duration::from_secs(1), late_spawn_receiver).await { + Ok(Ok(())) => true, + Ok(Err(_)) | Err(_) => false, + } + }); + + assert!( + !late_spawn_was_polled, + "cancellable task spawned after shutdown started should not be polled" + ); + + runtime.allow_cancellable_background_task_spawns(); + + let (restarted_sender, restarted_receiver) = oneshot::channel(); + runtime.spawn_cancellable_background_task(async move { + let _ = restarted_sender.send(()); + }); + runtime.block_on(async { + restarted_receiver.await.expect("spawn should be allowed after restart"); + }); + runtime.abort_cancellable_background_tasks(); + } +}