Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
Rust wrapper for [librdb](https://github.com/redis/librdb), the Redis RDB file parser.

Parses RDB dump files and delivers Redis data types (strings, lists, hashes,
sets, sorted sets, streams) through a callback trait, without loading the
entire file into memory.
sets, sorted sets, streams, arrays, and more) through a callback trait, without
loading the entire file into memory.

| Crate | Description |
|-------|-------------|
Expand All @@ -15,7 +15,7 @@ entire file into memory.

```toml
[dependencies]
librdb = "0.1"
librdb = "0.2"
```

By default, librdb is compiled from the vendored source (git submodule).
Expand Down
7 changes: 5 additions & 2 deletions librdb-sys/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
[package]
name = "librdb-sys"
version = "0.1.0+2.2.0"
version = "0.2.0+2.3.0"
links = "librdb"
repository = "https://github.com/funcpp/rust-librdb"
license = "MIT"
# Crate's own code is MIT; it vendors librdb (MIT) plus deps/redis sources under
# BSD-3-Clause (Redis core), BSD-2-Clause (lzf), and Zlib (crcspeed). sha256 is
# public-domain (no SPDX obligation). File headers are retained for attribution.
license = "MIT AND BSD-3-Clause AND BSD-2-Clause AND Zlib"
description = "Native bindings to the librdb library"
keywords = ["redis"]
categories = ["external-ffi-bindings"]
Expand Down
2 changes: 1 addition & 1 deletion librdb-sys/librdb
46 changes: 40 additions & 6 deletions librdb-sys/src/bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ pub const RdbRes_RDB_ERR_MAX_RAW_LEN_EXCEEDED_FOR_KEY: RdbRes = 53;
pub const RdbRes_RDB_ERR_EXCLUSIVE_RAW_HANDLERS: RdbRes = 54;
#[doc = " error codes - reported by parser's blocks"]
pub const RdbRes_RDB_ERR_MODULE_INVALID_WHEN_OPCODE: RdbRes = 55;
#[doc = " error codes - reported by parser's blocks"]
pub const RdbRes_RDB_ERR_ARRAY_INVALID_STATE: RdbRes = 56;
#[doc = " api-ext error codes (see file: rp-ext-api.h)"]
pub const RdbRes__RDB_ERR_EXTENSION_FIRST: RdbRes = 4096;
#[doc = " user-defined error codes - reported by user-defined handlers or reader"]
Expand Down Expand Up @@ -835,7 +837,7 @@ pub struct RdbHandlersDataCallbacks {
serializedSize: usize,
) -> RdbRes,
>,
#[doc = " Stream cb - invoked in this order for each stream key. Indentation represent nesting.\n handleStreamMetadata(meta)\n handleStreamItem(id, field, value, itemsLeft) [repeated per entry/field]\n handleStreamNewCGroup(grpName, meta) [per consumer group]\n handleStreamCGroupPendingEntry(entry) [per group PEL entry]\n handleStreamNewConsumer(consName, meta) [per consumer]\n handleStreamConsumerPendingEntry(id) [per consumer PEL entry]\n handleStreamIdmpMeta(meta) [once, if IDMP enabled]\n handleStreamIdmpProducer(producer) [per producer]\n handleStreamIdmpEntry(entry) [per IID mapping]"]
#[doc = " Stream cb - invoked in this order for each stream key. Indentation represent nesting.\n handleStreamMetadata(meta)\n handleStreamItem(id, field, value, itemsLeft) [repeated per entry/field]\n handleStreamNewCGroup(grpName, meta) [per consumer group]\n handleStreamCGroupPendingEntry(entry) [per group PEL entry]\n handleStreamNewConsumer(consName, meta) [per consumer]\n handleStreamConsumerPendingEntry(id) [per consumer PEL entry]\n handleStreamNackZoneEntry(id, itemsLeft) [per NACKed entry, v14+]\n handleStreamIdmpMeta(meta) [once, if IDMP enabled]\n handleStreamIdmpProducer(producer) [per producer]\n handleStreamIdmpEntry(entry) [per IID mapping]"]
pub handleStreamMetadata: ::std::option::Option<
unsafe extern "C" fn(
p: *mut RdbParser,
Expand Down Expand Up @@ -883,6 +885,14 @@ pub struct RdbHandlersDataCallbacks {
streamId: *mut RdbStreamID,
) -> RdbRes,
>,
pub handleStreamNackZoneEntry: ::std::option::Option<
unsafe extern "C" fn(
p: *mut RdbParser,
userData: *mut ::std::os::raw::c_void,
id: *mut RdbStreamID,
itemsLeft: i64,
) -> RdbRes,
>,
pub handleStreamIdmpMeta: ::std::option::Option<
unsafe extern "C" fn(
p: *mut RdbParser,
Expand All @@ -904,11 +914,28 @@ pub struct RdbHandlersDataCallbacks {
entry: *mut RdbStreamIdmpEntry,
) -> RdbRes,
>,
#[doc = " Array cb (RDB_TYPE_ARRAY, v14+) - invoked in this order per array key:\n handleArrayMetadata(count, insertIdx) [once]\n handleArrayElement(idx, value) [repeated count times, ascending idx]\n\n insertIdx == RDB_ARRAY_INSERT_IDX_NONE means the array was saved with no\n insert cursor. Otherwise it carries the persisted insert_idx value."]
pub handleArrayMetadata: ::std::option::Option<
unsafe extern "C" fn(
p: *mut RdbParser,
userData: *mut ::std::os::raw::c_void,
count: u64,
insertIdx: u64,
) -> RdbRes,
>,
pub handleArrayElement: ::std::option::Option<
unsafe extern "C" fn(
p: *mut RdbParser,
userData: *mut ::std::os::raw::c_void,
idx: u64,
value: RdbBulk,
) -> RdbRes,
>,
}
#[allow(clippy::unnecessary_operation, clippy::identity_op)]
const _: () = {
["Size of RdbHandlersDataCallbacks"]
[::std::mem::size_of::<RdbHandlersDataCallbacks>() - 192usize];
[::std::mem::size_of::<RdbHandlersDataCallbacks>() - 216usize];
["Alignment of RdbHandlersDataCallbacks"]
[::std::mem::align_of::<RdbHandlersDataCallbacks>() - 8usize];
["Offset of field: RdbHandlersDataCallbacks::handleStartRdb"]
Expand Down Expand Up @@ -957,12 +984,18 @@ const _: () = {
RdbHandlersDataCallbacks,
handleStreamConsumerPendingEntry
) - 160usize];
["Offset of field: RdbHandlersDataCallbacks::handleStreamNackZoneEntry"]
[::std::mem::offset_of!(RdbHandlersDataCallbacks, handleStreamNackZoneEntry) - 168usize];
["Offset of field: RdbHandlersDataCallbacks::handleStreamIdmpMeta"]
[::std::mem::offset_of!(RdbHandlersDataCallbacks, handleStreamIdmpMeta) - 168usize];
[::std::mem::offset_of!(RdbHandlersDataCallbacks, handleStreamIdmpMeta) - 176usize];
["Offset of field: RdbHandlersDataCallbacks::handleStreamIdmpProducer"]
[::std::mem::offset_of!(RdbHandlersDataCallbacks, handleStreamIdmpProducer) - 176usize];
[::std::mem::offset_of!(RdbHandlersDataCallbacks, handleStreamIdmpProducer) - 184usize];
["Offset of field: RdbHandlersDataCallbacks::handleStreamIdmpEntry"]
[::std::mem::offset_of!(RdbHandlersDataCallbacks, handleStreamIdmpEntry) - 184usize];
[::std::mem::offset_of!(RdbHandlersDataCallbacks, handleStreamIdmpEntry) - 192usize];
["Offset of field: RdbHandlersDataCallbacks::handleArrayMetadata"]
[::std::mem::offset_of!(RdbHandlersDataCallbacks, handleArrayMetadata) - 200usize];
["Offset of field: RdbHandlersDataCallbacks::handleArrayElement"]
[::std::mem::offset_of!(RdbHandlersDataCallbacks, handleArrayElement) - 208usize];
};
unsafe extern "C" {
#[doc = " Parser creation and deletion"]
Expand Down Expand Up @@ -1174,7 +1207,8 @@ pub const RdbDataType_RDB_DATA_TYPE_HASH: RdbDataType = 4;
pub const RdbDataType_RDB_DATA_TYPE_MODULE: RdbDataType = 5;
pub const RdbDataType_RDB_DATA_TYPE_STREAM: RdbDataType = 6;
pub const RdbDataType_RDB_DATA_TYPE_FUNCTION: RdbDataType = 7;
pub const RdbDataType_RDB_DATA_TYPE_MAX: RdbDataType = 8;
pub const RdbDataType_RDB_DATA_TYPE_ARRAY: RdbDataType = 8;
pub const RdbDataType_RDB_DATA_TYPE_MAX: RdbDataType = 9;
#[doc = " Multiple levels registration\n Some of the more advanced configuration might require parsing different data\n types at different levels of the parser.\n\n The callbacks that are common to all levels (lookup HANDLERS_COMMON_CALLBACKS),\n if registered at different levels then all of them will be called, one by one,\n starting from level 0.\n\n As for the callbacks of RDB object types, each level has its own way to\n handle the data with distinct set of callbacks interfaces. In case of multiple\n levels registration, the application should configure for each RDB data type\n at what level it is needed to get parsed by calling `RDB_handleByLevel()`.\n Otherwise, the parser will resolve it by parsing and calling handlers that are\n registered at lowest level."]
pub type RdbDataType = ::std::os::raw::c_uint;
unsafe extern "C" {
Expand Down
8 changes: 8 additions & 0 deletions librdb-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,11 @@
#![allow(clippy::pedantic)]

include!("bindings.rs");

/// `RDB_ARRAY_INSERT_IDX_NONE` — the "no insert cursor" sentinel for `RDB_TYPE_ARRAY`.
///
/// Declared by hand instead of via bindgen: the C macro is
/// `#define RDB_ARRAY_INSERT_IDX_NONE UINT64_MAX`, but bindgen mis-evaluates the
/// all-bits-set value as `i32 = -1`. The upstream type is `uint64_t`, so the var
/// is blocklisted in `update-bindings.sh` and re-declared here with correct width.
pub const RDB_ARRAY_INSERT_IDX_NONE: u64 = u64::MAX;
13 changes: 8 additions & 5 deletions librdb-sys/update-bindings.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@ if ! command -v bindgen &>/dev/null; then
exit 1
fi

# Clang args (the -I include paths) must follow `--`; bindgen derives Debug by
# default, so only Default needs an explicit flag.
bindgen "$WRAPPER_H" \
-I "$LIBRDB_ROOT/api" \
-I "$LIBRDB_ROOT/src" \
-I "$LIBRDB_ROOT/deps/redis" \
--allowlist-function 'RDB_.*' \
--allowlist-function 'RDBX_.*' \
--allowlist-type 'Rdb.*' \
--allowlist-type 'Rdbx.*' \
--allowlist-var 'RDB_.*' \
--allowlist-var 'RDBX_.*' \
--with-derive-debug \
--blocklist-var 'RDB_ARRAY_INSERT_IDX_NONE' \
--with-derive-default \
-o "$OUTPUT"
-o "$OUTPUT" \
-- \
-I "$LIBRDB_ROOT/api" \
-I "$LIBRDB_ROOT/src" \
-I "$LIBRDB_ROOT/deps/redis"

rustfmt "$OUTPUT"

Expand Down
4 changes: 2 additions & 2 deletions librdb/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "librdb"
version = "0.1.1"
version = "0.2.0"
repository = "https://github.com/funcpp/rust-librdb"
readme = "../README.md"
license = "MIT"
Expand All @@ -11,7 +11,7 @@ edition = "2024"
rust-version = "1.94"

[dependencies]
librdb-sys = { version = "0.1.0", path = "../librdb-sys" }
librdb-sys = { version = "0.2.0", path = "../librdb-sys" }
thiserror = "2"

[lints]
Expand Down
20 changes: 20 additions & 0 deletions librdb/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ pub trait RdbHandlers {
Ok(())
}

/// A NACK-zone (not-yet-acknowledged) entry within a consumer group
/// (`RDB_TYPE_STREAM_LISTPACKS_5`, RDB v14+).
fn handle_stream_nack_zone_entry(&mut self, _id: &StreamId, _items_left: i64) -> Result<()> {
Ok(())
}

fn handle_stream_idmp_meta(&mut self, _meta: &StreamIdmpMeta) -> Result<()> {
Ok(())
}
Expand All @@ -126,4 +132,18 @@ pub trait RdbHandlers {
fn handle_stream_idmp_entry(&mut self, _iid: &[u8], _stream_id: &StreamId) -> Result<()> {
Ok(())
}

/// Metadata for a sparse array (`RDB_TYPE_ARRAY`, RDB v14+), called once
/// before its elements.
///
/// `insert_idx` is the persisted insert cursor, or `None` if the array was
/// saved without one (librdb's `RDB_ARRAY_INSERT_IDX_NONE` sentinel).
fn handle_array_metadata(&mut self, _count: u64, _insert_idx: Option<u64>) -> Result<()> {
Ok(())
}

/// A single sparse-array element, called `count` times in ascending `idx` order.
fn handle_array_element(&mut self, _idx: u64, _value: &[u8]) -> Result<()> {
Ok(())
}
}
47 changes: 47 additions & 0 deletions librdb/src/trampoline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,19 @@ pub unsafe extern "C" fn trampoline_stream_consumer_pending_entry<H: RdbHandlers
})
}

