diff --git a/Cargo.lock b/Cargo.lock index 6cd9b3a..b3103e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,7 +20,7 @@ checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" [[package]] name = "librdb" -version = "0.1.1" +version = "0.2.0" dependencies = [ "librdb-sys", "thiserror", @@ -28,7 +28,7 @@ dependencies = [ [[package]] name = "librdb-sys" -version = "0.1.0+2.2.0" +version = "0.2.0+2.3.0" dependencies = [ "cc", ] diff --git a/README.md b/README.md index c08deee..c2a42cb 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,8 @@ Rust wrapper for [librdb](https://github.com/redis/librdb), the Redis RDB file parser. Parses RDB dump files and delivers Redis data types (strings, lists, hashes, -sets, sorted sets, streams) through a callback trait, without loading the -entire file into memory. +sets, sorted sets, streams, arrays, and more) through a callback trait, without +loading the entire file into memory. | Crate | Description | |-------|-------------| @@ -15,7 +15,7 @@ entire file into memory. ```toml [dependencies] -librdb = "0.1" +librdb = "0.2" ``` By default, librdb is compiled from the vendored source (git submodule). diff --git a/librdb-sys/Cargo.toml b/librdb-sys/Cargo.toml index 075c8e5..00cb415 100644 --- a/librdb-sys/Cargo.toml +++ b/librdb-sys/Cargo.toml @@ -1,9 +1,12 @@ [package] name = "librdb-sys" -version = "0.1.0+2.2.0" +version = "0.2.0+2.3.0" links = "librdb" repository = "https://github.com/funcpp/rust-librdb" -license = "MIT" +# Crate's own code is MIT; it vendors librdb (MIT) plus deps/redis sources under +# BSD-3-Clause (Redis core), BSD-2-Clause (lzf), and Zlib (crcspeed). sha256 is +# public-domain (no SPDX obligation). File headers are retained for attribution. +license = "MIT AND BSD-3-Clause AND BSD-2-Clause AND Zlib" description = "Native bindings to the librdb library" keywords = ["redis"] categories = ["external-ffi-bindings"] diff --git a/librdb-sys/librdb b/librdb-sys/librdb index a706596..d1ee1f0 160000 --- a/librdb-sys/librdb +++ b/librdb-sys/librdb @@ -1 +1 @@ -Subproject commit a7065962cf434da1a909aa45107dafa7e45b5057 +Subproject commit d1ee1f04582abc748477bd777e574e92345621c9 diff --git a/librdb-sys/src/bindings.rs b/librdb-sys/src/bindings.rs index 575768b..e7c7339 100644 --- a/librdb-sys/src/bindings.rs +++ b/librdb-sys/src/bindings.rs @@ -128,6 +128,8 @@ pub const RdbRes_RDB_ERR_MAX_RAW_LEN_EXCEEDED_FOR_KEY: RdbRes = 53; pub const RdbRes_RDB_ERR_EXCLUSIVE_RAW_HANDLERS: RdbRes = 54; #[doc = " error codes - reported by parser's blocks"] pub const RdbRes_RDB_ERR_MODULE_INVALID_WHEN_OPCODE: RdbRes = 55; +#[doc = " error codes - reported by parser's blocks"] +pub const RdbRes_RDB_ERR_ARRAY_INVALID_STATE: RdbRes = 56; #[doc = " api-ext error codes (see file: rp-ext-api.h)"] pub const RdbRes__RDB_ERR_EXTENSION_FIRST: RdbRes = 4096; #[doc = " user-defined error codes - reported by user-defined handlers or reader"] @@ -835,7 +837,7 @@ pub struct RdbHandlersDataCallbacks { serializedSize: usize, ) -> RdbRes, >, - #[doc = " Stream cb - invoked in this order for each stream key. Indentation represent nesting.\n handleStreamMetadata(meta)\n handleStreamItem(id, field, value, itemsLeft) [repeated per entry/field]\n handleStreamNewCGroup(grpName, meta) [per consumer group]\n handleStreamCGroupPendingEntry(entry) [per group PEL entry]\n handleStreamNewConsumer(consName, meta) [per consumer]\n handleStreamConsumerPendingEntry(id) [per consumer PEL entry]\n handleStreamIdmpMeta(meta) [once, if IDMP enabled]\n handleStreamIdmpProducer(producer) [per producer]\n handleStreamIdmpEntry(entry) [per IID mapping]"] + #[doc = " Stream cb - invoked in this order for each stream key. Indentation represent nesting.\n handleStreamMetadata(meta)\n handleStreamItem(id, field, value, itemsLeft) [repeated per entry/field]\n handleStreamNewCGroup(grpName, meta) [per consumer group]\n handleStreamCGroupPendingEntry(entry) [per group PEL entry]\n handleStreamNewConsumer(consName, meta) [per consumer]\n handleStreamConsumerPendingEntry(id) [per consumer PEL entry]\n handleStreamNackZoneEntry(id, itemsLeft) [per NACKed entry, v14+]\n handleStreamIdmpMeta(meta) [once, if IDMP enabled]\n handleStreamIdmpProducer(producer) [per producer]\n handleStreamIdmpEntry(entry) [per IID mapping]"] pub handleStreamMetadata: ::std::option::Option< unsafe extern "C" fn( p: *mut RdbParser, @@ -883,6 +885,14 @@ pub struct RdbHandlersDataCallbacks { streamId: *mut RdbStreamID, ) -> RdbRes, >, + pub handleStreamNackZoneEntry: ::std::option::Option< + unsafe extern "C" fn( + p: *mut RdbParser, + userData: *mut ::std::os::raw::c_void, + id: *mut RdbStreamID, + itemsLeft: i64, + ) -> RdbRes, + >, pub handleStreamIdmpMeta: ::std::option::Option< unsafe extern "C" fn( p: *mut RdbParser, @@ -904,11 +914,28 @@ pub struct RdbHandlersDataCallbacks { entry: *mut RdbStreamIdmpEntry, ) -> RdbRes, >, + #[doc = " Array cb (RDB_TYPE_ARRAY, v14+) - invoked in this order per array key:\n handleArrayMetadata(count, insertIdx) [once]\n handleArrayElement(idx, value) [repeated count times, ascending idx]\n\n insertIdx == RDB_ARRAY_INSERT_IDX_NONE means the array was saved with no\n insert cursor. Otherwise it carries the persisted insert_idx value."] + pub handleArrayMetadata: ::std::option::Option< + unsafe extern "C" fn( + p: *mut RdbParser, + userData: *mut ::std::os::raw::c_void, + count: u64, + insertIdx: u64, + ) -> RdbRes, + >, + pub handleArrayElement: ::std::option::Option< + unsafe extern "C" fn( + p: *mut RdbParser, + userData: *mut ::std::os::raw::c_void, + idx: u64, + value: RdbBulk, + ) -> RdbRes, + >, } #[allow(clippy::unnecessary_operation, clippy::identity_op)] const _: () = { ["Size of RdbHandlersDataCallbacks"] - [::std::mem::size_of::() - 192usize]; + [::std::mem::size_of::() - 216usize]; ["Alignment of RdbHandlersDataCallbacks"] [::std::mem::align_of::() - 8usize]; ["Offset of field: RdbHandlersDataCallbacks::handleStartRdb"] @@ -957,12 +984,18 @@ const _: () = { RdbHandlersDataCallbacks, handleStreamConsumerPendingEntry ) - 160usize]; + ["Offset of field: RdbHandlersDataCallbacks::handleStreamNackZoneEntry"] + [::std::mem::offset_of!(RdbHandlersDataCallbacks, handleStreamNackZoneEntry) - 168usize]; ["Offset of field: RdbHandlersDataCallbacks::handleStreamIdmpMeta"] - [::std::mem::offset_of!(RdbHandlersDataCallbacks, handleStreamIdmpMeta) - 168usize]; + [::std::mem::offset_of!(RdbHandlersDataCallbacks, handleStreamIdmpMeta) - 176usize]; ["Offset of field: RdbHandlersDataCallbacks::handleStreamIdmpProducer"] - [::std::mem::offset_of!(RdbHandlersDataCallbacks, handleStreamIdmpProducer) - 176usize]; + [::std::mem::offset_of!(RdbHandlersDataCallbacks, handleStreamIdmpProducer) - 184usize]; ["Offset of field: RdbHandlersDataCallbacks::handleStreamIdmpEntry"] - [::std::mem::offset_of!(RdbHandlersDataCallbacks, handleStreamIdmpEntry) - 184usize]; + [::std::mem::offset_of!(RdbHandlersDataCallbacks, handleStreamIdmpEntry) - 192usize]; + ["Offset of field: RdbHandlersDataCallbacks::handleArrayMetadata"] + [::std::mem::offset_of!(RdbHandlersDataCallbacks, handleArrayMetadata) - 200usize]; + ["Offset of field: RdbHandlersDataCallbacks::handleArrayElement"] + [::std::mem::offset_of!(RdbHandlersDataCallbacks, handleArrayElement) - 208usize]; }; unsafe extern "C" { #[doc = " Parser creation and deletion"] @@ -1174,7 +1207,8 @@ pub const RdbDataType_RDB_DATA_TYPE_HASH: RdbDataType = 4; pub const RdbDataType_RDB_DATA_TYPE_MODULE: RdbDataType = 5; pub const RdbDataType_RDB_DATA_TYPE_STREAM: RdbDataType = 6; pub const RdbDataType_RDB_DATA_TYPE_FUNCTION: RdbDataType = 7; -pub const RdbDataType_RDB_DATA_TYPE_MAX: RdbDataType = 8; +pub const RdbDataType_RDB_DATA_TYPE_ARRAY: RdbDataType = 8; +pub const RdbDataType_RDB_DATA_TYPE_MAX: RdbDataType = 9; #[doc = " Multiple levels registration\n Some of the more advanced configuration might require parsing different data\n types at different levels of the parser.\n\n The callbacks that are common to all levels (lookup HANDLERS_COMMON_CALLBACKS),\n if registered at different levels then all of them will be called, one by one,\n starting from level 0.\n\n As for the callbacks of RDB object types, each level has its own way to\n handle the data with distinct set of callbacks interfaces. In case of multiple\n levels registration, the application should configure for each RDB data type\n at what level it is needed to get parsed by calling `RDB_handleByLevel()`.\n Otherwise, the parser will resolve it by parsing and calling handlers that are\n registered at lowest level."] pub type RdbDataType = ::std::os::raw::c_uint; unsafe extern "C" { diff --git a/librdb-sys/src/lib.rs b/librdb-sys/src/lib.rs index 21321bc..0e13f1f 100644 --- a/librdb-sys/src/lib.rs +++ b/librdb-sys/src/lib.rs @@ -6,3 +6,11 @@ #![allow(clippy::pedantic)] include!("bindings.rs"); + +/// `RDB_ARRAY_INSERT_IDX_NONE` — the "no insert cursor" sentinel for `RDB_TYPE_ARRAY`. +/// +/// Declared by hand instead of via bindgen: the C macro is +/// `#define RDB_ARRAY_INSERT_IDX_NONE UINT64_MAX`, but bindgen mis-evaluates the +/// all-bits-set value as `i32 = -1`. The upstream type is `uint64_t`, so the var +/// is blocklisted in `update-bindings.sh` and re-declared here with correct width. +pub const RDB_ARRAY_INSERT_IDX_NONE: u64 = u64::MAX; diff --git a/librdb-sys/update-bindings.sh b/librdb-sys/update-bindings.sh index afd721f..fa84396 100755 --- a/librdb-sys/update-bindings.sh +++ b/librdb-sys/update-bindings.sh @@ -18,19 +18,22 @@ if ! command -v bindgen &>/dev/null; then exit 1 fi +# Clang args (the -I include paths) must follow `--`; bindgen derives Debug by +# default, so only Default needs an explicit flag. bindgen "$WRAPPER_H" \ - -I "$LIBRDB_ROOT/api" \ - -I "$LIBRDB_ROOT/src" \ - -I "$LIBRDB_ROOT/deps/redis" \ --allowlist-function 'RDB_.*' \ --allowlist-function 'RDBX_.*' \ --allowlist-type 'Rdb.*' \ --allowlist-type 'Rdbx.*' \ --allowlist-var 'RDB_.*' \ --allowlist-var 'RDBX_.*' \ - --with-derive-debug \ + --blocklist-var 'RDB_ARRAY_INSERT_IDX_NONE' \ --with-derive-default \ - -o "$OUTPUT" + -o "$OUTPUT" \ + -- \ + -I "$LIBRDB_ROOT/api" \ + -I "$LIBRDB_ROOT/src" \ + -I "$LIBRDB_ROOT/deps/redis" rustfmt "$OUTPUT" diff --git a/librdb/Cargo.toml b/librdb/Cargo.toml index 0c0d788..7c0003b 100644 --- a/librdb/Cargo.toml +++ b/librdb/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "librdb" -version = "0.1.1" +version = "0.2.0" repository = "https://github.com/funcpp/rust-librdb" readme = "../README.md" license = "MIT" @@ -11,7 +11,7 @@ edition = "2024" rust-version = "1.94" [dependencies] -librdb-sys = { version = "0.1.0", path = "../librdb-sys" } +librdb-sys = { version = "0.2.0", path = "../librdb-sys" } thiserror = "2" [lints] diff --git a/librdb/src/handlers.rs b/librdb/src/handlers.rs index e4d38c3..8d57a9b 100644 --- a/librdb/src/handlers.rs +++ b/librdb/src/handlers.rs @@ -115,6 +115,12 @@ pub trait RdbHandlers { Ok(()) } + /// A NACK-zone (not-yet-acknowledged) entry within a consumer group + /// (`RDB_TYPE_STREAM_LISTPACKS_5`, RDB v14+). + fn handle_stream_nack_zone_entry(&mut self, _id: &StreamId, _items_left: i64) -> Result<()> { + Ok(()) + } + fn handle_stream_idmp_meta(&mut self, _meta: &StreamIdmpMeta) -> Result<()> { Ok(()) } @@ -126,4 +132,18 @@ pub trait RdbHandlers { fn handle_stream_idmp_entry(&mut self, _iid: &[u8], _stream_id: &StreamId) -> Result<()> { Ok(()) } + + /// Metadata for a sparse array (`RDB_TYPE_ARRAY`, RDB v14+), called once + /// before its elements. + /// + /// `insert_idx` is the persisted insert cursor, or `None` if the array was + /// saved without one (librdb's `RDB_ARRAY_INSERT_IDX_NONE` sentinel). + fn handle_array_metadata(&mut self, _count: u64, _insert_idx: Option) -> Result<()> { + Ok(()) + } + + /// A single sparse-array element, called `count` times in ascending `idx` order. + fn handle_array_element(&mut self, _idx: u64, _value: &[u8]) -> Result<()> { + Ok(()) + } } diff --git a/librdb/src/trampoline.rs b/librdb/src/trampoline.rs index a829960..9d42f27 100644 --- a/librdb/src/trampoline.rs +++ b/librdb/src/trampoline.rs @@ -321,6 +321,19 @@ pub unsafe extern "C" fn trampoline_stream_consumer_pending_entry( + _p: *mut RdbParser, + user_data: *mut c_void, + id: *mut RdbStreamID, + items_left: i64, +) -> RdbRes { + let state = unsafe { &mut *(user_data.cast::>()) }; + guarded(state, |h| { + let stream_id = StreamId::from_raw(unsafe { &*id }); + h.handle_stream_nack_zone_entry(&stream_id, items_left) + }) +} + pub unsafe extern "C" fn trampoline_stream_idmp_meta( _p: *mut RdbParser, user_data: *mut c_void, @@ -360,6 +373,37 @@ pub unsafe extern "C" fn trampoline_stream_idmp_entry( }) } +pub unsafe extern "C" fn trampoline_array_metadata( + _p: *mut RdbParser, + user_data: *mut c_void, + count: u64, + insert_idx: u64, +) -> RdbRes { + let state = unsafe { &mut *(user_data.cast::>()) }; + guarded(state, |h| { + // librdb encodes "no insert cursor" as the RDB_ARRAY_INSERT_IDX_NONE sentinel. + let insert_idx = if insert_idx == librdb_sys::RDB_ARRAY_INSERT_IDX_NONE { + None + } else { + Some(insert_idx) + }; + h.handle_array_metadata(count, insert_idx) + }) +} + +pub unsafe extern "C" fn trampoline_array_element( + p: *mut RdbParser, + user_data: *mut c_void, + idx: u64, + value: RdbBulk, +) -> RdbRes { + let state = unsafe { &mut *(user_data.cast::>()) }; + guarded(state, |h| { + let value_slice = unsafe { bulk_to_slice(p, value) }; + h.handle_array_element(idx, value_slice) + }) +} + pub fn build_callbacks() -> librdb_sys::RdbHandlersDataCallbacks { librdb_sys::RdbHandlersDataCallbacks { handleStartRdb: Some(trampoline_start_rdb::), @@ -383,8 +427,11 @@ pub fn build_callbacks() -> librdb_sys::RdbHandlersDataCallbacks handleStreamCGroupPendingEntry: Some(trampoline_stream_cgroup_pending_entry::), handleStreamNewConsumer: Some(trampoline_stream_new_consumer::), handleStreamConsumerPendingEntry: Some(trampoline_stream_consumer_pending_entry::), + handleStreamNackZoneEntry: Some(trampoline_stream_nack_zone_entry::), handleStreamIdmpMeta: Some(trampoline_stream_idmp_meta::), handleStreamIdmpProducer: Some(trampoline_stream_idmp_producer::), handleStreamIdmpEntry: Some(trampoline_stream_idmp_entry::), + handleArrayMetadata: Some(trampoline_array_metadata::), + handleArrayElement: Some(trampoline_array_element::), } } diff --git a/librdb/src/types.rs b/librdb/src/types.rs index 788d4d2..0e99f53 100644 --- a/librdb/src/types.rs +++ b/librdb/src/types.rs @@ -16,6 +16,7 @@ impl RdbError { } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[non_exhaustive] // RDB gains data types across versions (e.g. Array in v14); keep this open. pub enum DataType { String, List, @@ -25,16 +26,19 @@ pub enum DataType { Module, Stream, Function, + /// Sparse array (RDB v14+, `RDB_TYPE_ARRAY`). + Array, } impl DataType { pub(crate) fn from_raw(value: i32) -> Result { #![allow(non_upper_case_globals)] use librdb_sys::{ - RdbDataType_RDB_DATA_TYPE_FUNCTION, RdbDataType_RDB_DATA_TYPE_HASH, - RdbDataType_RDB_DATA_TYPE_LIST, RdbDataType_RDB_DATA_TYPE_MODULE, - RdbDataType_RDB_DATA_TYPE_SET, RdbDataType_RDB_DATA_TYPE_STREAM, - RdbDataType_RDB_DATA_TYPE_STRING, RdbDataType_RDB_DATA_TYPE_ZSET, + RdbDataType_RDB_DATA_TYPE_ARRAY, RdbDataType_RDB_DATA_TYPE_FUNCTION, + RdbDataType_RDB_DATA_TYPE_HASH, RdbDataType_RDB_DATA_TYPE_LIST, + RdbDataType_RDB_DATA_TYPE_MODULE, RdbDataType_RDB_DATA_TYPE_SET, + RdbDataType_RDB_DATA_TYPE_STREAM, RdbDataType_RDB_DATA_TYPE_STRING, + RdbDataType_RDB_DATA_TYPE_ZSET, }; #[allow(clippy::cast_sign_loss)] match value as u32 { @@ -46,6 +50,7 @@ impl DataType { RdbDataType_RDB_DATA_TYPE_MODULE => Ok(Self::Module), RdbDataType_RDB_DATA_TYPE_STREAM => Ok(Self::Stream), RdbDataType_RDB_DATA_TYPE_FUNCTION => Ok(Self::Function), + RdbDataType_RDB_DATA_TYPE_ARRAY => Ok(Self::Array), _ => Err(RdbError::Parser { code: 0, message: format!("unknown data type: {value}"), diff --git a/librdb/tests/level2.rs b/librdb/tests/level2.rs index fc6a8a8..fbc6608 100644 --- a/librdb/tests/level2.rs +++ b/librdb/tests/level2.rs @@ -68,6 +68,15 @@ enum Event { iid: Vec, stream_id: StreamId, }, + StreamNackZoneEntry(StreamId), + ArrayMetadata { + count: u64, + insert_idx: Option, + }, + ArrayElement { + idx: u64, + value: Vec, + }, } #[derive(Default)] @@ -230,6 +239,24 @@ impl RdbHandlers for Collector { }); Ok(()) } + + fn handle_stream_nack_zone_entry(&mut self, id: &StreamId, _items_left: i64) -> Result<()> { + self.events.push(Event::StreamNackZoneEntry(*id)); + Ok(()) + } + + fn handle_array_metadata(&mut self, count: u64, insert_idx: Option) -> Result<()> { + self.events.push(Event::ArrayMetadata { count, insert_idx }); + Ok(()) + } + + fn handle_array_element(&mut self, idx: u64, value: &[u8]) -> Result<()> { + self.events.push(Event::ArrayElement { + idx, + value: value.to_vec(), + }); + Ok(()) + } } fn parse_collect(rdb: &str) -> Vec { @@ -658,3 +685,110 @@ fn parse_fd_single_key() { assert_eq!(keys, vec![b"xxx"]); assert!(events.contains(&Event::StringValue(b"111".to_vec()))); } + +fn array_meta(events: &[Event]) -> Vec<(u64, Option)> { + events + .iter() + .filter_map(|e| match e { + Event::ArrayMetadata { count, insert_idx } => Some((*count, *insert_idx)), + _ => None, + }) + .collect() +} + +fn array_elements(events: &[Event]) -> Vec<(u64, &[u8])> { + events + .iter() + .filter_map(|e| match e { + Event::ArrayElement { idx, value } => Some((*idx, value.as_slice())), + _ => None, + }) + .collect() +} + +#[test] +fn array_basic_no_insert_idx() { + let events = parse_collect("array_v14_basic.rdb"); + + assert!(events.contains(&Event::NewKey { + key: b"arr".to_vec(), + data_type: DataType::Array, + })); + assert_eq!(array_meta(&events), vec![(100, None)]); + + let elements = array_elements(&events); + assert_eq!(elements.len(), 100); + assert_eq!(elements[0], (0, b"a0".as_slice())); + assert_eq!(elements[99], (99, b"a99".as_slice())); + assert!( + elements.windows(2).all(|w| w[0].0 < w[1].0), + "elements must arrive in ascending idx order" + ); +} + +#[test] +fn array_with_insert_idx() { + let events = parse_collect("array_v14_with_insert_idx.rdb"); + + assert_eq!(array_meta(&events), vec![(3, Some(49))]); + assert_eq!( + array_elements(&events), + vec![ + (1_000_000, b"first".as_slice()), + (1_000_100, b"mid".as_slice()), + (1_001_000, b"third".as_slice()), + ] + ); +} + +#[test] +fn array_insert_idx_boundary_is_not_sentinel() { + // insert_idx == u64::MAX - 1 is a real cursor value, not the + // RDB_ARRAY_INSERT_IDX_NONE (u64::MAX) sentinel — it must map to Some, not None. + let events = parse_collect("array_v14_insert_idx_boundary.rdb"); + assert_eq!(array_meta(&events), vec![(2, Some(u64::MAX - 1))]); +} + +#[test] +fn array_mixed_value_encodings() { + // Int- and float-encoded elements are delivered as their textual bytes. + let events = parse_collect("array_v14_mixed_types.rdb"); + let values: Vec<&[u8]> = array_elements(&events) + .into_iter() + .map(|(_, v)| v) + .collect(); + assert_eq!( + values, + vec![ + b"this_is_a_long_sds_string_value_long_enough".as_slice(), + b"42".as_slice(), + b"1.5".as_slice(), + b"abc".as_slice(), + ] + ); +} + +#[test] +fn stream_xnack_zone_entries() { + let events = parse_collect("stream_v14_xnack.rdb"); + let nacked: Vec = events + .iter() + .filter_map(|e| match e { + Event::StreamNackZoneEntry(id) => Some(*id), + _ => None, + }) + .collect(); + assert_eq!( + nacked, + vec![ + StreamId { + ms: 1_779_170_191_998, + seq: 0 + }, + StreamId { + ms: 1_779_170_192_000, + seq: 0 + }, + ] + ); +}