Skip to content

SlickQuant/slick-stream-buffer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

SlickStreamBuffer

C++20 License: MIT Header-only Lock-free

SlickStreamBuffer is a header-only C++ library that provides a lock-free, single-producer multi-consumer (SPMC) byte stream buffer built on a ring buffer. It is designed as a drop-in replacement for boost::beast::flat_buffer: network bytes received by boost::asio / boost::beast are written directly into the ring, and publishing a complete message to consumer threads — or other processes via shared memory — requires zero copies.

How it works

The producer side exposes the familiar dynamic-buffer interface (prepare / commit / consume / data / size), with one twist:

  • prepare(n) returns a contiguous writable region — asio writes received bytes there
  • commit(n) moves bytes into the readable area — the app parses them in place
  • consume(n) does not discard bytes: it publishes them to consumers as one discrete message record

Each consumer owns an independent monotonic cursor and reads whole messages zero-copy as (pointer, length) pairs — the broadcast pattern of SlickQueue, applied to a byte stream.

 network ──asio──▶ prepare/commit ──▶ [ data ring ] ──consume(n)──▶ record {offset, len}
                                                                        │
                                              consumer A (own cursor) ◀─┤  zero-copy reads
                                              consumer B (own cursor) ◀─┤  (threads or
                                              process C (shared memory)◀┘   processes)

Features

  • Lock-free single-producer / multi-consumer broadcast
  • Zero-copy fan-out of received network data to threads and processes
  • Boost.Asio DynamicBuffer adapter (slick::dynamic_stream_buffer) usable with boost::beast / boost::asio read operations
  • Header-only; the core has no Boost dependency
  • Shared memory support for inter-process communication
  • Cross-platform — Windows, Linux, macOS
  • Modern C++20

Requirements

  • C++20 compatible compiler
  • slick-shm (fetched automatically when not installed)
  • Boost.Asio — only if you use the dynamic_stream_buffer adapter header

Installation

Header-only. Add the include directory to your include path:

#include <slick/stream_buffer.h>          // core SPMC byte queue (no Boost)
#include <slick/dynamic_stream_buffer.h>  // asio DynamicBuffer adapter (requires Boost.Asio)

Using CMake FetchContent

include(FetchContent)

set(BUILD_SLICK_STREAM_BUFFER_TESTS OFF CACHE BOOL "" FORCE)
FetchContent_Declare(
    slick-stream-buffer
    GIT_REPOSITORY https://github.com/SlickQuant/slick-stream-buffer.git
    GIT_TAG v1.0.0
)
FetchContent_MakeAvailable(slick-stream-buffer)

target_link_libraries(your_target PRIVATE slick::stream_buffer)

Usage

Producer: receive with boost::asio, publish on message boundaries

#include <slick/dynamic_stream_buffer.h>

// 64 MB data ring, 64K message records; named -> shared memory, nullptr -> local
slick::SlickStreamBuffer stream(1ull << 26, 1u << 16, "market_data");
slick::dynamic_stream_buffer buffer(stream);   // cheap copyable handle

for (;;) {
    std::size_t n = socket.read_some(buffer.prepare(64 * 1024));
    buffer.commit(n);

    // parse the readable area; publish every complete package
    while (std::size_t package_size = find_complete_package(buffer.data())) {
        buffer.consume(package_size);   // publishes one record - no copy
    }
}

The adapter satisfies asio's DynamicBuffer_v1 requirements, so it also works with composed operations such as boost::asio::read(socket, buffer, ...), boost::beast::http::read(...) and websocket::stream::read(...).

Consumers: independent cursors, zero-copy reads

// same process:
slick::SlickStreamBuffer& stream = buffer.stream_buffer();
// another process:
slick::SlickStreamBuffer stream("market_data");

uint64_t cursor = stream.initial_reading_index();   // or 0 to replay history
for (;;) {
    auto [data, length] = stream.read(cursor);
    if (data == nullptr) continue;          // nothing new yet
    handle_package(data, length);           // points directly into the ring
}

Core API without Boost

#include <slick/stream_buffer.h>

slick::SlickStreamBuffer buf(1024, 16);     // capacity bytes, record count (both pow2)

auto [ptr, sz] = buf.prepare(5);
std::memcpy(ptr, "hello", 5);
buf.commit(5);
buf.consume(5);                              // publish "hello" as one record

uint64_t cursor = 0;
auto [data, length] = buf.read(cursor);      // -> "hello", 5

API Overview

Constructors

SlickStreamBuffer(uint64_t capacity, uint32_t control_size);                       // local memory
SlickStreamBuffer(uint64_t capacity, uint32_t control_size, const char* shm_name); // shm creator
SlickStreamBuffer(const char* shm_name);                                           // shm opener

capacity is the data ring size in bytes; control_size is the number of message records the control ring holds. Both must be powers of 2. Size control_size to the number of messages (not bytes) a slow consumer may lag behind.

Producer methods (single thread only)

  • std::pair<uint8_t*, size_t> prepare(size_t n) — contiguous writable region; throws std::length_error if size() + n > capacity()
  • void commit(size_t n) — make n prepared bytes readable
  • published_record consume(size_t n) — publish the first n readable bytes as one message record; returns the record exactly as consumers will see it ({sequence, data, length}, evaluates to false if nothing was published)
  • const uint8_t* data() / size_t size() — the readable (committed, unconsumed) region

Consumer methods

  • std::pair<const uint8_t*, uint32_t> read(uint64_t& cursor) — next message, or (nullptr, 0)
  • std::pair<const uint8_t*, uint32_t> read_last() — most recently published message
  • uint64_t initial_reading_index() — cursor for late joiners (skip history)
  • uint64_t loss_count() — messages skipped due to overwrite (debug-only unless enabled)

Important Constraints

Single producer. All producer methods must be called from one thread. Consumers are lock-free and independent.

Lossy semantics. The producer never blocks. If it laps a slow consumer — by more than control_size messages or capacity bytes — the consumer skips ahead and the loss is counted. Size the rings so this cannot happen in normal operation; define SLICK_STREAM_BUFFER_ENABLE_LOSS_DETECTION=1 (default in Debug) and check loss_count().

Pointer invalidation. prepare() may relocate the readable region to keep it contiguous when the ring wraps; pointers previously returned by data()/prepare() are invalidated — the same rule as flat_buffer reallocation. Message pointers returned by read() stay valid until the producer laps that part of the ring.

Record granularity. Every consume(n) call produces exactly one consumer-visible record. If a protocol layer consumes incrementally (e.g. the beast HTTP parser), records correspond to those increments; call consume() yourself on package boundaries when you need strict framing.

Message size is limited to < 4 GiB per record.

Architecture

A 64-byte header (cursors + geometry + shared-memory init handshake), a control ring of 32-byte records {seq, offset, length}, and the byte data ring. Records are published with a release store on seq which consumers acquire-load; monotonic 64-bit offsets make wrap-around and lap detection unambiguous. The layout is identical in local memory and shared memory, and shared-memory creation uses an atomic init-state handshake so creator/opener races are safe.

Building and Testing

cmake -S . -B build
cmake --build build --config Debug
ctest --test-dir build -C Debug --output-on-failure

The Boost.Asio adapter tests build only when Boost is found (e.g. configure with a vcpkg toolchain file); they are skipped gracefully otherwise.

License

SlickStreamBuffer is released under the MIT License.

Made with ⚡ by SlickQuant

About

A C++ lock-free SPMC byte stream buffer with a Boost.Asio DynamicBuffer adapter

Topics

Resources

License

Stars

Watchers

Forks

Contributors