diff --git a/cpp/CLAUDE.md b/cpp/CLAUDE.md index 00157dd5a..674771759 100644 --- a/cpp/CLAUDE.md +++ b/cpp/CLAUDE.md @@ -92,6 +92,7 @@ cpp/src/ ## Code Style - **Formatter**: clang-format (Google style), configured in `.clang-format` +- After modifying C++ code, run from the repo root to format: `./mvnw spotless:apply -P with-cpp` ## Testing diff --git a/cpp/src/file/tsfile_io_reader.cc b/cpp/src/file/tsfile_io_reader.cc index e96008a47..296556c15 100644 --- a/cpp/src/file/tsfile_io_reader.cc +++ b/cpp/src/file/tsfile_io_reader.cc @@ -155,6 +155,70 @@ int TsFileIOReader::get_device_timeseries_meta_without_chunk_meta( return ret; } +int TsFileIOReader::get_device_timeseries_meta_by_offset( + int64_t start_offset, int64_t end_offset, + std::vector& timeseries_indexs, PageArena& pa) { + int ret = E_OK; + load_tsfile_meta_if_necessary(); + + std::vector, int64_t>> + meta_index_entry_list; + bool is_aligned = false; + TimeseriesIndex* time_timeseries_index = nullptr; + + ASSERT(start_offset < end_offset); + const int32_t read_size = end_offset - start_offset; + int32_t ret_read_len = 0; + char* data_buf = (char*)pa.alloc(read_size); + void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); + if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) { + return E_OOM; + } + auto* top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); + auto top_node = std::shared_ptr(top_node_ptr, + MetaIndexNode::self_deleter); + if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size, + ret_read_len))) { + return ret; + } + if (RET_FAIL(top_node->deserialize_from(data_buf, read_size))) { + return ret; + } + + is_aligned = is_aligned_device(top_node); + if (is_aligned) { + if (RET_FAIL(get_time_column_metadata(top_node, time_timeseries_index, + pa))) { + return ret; + } + } + + get_all_leaf(top_node, meta_index_entry_list); + + if (RET_FAIL(do_load_all_timeseries_index(meta_index_entry_list, pa, + timeseries_indexs))) { + return ret; + } + + if (is_aligned && time_timeseries_index != nullptr) { + for (size_t i = 0; i < timeseries_indexs.size(); i++) { + void* buf = pa.alloc(sizeof(AlignedTimeseriesIndex)); + if (IS_NULL(buf)) { + return E_OOM; + } + auto* aligned_ts_idx = new (buf) AlignedTimeseriesIndex; + aligned_ts_idx->time_ts_idx_ = time_timeseries_index; + aligned_ts_idx->value_ts_idx_ = + dynamic_cast(timeseries_indexs[i]); + if (aligned_ts_idx->value_ts_idx_ == nullptr) { + return E_TYPE_NOT_MATCH; + } + timeseries_indexs[i] = aligned_ts_idx; + } + } + return ret; +} + bool TsFileIOReader::filter_stasify(ITimeseriesIndex* ts_index, Filter* time_filter) { ASSERT(ts_index->get_statistic() != nullptr); diff --git a/cpp/src/file/tsfile_io_reader.h b/cpp/src/file/tsfile_io_reader.h index 2f4135e0e..85443326f 100644 --- a/cpp/src/file/tsfile_io_reader.h +++ b/cpp/src/file/tsfile_io_reader.h @@ -84,6 +84,16 @@ class TsFileIOReader { std::vector& timeseries_indexs, common::PageArena& pa); + int get_device_timeseries_meta_by_offset( + int64_t start_offset, int64_t end_offset, + std::vector& timeseries_indexs, + common::PageArena& pa); + + int load_device_index_entry( + std::shared_ptr target_name, + std::shared_ptr& device_index_entry, + int64_t& end_offset); + private: FORCE_INLINE int64_t file_size() const { return read_file_->file_size(); } @@ -91,11 +101,6 @@ class TsFileIOReader { int load_tsfile_meta_if_necessary(); - int load_device_index_entry( - std::shared_ptr target_name, - std::shared_ptr& device_index_entry, - int64_t& end_offset); - int load_measurement_index_entry( const std::string& measurement_name, std::shared_ptr top_node, diff --git a/cpp/src/reader/device_meta_iterator.cc b/cpp/src/reader/device_meta_iterator.cc index a41a29e6c..bf01b23a5 100644 --- a/cpp/src/reader/device_meta_iterator.cc +++ b/cpp/src/reader/device_meta_iterator.cc @@ -43,6 +43,16 @@ bool DeviceMetaIterator::has_next() { return true; } + if (direct_device_id_ != nullptr) { + if (direct_lookup_done_) { + return false; + } + if (load_results_direct() != common::E_OK) { + return false; + } + return !result_cache_.empty(); + } + if (load_results() != common::E_OK) { return false; } @@ -63,9 +73,6 @@ int DeviceMetaIterator::next( int DeviceMetaIterator::load_results() { int root_num = meta_index_nodes_.size(); while (!meta_index_nodes_.empty()) { - // To avoid ASan overflow. - // using `const auto&` creates a reference - // to a queue element that may become invalid. auto meta_data_index_node = meta_index_nodes_.front(); meta_index_nodes_.pop(); const auto& node_type = meta_data_index_node->node_type_; @@ -80,7 +87,6 @@ int DeviceMetaIterator::load_results() { meta_data_index_node->~MetaIndexNode(); } } - return common::E_OK; } @@ -135,4 +141,69 @@ int DeviceMetaIterator::load_internal_node(MetaIndexNode* meta_index_node) { } return ret; } + +void DeviceMetaIterator::try_setup_direct_lookup(MetaIndexNode* root_node) { + if (id_filter_ == nullptr) return; + + const auto* eq = dynamic_cast(id_filter_); + if (eq == nullptr) return; + + if (root_node->children_.empty()) return; + + auto first_device = root_node->children_[0]->get_device_id(); + if (first_device == nullptr) return; + + auto first_segments = first_device->get_segments(); + int actual_segment_count = static_cast(first_segments.size()); + + if (actual_segment_count != 2) return; + + std::string table_name = first_device->get_table_name(); + std::vector segs(actual_segment_count); + segs[0] = table_name; + for (int i = 1; i < actual_segment_count; i++) { + segs[i] = ""; + } + segs[eq->col_idx_] = eq->value_; + direct_device_id_ = std::make_shared(segs); + direct_root_node_ = root_node; +} + +int DeviceMetaIterator::load_results_direct() { + int ret = common::E_OK; + direct_lookup_done_ = true; + + if (direct_device_id_ == nullptr) { + return common::E_OK; + } + + auto device_comparable = + std::make_shared(direct_device_id_); + + std::shared_ptr device_index_entry; + int64_t end_offset = 0; + + ret = io_reader_->load_device_index_entry(device_comparable, + device_index_entry, end_offset); + + if (ret != common::E_OK || device_index_entry == nullptr) { + return common::E_OK; + } + + int64_t start_offset = device_index_entry->get_offset(); + MetaIndexNode* child_node = nullptr; + if (RET_FAIL(io_reader_->read_device_meta_index(start_offset, end_offset, + pa_, child_node, true))) { + return ret; + } + + auto device_id = device_index_entry->get_device_id(); + if (should_split_device_name) { + device_id->split_table_name(); + } + result_cache_.push(std::make_pair(device_id, child_node)); + + return common::E_OK; +} + } // namespace storage \ No newline at end of file diff --git a/cpp/src/reader/device_meta_iterator.h b/cpp/src/reader/device_meta_iterator.h index 704098b4d..da6a37dc4 100644 --- a/cpp/src/reader/device_meta_iterator.h +++ b/cpp/src/reader/device_meta_iterator.h @@ -21,6 +21,8 @@ #define READER_DEVICE_META_ITERATOR_H #include +#include +#include #include "file/tsfile_io_reader.h" #include "reader/expression.h" @@ -34,15 +36,19 @@ class DeviceMetaIterator { const Filter* id_filter) : io_reader_(io_reader), id_filter_(id_filter), - should_split_device_name(false) { + should_split_device_name(false), + direct_lookup_done_(false) { meta_index_nodes_.push(meat_index_node); pa_.init(512, common::MOD_DEVICE_META_ITER); + try_setup_direct_lookup(meat_index_node); } DeviceMetaIterator(TsFileIOReader* io_reader, const std::vector& meta_index_node_list, const Filter* id_filter) - : io_reader_(io_reader), id_filter_(id_filter) { + : io_reader_(io_reader), + id_filter_(id_filter), + direct_lookup_done_(false) { for (auto meta_index_node : meta_index_node_list) { meta_index_nodes_.push(meta_index_node); } @@ -62,6 +68,10 @@ class DeviceMetaIterator { int load_results(); int load_leaf_device(MetaIndexNode* meta_index_node); int load_internal_node(MetaIndexNode* meta_index_node); + + void try_setup_direct_lookup(MetaIndexNode* root_node); + int load_results_direct(); + TsFileIOReader* io_reader_; std::queue meta_index_nodes_; std::queue, MetaIndexNode*>> @@ -69,6 +79,10 @@ class DeviceMetaIterator { const Filter* id_filter_; common::PageArena pa_; bool should_split_device_name; + + bool direct_lookup_done_; + std::shared_ptr direct_device_id_; + MetaIndexNode* direct_root_node_ = nullptr; }; } // end namespace storage diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc index cabf02b08..8d9d9b5dc 100644 --- a/cpp/src/reader/tsfile_reader.cc +++ b/cpp/src/reader/tsfile_reader.cc @@ -25,6 +25,71 @@ using namespace common; using namespace storage; +namespace { + +struct DeviceMetaEntry { + std::shared_ptr device_id; + int64_t start_offset; + int64_t end_offset; +}; + +int get_all_device_entries(std::vector& entries, + std::shared_ptr index_node, + ReadFile* read_file, PageArena& pa) { + int ret = E_OK; + if (index_node == nullptr) { + return ret; + } + if (index_node->node_type_ == LEAF_DEVICE) { + for (size_t i = 0; i < index_node->children_.size(); i++) { + DeviceMetaEntry entry; + entry.device_id = index_node->children_[i]->get_device_id(); + entry.start_offset = index_node->children_[i]->get_offset(); + entry.end_offset = (i + 1 < index_node->children_.size()) + ? index_node->children_[i + 1]->get_offset() + : index_node->end_offset_; + entries.push_back(entry); + } + } else { + for (size_t idx = 0; idx < index_node->children_.size(); idx++) { + auto meta_index_entry = index_node->children_[idx]; + int64_t start_offset = meta_index_entry->get_offset(); + int64_t end_offset = index_node->end_offset_; + if (idx + 1 < index_node->children_.size()) { + end_offset = index_node->children_[idx + 1]->get_offset(); + } + ASSERT(end_offset - start_offset > 0); + const int32_t read_size = (int32_t)(end_offset - start_offset); + int32_t ret_read_len = 0; + char* data_buf = (char*)pa.alloc(read_size); + void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); + if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) { + return E_OOM; + } + auto* top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); + auto top_node = std::shared_ptr( + top_node_ptr, [](MetaIndexNode* ptr) { + if (ptr) { + ptr->~MetaIndexNode(); + } + }); + if (RET_FAIL(read_file->read(start_offset, data_buf, read_size, + ret_read_len))) { + } else if (RET_FAIL(top_node->device_deserialize_from(data_buf, + read_size))) { + } else { + ret = get_all_device_entries(entries, top_node, read_file, pa); + } + if (ret != E_OK) { + return ret; + } + } + } + return ret; +} + +} // namespace + namespace storage { TsFileReader::TsFileReader() : read_file_(nullptr), @@ -367,8 +432,6 @@ int TsFileReader::get_timeseries_metadata_impl( std::vector>& result) { int ret = E_OK; std::vector timeseries_indexs; - tsfile_reader_meta_pa_.init(512, MOD_TSFILE_READER); - // Pointers are owned by tsfile_reader_meta_pa_; shared_ptr must not delete auto noop_deleter = [](ITimeseriesIndex*) {}; if (RET_FAIL( tsfile_executor_->get_tsfile_io_reader() @@ -397,13 +460,36 @@ DeviceTimeseriesMetadataMap TsFileReader::get_timeseries_metadata( } DeviceTimeseriesMetadataMap TsFileReader::get_timeseries_metadata() { - // Collect metadata for all devices present in the file DeviceTimeseriesMetadataMap result; - auto device_ids = get_all_device_ids(); - for (const auto& device_id : device_ids) { - std::vector> list; - if (get_timeseries_metadata_impl(device_id, list) == E_OK) { - result.insert(std::make_pair(device_id, std::move(list))); + TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta(); + if (tsfile_meta == nullptr) { + return result; + } + + PageArena pa; + pa.init(512, MOD_TSFILE_READER); + std::vector entries; + for (auto& table_entry : tsfile_meta->table_metadata_index_node_map_) { + if (get_all_device_entries(entries, table_entry.second, read_file_, + pa) != E_OK) { + return result; + } + } + + auto noop_deleter = [](ITimeseriesIndex*) {}; + for (auto& device_entry : entries) { + std::vector raw_ts_indexes; + if (tsfile_executor_->get_tsfile_io_reader() + ->get_device_timeseries_meta_by_offset( + device_entry.start_offset, device_entry.end_offset, + raw_ts_indexes, tsfile_reader_meta_pa_) == E_OK) { + std::vector> list; + for (auto ts_idx : raw_ts_indexes) { + list.emplace_back( + std::shared_ptr(ts_idx, noop_deleter)); + } + result.insert( + std::make_pair(device_entry.device_id, std::move(list))); } } return result; diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_test.cc index 1f63573e1..e55f34c2a 100644 --- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc +++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc @@ -885,4 +885,340 @@ TEST_F(TsFileTableReaderTest, AlignedNullAtBlockBoundaryNoRowLoss) { ASSERT_EQ(nullable_rows, total_rows); ASSERT_EQ(reader.close(), common::E_OK); +} + +TEST_F(TsFileTableReaderTest, GetTimeseriesMetadataTableModel) { + std::vector schemas; + std::vector categories; + schemas.emplace_back(new MeasurementSchema("device", TSDataType::STRING, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::TAG); + schemas.emplace_back(new MeasurementSchema("value", TSDataType::INT64, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::FIELD); + auto* table_schema = new TableSchema("meta_table", schemas, categories); + auto writer = + std::make_shared(&write_file_, table_schema); + + int num_devices = 3; + int points = 10; + int total_rows = num_devices * points; + storage::Tablet tablet(table_schema->get_table_name(), + table_schema->get_measurement_names(), + table_schema->get_data_types(), + table_schema->get_column_categories(), total_rows); + for (int d = 0; d < num_devices; d++) { + std::string dev = "dev" + std::to_string(d); + for (int t = 0; t < points; t++) { + int row = d * points + t; + tablet.add_timestamp(row, static_cast(t)); + tablet.add_value(row, "device", dev.c_str()); + tablet.add_value(row, "value", static_cast(d * 100 + t)); + } + } + ASSERT_EQ(writer->write_table(tablet), common::E_OK); + ASSERT_EQ(writer->flush(), common::E_OK); + ASSERT_EQ(writer->close(), common::E_OK); + + storage::TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), common::E_OK); + + auto meta_map = reader.get_timeseries_metadata(); + ASSERT_EQ(meta_map.size(), static_cast(num_devices)); + + for (auto& entry : meta_map) { + auto& ts_list = entry.second; + ASSERT_FALSE(ts_list.empty()); + for (auto& ts_idx : ts_list) { + ASSERT_NE(ts_idx->get_statistic(), nullptr); + ASSERT_EQ(ts_idx->get_statistic()->count_, points); + } + } + + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; +} + +TEST_F(TsFileTableReaderTest, GetTimeseriesMetadataMultiTable) { + std::vector schemas0; + std::vector cats0; + schemas0.emplace_back(new MeasurementSchema("tag", TSDataType::STRING, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + cats0.emplace_back(ColumnCategory::TAG); + schemas0.emplace_back(new MeasurementSchema("v0", TSDataType::INT64, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + cats0.emplace_back(ColumnCategory::FIELD); + auto* schema0 = new TableSchema("table_a", schemas0, cats0); + auto writer = std::make_shared(&write_file_, schema0); + + storage::Tablet tablet0( + schema0->get_table_name(), schema0->get_measurement_names(), + schema0->get_data_types(), schema0->get_column_categories(), 10); + for (int d = 0; d < 2; d++) { + std::string dev = "a_dev" + std::to_string(d); + for (int t = 0; t < 5; t++) { + int row = d * 5 + t; + tablet0.add_timestamp(row, static_cast(t)); + tablet0.add_value(row, "tag", dev.c_str()); + tablet0.add_value(row, "v0", static_cast(t)); + } + } + ASSERT_EQ(writer->write_table(tablet0), common::E_OK); + + std::vector schemas1; + std::vector cats1; + schemas1.emplace_back(new MeasurementSchema("tag", TSDataType::STRING, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + cats1.emplace_back(ColumnCategory::TAG); + schemas1.emplace_back(new MeasurementSchema("v1", TSDataType::INT64, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + cats1.emplace_back(ColumnCategory::FIELD); + auto* schema1 = new TableSchema("table_b", schemas1, cats1); + auto schema1_ptr = std::shared_ptr(schema1); + writer->register_table(schema1_ptr); + + storage::Tablet tablet1( + schema1->get_table_name(), schema1->get_measurement_names(), + schema1->get_data_types(), schema1->get_column_categories(), 24); + for (int d = 0; d < 3; d++) { + std::string dev = "b_dev" + std::to_string(d); + for (int t = 0; t < 8; t++) { + int row = d * 8 + t; + tablet1.add_timestamp(row, static_cast(t)); + tablet1.add_value(row, "tag", dev.c_str()); + tablet1.add_value(row, "v1", static_cast(t)); + } + } + ASSERT_EQ(writer->write_table(tablet1), common::E_OK); + + ASSERT_EQ(writer->flush(), common::E_OK); + ASSERT_EQ(writer->close(), common::E_OK); + + storage::TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), common::E_OK); + + auto meta_map = reader.get_timeseries_metadata(); + ASSERT_EQ(meta_map.size(), 5u); + + int table_a_count = 0; + int table_b_count = 0; + for (auto& entry : meta_map) { + auto table_name = entry.first->get_table_name(); + if (table_name == "table_a") { + table_a_count++; + for (auto& ts : entry.second) { + ASSERT_EQ(ts->get_statistic()->count_, 5); + } + } else if (table_name == "table_b") { + table_b_count++; + for (auto& ts : entry.second) { + ASSERT_EQ(ts->get_statistic()->count_, 8); + } + } + } + ASSERT_EQ(table_a_count, 2); + ASSERT_EQ(table_b_count, 3); + + ASSERT_EQ(reader.close(), common::E_OK); + delete schema0; +} + +TEST_F(TsFileTableReaderTest, DirectLookupSingleTagColumn) { + std::vector schemas; + std::vector categories; + schemas.emplace_back(new MeasurementSchema("tag", TSDataType::STRING, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::TAG); + schemas.emplace_back(new MeasurementSchema("val", TSDataType::INT64, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::FIELD); + auto* table_schema = + new TableSchema("single_tag_table", schemas, categories); + auto writer = + std::make_shared(&write_file_, table_schema); + + int num_devices = 5; + int points = 10; + storage::Tablet tablet( + table_schema->get_table_name(), table_schema->get_measurement_names(), + table_schema->get_data_types(), table_schema->get_column_categories(), + num_devices * points); + for (int d = 0; d < num_devices; d++) { + std::string dev_name = "dev" + std::to_string(d); + for (int t = 0; t < points; t++) { + int row = d * points + t; + tablet.add_timestamp(row, static_cast(t)); + tablet.add_value(row, "tag", dev_name.c_str()); + tablet.add_value(row, "val", static_cast(d * 100 + t)); + } + } + ASSERT_EQ(writer->write_table(tablet), common::E_OK); + ASSERT_EQ(writer->flush(), common::E_OK); + ASSERT_EQ(writer->close(), common::E_OK); + + storage::TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), common::E_OK); + + ResultSet* tmp_result_set = nullptr; + Filter* tag_filter = TagFilterBuilder(table_schema).eq("tag", "dev2"); + std::vector cols = {"tag", "val"}; + int ret = reader.query("single_tag_table", cols, 0, 1000000, tmp_result_set, + tag_filter); + ASSERT_EQ(ret, common::E_OK); + auto* table_result_set = (TableResultSet*)tmp_result_set; + + bool has_next = false; + int64_t row_num = 0; + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + ASSERT_EQ(table_result_set->get_value(1), row_num % points); + auto* tag_val = table_result_set->get_value(2); + std::string expected_tag = "dev2"; + ASSERT_EQ(std::string(tag_val->buf_, tag_val->len_), expected_tag); + ASSERT_EQ(table_result_set->get_value(3), + static_cast(200 + row_num)); + row_num++; + } + ASSERT_EQ(row_num, points); + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; + delete tag_filter; +} + +TEST_F(TsFileTableReaderTest, DirectLookupNonExistDevice) { + std::vector schemas; + std::vector categories; + schemas.emplace_back(new MeasurementSchema("tag", TSDataType::STRING, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::TAG); + schemas.emplace_back(new MeasurementSchema("val", TSDataType::INT64, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::FIELD); + auto* table_schema = + new TableSchema("single_tag_table", schemas, categories); + auto writer = + std::make_shared(&write_file_, table_schema); + + storage::Tablet tablet(table_schema->get_table_name(), + table_schema->get_measurement_names(), + table_schema->get_data_types(), + table_schema->get_column_categories(), 5); + for (int t = 0; t < 5; t++) { + tablet.add_timestamp(t, static_cast(t)); + tablet.add_value(t, "tag", "existing_dev"); + tablet.add_value(t, "val", static_cast(t)); + } + ASSERT_EQ(writer->write_table(tablet), common::E_OK); + ASSERT_EQ(writer->flush(), common::E_OK); + ASSERT_EQ(writer->close(), common::E_OK); + + storage::TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), common::E_OK); + + ResultSet* tmp_result_set = nullptr; + Filter* tag_filter = TagFilterBuilder(table_schema).eq("tag", "non_exist"); + std::vector cols = {"tag", "val"}; + int ret = reader.query("single_tag_table", cols, 0, 1000000, tmp_result_set, + tag_filter); + ASSERT_EQ(ret, common::E_OK); + auto* table_result_set = (TableResultSet*)tmp_result_set; + + bool has_next = false; + int64_t row_num = 0; + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + row_num++; + } + ASSERT_EQ(row_num, 0); + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; + delete tag_filter; +} + +TEST_F(TsFileTableReaderTest, MultiTagColumnFilterOnSecondTag) { + std::vector schemas; + std::vector categories; + schemas.emplace_back(new MeasurementSchema("region", TSDataType::STRING, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::TAG); + schemas.emplace_back(new MeasurementSchema("device", TSDataType::STRING, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::TAG); + schemas.emplace_back(new MeasurementSchema("val", TSDataType::INT64, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::FIELD); + auto* table_schema = + new TableSchema("multi_tag_table", schemas, categories); + auto writer = + std::make_shared(&write_file_, table_schema); + + struct DeviceData { + std::string region; + std::string device; + int start; + int count; + }; + std::vector devices = { + {"north", "dev_a", 0, 5}, + {"north", "dev_b", 5, 5}, + {"south", "dev_c", 10, 5}, + {"east", "dev_d", 15, 5}, + }; + + int total = 20; + storage::Tablet tablet(table_schema->get_table_name(), + table_schema->get_measurement_names(), + table_schema->get_data_types(), + table_schema->get_column_categories(), total); + int row = 0; + for (auto& d : devices) { + for (int t = 0; t < d.count; t++) { + tablet.add_timestamp(row, static_cast(d.start + t)); + tablet.add_value(row, "region", d.region.c_str()); + tablet.add_value(row, "device", d.device.c_str()); + tablet.add_value(row, "val", static_cast(d.start + t)); + row++; + } + } + ASSERT_EQ(writer->write_table(tablet), common::E_OK); + ASSERT_EQ(writer->flush(), common::E_OK); + ASSERT_EQ(writer->close(), common::E_OK); + + storage::TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), common::E_OK); + + ResultSet* tmp_result_set = nullptr; + Filter* tag_filter = TagFilterBuilder(table_schema).eq("device", "dev_c"); + std::vector cols = {"region", "device", "val"}; + int ret = reader.query("multi_tag_table", cols, 0, 1000000, tmp_result_set, + tag_filter); + ASSERT_EQ(ret, common::E_OK); + auto* table_result_set = (TableResultSet*)tmp_result_set; + + bool has_next = false; + int64_t row_num = 0; + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + row_num++; + } + ASSERT_EQ(row_num, 5); + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; + delete tag_filter; } \ No newline at end of file