C++: optimize batch read/write paths and aligned table null handling#823
C++: optimize batch read/write paths and aligned table null handling#823ColinLeeo wants to merge 3 commits into
Conversation
69e1658 to
7db1a3a
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #823 +/- ##
===========================================
- Coverage 61.60% 60.39% -1.22%
===========================================
Files 734 735 +1
Lines 45949 48568 +2619
Branches 6895 7747 +852
===========================================
+ Hits 28306 29331 +1025
- Misses 16631 17834 +1203
- Partials 1012 1403 +391 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR consolidates a large optimization branch into develop: it adds batch decode/write APIs across the C++ Decoder/Encoder hierarchy, multi-value aligned read paths with optional parallel decode, columnar tablet write helpers, SIMD fast paths, and a set of correctness fixes for aligned/table null handling (null TAG segments, null FIELD writes, all-null value pages, sparse aligned columns, repeated logical devices, ValuePageWriter::reset state). It also trims the C wrapper API (drops unused metadata export/tag-filter symbols, then re-adds tag-filter helpers in a different section) and removes several regression tests for behaviors it claims to fix.
Changes:
- Add batch decode/encode/write paths through
Decoder/Encoder/page/chunk writers and aMultiAlignedTimeseriesIndexplus single-device aligned fast-path reader. - Several aligned table fixes (null TAG/FIELD, all-null pages, single-device tablet flag,
ValuePageWriter::reset, double-free of first-page buffers viarelease_cur_page_data). - Build/infra: SIMD option, optional
BUILD_EXAMPLES, mem-stat counters widened to 64-bit, newBlockingQueue, removal of several existing regression tests, license-header punctuation churn in multiple CMake files.
Reviewed changes
Copilot reviewed 118 out of 119 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| cpp/src/reader/filter/{and,or}_filter.h, filter.h, time_operator.h | Adds satisfy_batch_time (uses fixed 129-element stack buffer — flagged). |
| cpp/src/encoding/{plain,decoder,encoder,plain_decoder,dictionary_encoder}.h | Batch encode/decode API + dictionary index assignment change (flagged). |
| cpp/src/writer/{value_,time_,}{chunk,page}_writer.{h,cc} | Batch write paths, first-page ownership transfer, larger page buffers. |
| cpp/src/writer/tsfile_table_writer.{h,cc}, tsfile_writer.h | Memoized lowercasing, idempotent close, optional parallel write pool. |
| cpp/src/reader/* | Aligned multi-value batch path, bloom-filter contains, table result-set lifecycle. |
| cpp/src/file/tsfile_io_writer.{h,cc}, restorable_tsfile_io_writer.cc | Recovery cleanup simplified; conditional sync_on_close_ (flagged); chunk-group index for O(1) lookup. |
| cpp/src/file/tsfile_io_reader.h | Device-node cache + multi-SSI alloc. |
| cpp/src/common/allocator/byte_stream.h, alloc_base.h, mem_alloc.cc | Page-mask bitwise modulo + power-of-2 rounding for wrapped buffers (flagged), 64-bit stat counters. |
| cpp/src/common/{tablet,schema,path,global,thread_pool}.* | Single-device flag, string-column uint32_t offsets, Path inlined, config knobs reshuffled. |
| cpp/src/common/container/{bit_map,blocking_queue,byte_buffer}.* | New BlockingQueue, BitMap::may_have_set_bits, bounds asserts. |
| cpp/src/compress/{snappy,lz4,uncompressed}_compressor.* | Safer after_compress ownership handling; Uncompressed now copies. |
| cpp/src/cwrapper/{tsfile_cwrapper.h,arrow_c.cc} | Tag-filter API moved, sliced-Arrow handling reverted (loses prior bug-fix paths). |
| cpp/test/** | Deletes several regression tests (deep path, missing measurement, aligned NULL boundary, dictionary RLE run counts, Arrow slice-with-offset, etc.) and adds new batch/page-boundary tests. |
| python/tsfile/dataset/reader.py + tests | Switches row reads to read_arrow_batch(). |
| cpp/{CMakeLists.txt,examples/**,src/CMakeLists.txt,src/common/CMakeLists.txt,test/CMakeLists.txt} | Build flags, SIMD option, Arrow/Parquet-dependent examples, license-header punctuation regressions. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| int satisfy_batch_time(const int64_t* times, int count, bool* mask) { | ||
| bool mask_right[129]; | ||
| left_->satisfy_batch_time(times, count, mask); | ||
| right_->satisfy_batch_time(times, count, mask_right); | ||
| int pass = 0; | ||
| for (int i = 0; i < count; ++i) { | ||
| mask[i] = mask[i] || mask_right[i]; | ||
| if (mask[i]) ++pass; | ||
| } | ||
| return pass; | ||
| } |
| int satisfy_batch_time(const int64_t* times, int count, bool* mask) { | ||
| bool mask_right[129]; | ||
| left_->satisfy_batch_time(times, count, mask); | ||
| right_->satisfy_batch_time(times, count, mask_right); | ||
| int pass = 0; | ||
| for (int i = 0; i < count; ++i) { | ||
| mask[i] = mask[i] && mask_right[i]; | ||
| if (mask[i]) ++pass; | ||
| } | ||
| return pass; | ||
| } |
| index_entry_.push_back(value); | ||
| map_size_ = map_size_ + value.length(); | ||
| entry_index_[value] = static_cast<int>(index_entry_.size()) - 1; | ||
| entry_index_[value] = entry_index_.size(); |
| // page_mask_ is used as a bitmask and only works correctly for | ||
| // power-of-2 page sizes. Round up to the next power-of-2 so that | ||
| // (read_pos_ & page_mask_) gives the correct within-page offset and | ||
| // the page-crossing check doesn't misfire on arbitrary buffer sizes. | ||
| uint32_t ps = 1; | ||
| while (ps < (uint32_t)buf_len) ps <<= 1; | ||
| page_size_ = ps; | ||
| page_mask_ = ps - 1; | ||
| head_.store(&wrapped_page_); |
| } else if (RET_FAIL(write_file_footer())) { | ||
| std::cout << "writer file footer error, ret = " << ret << std::endl; | ||
| } else if (RET_FAIL(sync_file())) { | ||
| } else if (g_config_value_.sync_on_close_ && RET_FAIL(sync_file())) { |
| @@ -1,5 +1,5 @@ | |||
| #[[ | |||
| Licensed to the Apache Software Foundation (ASF) under one | |||
| Licensed to the Apache Software Foundation(ASF) under one | |||
|
|
||
| Unless required by applicable law or agreed to in writing, | ||
| software distributed under the License is distributed on an | ||
| software distributed under the LICENSE is distributed on an |
| file = write_file_new("test/", &error_no); | ||
| ASSERT_TRUE(error_no == RET_FILRET_OPEN_ERR || | ||
| error_no == RET_ALREADY_EXIST); | ||
| ASSERT_EQ(RET_FILRET_OPEN_ERR, error_no); |
| @@ -1320,7 +1112,7 @@ TEST_F(TreeQueryByRowTest, DISABLED_QueryByRowFasterThanManualNext) { | |||
| write_test_file(devices, measurements, num_rows); | |||
|
|
|||
| const int num_iters = 5; | |||
| const double tolerance = 0.2; | |||
| const double tolerance = 0.05; | |||
| uint32_t cur_points = value_page_writer_.get_point_numer(); | ||
| uint32_t page_remaining = | ||
| common::g_config_value_.page_writer_max_point_num_ - cur_points; | ||
| if (page_remaining == 0) { | ||
| if (RET_FAIL(seal_cur_page(false))) { | ||
| return ret; | ||
| } | ||
| page_remaining = | ||
| common::g_config_value_.page_writer_max_point_num_; | ||
| } |
651f4af to
fcef966
Compare
| @@ -881,21 +837,26 @@ int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, | |||
| case common::INT64: | |||
| case common::FLOAT: | |||
| case common::DOUBLE: { | |||
| size_t elem_size = | |||
| (dtype == common::INT64 || dtype == common::DOUBLE) ? 8 : 4; | |||
| const void* data = | |||
| static_cast<const char*>(col_arr->buffers[1]) + | |||
| off * elem_size; | |||
| uint8_t* null_bm = InvertArrowBitmap( | |||
| validity, off, static_cast<uint32_t>(n_rows)); | |||
| if (validity != nullptr && null_bm == nullptr) { | |||
| delete tablet; | |||
| return common::E_OOM; | |||
| // Invert Arrow bitmap (1=valid) to TsFile bitmap (1=null) | |||
| virtual common::String get_measurement_name() const { | ||
| return value_ts_idx_->get_measurement_name(); | ||
| } | ||
| virtual common::TSDataType get_data_type() const { | ||
| return value_ts_idx_ == nullptr ? common::INVALID_DATATYPE | ||
| : value_ts_idx_->get_data_type(); | ||
| return time_ts_idx_->get_data_type(); | ||
| } | ||
| virtual bool is_aligned() const { return true; } | ||
| virtual Statistic* get_statistic() const { | ||
| return value_ts_idx_->get_statistic(); |
| // timeseries measurenemnt chunk meta info | ||
| // map <device_name, <measurement_name, vector<chunk_meta>>> | ||
| std::map<std::shared_ptr<IDeviceID>, | ||
| std::map<common::String, std::vector<ChunkMeta*>>, | ||
| IDeviceIDComparator> | ||
| std::map<common::String, std::vector<ChunkMeta*>>> | ||
| tsm_chunk_meta_info_; |
| if (!tmp.empty()) { | ||
| auto& merged = | ||
| tsm_chunk_meta_info_[chunk_group_meta_iter_.get()->device_id_]; | ||
| for (auto& m_entry : tmp) { | ||
| auto& vec = merged[m_entry.first]; | ||
| vec.insert(vec.end(), m_entry.second.begin(), | ||
| m_entry.second.end()); | ||
| } | ||
| tsm_chunk_meta_info_[chunk_group_meta_iter_.get()->device_id_] = | ||
| tmp; | ||
| } |
| void after_uncompress(char* uncompressed_buf) { | ||
| if (uncompressed_buf != nullptr) { | ||
| common::mem_free(uncompressed_buf_); | ||
| uncompressed_buf_ = nullptr; | ||
| } |
| if (!names_lowered_) { | ||
| tablet.set_table_name(to_lower(tablet.get_table_name())); | ||
| for (size_t i = 0; i < tablet.get_column_count(); i++) { | ||
| tablet.set_column_name(i, to_lower(tablet.get_column_name(i))); | ||
| } |
| g_config_value_.float_encoding_type_ = PLAIN; | ||
| g_config_value_.double_encoding_type_ = PLAIN; | ||
| g_config_value_.string_encoding_type_ = PLAIN; | ||
| // Default compression type is LZ4 | ||
| #ifdef ENABLE_LZ4 | ||
| g_config_value_.default_compression_type_ = LZ4; | ||
| g_config_value_.default_compression_type_ = SNAPPY; | ||
| #else | ||
| g_config_value_.default_compression_type_ = UNCOMPRESSED; | ||
| #endif | ||
| unsigned int hw_cores = std::thread::hardware_concurrency(); | ||
| if (hw_cores == 0) hw_cores = 1; // fallback if detection fails | ||
| g_config_value_.parallel_write_enabled_ = (hw_cores > 1); | ||
| g_config_value_.write_thread_count_ = | ||
| static_cast<int32_t>(std::min(hw_cores, 64u)); | ||
| // Enforce aligned page size limits strictly by default. | ||
| g_config_value_.strict_page_size_ = true; | ||
| g_config_value_.parallel_read_enabled_ = true; | ||
| g_config_value_.parallel_write_enabled_ = true; | ||
| g_config_value_.read_thread_count_ = 4; | ||
| g_config_value_.write_thread_count_ = 6; |
| void TsFileIOWriter::destroy() { | ||
| if (destroyed_) { | ||
| return; | ||
| } | ||
| // Recovery attaches a prefix of ChunkGroupMeta; device_id and chunk stats | ||
| // in that snapshot live in reader/recovery memory. After open, new chunks | ||
| // may be pushed into the same ChunkGroupMeta (same device); only those | ||
| // appended ChunkMeta need statistic_->destroy() (see | ||
| // recovery_chunk_meta_prefix_). | ||
| for (auto iter = chunk_group_meta_list_.begin(); | ||
| iter != chunk_group_meta_list_.end(); iter++) { | ||
| ChunkGroupMeta* cgm = iter.get(); | ||
| auto prefix_it = recovery_chunk_meta_prefix_.find(cgm); | ||
| const bool is_recovery_cgm = | ||
| chunk_group_meta_from_recovery_ && cgm != nullptr && | ||
| prefix_it != recovery_chunk_meta_prefix_.end(); | ||
| uint32_t recovered_cm_count = is_recovery_cgm ? prefix_it->second : 0; | ||
|
|
||
| if (!is_recovery_cgm) { | ||
| if (cgm != nullptr && cgm->device_id_) { | ||
| cgm->device_id_.reset(); | ||
| // When meta came from RestorableTsFileIOWriter recovery, entries live in | ||
| // an arena there; do not release device_id_/statistic_ here. | ||
| if (!chunk_group_meta_from_recovery_) { | ||
| for (auto iter = chunk_group_meta_list_.begin(); | ||
| iter != chunk_group_meta_list_.end(); iter++) { | ||
| if (iter.get() && iter.get()->device_id_) { | ||
| iter.get()->device_id_.reset(); | ||
| } | ||
| } | ||
|
|
||
| if (cgm == nullptr) { | ||
| continue; | ||
| } | ||
| uint32_t cm_idx = 0; | ||
| for (auto chunk_meta = cgm->chunk_meta_list_.begin(); | ||
| chunk_meta != cgm->chunk_meta_list_.end(); | ||
| chunk_meta++, cm_idx++) { | ||
| if (chunk_meta.get() == nullptr || | ||
| chunk_meta.get()->statistic_ == nullptr) { | ||
| continue; | ||
| } | ||
| if (is_recovery_cgm && cm_idx < recovered_cm_count) { | ||
| continue; | ||
| if (iter.get()) { | ||
| for (auto chunk_meta = iter.get()->chunk_meta_list_.begin(); | ||
| chunk_meta != iter.get()->chunk_meta_list_.end(); | ||
| chunk_meta++) { | ||
| if (chunk_meta.get()) { | ||
| chunk_meta.get()->statistic_->destroy(); | ||
| } | ||
| } | ||
| } | ||
| chunk_meta.get()->statistic_->destroy(); | ||
| } | ||
| } |
| #include "global.h" | ||
|
|
||
| #ifndef _WIN32 | ||
| #include <execinfo.h> | ||
| #endif |
| // Pushdown is faster than full query + manual next: queryByRow(offset, limit) | ||
| // skips at device/SSI/Chunk level; old query then manual next decodes every | ||
| // row. Timing tolerance 20% to allow measurement noise. | ||
| // row. Timing tolerance 5% to allow measurement noise. |
| void shallow_clone_from(ByteStream& other) { | ||
| this->page_size_ = other.page_size_; | ||
| this->page_mask_ = other.page_mask_; | ||
| this->mid_ = other.mid_; | ||
| this->head_.store(other.head_.load()); | ||
| this->tail_.store(other.tail_.load()); | ||
| this->total_size_.store(other.total_size_.load()); | ||
| } |
| @@ -0,0 +1,46 @@ | |||
| /* | |||
| * Licensed to the Apache Software Foundation (ASF) under one | |||
| * or more contributor license agreements. See the NOTICE file | |||
| @@ -26,9 +26,6 @@ | |||
|
|
|||
| namespace common { | |||
| case STRING: { | ||
| auto* sc = static_cast<StringColumn*>(common::mem_alloc( | ||
| sizeof(StringColumn), common::MOD_TABLET)); | ||
| if (sc == nullptr) return E_OOM; |
| ++it) { | ||
| const StringColumn& sc = *value_matrix_[*it].string_col; | ||
| const int32_t* off = sc.offsets; | ||
| for (auto col_idx : id_column_indexes_) { |
| // Without the try/catch, a task that throws would: | ||
| // (1) skip the active_-- below → wait_all() blocks forever | ||
| // because active_ never drops to zero, and | ||
| // (2) propagate the exception out of the std::thread function | ||
| // → std::terminate() takes down the whole process. | ||
| // Swallowing the exception is unfortunate but it matches the | ||
| // contract of the public submit(std::function<void()>) overload | ||
| // which has no way to surface the failure back to the caller. | ||
| // submit<F>() callers receive their error via the std::future | ||
| // wrapper installed by std::packaged_task — that path never | ||
| // reaches here, so this catch only fires for fire-and-forget | ||
| // tasks where the alternative is termination. | ||
| try { | ||
| task(); | ||
| } catch (...) { | ||
| // Intentionally suppressed; see comment above. | ||
| } | ||
| { | ||
| std::lock_guard<std::mutex> lk(mu_); |
| if (RET_FAIL(decoder.read_int(current_buffer_[i], input))) { | ||
| return ret; | ||
| } | ||
| int ret = decoder.read_int(current_buffer_[i], input); |
| // ── Batch overrides ────────────────────────────────────────────────────── | ||
| // | ||
| // INT32: PLAIN encoding uses varint (variable stride). Override to avoid | ||
| // virtual dispatch per element; actual decode is still per-value. | ||
| int read_batch_int32(int32_t* out, int capacity, int& actual, | ||
| common::ByteStream& in) override { | ||
| actual = 0; | ||
| while (actual < capacity && in.has_remaining()) { | ||
| int ret = common::SerializationUtil::read_var_int(out[actual], in); | ||
| if (ret != common::E_OK) return ret; |
There was a problem hiding this comment.
Simplify the code using macros.
Batch decode/encode APIs (PLAIN / TS2DIFF / Gorilla) with single-pass TsBlock decode, AVX2/NEON SIMD paths, a single process-wide worker pool for chunk-level parallel read and column-parallel write, and batched NEON statistics. On-disk format unchanged; interoperable with Java/Python.
5ffe889 to
2ad0461
Compare
| // Round n up to the next power of two (>=1). Used to normalize ByteStream | ||
| // page sizes so that `& page_mask_` is equivalent to `% page_size_`. | ||
| // Values above the largest power-of-two that fits in uint32_t are clamped to | ||
| // 0x80000000 — the previous `while (ps < n) ps <<= 1` would shift past 2^31 | ||
| // and overflow to 0, looping forever. | ||
| FORCE_INLINE uint32_t round_up_pow2(uint32_t n) { | ||
| if (n <= 1) return 1; | ||
| if (n > 0x80000000u) return 0x80000000u; | ||
| uint32_t v = n - 1; | ||
| v |= v >> 1; | ||
| v |= v >> 2; | ||
| v |= v >> 4; | ||
| v |= v >> 8; | ||
| v |= v >> 16; | ||
| return v + 1; | ||
| } |
There was a problem hiding this comment.
Is it faster to do so or store a pre-calculated array and use the number of leading zeros of n as the index?
| g_config_value_.float_encoding_type_ = GORILLA; | ||
| g_config_value_.double_encoding_type_ = GORILLA; | ||
| g_config_value_.float_encoding_type_ = PLAIN; | ||
| g_config_value_.double_encoding_type_ = PLAIN; | ||
| g_config_value_.string_encoding_type_ = PLAIN; |
| // Pick the strongest compressor that was actually compiled in. Gating on | ||
| // ENABLE_LZ4 while setting SNAPPY (the original code) would request a | ||
| // compressor that the factory can't produce when the build disables | ||
| // Snappy, returning nullptr at write time. | ||
| #ifdef ENABLE_SNAPPY | ||
| g_config_value_.default_compression_type_ = SNAPPY; | ||
| #elif defined(ENABLE_LZ4) | ||
| g_config_value_.default_compression_type_ = LZ4; | ||
| #else |
There was a problem hiding this comment.
Why is SNAPPY stronger than LZ4?
| #ifdef ENABLE_ANTLR4 | ||
| std::vector<std::string> nodes = | ||
| PathNodesGenerator::invokeParser(path_sc); | ||
| #else | ||
| std::vector<std::string> nodes = | ||
| IDeviceID::split_string(path_sc, '.'); | ||
| #endif | ||
| if (nodes.size() > 1) { | ||
| // Join nodes, then parse like write path / Java Path | ||
| // (route through the interpretive string ctor instead of | ||
| // the literal per-segment vector ctor, so a stored | ||
| // "root.sg.d1" device matches a query path | ||
| // "root.sg.d1.s1"). | ||
| std::string device_joined; | ||
| for (size_t i = 0; i + 1 < nodes.size(); ++i) { | ||
| if (i > 0) { | ||
| device_joined += PATH_SEPARATOR_CHAR; | ||
| } | ||
| device_joined += nodes[i]; | ||
| } | ||
| device_id_ = | ||
| std::make_shared<StringArrayDeviceID>(device_joined); | ||
| measurement_ = nodes[nodes.size() - 1]; | ||
| full_path_ = | ||
| device_id_->get_device_name() + "." + measurement_; | ||
| } else { | ||
| full_path_ = path_sc; | ||
| device_id_ = std::make_shared<StringArrayDeviceID>(); | ||
| measurement_ = path_sc; | ||
| } | ||
| } |
There was a problem hiding this comment.
Is this join necessary?
Why is device_id empty in the else branch?
| for (uint32_t i = start; i < count; i++) { | ||
| if (timestamps[i] < start_time_) start_time_ = timestamps[i]; | ||
| if (timestamps[i] > end_time_) end_time_ = timestamps[i]; | ||
| if (values[i] < min_value_) min_value_ = values[i]; | ||
| if (values[i] > max_value_) max_value_ = values[i]; | ||
| sum_value_ += (int64_t)values[i]; | ||
| } | ||
| last_value_ = values[count - 1]; | ||
| count_ += (count - start); | ||
| } |
There was a problem hiding this comment.
If the timestamps are sorted, there is no point in updating start_time_ and end_time_ each time.
If not, first_value and last_value should be associated with the timestamp that is start_time or end_time.
| } else { | ||
| int start_idx = 0; |
There was a problem hiding this comment.
This method is too long, may extract some sub-methods.
| auto sentinel = std::make_shared<StringArrayDeviceID>("last_device_id"); | ||
| result.emplace_back(std::move(sentinel), 0); |
There was a problem hiding this comment.
Is it possible to make the sentiel a static member?
| for (uint32_t r = start_idx; r < end_idx; r++) { | ||
| if (col_notnull_bitmap.test(r)) { | ||
| has_null = true; | ||
| break; | ||
| } |
There was a problem hiding this comment.
Is it possible to add test_range_any and test_range_all for Bitmap?
| if (IS_SUCC(ret)) { | ||
| save_first_page_data(value_page_writer_); | ||
| value_page_writer_.clear_page_data(); | ||
| // value_page_writer_.destroy_page_data(); |
| common::ThreadPool pool(0); | ||
| EXPECT_GE(pool.num_threads(), static_cast<size_t>(1)); |
There was a problem hiding this comment.
Or, maybe we can define that any submitted task will be executed in the submitter thread synchronously when size == 0?
Summary
This PR optimizes the C++ TsFile read/write paths for batch and columnar workloads, and fixes several aligned table null-handling issues uncovered while validating the optimized path.
It consolidates the batch decode/write work from the long-lived optimization branch into a reviewable change for
develop.Supersedes #749, #754, and #774.
Main Changes
Decoderhierarchy and implement batch paths for PLAIN, TS2DIFF, and Gorilla.ChunkReaderandAlignedChunkReader, including shared timestamp decoding for aligned multi-value reads.ByteStream, compressor, and page/chunk writer internals used by the optimized paths.Correctness Fixes
ValuePageWriter::reset()so row count and null bitmap state are reset together.Compatibility Notes
cpp/third_party/is left atdevelopstate so existing platform compatibility fixes are preserved.Verification
cmake --build cpp/target/build --target TsFile_Test -j1ctest --test-dir cpp/target/build/test --output-on-failure -R '^TsFileTableReaderTest\.TestNullInTable4$'ctest --test-dir cpp/target/build/test --output-on-failure -j4cd cpp && mvn spotless:checkcd cpp && mvn apache-rat:checkCurrent full C++ test result:
496/496tests pass.