Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/hotblocks/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ impl CLI {
.map(Arc::new)
.context("failed to open rocksdb database")?;

// Crash recovery: drop DIRTY_TABLES markers left by a build that died before
// commit. Must run before any ingest -- an orphan marker pins the disk-reclaim
// watermark forever. Best-effort: a failure degrades reclaim, never blocks startup.
match db.purge_orphan_dirty_tables() {
Ok(0) => {}
Ok(n) => tracing::info!("purged {n} orphan dirty table(s) left by an interrupted build"),
Err(err) => tracing::warn!(error =? err, "failed to purge orphan dirty tables at startup")
}

let mut metrics_registry = crate::metrics::build_metrics_registry();
metrics_registry.register_collector(Box::new(DatasetMetricsCollector {
db: db.clone(),
Expand Down
14 changes: 13 additions & 1 deletion crates/hotblocks/src/data_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use anyhow::{Context, anyhow};
use futures::{FutureExt, StreamExt, TryStreamExt};
use sqd_data_client::reqwest::ReqwestDataClient;
use sqd_storage::db::DatasetId;
use tracing::{error, info};
use tracing::{debug, error, info};

use crate::{
dataset_config::{DatasetConfig, RetentionConfig},
Expand All @@ -34,6 +34,18 @@ impl DataService {
}
}

// Reclaim disk now, before any controller spawns. The file unlink ignores
// snapshots, so it is safe only here -- no ingest or query snapshot exists
// yet. Write-free, so it also makes progress at a full disk.
{
let db = db.clone();
match tokio::task::spawn_blocking(move || db.reclaim_disk_space()).await {
Ok(Ok(())) => debug!("startup disk reclaim complete"),
Ok(Err(err)) => error!(error =? err, "startup disk reclaim failed"),
Err(_) => error!("startup disk reclaim panicked")
}
}

let mut controllers = futures::stream::iter(datasets.into_iter())
.map(|(dataset_id, cfg)| {
let db = db.clone();
Expand Down
51 changes: 39 additions & 12 deletions crates/hotblocks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ fn main() -> anyhow::Result<()> {
.block_on(async {
let app = args.build_app().await?;

// NB: startup disk reclaim (file unlink) runs inside DataService::start
// (build_app above), before any controller spawns -- the only point where
// unlinking is safe, since no ingest/query snapshot exists yet.

tokio::spawn(db_cleanup_task(app.db.clone()));

let api = build_api(app);
Expand Down Expand Up @@ -89,24 +93,47 @@ async fn shutdown_signal() {
}
}

const CLEANUP_INTERVAL: Duration = Duration::from_secs(10);
/// Longer pause after a failed tick, so a persistent error (e.g. full disk) doesn't
/// busy-loop failing writes.
const CLEANUP_ERROR_BACKOFF: Duration = Duration::from_secs(30);

/// Routine Phase-1 cleanup: range-tombstone deleted tables every tick so compaction
/// can reclaim their space -- in normal operation the entire runtime reclaim path.
///
/// The physical file unlink (`Database::reclaim_disk_space`) is NOT run here: it
/// ignores snapshots and could break an in-flight query, so it runs only at startup.
/// FUTURE: also trigger it from here under disk pressure (the emergency reclaim
/// compaction can't deliver at a full disk), accepting that in-flight-query risk.
#[instrument(name = "db_cleanup", skip_all)]
async fn db_cleanup_task(db: DBRef) {
tokio::time::sleep(Duration::from_secs(10)).await;
tokio::time::sleep(CLEANUP_INTERVAL).await;
loop {
debug!("db cleanup started");
let db = db.clone();
let result = tokio::task::spawn_blocking(move || db.cleanup()).await;
match result {
Ok(Ok(deleted)) => {
if deleted > 0 {
debug!("purged {} tables", deleted)
} else {
debug!("nothing to purge, pausing cleanup for 10 seconds");
tokio::time::sleep(Duration::from_secs(10)).await;

let failed = match result {
Ok(Ok(purged)) => {
if purged > 0 {
debug!("cleanup: logically purged {purged} table(s)");
}
false
}
Ok(Err(err)) => error!(error =? err, "database cleanup task failed"),
Err(_) => error!("database cleanup task panicked")
}
Ok(Err(err)) => {
error!(error =? err, "database cleanup failed");
true
}
Err(_) => {
error!("database cleanup task panicked");
true
}
};

tokio::time::sleep(if failed {
CLEANUP_ERROR_BACKOFF
} else {
CLEANUP_INTERVAL
})
.await;
}
}
67 changes: 63 additions & 4 deletions crates/storage/src/db/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::{
use crate::db::{
ops::{perform_dataset_compaction, CompactionStatus},
read::datasets::list_all_datasets,
write::{ops::deleted_deleted_tables, table_builder::TableBuilder, tx::Tx},
write::{ops as cleanup_ops, table_builder::TableBuilder, tx::Tx},
Chunk, DatasetUpdate
};

Expand All @@ -36,7 +36,8 @@ pub struct DatabaseSettings {
direct_io: bool,
cache_index_and_filter_blocks: bool,
max_log_file_size: usize,
keep_log_file_num: usize
keep_log_file_num: usize,
auto_compactions: bool
}

impl Default for DatabaseSettings {
Expand All @@ -48,7 +49,8 @@ impl Default for DatabaseSettings {
direct_io: false,
cache_index_and_filter_blocks: false,
max_log_file_size: 10,
keep_log_file_num: 10
keep_log_file_num: 10,
auto_compactions: true
}
}
}
Expand Down Expand Up @@ -91,6 +93,13 @@ impl DatabaseSettings {
self
}

/// Enable/disable RocksDB background auto-compaction of the table data.
/// Defaults to `true`; off mainly for tests needing deterministic compaction.
pub fn with_auto_compactions(mut self, yes: bool) -> Self {
self.auto_compactions = yes;
self
}

fn db_options(&self) -> RocksOptions {
let mut options = RocksOptions::default();
options.create_if_missing(true);
Expand All @@ -99,6 +108,11 @@ impl DatabaseSettings {
// Bound info log (LOG, LOG.old.*) growth
options.set_max_log_file_size(self.max_log_file_size * 1024 * 1024);
options.set_keep_log_file_num(self.keep_log_file_num);
// Keep compaction ahead of ingest + deletion tombstones: the default 2 jobs
// could not keep up during the NET-819 incident, and routine reclaim now relies
// entirely on compaction (the file unlink is startup-only). Load-bearing.
options.set_max_background_jobs(8);
options.set_max_subcompactions(4);
if self.with_rocksdb_stats {
options.enable_statistics();
}
Expand Down Expand Up @@ -141,6 +155,18 @@ impl DatabaseSettings {
let mut options = RocksOptions::default();
options.set_block_based_table_factory(&block_based_table_factory);
options.set_compression_type(rocksdb::DBCompressionType::Lz4);
// Find tombstone-heavy SSTs and bound staleness so no dead file lingers. A lone
// range tombstone over a whole table has low deletion density, so the 24h periodic
// compaction is the real backstop; the collector catches denser boundary files.
// Thresholds are provisional.
options.add_compact_on_deletion_collector_factory(128 * 1024, 64 * 1024, 0.5);
options.set_periodic_compaction_seconds(24 * 60 * 60);
// Bound space amplification under leveled compaction (default since RocksDB 8.4;
// pinned because it is load-bearing here).
options.set_level_compaction_dynamic_level_bytes(true);
if !self.auto_compactions {
options.set_disable_auto_compactions(true);
}
options
}

Expand Down Expand Up @@ -287,8 +313,41 @@ impl Database {
Ok(())
}

/// Phase 1 -- logically purge deleted tables (snapshot-safe range tombstones).
/// Returns the number of tables logically deleted by this call.
pub fn cleanup(&self) -> anyhow::Result<usize> {
deleted_deleted_tables(&self.db)
cleanup_ops::logical_cleanup(&self.db)
}

/// Physically reclaim disk by unlinking whole SST files below the live watermark
/// (min live `TableId`). No writes, so it works even at a full disk, unlike
/// compaction. IGNORES snapshots, so it is safe only where no live reader exists --
/// today only at STARTUP. See [`cleanup_ops::reclaim_disk_space`] for the trade-off.
pub fn reclaim_disk_space(&self) -> anyhow::Result<()> {
cleanup_ops::reclaim_disk_space(&self.db)
}

/// Crash recovery: purge `DIRTY_TABLES` markers left by builds that died before
/// commit -- an orphan's id otherwise pins the reclaim watermark forever (see
/// [`Database::reclaim_disk_space`]). MUST run before any ingest starts (e.g. at
/// startup): it treats every dirty marker as an orphan. Returns orphans purged.
pub fn purge_orphan_dirty_tables(&self) -> anyhow::Result<usize> {
cleanup_ops::purge_orphan_dirty_tables(&self.db)
}

/// Flush the table-data column family's memtable to SST files (e.g. before a
/// reclaim, so freshly written data is unlinkable).
pub fn flush(&self) -> anyhow::Result<()> {
self.db.flush_cf(self.db.cf_handle(CF_TABLES).unwrap())?;
Ok(())
}

/// Force a full compaction of the table-data column family. Unlike
/// [`Database::reclaim_disk_space`] this *writes* (unsafe at a full disk); it pushes
/// data to the bottom level and rewrites tombstone-heavy files the unlink can't reach.
pub fn compact_tables(&self) {
let cf = self.db.cf_handle(CF_TABLES).unwrap();
self.db.compact_range_cf(cf, None::<&[u8]>, None::<&[u8]>);
}

pub fn get_statistics(&self) -> Option<String> {
Expand Down
29 changes: 26 additions & 3 deletions crates/storage/src/db/table_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ impl TableId {
}

pub fn from_slice(bytes: &[u8]) -> Self {
Self {
uuid: Uuid::from_slice(bytes).unwrap()
}
Self::try_from_slice(bytes).expect("TableId::from_slice: not a 16-byte UUID")
}

/// Decode a key into a `TableId`, or `None` if not exactly a 16-byte UUID. Use this
/// (not [`TableId::from_slice`]) when scanning a CF whose keys could be corrupt, so a
/// malformed key is skipped instead of panicking and wedging the scan.
pub fn try_from_slice(bytes: &[u8]) -> Option<Self> {
Uuid::from_slice(bytes).ok().map(|uuid| Self { uuid })
}
}

Expand All @@ -31,3 +36,21 @@ impl Display for TableId {
self.uuid.fmt(f)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn try_from_slice_rejects_non_16_byte_keys() {
// A real 16-byte id round-trips.
let id = TableId::new();
assert_eq!(TableId::try_from_slice(id.as_ref()), Some(id));

// Anything that is not exactly 16 bytes decodes to None instead of
// panicking (so a corrupt/short key can be skipped, not wedge cleanup).
assert_eq!(TableId::try_from_slice(&[]), None);
assert_eq!(TableId::try_from_slice(&[0u8; 15]), None);
assert_eq!(TableId::try_from_slice(&[0u8; 17]), None);
}
}
Loading
Loading