diff --git a/cpp/src/common/allocator/byte_stream.h b/cpp/src/common/allocator/byte_stream.h index 36db0e8d9..435a1f6fd 100644 --- a/cpp/src/common/allocator/byte_stream.h +++ b/cpp/src/common/allocator/byte_stream.h @@ -543,6 +543,10 @@ class ByteStream { Consumer(const ByteStream& bs) : host_(bs) { ASSERT(bs.head_.enable_atomic()); + reset(); + } + + void reset() { cur_ = nullptr; read_offset_within_cur_page_ = 0; total_end_offset_ = 0; diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index 07b363aeb..0934981f9 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -56,14 +56,7 @@ int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, extern "C" { #endif -static bool is_init = false; - -void init_tsfile_config() { - if (!is_init) { - common::init_common(); - is_init = true; - } -} +void init_tsfile_config() { storage::libtsfile_init(); } uint8_t get_global_time_encoding() { return common::get_global_time_encoding(); diff --git a/cpp/src/file/read_file.cc b/cpp/src/file/read_file.cc index 8aab78ca6..d9902ddb9 100644 --- a/cpp/src/file/read_file.cc +++ b/cpp/src/file/read_file.cc @@ -49,7 +49,11 @@ void ReadFile::close() { int ReadFile::open(const std::string& file_path) { int ret = E_OK; file_path_ = file_path; - fd_ = ::open(file_path_.c_str(), O_RDONLY); + int flags = O_RDONLY; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + fd_ = ::open(file_path_.c_str(), flags); if (fd_ < 0) { std::cerr << "open file " << file_path << " error: " << strerror(errno) << " (errno " << errno << ")" << std::endl; @@ -66,8 +70,13 @@ int ReadFile::open(const std::string& file_path) { } int ReadFile::get_file_size(int64_t& file_size) { +#ifdef _WIN32 + struct __stat64 s; + if (_fstat64(fd_, &s) < 0) { +#else struct stat s; if (fstat(fd_, &s) < 0) { +#endif LOGE("fstat error, file_path=" << file_path_.c_str() << "fd=" << fd_ << "errno" << errno); return E_FILE_STAT_ERR; @@ -114,8 +123,13 @@ int ReadFile::read(int64_t offset, char* buf, int32_t buf_size, int ret = E_OK; read_len = 0; while (read_len < buf_size) { +#ifdef _WIN32 + ssize_t pread_size = ::pread(fd_, buf + read_len, buf_size - read_len, + static_cast(offset + read_len)); +#else ssize_t pread_size = ::pread(fd_, buf + read_len, buf_size - read_len, static_cast(offset + read_len)); +#endif if (pread_size < 0) { ret = E_FILE_READ_ERR; ////log_err("tsfile reader error, file_path=%s, errno=%d", diff --git a/cpp/src/file/tsfile_io_writer.cc b/cpp/src/file/tsfile_io_writer.cc index 21086da61..42d99feda 100644 --- a/cpp/src/file/tsfile_io_writer.cc +++ b/cpp/src/file/tsfile_io_writer.cc @@ -891,7 +891,11 @@ int TsFileIOWriter::flush_stream_to_file() { } } - write_stream_.purge_prev_pages(); + if (IS_SUCC(ret)) { + file_base_offset_ = file_->get_position(); + write_stream_.reset(); + write_stream_consumer_.reset(); + } return ret; } diff --git a/cpp/src/file/write_file.cc b/cpp/src/file/write_file.cc index b6fbd6e44..227520b71 100644 --- a/cpp/src/file/write_file.cc +++ b/cpp/src/file/write_file.cc @@ -131,7 +131,7 @@ int WriteFile::close() { int WriteFile::truncate(int64_t size) { ASSERT(fd_ > 0); #ifdef _WIN32 - if (_chsize_s(fd_, static_cast(size)) != 0) { + if (_chsize_s(fd_, size) != 0) { return E_FILE_WRITE_ERR; } #else diff --git a/cpp/src/reader/aligned_chunk_reader.cc b/cpp/src/reader/aligned_chunk_reader.cc index d79bc7811..49c469547 100644 --- a/cpp/src/reader/aligned_chunk_reader.cc +++ b/cpp/src/reader/aligned_chunk_reader.cc @@ -319,7 +319,7 @@ int AlignedChunkReader::read_from_file_and_rewrap( int ret = E_OK; const int DEFAULT_READ_SIZE = 4096; // may use page_size + page_header_size char* file_data_buf = in_stream_.get_wrapped_buf(); - int offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset; + int64_t offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset; int read_size = (want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size); if (file_data_buf_size < read_size || diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 3170a3160..bc3398d98 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -44,7 +44,6 @@ bool g_s_is_inited = false; } int libtsfile_init() { - libtsfile::g_s_is_inited = false; if (libtsfile::g_s_is_inited) { return E_OK; } diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index 4d325635f..513cbd5ca 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -150,6 +150,7 @@ set(LIB_TSFILE_SDK_DIR ${PROJECT_BINARY_DIR}/lib) message("LIB_TSFILE_SDK_DIR: ${LIB_TSFILE_SDK_DIR}") include_directories( + ${CMAKE_CURRENT_SOURCE_DIR}/reader ${LIBRARY_INCLUDE_DIR} ${THIRD_PARTY_INCLUDE} ) diff --git a/cpp/test/reader/large_file_test_common.h b/cpp/test/reader/large_file_test_common.h new file mode 100644 index 000000000..364634188 --- /dev/null +++ b/cpp/test/reader/large_file_test_common.h @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef TEST_LARGE_FILE_TEST_COMMON_H +#define TEST_LARGE_FILE_TEST_COMMON_H + +#ifdef _WIN32 +#include +#include +#else +#include +#endif + +#include +#include +#include + +namespace large_file_test { + +constexpr int64_t kTargetFileSize = + static_cast(4) * 1024 * 1024 * 1024; +constexpr int64_t kMinAcceptableFileSize = + static_cast(3800) * 1024 * 1024; +constexpr int64_t kStartTime = 1622505600000LL; +constexpr uint32_t kTabletRows = 50000; +constexpr int64_t kFlushRows = 1000000; + +inline int64_t GetFileSize(const std::string& path) { +#ifdef _WIN32 + struct __stat64 s; + if (_stat64(path.c_str(), &s) != 0) { + return -1; + } + return static_cast(s.st_size); +#else + struct stat s; + if (stat(path.c_str(), &s) != 0) { + return -1; + } + return static_cast(s.st_size); +#endif +} + +inline std::string RandomSuffix(int length = 10) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 61); + const std::string chars = + "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + std::string out; + out.reserve(length); + for (int i = 0; i < length; ++i) { + out += chars[dis(gen)]; + } + return out; +} + +} // namespace large_file_test + +#endif // TEST_LARGE_FILE_TEST_COMMON_H diff --git a/cpp/test/reader/table_view/large_file_table_test.cc b/cpp/test/reader/table_view/large_file_table_test.cc new file mode 100644 index 000000000..19945179e --- /dev/null +++ b/cpp/test/reader/table_view/large_file_table_test.cc @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include +#include + +#include "common/schema.h" +#include "common/tablet.h" +#include "file/write_file.h" +#include "large_file_test_common.h" +#include "reader/table_result_set.h" +#include "reader/tsfile_reader.h" +#include "writer/tsfile_table_writer.h" + +using namespace common; +using namespace storage; +using namespace large_file_test; + +namespace { + +constexpr char kTableName[] = "large_table"; +constexpr char kDeviceTag[] = "device"; +constexpr char kValueField[] = "value"; + +bool VerifyTableRecord(TsFileReader& reader, int64_t record_index) { + const int64_t timestamp = kStartTime + record_index * 1000; + ResultSet* tmp_result_set = nullptr; + int ret = reader.query(kTableName, {kValueField}, timestamp, timestamp + 1, + tmp_result_set); + if (ret != E_OK || tmp_result_set == nullptr) { + return false; + } + + auto* table_result_set = static_cast(tmp_result_set); + bool has_next = false; + ret = table_result_set->next(has_next); + if (ret != E_OK || !has_next) { + reader.destroy_query_data_set(table_result_set); + return false; + } + + const int64_t read_time = table_result_set->get_value(1); + const int64_t read_value = table_result_set->get_value(2); + reader.destroy_query_data_set(table_result_set); + return read_time == timestamp && read_value == record_index; +} + +TableSchema* CreateTableSchema() { + std::vector measurement_schemas; + measurement_schemas.push_back( + new MeasurementSchema(kDeviceTag, TSDataType::STRING, TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + measurement_schemas.push_back( + new MeasurementSchema(kValueField, TSDataType::INT64, TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + std::vector column_categories = {ColumnCategory::TAG, + ColumnCategory::FIELD}; + return new TableSchema(kTableName, measurement_schemas, column_categories); +} + +} // namespace + +class LargeFileTableTest : public ::testing::Test { + protected: + void SetUp() override { + libtsfile_init(); + file_name_ = "large_file_table_test_" + RandomSuffix() + ".tsfile"; + remove(file_name_.c_str()); + } + + void TearDown() override { + remove(file_name_.c_str()); + libtsfile_destroy(); + } + + std::string file_name_; +}; + +TEST_F(LargeFileTableTest, DISABLED_LargeFile4GB_TableWriteAndRead) { + std::unique_ptr table_schema(CreateTableSchema()); + + WriteFile write_file; + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + ASSERT_EQ(write_file.create(file_name_, flags, 0666), E_OK); + + TsFileTableWriter table_writer(&write_file, table_schema.get()); + const std::vector column_names = {kDeviceTag, kValueField}; + const std::vector data_types = {TSDataType::STRING, + TSDataType::INT64}; + + int64_t total_rows = 0; + while (GetFileSize(file_name_) < kTargetFileSize) { + Tablet tablet(column_names, data_types, kTabletRows); + tablet.set_table_name(kTableName); + for (uint32_t row = 0; row < kTabletRows; ++row) { + const int64_t record_index = total_rows + row; + tablet.add_timestamp(row, kStartTime + record_index * 1000); + tablet.add_value(row, kDeviceTag, "device0"); + tablet.add_value(row, kValueField, record_index); + } + ASSERT_EQ(table_writer.write_table(tablet), E_OK); + total_rows += kTabletRows; + if (total_rows % kFlushRows == 0) { + ASSERT_EQ(table_writer.flush(), E_OK); + } + } + + ASSERT_EQ(table_writer.flush(), E_OK); + ASSERT_EQ(table_writer.close(), E_OK); + ASSERT_EQ(write_file.close(), E_OK); + + const int64_t final_size = GetFileSize(file_name_); + ASSERT_GE(final_size, kMinAcceptableFileSize); + + TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + + const std::vector check_indexes = {0, total_rows / 2, + total_rows - 1}; + for (int64_t index : check_indexes) { + ASSERT_TRUE(VerifyTableRecord(reader, index)) << "index=" << index; + } + + ASSERT_EQ(reader.close(), E_OK); +} diff --git a/cpp/test/reader/tree_view/large_file_tree_test.cc b/cpp/test/reader/tree_view/large_file_tree_test.cc new file mode 100644 index 000000000..8a05b957c --- /dev/null +++ b/cpp/test/reader/tree_view/large_file_tree_test.cc @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include +#include + +#include "common/schema.h" +#include "common/tablet.h" +#include "large_file_test_common.h" +#include "reader/qds_without_timegenerator.h" +#include "reader/tsfile_reader.h" +#include "writer/tsfile_writer.h" + +using namespace common; +using namespace storage; +using namespace large_file_test; + +namespace { + +bool VerifyTreeRecord(TsFileReader& reader, int64_t record_index) { + std::vector select_list = {"device1.temperature"}; + const int64_t timestamp = kStartTime + record_index * 1000; + ResultSet* tmp_qds = nullptr; + int ret = reader.query(select_list, timestamp, timestamp + 1, tmp_qds); + if (ret != E_OK || tmp_qds == nullptr) { + return false; + } + + auto* qds = static_cast(tmp_qds); + bool has_next = false; + ret = qds->next(has_next); + if (ret != E_OK || !has_next) { + reader.destroy_query_data_set(qds); + return false; + } + + const int64_t read_time = qds->get_value(1); + const int64_t read_value = qds->get_value(2); + reader.destroy_query_data_set(qds); + return read_time == timestamp && read_value == record_index; +} + +} // namespace + +class LargeFileTreeTest : public ::testing::Test { + protected: + void SetUp() override { + libtsfile_init(); + file_name_ = "large_file_tree_test_" + RandomSuffix() + ".tsfile"; + remove(file_name_.c_str()); + } + + void TearDown() override { + remove(file_name_.c_str()); + libtsfile_destroy(); + } + + std::string file_name_; +}; + +TEST_F(LargeFileTreeTest, DISABLED_LargeFile4GB_TreeWriteAndRead) { + TsFileWriter writer; + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + ASSERT_EQ(writer.open(file_name_, flags, 0666), E_OK); + writer.register_timeseries( + "device1", + MeasurementSchema("temperature", TSDataType::INT64, TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + + std::vector schema = { + MeasurementSchema("temperature", TSDataType::INT64, TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)}; + auto schema_ptr = std::make_shared>(schema); + + int64_t total_rows = 0; + while (GetFileSize(file_name_) < kTargetFileSize) { + Tablet tablet("device1", schema_ptr, kTabletRows); + for (uint32_t row = 0; row < kTabletRows; ++row) { + const int64_t record_index = total_rows + row; + tablet.add_timestamp(row, kStartTime + record_index * 1000); + tablet.add_value(row, 0, record_index); + } + ASSERT_EQ(writer.write_tablet(tablet), E_OK); + total_rows += kTabletRows; + if (total_rows % kFlushRows == 0) { + ASSERT_EQ(writer.flush(), E_OK); + } + } + + ASSERT_EQ(writer.flush(), E_OK); + ASSERT_EQ(writer.close(), E_OK); + + const int64_t final_size = GetFileSize(file_name_); + ASSERT_GE(final_size, kMinAcceptableFileSize); + + TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + + const std::vector check_indexes = {0, total_rows / 2, + total_rows - 1}; + for (int64_t index : check_indexes) { + ASSERT_TRUE(VerifyTreeRecord(reader, index)) << "index=" << index; + } + + ASSERT_EQ(reader.close(), E_OK); +} diff --git a/cpp/test/reader/tsfile_reader_test.cc b/cpp/test/reader/tsfile_reader_test.cc index 45261cf45..08cda6e31 100644 --- a/cpp/test/reader/tsfile_reader_test.cc +++ b/cpp/test/reader/tsfile_reader_test.cc @@ -395,65 +395,3 @@ TEST_F(TsFileReaderTest, GetTimeseriesMetadataTableModelTypeAndDeviceFilter) { reader.close(); } - -static const int64_t kLargeFileNumRecords = 300000000; -static const int64_t kLargeFileFlushBatch = 100000; - -TEST_F(TsFileReaderTest, - DISABLED_LargeFileNoEncodingNoCompression_WriteAndRead) { - std::string device_path = "device1"; - std::string measurement_name = "temperature"; - common::TSDataType data_type = common::TSDataType::INT64; - common::TSEncoding encoding = common::TSEncoding::PLAIN; - common::CompressionType compression_type = - common::CompressionType::UNCOMPRESSED; - - tsfile_writer_->register_timeseries( - device_path, storage::MeasurementSchema(measurement_name, data_type, - encoding, compression_type)); - - const int64_t start_time = 1622505600000LL; - for (int64_t i = 0; i < kLargeFileNumRecords; ++i) { - TsRecord record(start_time + i * 1000, device_path); - record.add_point(measurement_name, static_cast(i)); - ASSERT_EQ(tsfile_writer_->write_record(record), E_OK); - if ((i + 1) % kLargeFileFlushBatch == 0) { - ASSERT_EQ(tsfile_writer_->flush(), E_OK); - } - } - ASSERT_EQ(tsfile_writer_->flush(), E_OK); - ASSERT_EQ(tsfile_writer_->close(), E_OK); - - std::vector select_list = {"device1.temperature"}; - const int64_t end_time = start_time + (kLargeFileNumRecords - 1) * 1000 + 1; - - storage::TsFileReader reader; - int ret = reader.open(file_name_); - ASSERT_EQ(ret, common::E_OK); - - storage::ResultSet* tmp_qds = nullptr; - ret = reader.query(select_list, start_time, end_time, tmp_qds); - ASSERT_EQ(ret, common::E_OK); - ASSERT_NE(tmp_qds, nullptr); - - auto* qds = static_cast(tmp_qds); - std::shared_ptr meta = qds->get_metadata(); - ASSERT_NE(meta, nullptr); - ASSERT_EQ(meta->get_column_type(1), INT64); - ASSERT_EQ(meta->get_column_type(2), INT64); - - int64_t row_count = 0; - bool has_next = false; - - while (true) { - ret = qds->next(has_next); - ASSERT_EQ(ret, common::E_OK); - if (!has_next) break; - row_count++; - } - - ASSERT_EQ(row_count, kLargeFileNumRecords); - - reader.destroy_query_data_set(qds); - reader.close(); -} diff --git a/cpp/test/writer/tsfile_writer_test.cc b/cpp/test/writer/tsfile_writer_test.cc index 3c6d15165..139761380 100644 --- a/cpp/test/writer/tsfile_writer_test.cc +++ b/cpp/test/writer/tsfile_writer_test.cc @@ -262,24 +262,51 @@ TEST_F(TsFileWriterTest, RegisterTimeSeries) { ASSERT_EQ(tsfile_writer_->close(), E_OK); } -TEST_F(TsFileWriterTest, WriteMultipleRecords) { +TEST_F(TsFileWriterTest, MultiFlushWriteAndRead) { std::string device_path = "device1"; std::string measurement_name = "temperature"; - common::TSDataType data_type = common::TSDataType::INT32; - common::TSEncoding encoding = common::TSEncoding::PLAIN; - common::CompressionType compression_type = - common::CompressionType::UNCOMPRESSED; tsfile_writer_->register_timeseries( - device_path, storage::MeasurementSchema(measurement_name, data_type, - encoding, compression_type)); - - for (int i = 0; i < 50000; ++i) { - TsRecord record(1622505600000 + i * 1000, device_path); - record.add_point(measurement_name, (int32_t)i); + device_path, + storage::MeasurementSchema(measurement_name, common::TSDataType::INT64, + common::TSEncoding::PLAIN, + common::CompressionType::UNCOMPRESSED)); + + const int64_t start_time = 1622505600000LL; + const int row_count = 200000; + for (int i = 0; i < row_count; ++i) { + TsRecord record(start_time + i * 1000, device_path); + record.add_point(measurement_name, static_cast(i)); ASSERT_EQ(tsfile_writer_->write_record(record), E_OK); - ASSERT_EQ(tsfile_writer_->flush(), E_OK); + if ((i + 1) % 10000 == 0) { + ASSERT_EQ(tsfile_writer_->flush(), E_OK); + } } + ASSERT_EQ(tsfile_writer_->flush(), E_OK); ASSERT_EQ(tsfile_writer_->close(), E_OK); + + storage::TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + std::vector select_list = {"device1.temperature"}; + storage::ResultSet* tmp_qds = nullptr; + ASSERT_EQ(reader.query(select_list, start_time, + start_time + (row_count - 1) * 1000 + 1, tmp_qds), + E_OK); + auto* qds = static_cast(tmp_qds); + + int64_t read_rows = 0; + bool has_next = false; + while (true) { + ASSERT_EQ(qds->next(has_next), E_OK); + if (!has_next) { + break; + } + ASSERT_EQ(qds->get_value(1), start_time + read_rows * 1000); + ASSERT_EQ(qds->get_value(2), read_rows); + ++read_rows; + } + ASSERT_EQ(read_rows, row_count); + reader.destroy_query_data_set(qds); + ASSERT_EQ(reader.close(), E_OK); } #if defined(ENABLE_ZLIB) && defined(ENABLE_SNAPPY) && defined(ENABLE_LZ4) && \