Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/src/common/allocator/byte_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,10 @@ class ByteStream {

Consumer(const ByteStream& bs) : host_(bs) {
ASSERT(bs.head_.enable_atomic());
reset();
}

void reset() {
cur_ = nullptr;
read_offset_within_cur_page_ = 0;
total_end_offset_ = 0;
Expand Down
9 changes: 1 addition & 8 deletions cpp/src/cwrapper/tsfile_cwrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,7 @@ int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array,
extern "C" {
#endif

static bool is_init = false;

void init_tsfile_config() {
if (!is_init) {
common::init_common();
is_init = true;
}
}
void init_tsfile_config() { storage::libtsfile_init(); }

uint8_t get_global_time_encoding() {
return common::get_global_time_encoding();
Expand Down
16 changes: 15 additions & 1 deletion cpp/src/file/read_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ void ReadFile::close() {
int ReadFile::open(const std::string& file_path) {
int ret = E_OK;
file_path_ = file_path;
fd_ = ::open(file_path_.c_str(), O_RDONLY);
int flags = O_RDONLY;
#ifdef _WIN32
flags |= O_BINARY;
#endif
fd_ = ::open(file_path_.c_str(), flags);
if (fd_ < 0) {
std::cerr << "open file " << file_path << " error: " << strerror(errno)
<< " (errno " << errno << ")" << std::endl;
Expand All @@ -66,8 +70,13 @@ int ReadFile::open(const std::string& file_path) {
}

int ReadFile::get_file_size(int64_t& file_size) {
#ifdef _WIN32
struct __stat64 s;
if (_fstat64(fd_, &s) < 0) {
#else
struct stat s;
if (fstat(fd_, &s) < 0) {
#endif
LOGE("fstat error, file_path=" << file_path_.c_str() << "fd=" << fd_
<< "errno" << errno);
return E_FILE_STAT_ERR;
Expand Down Expand Up @@ -114,8 +123,13 @@ int ReadFile::read(int64_t offset, char* buf, int32_t buf_size,
int ret = E_OK;
read_len = 0;
while (read_len < buf_size) {
#ifdef _WIN32
ssize_t pread_size = ::pread(fd_, buf + read_len, buf_size - read_len,
static_cast<uint64_t>(offset + read_len));
#else
ssize_t pread_size = ::pread(fd_, buf + read_len, buf_size - read_len,
static_cast<off_t>(offset + read_len));
#endif
if (pread_size < 0) {
ret = E_FILE_READ_ERR;
////log_err("tsfile reader error, file_path=%s, errno=%d",
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/file/tsfile_io_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,11 @@ int TsFileIOWriter::flush_stream_to_file() {
}
}

write_stream_.purge_prev_pages();
if (IS_SUCC(ret)) {
file_base_offset_ = file_->get_position();
write_stream_.reset();
write_stream_consumer_.reset();
}

return ret;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/file/write_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ int WriteFile::close() {
int WriteFile::truncate(int64_t size) {
ASSERT(fd_ > 0);
#ifdef _WIN32
if (_chsize_s(fd_, static_cast<long>(size)) != 0) {
if (_chsize_s(fd_, size) != 0) {
return E_FILE_WRITE_ERR;
}
#else
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/reader/aligned_chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ int AlignedChunkReader::read_from_file_and_rewrap(
int ret = E_OK;
const int DEFAULT_READ_SIZE = 4096; // may use page_size + page_header_size
char* file_data_buf = in_stream_.get_wrapped_buf();
int offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset;
int64_t offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset;
int read_size =
(want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size);
if (file_data_buf_size < read_size ||
Expand Down
1 change: 0 additions & 1 deletion cpp/src/writer/tsfile_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ bool g_s_is_inited = false;
}

int libtsfile_init() {
libtsfile::g_s_is_inited = false;
if (libtsfile::g_s_is_inited) {
return E_OK;
}
Expand Down
1 change: 1 addition & 0 deletions cpp/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ set(LIB_TSFILE_SDK_DIR ${PROJECT_BINARY_DIR}/lib)
message("LIB_TSFILE_SDK_DIR: ${LIB_TSFILE_SDK_DIR}")

include_directories(
${CMAKE_CURRENT_SOURCE_DIR}/reader
${LIBRARY_INCLUDE_DIR}
${THIRD_PARTY_INCLUDE}
)
Expand Down
76 changes: 76 additions & 0 deletions cpp/test/reader/large_file_test_common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#ifndef TEST_LARGE_FILE_TEST_COMMON_H
#define TEST_LARGE_FILE_TEST_COMMON_H

#ifdef _WIN32
#include <sys/stat.h>
#include <sys/types.h>
#else
#include <sys/stat.h>
#endif

#include <cstdint>
#include <random>
#include <string>

namespace large_file_test {

constexpr int64_t kTargetFileSize =
static_cast<int64_t>(4) * 1024 * 1024 * 1024;
constexpr int64_t kMinAcceptableFileSize =
static_cast<int64_t>(3800) * 1024 * 1024;
constexpr int64_t kStartTime = 1622505600000LL;
constexpr uint32_t kTabletRows = 50000;
constexpr int64_t kFlushRows = 1000000;

inline int64_t GetFileSize(const std::string& path) {
#ifdef _WIN32
struct __stat64 s;
if (_stat64(path.c_str(), &s) != 0) {
return -1;
}
return static_cast<int64_t>(s.st_size);
#else
struct stat s;
if (stat(path.c_str(), &s) != 0) {
return -1;
}
return static_cast<int64_t>(s.st_size);
#endif
}

inline std::string RandomSuffix(int length = 10) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, 61);
const std::string chars =
"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
std::string out;
out.reserve(length);
for (int i = 0; i < length; ++i) {
out += chars[dis(gen)];
}
return out;
}

} // namespace large_file_test

#endif // TEST_LARGE_FILE_TEST_COMMON_H
146 changes: 146 additions & 0 deletions cpp/test/reader/table_view/large_file_table_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <gtest/gtest.h>

#include <memory>
#include <vector>

#include "common/schema.h"
#include "common/tablet.h"
#include "file/write_file.h"
#include "large_file_test_common.h"
#include "reader/table_result_set.h"
#include "reader/tsfile_reader.h"
#include "writer/tsfile_table_writer.h"

using namespace common;
using namespace storage;
using namespace large_file_test;

namespace {

constexpr char kTableName[] = "large_table";
constexpr char kDeviceTag[] = "device";
constexpr char kValueField[] = "value";

bool VerifyTableRecord(TsFileReader& reader, int64_t record_index) {
const int64_t timestamp = kStartTime + record_index * 1000;
ResultSet* tmp_result_set = nullptr;
int ret = reader.query(kTableName, {kValueField}, timestamp, timestamp + 1,
tmp_result_set);
if (ret != E_OK || tmp_result_set == nullptr) {
return false;
}

auto* table_result_set = static_cast<TableResultSet*>(tmp_result_set);
bool has_next = false;
ret = table_result_set->next(has_next);
if (ret != E_OK || !has_next) {
reader.destroy_query_data_set(table_result_set);
return false;
}

const int64_t read_time = table_result_set->get_value<int64_t>(1);
const int64_t read_value = table_result_set->get_value<int64_t>(2);
reader.destroy_query_data_set(table_result_set);
return read_time == timestamp && read_value == record_index;
}

TableSchema* CreateTableSchema() {
std::vector<MeasurementSchema*> measurement_schemas;
measurement_schemas.push_back(
new MeasurementSchema(kDeviceTag, TSDataType::STRING, TSEncoding::PLAIN,
CompressionType::UNCOMPRESSED));
measurement_schemas.push_back(
new MeasurementSchema(kValueField, TSDataType::INT64, TSEncoding::PLAIN,
CompressionType::UNCOMPRESSED));
std::vector<ColumnCategory> column_categories = {ColumnCategory::TAG,
ColumnCategory::FIELD};
return new TableSchema(kTableName, measurement_schemas, column_categories);
}

} // namespace

class LargeFileTableTest : public ::testing::Test {
protected:
void SetUp() override {
libtsfile_init();
file_name_ = "large_file_table_test_" + RandomSuffix() + ".tsfile";
remove(file_name_.c_str());
}

void TearDown() override {
remove(file_name_.c_str());
libtsfile_destroy();
}

std::string file_name_;
};

TEST_F(LargeFileTableTest, DISABLED_LargeFile4GB_TableWriteAndRead) {
std::unique_ptr<TableSchema> table_schema(CreateTableSchema());

WriteFile write_file;
int flags = O_WRONLY | O_CREAT | O_TRUNC;
#ifdef _WIN32
flags |= O_BINARY;
#endif
ASSERT_EQ(write_file.create(file_name_, flags, 0666), E_OK);

TsFileTableWriter table_writer(&write_file, table_schema.get());
const std::vector<std::string> column_names = {kDeviceTag, kValueField};
const std::vector<TSDataType> data_types = {TSDataType::STRING,
TSDataType::INT64};

int64_t total_rows = 0;
while (GetFileSize(file_name_) < kTargetFileSize) {
Tablet tablet(column_names, data_types, kTabletRows);
tablet.set_table_name(kTableName);
for (uint32_t row = 0; row < kTabletRows; ++row) {
const int64_t record_index = total_rows + row;
tablet.add_timestamp(row, kStartTime + record_index * 1000);
tablet.add_value(row, kDeviceTag, "device0");
tablet.add_value(row, kValueField, record_index);
}
ASSERT_EQ(table_writer.write_table(tablet), E_OK);
total_rows += kTabletRows;
if (total_rows % kFlushRows == 0) {
ASSERT_EQ(table_writer.flush(), E_OK);
}
}

ASSERT_EQ(table_writer.flush(), E_OK);
ASSERT_EQ(table_writer.close(), E_OK);
ASSERT_EQ(write_file.close(), E_OK);

const int64_t final_size = GetFileSize(file_name_);
ASSERT_GE(final_size, kMinAcceptableFileSize);

TsFileReader reader;
ASSERT_EQ(reader.open(file_name_), E_OK);

const std::vector<int64_t> check_indexes = {0, total_rows / 2,
total_rows - 1};
for (int64_t index : check_indexes) {
ASSERT_TRUE(VerifyTableRecord(reader, index)) << "index=" << index;
}

ASSERT_EQ(reader.close(), E_OK);
}
Loading
Loading