From 718995e8ecddbcd60fce063a8de13d0da9fc3661 Mon Sep 17 00:00:00 2001 From: Evgeny Formanenko Date: Thu, 25 Jun 2026 18:07:52 +0300 Subject: [PATCH] Reclaim hotblocks disk: range-tombstone cleanup + startup file unlink Fixes the NET-819 full-disk incident and NET-798 (blocking startup cleanup). Two independent mechanisms: Phase 1 -- routine logical cleanup (every 10s): one range tombstone per deleted table instead of the old per-key delete loop. Snapshot-safe (no grace needed); lets compaction drop the data. This is the normal-operation reclaim path. Compaction tuning (now load-bearing): compact-on-deletion collector + 24h periodic compaction to find tombstone-heavy / stale SSTs, plus the backlog one-liners max_background_jobs=8, max_subcompactions=4 and level_compaction_dynamic_level_bytes -- the default 2 background jobs could not keep up during the incident. Physical reclaim -- startup only: reclaim_disk_space unlinks whole SST files below the min-live-TableId watermark via DeleteFilesInRange. No writes and no scratch space, so it frees disk even at 100% where compaction deadlocks. It is the "reclaim before ingest" step. Crash-orphaned dirty markers are purged at startup so they don't pin the watermark. The file unlink ignores snapshots, so it can break an in-flight pre-deletion query; it therefore runs only at startup, where there are no live readers. A runtime disk-pressure trigger (free-space threshold or write ENOSPC) is left as a documented future improvement -- it is the emergency reclaim compaction cannot deliver at a full disk. This supersedes the earlier grace-period design: per-table deletion timestamps, the wall-clock grace gate, and the runtime orphan reaper are all removed, along with their CLI flags. That eliminates a class of wall-clock data-loss bugs (clock jumps, slow clients outliving the grace) in exchange for running the unlink only at startup. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01LeUumqwJBHzqtFM3x3uZXQ --- crates/hotblocks/src/cli.rs | 9 + crates/hotblocks/src/data_service.rs | 14 +- crates/hotblocks/src/main.rs | 51 +++-- crates/storage/src/db/db.rs | 67 +++++- crates/storage/src/db/table_id.rs | 29 ++- crates/storage/src/db/write/ops.rs | 188 +++++++++++++---- crates/storage/src/db/write/storage.rs | 3 + crates/storage/src/db/write/tx.rs | 2 + crates/storage/tests/cleanup_reclaim.rs | 257 ++++++++++++++++++++++++ crates/storage/tests/mock_db/mod.rs | 212 +++++++++++++++++++ 10 files changed, 775 insertions(+), 57 deletions(-) create mode 100644 crates/storage/tests/cleanup_reclaim.rs create mode 100644 crates/storage/tests/mock_db/mod.rs diff --git a/crates/hotblocks/src/cli.rs b/crates/hotblocks/src/cli.rs index 1e14ff72..9de0e685 100644 --- a/crates/hotblocks/src/cli.rs +++ b/crates/hotblocks/src/cli.rs @@ -90,6 +90,15 @@ impl CLI { .map(Arc::new) .context("failed to open rocksdb database")?; + // Crash recovery: drop DIRTY_TABLES markers left by a build that died before + // commit. Must run before any ingest -- an orphan marker pins the disk-reclaim + // watermark forever. Best-effort: a failure degrades reclaim, never blocks startup. + match db.purge_orphan_dirty_tables() { + Ok(0) => {} + Ok(n) => tracing::info!("purged {n} orphan dirty table(s) left by an interrupted build"), + Err(err) => tracing::warn!(error =? err, "failed to purge orphan dirty tables at startup") + } + let mut metrics_registry = crate::metrics::build_metrics_registry(); metrics_registry.register_collector(Box::new(DatasetMetricsCollector { db: db.clone(), diff --git a/crates/hotblocks/src/data_service.rs b/crates/hotblocks/src/data_service.rs index a95256c1..1af35725 100644 --- a/crates/hotblocks/src/data_service.rs +++ b/crates/hotblocks/src/data_service.rs @@ -7,7 +7,7 @@ use anyhow::{Context, anyhow}; use futures::{FutureExt, StreamExt, TryStreamExt}; use sqd_data_client::reqwest::ReqwestDataClient; use sqd_storage::db::DatasetId; -use tracing::{error, info}; +use tracing::{debug, error, info}; use crate::{ dataset_config::{DatasetConfig, RetentionConfig}, @@ -34,6 +34,18 @@ impl DataService { } } + // Reclaim disk now, before any controller spawns. The file unlink ignores + // snapshots, so it is safe only here -- no ingest or query snapshot exists + // yet. Write-free, so it also makes progress at a full disk. + { + let db = db.clone(); + match tokio::task::spawn_blocking(move || db.reclaim_disk_space()).await { + Ok(Ok(())) => debug!("startup disk reclaim complete"), + Ok(Err(err)) => error!(error =? err, "startup disk reclaim failed"), + Err(_) => error!("startup disk reclaim panicked") + } + } + let mut controllers = futures::stream::iter(datasets.into_iter()) .map(|(dataset_id, cfg)| { let db = db.clone(); diff --git a/crates/hotblocks/src/main.rs b/crates/hotblocks/src/main.rs index 409ffecd..82ad3d8a 100644 --- a/crates/hotblocks/src/main.rs +++ b/crates/hotblocks/src/main.rs @@ -36,6 +36,10 @@ fn main() -> anyhow::Result<()> { .block_on(async { let app = args.build_app().await?; + // NB: startup disk reclaim (file unlink) runs inside DataService::start + // (build_app above), before any controller spawns -- the only point where + // unlinking is safe, since no ingest/query snapshot exists yet. + tokio::spawn(db_cleanup_task(app.db.clone())); let api = build_api(app); @@ -89,24 +93,47 @@ async fn shutdown_signal() { } } +const CLEANUP_INTERVAL: Duration = Duration::from_secs(10); +/// Longer pause after a failed tick, so a persistent error (e.g. full disk) doesn't +/// busy-loop failing writes. +const CLEANUP_ERROR_BACKOFF: Duration = Duration::from_secs(30); + +/// Routine Phase-1 cleanup: range-tombstone deleted tables every tick so compaction +/// can reclaim their space -- in normal operation the entire runtime reclaim path. +/// +/// The physical file unlink (`Database::reclaim_disk_space`) is NOT run here: it +/// ignores snapshots and could break an in-flight query, so it runs only at startup. +/// FUTURE: also trigger it from here under disk pressure (the emergency reclaim +/// compaction can't deliver at a full disk), accepting that in-flight-query risk. #[instrument(name = "db_cleanup", skip_all)] async fn db_cleanup_task(db: DBRef) { - tokio::time::sleep(Duration::from_secs(10)).await; + tokio::time::sleep(CLEANUP_INTERVAL).await; loop { - debug!("db cleanup started"); let db = db.clone(); let result = tokio::task::spawn_blocking(move || db.cleanup()).await; - match result { - Ok(Ok(deleted)) => { - if deleted > 0 { - debug!("purged {} tables", deleted) - } else { - debug!("nothing to purge, pausing cleanup for 10 seconds"); - tokio::time::sleep(Duration::from_secs(10)).await; + + let failed = match result { + Ok(Ok(purged)) => { + if purged > 0 { + debug!("cleanup: logically purged {purged} table(s)"); } + false } - Ok(Err(err)) => error!(error =? err, "database cleanup task failed"), - Err(_) => error!("database cleanup task panicked") - } + Ok(Err(err)) => { + error!(error =? err, "database cleanup failed"); + true + } + Err(_) => { + error!("database cleanup task panicked"); + true + } + }; + + tokio::time::sleep(if failed { + CLEANUP_ERROR_BACKOFF + } else { + CLEANUP_INTERVAL + }) + .await; } } diff --git a/crates/storage/src/db/db.rs b/crates/storage/src/db/db.rs index 34e0d71c..49e29d9c 100644 --- a/crates/storage/src/db/db.rs +++ b/crates/storage/src/db/db.rs @@ -12,7 +12,7 @@ use super::{ use crate::db::{ ops::{perform_dataset_compaction, CompactionStatus}, read::datasets::list_all_datasets, - write::{ops::deleted_deleted_tables, table_builder::TableBuilder, tx::Tx}, + write::{ops as cleanup_ops, table_builder::TableBuilder, tx::Tx}, Chunk, DatasetUpdate }; @@ -36,7 +36,8 @@ pub struct DatabaseSettings { direct_io: bool, cache_index_and_filter_blocks: bool, max_log_file_size: usize, - keep_log_file_num: usize + keep_log_file_num: usize, + auto_compactions: bool } impl Default for DatabaseSettings { @@ -48,7 +49,8 @@ impl Default for DatabaseSettings { direct_io: false, cache_index_and_filter_blocks: false, max_log_file_size: 10, - keep_log_file_num: 10 + keep_log_file_num: 10, + auto_compactions: true } } } @@ -91,6 +93,13 @@ impl DatabaseSettings { self } + /// Enable/disable RocksDB background auto-compaction of the table data. + /// Defaults to `true`; off mainly for tests needing deterministic compaction. + pub fn with_auto_compactions(mut self, yes: bool) -> Self { + self.auto_compactions = yes; + self + } + fn db_options(&self) -> RocksOptions { let mut options = RocksOptions::default(); options.create_if_missing(true); @@ -99,6 +108,11 @@ impl DatabaseSettings { // Bound info log (LOG, LOG.old.*) growth options.set_max_log_file_size(self.max_log_file_size * 1024 * 1024); options.set_keep_log_file_num(self.keep_log_file_num); + // Keep compaction ahead of ingest + deletion tombstones: the default 2 jobs + // could not keep up during the NET-819 incident, and routine reclaim now relies + // entirely on compaction (the file unlink is startup-only). Load-bearing. + options.set_max_background_jobs(8); + options.set_max_subcompactions(4); if self.with_rocksdb_stats { options.enable_statistics(); } @@ -141,6 +155,18 @@ impl DatabaseSettings { let mut options = RocksOptions::default(); options.set_block_based_table_factory(&block_based_table_factory); options.set_compression_type(rocksdb::DBCompressionType::Lz4); + // Find tombstone-heavy SSTs and bound staleness so no dead file lingers. A lone + // range tombstone over a whole table has low deletion density, so the 24h periodic + // compaction is the real backstop; the collector catches denser boundary files. + // Thresholds are provisional. + options.add_compact_on_deletion_collector_factory(128 * 1024, 64 * 1024, 0.5); + options.set_periodic_compaction_seconds(24 * 60 * 60); + // Bound space amplification under leveled compaction (default since RocksDB 8.4; + // pinned because it is load-bearing here). + options.set_level_compaction_dynamic_level_bytes(true); + if !self.auto_compactions { + options.set_disable_auto_compactions(true); + } options } @@ -287,8 +313,41 @@ impl Database { Ok(()) } + /// Phase 1 -- logically purge deleted tables (snapshot-safe range tombstones). + /// Returns the number of tables logically deleted by this call. pub fn cleanup(&self) -> anyhow::Result { - deleted_deleted_tables(&self.db) + cleanup_ops::logical_cleanup(&self.db) + } + + /// Physically reclaim disk by unlinking whole SST files below the live watermark + /// (min live `TableId`). No writes, so it works even at a full disk, unlike + /// compaction. IGNORES snapshots, so it is safe only where no live reader exists -- + /// today only at STARTUP. See [`cleanup_ops::reclaim_disk_space`] for the trade-off. + pub fn reclaim_disk_space(&self) -> anyhow::Result<()> { + cleanup_ops::reclaim_disk_space(&self.db) + } + + /// Crash recovery: purge `DIRTY_TABLES` markers left by builds that died before + /// commit -- an orphan's id otherwise pins the reclaim watermark forever (see + /// [`Database::reclaim_disk_space`]). MUST run before any ingest starts (e.g. at + /// startup): it treats every dirty marker as an orphan. Returns orphans purged. + pub fn purge_orphan_dirty_tables(&self) -> anyhow::Result { + cleanup_ops::purge_orphan_dirty_tables(&self.db) + } + + /// Flush the table-data column family's memtable to SST files (e.g. before a + /// reclaim, so freshly written data is unlinkable). + pub fn flush(&self) -> anyhow::Result<()> { + self.db.flush_cf(self.db.cf_handle(CF_TABLES).unwrap())?; + Ok(()) + } + + /// Force a full compaction of the table-data column family. Unlike + /// [`Database::reclaim_disk_space`] this *writes* (unsafe at a full disk); it pushes + /// data to the bottom level and rewrites tombstone-heavy files the unlink can't reach. + pub fn compact_tables(&self) { + let cf = self.db.cf_handle(CF_TABLES).unwrap(); + self.db.compact_range_cf(cf, None::<&[u8]>, None::<&[u8]>); } pub fn get_statistics(&self) -> Option { diff --git a/crates/storage/src/db/table_id.rs b/crates/storage/src/db/table_id.rs index dfca7357..a3b6d820 100644 --- a/crates/storage/src/db/table_id.rs +++ b/crates/storage/src/db/table_id.rs @@ -20,9 +20,14 @@ impl TableId { } pub fn from_slice(bytes: &[u8]) -> Self { - Self { - uuid: Uuid::from_slice(bytes).unwrap() - } + Self::try_from_slice(bytes).expect("TableId::from_slice: not a 16-byte UUID") + } + + /// Decode a key into a `TableId`, or `None` if not exactly a 16-byte UUID. Use this + /// (not [`TableId::from_slice`]) when scanning a CF whose keys could be corrupt, so a + /// malformed key is skipped instead of panicking and wedging the scan. + pub fn try_from_slice(bytes: &[u8]) -> Option { + Uuid::from_slice(bytes).ok().map(|uuid| Self { uuid }) } } @@ -31,3 +36,21 @@ impl Display for TableId { self.uuid.fmt(f) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn try_from_slice_rejects_non_16_byte_keys() { + // A real 16-byte id round-trips. + let id = TableId::new(); + assert_eq!(TableId::try_from_slice(id.as_ref()), Some(id)); + + // Anything that is not exactly 16 bytes decodes to None instead of + // panicking (so a corrupt/short key can be skipped, not wedge cleanup). + assert_eq!(TableId::try_from_slice(&[]), None); + assert_eq!(TableId::try_from_slice(&[0u8; 15]), None); + assert_eq!(TableId::try_from_slice(&[0u8; 17]), None); + } +} diff --git a/crates/storage/src/db/write/ops.rs b/crates/storage/src/db/write/ops.rs index 3e0ce35e..5d1af1d9 100644 --- a/crates/storage/src/db/write/ops.rs +++ b/crates/storage/src/db/write/ops.rs @@ -1,56 +1,170 @@ use crate::{ - db::db::{RocksDB, RocksWriteBatch, CF_DELETED_TABLES, CF_DIRTY_TABLES, CF_TABLES}, - kv::KvReadCursor, + db::{ + db::{RocksDB, RocksWriteBatch, CF_CHUNKS, CF_DELETED_TABLES, CF_DIRTY_TABLES, CF_TABLES}, + table_id::TableId, + Chunk + }, table::key::TableKeyFactory }; -pub fn deleted_deleted_tables(db: &RocksDB) -> anyhow::Result { - let mut deleted = 0; - let cf_deleted_tables = db.cf_handle(CF_DELETED_TABLES).unwrap(); - let mut it = db.raw_iterator_cf(cf_deleted_tables); - for_each_key(&mut it, |key| { - deleted += 1; - delete_table(db, key) - })?; - Ok(deleted) -} +/// Phase 1 -- logical, snapshot-safe purge of deleted tables. +/// +/// Replaces each table routed to `CF_DELETED_TABLES` (by [`super::tx::Tx::delete_table`]) +/// with one `CF_TABLES` range tombstone instead of millions of point deletes, then +/// drops its bookkeeping entry. Range tombstones respect snapshots, so in-flight +/// queries are unaffected and no grace period is needed; the space is freed later by +/// compaction -- the whole runtime reclaim path (the file unlink is startup-only). +/// +/// Idempotent (a crash just replays the no-op range delete). Returns tables purged. +pub(crate) fn logical_cleanup(db: &RocksDB) -> anyhow::Result { + let cf_tables = db.cf_handle(CF_TABLES).unwrap(); + let cf_deleted = db.cf_handle(CF_DELETED_TABLES).unwrap(); -fn delete_table(db: &RocksDB, table_id: &[u8]) -> anyhow::Result<()> { - let mut key1 = TableKeyFactory::new(table_id); - let mut key2 = TableKeyFactory::new(table_id); - let start = key1.start(); - let end = key2.end(); + // Collect first, then mutate: writing mid-iteration disturbs the cursor. + let mut pending: Vec = Vec::new(); + { + let mut it = db.raw_iterator_cf(cf_deleted); + it.seek_to_first(); + while it.valid() { + // Skip malformed keys instead of panicking -- a panic re-fires every tick. + if let Some(id) = TableId::try_from_slice(it.key().unwrap()) { + pending.push(id); + } + it.next(); + } + it.status()?; + } - let cf_tables = db.cf_handle(CF_TABLES).unwrap(); + if pending.is_empty() { + return Ok(0); + } + + // Range delete writes straight to the base DB (the txn batch has no range + // delete), so it runs here, not in `delete_table`'s transaction. Dropping the + // bookkeeping entry after a crash just replays a harmless no-op range delete. let mut batch = RocksWriteBatch::default(); - let mut cursor = db.raw_iterator_cf(cf_tables); + for id in &pending { + let mut start = TableKeyFactory::new(id); + let mut end = TableKeyFactory::new(id); + db.delete_range_cf(cf_tables, start.start(), end.end())?; + batch.delete_cf(cf_deleted, id); + } + db.write(batch)?; - list_keys(&mut cursor, start, end, |key| batch.delete_cf(cf_tables, key))?; + Ok(pending.len()) +} - let cf_dirty_tables = db.cf_handle(CF_DIRTY_TABLES).unwrap(); - batch.delete_cf(cf_dirty_tables, table_id); +/// Physically reclaim disk by unlinking whole `CF_TABLES` SST files entirely below +/// the live watermark (smallest live `TableId`; see [`min_live_table_id`]). The only +/// reclaim that frees space *without writing*, so it works even at a full disk where +/// compaction deadlocks. Table ids are never-reused, time-ordered UUIDv7s, so dead +/// tables form a contiguous low range; boundary and above-watermark garbage are left +/// to compaction (which Phase-1 [`logical_cleanup`] tombstones let it drop). +/// +/// SAFETY: the unlink IGNORES snapshots -- it can break an in-flight query reading a +/// just-deleted table below the watermark. So it runs only at STARTUP, before any +/// controller/query exists. FUTURE: also trigger under runtime disk pressure (the +/// emergency reclaim compaction can't do at a full disk), accepting that read risk. +pub(crate) fn reclaim_disk_space(db: &RocksDB) -> anyhow::Result<()> { + let cf_tables = db.cf_handle(CF_TABLES).unwrap(); + let watermark = min_live_table_id(db)?; - let cf_deleted_tables = db.cf_handle(CF_DELETED_TABLES).unwrap(); - batch.delete_cf(cf_deleted_tables, table_id); + let lo = [0u8; 16]; + let hi: Vec = match watermark { + Some(b) => b.as_ref().to_vec(), + // No live tables: 17x 0xFF sorts above every `id(16B) ++ suffix` key, so + // every dead file (including orphans) is unlinked. + None => vec![0xFFu8; 17] + }; + db.delete_file_in_range_cf(cf_tables, &lo[..], &hi[..])?; - db.write(batch)?; Ok(()) } -fn list_keys(cursor: &mut impl KvReadCursor, from: &[u8], to: &[u8], mut cb: impl FnMut(&[u8])) -> anyhow::Result<()> { - cursor.seek(from)?; - while cursor.is_valid() && cursor.key() < to { - cb(cursor.key()); - cursor.next()?; +/// Crash recovery -- purge orphaned `CF_DIRTY_TABLES` markers. +/// +/// A dirty marker is written when a build starts and removed when its chunk commits +/// (see [`super::tx::Tx::write_chunk`]). One still present with no build running is an +/// orphan from a build that died before commit; [`min_live_table_id`] counts it as +/// live, so a single orphan pins [`reclaim_disk_space`]'s watermark forever. We drop +/// the marker and range-tombstone its (unreferenced) `CF_TABLES` data. +/// +/// MUST run only with no build in flight (e.g. startup before ingest): it treats +/// EVERY dirty marker as an orphan, so it would tombstone a live build's data. An +/// orphan from a mid-run crash survives until the next restart's purge -- it only +/// blunts the startup reclaim, never corrupts data. Returns orphans purged. +pub(crate) fn purge_orphan_dirty_tables(db: &RocksDB) -> anyhow::Result { + let cf_tables = db.cf_handle(CF_TABLES).unwrap(); + let cf_dirty = db.cf_handle(CF_DIRTY_TABLES).unwrap(); + + // Collect first, mutate after: writing while iterating disturbs the cursor. + let mut orphans: Vec = Vec::new(); + { + let mut it = db.raw_iterator_cf(cf_dirty); + it.seek_to_first(); + while it.valid() { + // Skip a malformed key rather than panic (see logical_cleanup). + if let Some(id) = TableId::try_from_slice(it.key().unwrap()) { + orphans.push(id); + } + it.next(); + } + it.status()?; } - Ok(()) + + if orphans.is_empty() { + return Ok(0); + } + + let mut batch = RocksWriteBatch::default(); + for id in &orphans { + let mut start = TableKeyFactory::new(id); + let mut end = TableKeyFactory::new(id); + db.delete_range_cf(cf_tables, start.start(), end.end())?; + batch.delete_cf(cf_dirty, id); + } + db.write(batch)?; + + Ok(orphans.len()) } -fn for_each_key(cursor: &mut impl KvReadCursor, mut cb: impl FnMut(&[u8]) -> anyhow::Result<()>) -> anyhow::Result<()> { - cursor.seek_first()?; - while cursor.is_valid() { - cb(cursor.key())?; - cursor.next()?; +/// Smallest `TableId` still live: referenced by a committed chunk (`CF_CHUNKS`) or +/// pending in `CF_DIRTY_TABLES` (built but not yet in a chunk). `None` if none exist. +/// Taken across ALL datasets, which also absorbs inter-dataset UUIDv7 clock skew. +/// +/// A `CF_CHUNKS` value that fails to decode aborts with `Err` (not skipped): skipping +/// could omit a live table and lift the watermark over data a query needs, so the +/// caller ([`reclaim_disk_space`]) reclaims nothing this run -- the safe failure mode. +fn min_live_table_id(db: &RocksDB) -> anyhow::Result> { + let mut min: Option = None; + + { + let cf_chunks = db.cf_handle(CF_CHUNKS).unwrap(); + let mut it = db.raw_iterator_cf(cf_chunks); + it.seek_to_first(); + while it.valid() { + let chunk: Chunk = borsh::from_slice(it.value().unwrap())?; + for id in chunk.tables().values() { + min = Some(min.map_or(*id, |m| m.min(*id))); + } + it.next(); + } + it.status()?; } - Ok(()) + + { + let cf_dirty = db.cf_handle(CF_DIRTY_TABLES).unwrap(); + let mut it = db.raw_iterator_cf(cf_dirty); + it.seek_to_first(); + while it.valid() { + // Skip a malformed key rather than panic (see logical_cleanup). + if let Some(id) = TableId::try_from_slice(it.key().unwrap()) { + min = Some(min.map_or(id, |m| m.min(id))); + } + it.next(); + } + it.status()?; + } + + Ok(min) } diff --git a/crates/storage/src/db/write/storage.rs b/crates/storage/src/db/write/storage.rs index f7cd89b6..faf6ae52 100644 --- a/crates/storage/src/db/write/storage.rs +++ b/crates/storage/src/db/write/storage.rs @@ -25,6 +25,9 @@ impl<'a> TableStorage<'a> { pub fn mark_table_dirty(&mut self, table_id: TableId) { let cf_dirty = self.db.cf_handle(CF_DIRTY_TABLES).unwrap(); + // Value unused; the key marks "table built, chunk not yet committed". Removed + // on commit (`tx::write_chunk`); an orphan from a dead build is cleared by + // `ops::purge_orphan_dirty_tables` at startup. self.write_batch.put_cf(cf_dirty, table_id, []) } diff --git a/crates/storage/src/db/write/tx.rs b/crates/storage/src/db/write/tx.rs index b7ba820f..7f201087 100644 --- a/crates/storage/src/db/write/tx.rs +++ b/crates/storage/src/db/write/tx.rs @@ -126,6 +126,8 @@ impl<'a> Tx<'a> { } pub fn delete_table(&self, table_id: &TableId) -> anyhow::Result<()> { + // Value unused; the key's presence is the signal. Phase 1 (`ops::logical_cleanup`) + // range-tombstones the table and drops this entry. self.transaction .put_cf(self.cf_handle(CF_DELETED_TABLES), table_id, [])?; Ok(()) diff --git a/crates/storage/tests/cleanup_reclaim.rs b/crates/storage/tests/cleanup_reclaim.rs new file mode 100644 index 00000000..4ab513c2 --- /dev/null +++ b/crates/storage/tests/cleanup_reclaim.rs @@ -0,0 +1,257 @@ +//! Tests for table cleanup: +//! * Phase 1 ([`Database::cleanup`]) -- logical, snapshot-safe range tombstones. +//! * Physical reclaim ([`Database::reclaim_disk_space`]) -- SST-file unlink below the +//! live watermark. It ignores snapshots, so it runs only where there are no live +//! readers (startup in production); fully decoupled from Phase 1. +//! +//! Everything runs against [`MockDB`] (see `mock_db`), which hides the shared setup so +//! each test reads as domain steps (commit / delete / cleanup / reclaim / snapshot / +//! purge) asserting on observable effects (rows read, files freed). + +mod mock_db; +mod utils; + +use mock_db::{MockDB, Table}; + +/// Phase 1 must be invisible to readers that already hold a snapshot: a query +/// in flight when its chunk is deleted keeps reading every row (RocksDB MVCC -- +/// range tombstones respect older snapshots), while new snapshots see nothing. +#[test] +fn logical_delete_is_snapshot_safe() { + let mut db = MockDB::new(); + let t = db.commit_table(50); + + // Snapshot taken BEFORE the deletion -- models an in-flight query. + let reader = db.snapshot(); + + db.delete(&t); + assert_eq!(db.cleanup(), 1, "one table logically deleted"); + + // A fresh snapshot no longer sees the chunk... + assert!(!db.has_visible_chunk()); + // ...but the pre-deletion snapshot still reads every row. + assert_eq!(db.read(&reader, &t), t.rows); + + // Running Phase 1 again is a no-op (already logically deleted). + assert_eq!(db.cleanup(), 0); +} + +/// Physical reclaim unlinks dead SST files below the watermark. With no live +/// table left, the watermark is unbounded and every dead file is dropped. +#[test] +fn reclaim_unlinks_dead_sst_files() { + let mut db = MockDB::new(); + let tables: Vec = (0..3).map(|_| db.commit_table(1000)).collect(); + db.compact_to_bottom(); + let before = db.sst_size(); + assert!(before > 0, "expected flushed SST data"); + + for t in &tables { + db.delete(t); + } + assert_eq!(db.cleanup(), 3); + // Flush the Phase-1 range tombstones out of the memtable. Auto-compaction is + // off, so the flushed tombstone SST won't trigger a background compaction + // that races the assertions -- the unlink is the only thing freeing space. + db.flush(); + + db.reclaim(); + let after = db.sst_size(); + assert!( + after * 4 < before, + "expected physical reclaim: before={before} after={after}" + ); +} + +/// The watermark is the min live `TableId` over ALL datasets, so a single old +/// *live* table pins it low and dead tables with larger ids are NOT +/// file-reclaimable until that live table is gone. Verifies live data is never +/// unlinked and documents the known limitation (heterogeneous retention). +#[test] +fn live_table_pins_reclaim_watermark() { + let mut db = MockDB::new(); + // Creation order == id order, so `older` is the smaller id (the + // watermark-pinning live table) and `newer` is deleted first. + let older = db.commit_table(1000); + let newer = db.commit_table(1000); + db.compact_to_bottom(); + let before = db.sst_size(); + assert!(before > 0); + + // Delete the NEWER table; the older one stays live and pins the watermark. + db.delete(&newer); + db.cleanup(); + db.flush(); + + // Dead `newer` sits above the watermark pinned by live `older` -> not + // unlinked; its files survive and `older` stays fully readable. + db.reclaim(); + assert!( + db.sst_size() * 4 > before, + "a dead table above the watermark must NOT be unlinked" + ); + assert_eq!(db.read(&db.snapshot(), &older), older.rows); + + // Remove the older table too -> watermark lifts -> everything reclaimable. + db.delete(&older); + db.cleanup(); + db.flush(); + db.reclaim(); + let after = db.sst_size(); + assert!( + after * 4 < before, + "expected reclaim once the watermark lifts: before={before} after={after}" + ); +} + +/// Both steps are idempotent (crash-safety relies on it): re-running after +/// completion does no work and does not error. +#[test] +fn cleanup_and_reclaim_are_idempotent() { + let mut db = MockDB::new(); + let t = db.commit_table(1000); + db.delete(&t); + + assert_eq!(db.cleanup(), 1); + assert_eq!(db.cleanup(), 0); + db.flush(); + db.reclaim(); + db.reclaim(); // a second run must not panic or error + assert_eq!(db.cleanup(), 0); +} + +/// End-to-end: `delete_dataset` runs Phase 1 synchronously (now cheap), and the +/// subsequent startup reclaim (no readers) frees the disk. +#[test] +fn delete_dataset_then_reclaim_frees_space() { + let mut db = MockDB::new(); + let _t0 = db.commit_table(1000); + let _t1 = db.commit_table(1000); + db.compact_to_bottom(); + let before = db.sst_size(); + assert!(before > 0); + + db.delete_dataset(); + assert!(db.has_no_datasets()); + db.flush(); // flush Phase-1 tombstones so the unlink can drop the files + + db.reclaim(); + let after = db.sst_size(); + assert!(after * 4 < before, "expected reclaim: before={before} after={after}"); +} + +/// Documents the trade-off behind running [`Database::reclaim_disk_space`] only with +/// no live readers (startup): the unlink IGNORES snapshots, so reclaiming under a live +/// pre-deletion snapshot pulls its files out and the read fails loudly (error), not +/// silently wrong. Cache disabled so the data is genuinely gone, not served warm. +#[test] +fn reclaim_breaks_a_live_pre_deletion_reader() { + let mut db = MockDB::uncached(); + let t = db.commit_table(3000); + db.compact_to_bottom(); + + // In-flight query: snapshot taken BEFORE the deletion. + let reader = db.snapshot(); + + db.delete(&t); + assert_eq!(db.cleanup(), 1); + db.flush(); + + // Baseline: the reader still reads every row while its files are present. + assert_eq!(db.read(&reader, &t), t.rows); + + // Reclaim while that snapshot is STILL live. No live table remains, so the + // watermark is unbounded and the reader's files are unlinked from under it. + db.reclaim(); + + // The same reader can no longer read the table -- it fails, not returns wrong + // rows. Running reclaim only at startup (no live readers) is what avoids this. + assert!( + db.try_read(&reader, &t).is_err(), + "reading a table whose files were unlinked under a live snapshot must fail, not return wrong rows" + ); +} + +/// An orphaned `DIRTY_TABLES` marker -- left when a build's chunk is never +/// committed (a crash/abandon before `write_chunk` removes it) -- is counted as a +/// live table by the watermark, so it pins disk reclaim for every later table. +/// Startup recovery (`purge_orphan_dirty_tables`) drops it so the watermark lifts +/// and dead tables become reclaimable again. +#[test] +fn orphan_dirty_marker_unpinned_by_purge() { + let mut db = MockDB::new(); + // Orphan created FIRST -> smaller id -> pins the watermark low. Its chunk is + // never committed, so nothing ever removes its dirty marker. + let orphan = db.orphan_table(1000); + let live = db.commit_table(1000); + assert!( + orphan.id < live.id, + "orphan must be the smaller id to pin the watermark" + ); + + db.compact_to_bottom(); + let before = db.sst_size(); + assert!(before > 0); + + // Delete the live table and tombstone it (Phase 1). + db.delete(&live); + assert_eq!(db.cleanup(), 1); + db.flush(); + + // The orphan marker pins the watermark at the orphan's id, so the dead + // (higher-id) `live` table cannot be unlinked. + db.reclaim(); + assert!( + db.sst_size() * 4 > before, + "orphan pins the watermark, the dead table's files survive" + ); + + // Startup recovery removes the orphan marker (and tombstones its data). + assert_eq!(db.purge_orphans(), 1, "one orphan marker purged"); + db.flush(); + + // Watermark lifts -> the dead table is now reclaimable. + db.reclaim(); + let after = db.sst_size(); + assert!( + after * 4 < before, + "expected physical reclaim once the orphan no longer pins the watermark: before={before} after={after}" + ); + + // Purge is idempotent: no markers remain. + assert_eq!(db.purge_orphans(), 0); +} + +/// Phase 1 and physical reclaim are decoupled: reclaim unlinks dead files purely +/// by the watermark (id ordering), with no dependence on a preceding `cleanup()`. +/// This mirrors the startup reclaim path, which runs before any Phase 1, and it +/// must not consume the deletion bookkeeping that Phase 1 still owes work on. +#[test] +fn reclaim_is_independent_of_phase1() { + let mut db = MockDB::new(); + let t = db.commit_table(2000); + db.compact_to_bottom(); + let before = db.sst_size(); + assert!(before > 0); + + // Logical-delete but DO NOT run Phase 1 (no range tombstone issued). + db.delete(&t); + + // No live table remains -> watermark unbounded -> the dead table's files are + // unlinked anyway. Reclaim needs no preceding tombstone. + db.reclaim(); + let after = db.sst_size(); + assert!( + after * 4 < before, + "reclaim frees dead files without a preceding Phase 1: before={before} after={after}" + ); + + // Reclaim left the bookkeeping entry untouched, so a later Phase 1 still + // finds and tombstones it (covering any boundary/above-watermark remnants). + assert_eq!( + db.cleanup(), + 1, + "the deletion record survived reclaim and is handled by Phase 1" + ); + assert_eq!(db.cleanup(), 0); +} diff --git a/crates/storage/tests/mock_db/mod.rs b/crates/storage/tests/mock_db/mod.rs new file mode 100644 index 00000000..76c8d7b9 --- /dev/null +++ b/crates/storage/tests/mock_db/mod.rs @@ -0,0 +1,212 @@ +//! [`MockDB`] -- a scratch temp-dir `Database` for driving the table cleanup +//! lifecycle in tests: logical purge (Phase 1, range tombstones) + physical +//! SST-file unlink below the live watermark. +//! +//! It hides the fiddly setup (auto-compaction off, small write buffers, UUIDv7 id +//! ordering, flush/compact to the bottom level) so a test reads as domain steps -- +//! commit / delete / cleanup / reclaim / snapshot / purge -- asserting on observable +//! effects (rows read, files freed). + +use std::{sync::Arc, time::Duration}; + +use arrow::datatypes::{DataType, Schema}; +use sqd_storage::{ + db::{Chunk, Database, DatabaseSettings, DatasetId, DatasetKind, ReadSnapshot, TableId}, + table::write::{use_small_buffers, RestoreBufferSizesGuard} +}; +use tempfile::TempDir; + +use crate::utils::{make_irregular_block, make_schema, read_chunk}; + +/// Generous upper bound on the total number of rows a single test builds across +/// all its tables (block numbers double as indices into the shared column data). +const CAPACITY: usize = 20_000; + +fn two_u16_columns(n: usize) -> Vec> { + let col: Vec = (0..n).map(|i| (i % 60_000) as u16).collect(); + vec![col.clone(), col] +} + +/// A table [`MockDB`] built. Holds enough to delete it, read it back, and +/// reason about its watermark position (`id`). +pub struct Table { + pub chunk: Chunk, + pub id: TableId, + pub rows: Vec<(u32, u32)> +} + +/// A scratch `Database` wired for deterministic cleanup/reclaim tests: auto-compaction +/// off (only explicit `compact`/unlink move data) and small write buffers (so modest +/// row counts still produce real SST files). +/// +/// Invariant: tables are created with strictly increasing ids in call order (a 2 ms gap +/// per build advances the UUIDv7 timestamp), so "created first" == "smaller id". Tests +/// rely on this to place an orphan/older table below a live one without reading raw ids. +pub struct MockDB { + db: Database, + ds: DatasetId, + schema: Arc, + static_data: Vec>, + next_block: usize, + last_id: Option, + _small_buffers: RestoreBufferSizesGuard, + // Keeps the temp dir alive: flush/reclaim create new WAL/SST files, which + // fails once the directory is cleaned up. + _dir: TempDir +} + +impl MockDB { + pub fn new() -> Self { + Self::open( + DatabaseSettings::default() + .with_rocksdb_stats(true) + .with_auto_compactions(false) + ) + } + + /// Like [`MockDB::new`] but with the block cache disabled, so reads must + /// hit the SST files. Lets a test observe data loss deterministically: once + /// files are unlinked the data is genuinely gone, never served from cache. + pub fn uncached() -> Self { + Self::open( + DatabaseSettings::default() + .with_rocksdb_stats(true) + .with_auto_compactions(false) + .with_data_cache_size(0) + .with_chunk_cache_size(0) + ) + } + + fn open(settings: DatabaseSettings) -> Self { + let dir = tempfile::tempdir().unwrap(); + let db = settings.open(dir.path()).unwrap(); + let ds = DatasetId::from_str("solana"); + db.create_dataset(ds, DatasetKind::from_str("solana")).unwrap(); + Self { + db, + ds, + schema: make_schema(DataType::UInt32, DataType::UInt32, false), + static_data: two_u16_columns(CAPACITY), + next_block: 0, + last_id: None, + _small_buffers: use_small_buffers(), + _dir: dir + } + } + + /// Build (and `finish`) a table of `rows` rows over the next free block + /// range. `finish` already persists the dirty marker and the table data; + /// the caller decides whether to commit the chunk. + fn build_table(&mut self, rows: usize) -> (Chunk, Vec<(u32, u32)>, TableId) { + // Sleep so the next UUIDv7 timestamp advances, keeping creation order == id + // order (the struct invariant the watermark tests rely on). + std::thread::sleep(Duration::from_millis(2)); + + let start = self.next_block; + let end = start + rows; + assert!(end <= self.static_data[0].len(), "harness data capacity exceeded"); + self.next_block = end; + + let (chunk, rows_data) = + make_irregular_block(&self.static_data, start, end, Arc::clone(&self.schema), &self.db); + let id = chunk.tables().get("block").copied().unwrap(); + if let Some(last) = self.last_id { + assert!(id > last, "harness invariant: ids must increase with creation order"); + } + self.last_id = Some(id); + (chunk, rows_data, id) + } + + /// Build a committed table: its chunk is inserted, so `write_chunk` removes + /// its dirty marker. + pub fn commit_table(&mut self, rows: usize) -> Table { + let (chunk, rows, id) = self.build_table(rows); + self.db.insert_chunk(self.ds, &chunk).unwrap(); + Table { chunk, id, rows } + } + + /// Build a table but never commit its chunk -- `build_table`'s `finish` has + /// already persisted the dirty marker and data, so nothing ever removes the + /// marker. Exactly the orphan a crash-before-commit leaves behind. + pub fn orphan_table(&mut self, rows: usize) -> Table { + let (chunk, rows, id) = self.build_table(rows); + Table { chunk, id, rows } + } + + pub fn snapshot(&self) -> ReadSnapshot<'_> { + self.db.snapshot() + } + + pub fn read(&self, snapshot: &ReadSnapshot, table: &Table) -> Vec<(u32, u32)> { + read_chunk(snapshot, table.chunk.clone()) + } + + /// Like [`MockDB::read`] but propagates errors instead of unwrapping -- + /// used to show that an unlink under a live snapshot makes the data + /// unreadable rather than silently wrong. + pub fn try_read(&self, snapshot: &ReadSnapshot, table: &Table) -> anyhow::Result<()> { + let chunk_reader = snapshot.create_chunk_reader(table.chunk.clone()); + let table_reader = chunk_reader.get_table_reader("block")?; + table_reader.read_column(0, None)?; + table_reader.read_column(1, None)?; + Ok(()) + } + + /// Whether a fresh snapshot still sees any committed chunk in the dataset. + pub fn has_visible_chunk(&self) -> bool { + self.db.snapshot().get_first_chunk(self.ds).unwrap().is_some() + } + + pub fn delete(&self, table: &Table) { + self.db + .update_dataset(self.ds, |tx| tx.delete_chunk(&table.chunk)) + .unwrap(); + } + + /// Delete the whole dataset (runs Phase 1 synchronously), as production's + /// `delete_dataset` does. + pub fn delete_dataset(&self) { + self.db.delete_dataset(self.ds).unwrap(); + } + + /// Whether the database has no datasets left. + pub fn has_no_datasets(&self) -> bool { + self.db.get_all_datasets().unwrap().is_empty() + } + + pub fn cleanup(&self) -> usize { + self.db.cleanup().unwrap() + } + + /// Physically unlink dead SST files below the live watermark. The caller is + /// responsible for there being no live pre-deletion reader (as at startup in + /// production). Assert effects via [`MockDB::sst_size`]. + pub fn reclaim(&self) { + self.db.reclaim_disk_space().unwrap() + } + + pub fn purge_orphans(&self) -> usize { + self.db.purge_orphan_dirty_tables().unwrap() + } + + pub fn flush(&self) { + self.db.flush().unwrap(); + } + + /// Land the table data in the bottom level, so a later reclaim is a real + /// file unlink (`DeleteFilesInRange` skips L0): flush the memtable, then + /// compact. In production the dead data being reclaimed lives there too. + pub fn compact_to_bottom(&self) { + self.db.flush().unwrap(); + self.db.compact_tables(); + } + + /// Total size of all SST files in the table-data column family, in bytes. + pub fn sst_size(&self) -> u64 { + self.db + .get_property("TABLES", "rocksdb.total-sst-files-size") + .unwrap() + .and_then(|s| s.parse().ok()) + .unwrap_or(0) + } +}