pub unsafe extern "C" fn trampoline_stream_nack_zone_entry<H: RdbHandlers>(
_p: *mut RdbParser,
user_data: *mut c_void,
id: *mut RdbStreamID,
items_left: i64,
) -> RdbRes {
let state = unsafe { &mut *(user_data.cast::<HandlerState<H>>()) };
guarded(state, |h| {
let stream_id = StreamId::from_raw(unsafe { &*id });
h.handle_stream_nack_zone_entry(&stream_id, items_left)
})
}

pub unsafe extern "C" fn trampoline_stream_idmp_meta<H: RdbHandlers>(
_p: *mut RdbParser,
user_data: *mut c_void,
Expand Down Expand Up @@ -360,6 +373,37 @@ pub unsafe extern "C" fn trampoline_stream_idmp_entry<H: RdbHandlers>(
})
}

pub unsafe extern "C" fn trampoline_array_metadata<H: RdbHandlers>(
_p: *mut RdbParser,
user_data: *mut c_void,
count: u64,
insert_idx: u64,
) -> RdbRes {
let state = unsafe { &mut *(user_data.cast::<HandlerState<H>>()) };
guarded(state, |h| {
// librdb encodes "no insert cursor" as the RDB_ARRAY_INSERT_IDX_NONE sentinel.
let insert_idx = if insert_idx == librdb_sys::RDB_ARRAY_INSERT_IDX_NONE {
None
} else {
Some(insert_idx)
};
h.handle_array_metadata(count, insert_idx)
})
}

