Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ impl Node {
self.config.network
);

self.runtime.allow_cancellable_background_task_spawns();

// Start up any runtime-dependant chain sources (e.g. Electrum)
self.chain_source.start(Arc::clone(&self.runtime)).map_err(|e| {
log_error!(self.logger, "Failed to start chain syncing: {}", e);
Expand Down
91 changes: 85 additions & 6 deletions src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,22 @@ use crate::logger::{log_debug, log_error, log_trace, LdkLogger, Logger};
pub(crate) struct Runtime {
mode: RuntimeMode,
background_tasks: Mutex<JoinSet<()>>,
cancellable_background_tasks: Mutex<JoinSet<()>>,
cancellable_background_tasks: Mutex<CancellableBackgroundTasks>,
background_processor_task: Mutex<Option<JoinHandle<()>>>,
logger: Arc<Logger>,
}

struct CancellableBackgroundTasks {
tasks: JoinSet<()>,
accepting_tasks: bool,
}

impl CancellableBackgroundTasks {
fn new() -> Self {
Self { tasks: JoinSet::new(), accepting_tasks: true }
}
}

impl Runtime {
pub fn new(logger: Arc<Logger>) -> Result<Self, std::io::Error> {
let mode = match tokio::runtime::Handle::try_current() {
Expand Down Expand Up @@ -55,7 +66,7 @@ impl Runtime {
},
};
let background_tasks = Mutex::new(JoinSet::new());
let cancellable_background_tasks = Mutex::new(JoinSet::new());
let cancellable_background_tasks = Mutex::new(CancellableBackgroundTasks::new());
let background_processor_task = Mutex::new(None);

Ok(Self {
Expand All @@ -70,7 +81,7 @@ impl Runtime {
pub fn with_handle(handle: tokio::runtime::Handle, logger: Arc<Logger>) -> Self {
let mode = RuntimeMode::Handle(handle);
let background_tasks = Mutex::new(JoinSet::new());
let cancellable_background_tasks = Mutex::new(JoinSet::new());
let cancellable_background_tasks = Mutex::new(CancellableBackgroundTasks::new());
let background_processor_task = Mutex::new(None);

Self {
Expand Down Expand Up @@ -100,11 +111,22 @@ impl Runtime {
{
let mut cancellable_background_tasks =
self.cancellable_background_tasks.lock().expect("lock");
if !cancellable_background_tasks.accepting_tasks {
log_trace!(
self.logger,
"Ignoring cancellable background task spawned during shutdown."
);
return;
}
let runtime_handle = self.handle();
// Since it seems to make a difference to `tokio` (see
// https://docs.rs/tokio/latest/tokio/time/fn.timeout.html#panics) we make sure the futures
// are always put in an `async` / `.await` closure.
cancellable_background_tasks.spawn_on(async { future.await }, runtime_handle);
cancellable_background_tasks.tasks.spawn_on(async { future.await }, runtime_handle);
}

pub fn allow_cancellable_background_task_spawns(&self) {
self.cancellable_background_tasks.lock().expect("lock").accepting_tasks = true;
}

pub fn spawn_background_processor_task<F>(&self, future: F)
Expand Down Expand Up @@ -142,8 +164,12 @@ impl Runtime {
}

pub fn abort_cancellable_background_tasks(&self) {
let mut tasks =
core::mem::take(&mut *self.cancellable_background_tasks.lock().expect("lock"));
let mut tasks = {
let mut cancellable_background_tasks =
self.cancellable_background_tasks.lock().expect("lock");
cancellable_background_tasks.accepting_tasks = false;
core::mem::take(&mut cancellable_background_tasks.tasks)
};
debug_assert!(tasks.len() > 0, "Expected some cancellable background_tasks");
tasks.abort_all();
self.block_on(async { while let Some(_) = tasks.join_next().await {} })
Expand Down Expand Up @@ -352,3 +378,56 @@ impl FutureSpawner for RuntimeSpawner {
output
}
}

#[cfg(test)]
mod tests {
use super::*;

use tokio::sync::oneshot;

fn test_runtime() -> Runtime {
Runtime::new(Arc::new(Logger::new_log_facade())).unwrap()
}

#[test]
fn late_cancellable_spawns_are_not_polled_after_abort() {
let runtime = test_runtime();
let (started_sender, started_receiver) = oneshot::channel();
runtime.spawn_cancellable_background_task(async move {
let _ = started_sender.send(());
std::future::pending::<()>().await;
});
runtime.block_on(async {
started_receiver.await.expect("initial task should start");
});

runtime.abort_cancellable_background_tasks();

let (late_spawn_sender, late_spawn_receiver) = oneshot::channel();
runtime.spawn_cancellable_background_task(async move {
let _ = late_spawn_sender.send(());
});
let late_spawn_was_polled = runtime.block_on(async {
match tokio::time::timeout(Duration::from_secs(1), late_spawn_receiver).await {
Ok(Ok(())) => true,
Ok(Err(_)) | Err(_) => false,
}
});

assert!(
!late_spawn_was_polled,
"cancellable task spawned after shutdown started should not be polled"
);

runtime.allow_cancellable_background_task_spawns();

let (restarted_sender, restarted_receiver) = oneshot::channel();
runtime.spawn_cancellable_background_task(async move {
let _ = restarted_sender.send(());
});
runtime.block_on(async {
restarted_receiver.await.expect("spawn should be allowed after restart");
});
runtime.abort_cancellable_background_tasks();
}
}