pub unsafe extern "C" fn trampoline_array_element<H: RdbHandlers>(
p: *mut RdbParser,
user_data: *mut c_void,
idx: u64,
value: RdbBulk,
) -> RdbRes {
let state = unsafe { &mut *(user_data.cast::<HandlerState<H>>()) };
guarded(state, |h| {
let value_slice = unsafe { bulk_to_slice(p, value) };
h.handle_array_element(idx, value_slice)
})
}

pub fn build_callbacks<H: RdbHandlers>() -> librdb_sys::RdbHandlersDataCallbacks {
librdb_sys::RdbHandlersDataCallbacks {
handleStartRdb: Some(trampoline_start_rdb::<H>),
Expand All @@ -383,8 +427,11 @@ pub fn build_callbacks<H: RdbHandlers>() -> librdb_sys::RdbHandlersDataCallbacks
handleStreamCGroupPendingEntry: Some(trampoline_stream_cgroup_pending_entry::<H>),
handleStreamNewConsumer: Some(trampoline_stream_new_consumer::<H>),
handleStreamConsumerPendingEntry: Some(trampoline_stream_consumer_pending_entry::<H>),
handleStreamNackZoneEntry: Some(trampoline_stream_nack_zone_entry::<H>),
handleStreamIdmpMeta: Some(trampoline_stream_idmp_meta::<H>),
handleStreamIdmpProducer: Some(trampoline_stream_idmp_producer::<H>),
handleStreamIdmpEntry: Some(trampoline_stream_idmp_entry::<H>),
handleArrayMetadata: Some(trampoline_array_metadata::<H>),
handleArrayElement: Some(trampoline_array_element::<H>),
}
}
13 changes: 9 additions & 4 deletions librdb/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ impl RdbError {
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive] // RDB gains data types across versions (e.g. Array in v14); keep this open.
pub enum DataType {
String,
List,
Expand All @@ -25,16 +26,19 @@ pub enum DataType {
Module,
Stream,
Function,
/// Sparse array (RDB v14+, `RDB_TYPE_ARRAY`).
Array,
}

impl DataType {
pub(crate) fn from_raw(value: i32) -> Result<Self> {
#![allow(non_upper_case_globals)]
use librdb_sys::{
RdbDataType_RDB_DATA_TYPE_FUNCTION, RdbDataType_RDB_DATA_TYPE_HASH,
RdbDataType_RDB_DATA_TYPE_LIST, RdbDataType_RDB_DATA_TYPE_MODULE,
RdbDataType_RDB_DATA_TYPE_SET, RdbDataType_RDB_DATA_TYPE_STREAM,
RdbDataType_RDB_DATA_TYPE_STRING, RdbDataType_RDB_DATA_TYPE_ZSET,
RdbDataType_RDB_DATA_TYPE_ARRAY, RdbDataType_RDB_DATA_TYPE_FUNCTION,
RdbDataType_RDB_DATA_TYPE_HASH, RdbDataType_RDB_DATA_TYPE_LIST,
RdbDataType_RDB_DATA_TYPE_MODULE, RdbDataType_RDB_DATA_TYPE_SET,
RdbDataType_RDB_DATA_TYPE_STREAM, RdbDataType_RDB_DATA_TYPE_STRING,
RdbDataType_RDB_DATA_TYPE_ZSET,
};
#[allow(clippy::cast_sign_loss)]
match value as u32 {
Expand All @@ -46,6 +50,7 @@ impl DataType {
RdbDataType_RDB_DATA_TYPE_MODULE => Ok(Self::Module),
RdbDataType_RDB_DATA_TYPE_STREAM => Ok(Self::Stream),
RdbDataType_RDB_DATA_TYPE_FUNCTION => Ok(Self::Function),
RdbDataType_RDB_DATA_TYPE_ARRAY => Ok(Self::Array),
_ => Err(RdbError::Parser {
code: 0,
message: format!("unknown data type: {value}"),
Expand Down
Loading
Loading