Skip to content
Open
1 change: 1 addition & 0 deletions cpp/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ cpp/src/
## Code Style

- **Formatter**: clang-format (Google style), configured in `.clang-format`
- After modifying C++ code, run from the repo root to format: `./mvnw spotless:apply -P with-cpp`

## Testing

Expand Down
64 changes: 64 additions & 0 deletions cpp/src/file/tsfile_io_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,70 @@ int TsFileIOReader::get_device_timeseries_meta_without_chunk_meta(
return ret;
}

int TsFileIOReader::get_device_timeseries_meta_by_offset(
int64_t start_offset, int64_t end_offset,
std::vector<ITimeseriesIndex*>& timeseries_indexs, PageArena& pa) {
int ret = E_OK;
load_tsfile_meta_if_necessary();

std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t>>
meta_index_entry_list;
bool is_aligned = false;
TimeseriesIndex* time_timeseries_index = nullptr;

ASSERT(start_offset < end_offset);
const int32_t read_size = end_offset - start_offset;
Comment thread
JackieTien97 marked this conversation as resolved.
int32_t ret_read_len = 0;
char* data_buf = (char*)pa.alloc(read_size);
void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode));
Comment thread
JackieTien97 marked this conversation as resolved.
Comment thread
JackieTien97 marked this conversation as resolved.
if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) {
return E_OOM;
}
auto* top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa);
auto top_node = std::shared_ptr<MetaIndexNode>(top_node_ptr,
MetaIndexNode::self_deleter);
if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size,
ret_read_len))) {
return ret;
}
Comment thread
JackieTien97 marked this conversation as resolved.
if (RET_FAIL(top_node->deserialize_from(data_buf, read_size))) {
return ret;
}

is_aligned = is_aligned_device(top_node);
if (is_aligned) {
if (RET_FAIL(get_time_column_metadata(top_node, time_timeseries_index,
pa))) {
return ret;
}
}

get_all_leaf(top_node, meta_index_entry_list);

if (RET_FAIL(do_load_all_timeseries_index(meta_index_entry_list, pa,
timeseries_indexs))) {
return ret;
Comment thread
JackieTien97 marked this conversation as resolved.
}

if (is_aligned && time_timeseries_index != nullptr) {
for (size_t i = 0; i < timeseries_indexs.size(); i++) {
void* buf = pa.alloc(sizeof(AlignedTimeseriesIndex));
if (IS_NULL(buf)) {
return E_OOM;
}
auto* aligned_ts_idx = new (buf) AlignedTimeseriesIndex;
aligned_ts_idx->time_ts_idx_ = time_timeseries_index;
aligned_ts_idx->value_ts_idx_ =
dynamic_cast<TimeseriesIndex*>(timeseries_indexs[i]);
if (aligned_ts_idx->value_ts_idx_ == nullptr) {
return E_TYPE_NOT_MATCH;
}
timeseries_indexs[i] = aligned_ts_idx;
}
}
return ret;
}

bool TsFileIOReader::filter_stasify(ITimeseriesIndex* ts_index,
Filter* time_filter) {
ASSERT(ts_index->get_statistic() != nullptr);
Expand Down
15 changes: 10 additions & 5 deletions cpp/src/file/tsfile_io_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,23 @@ class TsFileIOReader {
std::vector<ITimeseriesIndex*>& timeseries_indexs,
common::PageArena& pa);

int get_device_timeseries_meta_by_offset(
int64_t start_offset, int64_t end_offset,
std::vector<ITimeseriesIndex*>& timeseries_indexs,
common::PageArena& pa);

int load_device_index_entry(
std::shared_ptr<IComparable> target_name,
std::shared_ptr<IMetaIndexEntry>& device_index_entry,
int64_t& end_offset);

private:
FORCE_INLINE int64_t file_size() const { return read_file_->file_size(); }

int load_tsfile_meta();

int load_tsfile_meta_if_necessary();

int load_device_index_entry(
std::shared_ptr<IComparable> target_name,
std::shared_ptr<IMetaIndexEntry>& device_index_entry,
int64_t& end_offset);

int load_measurement_index_entry(
const std::string& measurement_name,
std::shared_ptr<MetaIndexNode> top_node,
Expand Down
79 changes: 75 additions & 4 deletions cpp/src/reader/device_meta_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ bool DeviceMetaIterator::has_next() {
return true;
}

if (direct_device_id_ != nullptr) {
if (direct_lookup_done_) {
return false;
}
if (load_results_direct() != common::E_OK) {
return false;
}
return !result_cache_.empty();
}

if (load_results() != common::E_OK) {
return false;
}
Expand All @@ -63,9 +73,6 @@ int DeviceMetaIterator::next(
int DeviceMetaIterator::load_results() {
int root_num = meta_index_nodes_.size();
while (!meta_index_nodes_.empty()) {
// To avoid ASan overflow.
// using `const auto&` creates a reference
// to a queue element that may become invalid.
auto meta_data_index_node = meta_index_nodes_.front();
meta_index_nodes_.pop();
const auto& node_type = meta_data_index_node->node_type_;
Expand All @@ -80,7 +87,6 @@ int DeviceMetaIterator::load_results() {
meta_data_index_node->~MetaIndexNode();
}
}

return common::E_OK;
}

Expand Down Expand Up @@ -135,4 +141,69 @@ int DeviceMetaIterator::load_internal_node(MetaIndexNode* meta_index_node) {
}
return ret;
}

void DeviceMetaIterator::try_setup_direct_lookup(MetaIndexNode* root_node) {
if (id_filter_ == nullptr) return;

const auto* eq = dynamic_cast<const TagEq*>(id_filter_);
if (eq == nullptr) return;

if (root_node->children_.empty()) return;

auto first_device = root_node->children_[0]->get_device_id();
if (first_device == nullptr) return;

auto first_segments = first_device->get_segments();
int actual_segment_count = static_cast<int>(first_segments.size());

if (actual_segment_count != 2) return;

std::string table_name = first_device->get_table_name();
std::vector<std::string> segs(actual_segment_count);
segs[0] = table_name;
for (int i = 1; i < actual_segment_count; i++) {
segs[i] = "";
}
segs[eq->col_idx_] = eq->value_;
direct_device_id_ = std::make_shared<StringArrayDeviceID>(segs);
direct_root_node_ = root_node;
}

int DeviceMetaIterator::load_results_direct() {
int ret = common::E_OK;
direct_lookup_done_ = true;

if (direct_device_id_ == nullptr) {
return common::E_OK;
}

auto device_comparable =
std::make_shared<DeviceIDComparable>(direct_device_id_);

std::shared_ptr<IMetaIndexEntry> device_index_entry;
int64_t end_offset = 0;

ret = io_reader_->load_device_index_entry(device_comparable,
device_index_entry, end_offset);

if (ret != common::E_OK || device_index_entry == nullptr) {
return common::E_OK;
}

int64_t start_offset = device_index_entry->get_offset();
MetaIndexNode* child_node = nullptr;
if (RET_FAIL(io_reader_->read_device_meta_index(start_offset, end_offset,
pa_, child_node, true))) {
return ret;
}

auto device_id = device_index_entry->get_device_id();
if (should_split_device_name) {
device_id->split_table_name();
}
result_cache_.push(std::make_pair(device_id, child_node));

return common::E_OK;
}

} // namespace storage
18 changes: 16 additions & 2 deletions cpp/src/reader/device_meta_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#define READER_DEVICE_META_ITERATOR_H

#include <queue>
#include <string>
#include <vector>

#include "file/tsfile_io_reader.h"
#include "reader/expression.h"
Expand All @@ -34,15 +36,19 @@ class DeviceMetaIterator {
const Filter* id_filter)
: io_reader_(io_reader),
id_filter_(id_filter),
should_split_device_name(false) {
should_split_device_name(false),
direct_lookup_done_(false) {
meta_index_nodes_.push(meat_index_node);
pa_.init(512, common::MOD_DEVICE_META_ITER);
try_setup_direct_lookup(meat_index_node);
}

DeviceMetaIterator(TsFileIOReader* io_reader,
const std::vector<MetaIndexNode*>& meta_index_node_list,
const Filter* id_filter)
: io_reader_(io_reader), id_filter_(id_filter) {
: io_reader_(io_reader),
id_filter_(id_filter),
direct_lookup_done_(false) {
for (auto meta_index_node : meta_index_node_list) {
meta_index_nodes_.push(meta_index_node);
}
Expand All @@ -62,13 +68,21 @@ class DeviceMetaIterator {
int load_results();
int load_leaf_device(MetaIndexNode* meta_index_node);
int load_internal_node(MetaIndexNode* meta_index_node);

void try_setup_direct_lookup(MetaIndexNode* root_node);
int load_results_direct();

TsFileIOReader* io_reader_;
std::queue<MetaIndexNode*> meta_index_nodes_;
std::queue<std::pair<std::shared_ptr<IDeviceID>, MetaIndexNode*>>
result_cache_;
const Filter* id_filter_;
common::PageArena pa_;
bool should_split_device_name;

bool direct_lookup_done_;
std::shared_ptr<IDeviceID> direct_device_id_;
MetaIndexNode* direct_root_node_ = nullptr;
};

} // end namespace storage
Expand Down
102 changes: 94 additions & 8 deletions cpp/src/reader/tsfile_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,71 @@
using namespace common;
using namespace storage;

namespace {

struct DeviceMetaEntry {
std::shared_ptr<IDeviceID> device_id;
int64_t start_offset;
int64_t end_offset;
};

int get_all_device_entries(std::vector<DeviceMetaEntry>& entries,
std::shared_ptr<MetaIndexNode> index_node,
ReadFile* read_file, PageArena& pa) {
Comment thread
JackieTien97 marked this conversation as resolved.
int ret = E_OK;
if (index_node == nullptr) {
return ret;
}
if (index_node->node_type_ == LEAF_DEVICE) {
for (size_t i = 0; i < index_node->children_.size(); i++) {
DeviceMetaEntry entry;
entry.device_id = index_node->children_[i]->get_device_id();
entry.start_offset = index_node->children_[i]->get_offset();
entry.end_offset = (i + 1 < index_node->children_.size())
? index_node->children_[i + 1]->get_offset()
: index_node->end_offset_;
entries.push_back(entry);
}
} else {
for (size_t idx = 0; idx < index_node->children_.size(); idx++) {
auto meta_index_entry = index_node->children_[idx];
int64_t start_offset = meta_index_entry->get_offset();
int64_t end_offset = index_node->end_offset_;
if (idx + 1 < index_node->children_.size()) {
end_offset = index_node->children_[idx + 1]->get_offset();
}
ASSERT(end_offset - start_offset > 0);
const int32_t read_size = (int32_t)(end_offset - start_offset);
int32_t ret_read_len = 0;
char* data_buf = (char*)pa.alloc(read_size);
void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode));
Comment thread
JackieTien97 marked this conversation as resolved.
if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) {
Comment thread
JackieTien97 marked this conversation as resolved.
return E_OOM;
}
auto* top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa);
auto top_node = std::shared_ptr<MetaIndexNode>(
top_node_ptr, [](MetaIndexNode* ptr) {
if (ptr) {
ptr->~MetaIndexNode();
}
});
if (RET_FAIL(read_file->read(start_offset, data_buf, read_size,
ret_read_len))) {
} else if (RET_FAIL(top_node->device_deserialize_from(data_buf,
read_size))) {
} else {
ret = get_all_device_entries(entries, top_node, read_file, pa);
Comment thread
JackieTien97 marked this conversation as resolved.
}
Comment thread
JackieTien97 marked this conversation as resolved.
if (ret != E_OK) {
return ret;
}
}
}
return ret;
}

} // namespace

namespace storage {
TsFileReader::TsFileReader()
: read_file_(nullptr),
Expand Down Expand Up @@ -367,8 +432,6 @@ int TsFileReader::get_timeseries_metadata_impl(
std::vector<std::shared_ptr<ITimeseriesIndex>>& result) {
int ret = E_OK;
std::vector<ITimeseriesIndex*> timeseries_indexs;
tsfile_reader_meta_pa_.init(512, MOD_TSFILE_READER);
// Pointers are owned by tsfile_reader_meta_pa_; shared_ptr must not delete
auto noop_deleter = [](ITimeseriesIndex*) {};
if (RET_FAIL(
tsfile_executor_->get_tsfile_io_reader()
Expand Down Expand Up @@ -397,13 +460,36 @@ DeviceTimeseriesMetadataMap TsFileReader::get_timeseries_metadata(
}

DeviceTimeseriesMetadataMap TsFileReader::get_timeseries_metadata() {
// Collect metadata for all devices present in the file
DeviceTimeseriesMetadataMap result;
auto device_ids = get_all_device_ids();
for (const auto& device_id : device_ids) {
std::vector<std::shared_ptr<ITimeseriesIndex>> list;
if (get_timeseries_metadata_impl(device_id, list) == E_OK) {
result.insert(std::make_pair(device_id, std::move(list)));
TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta();
if (tsfile_meta == nullptr) {
return result;
}

PageArena pa;
pa.init(512, MOD_TSFILE_READER);
std::vector<DeviceMetaEntry> entries;
for (auto& table_entry : tsfile_meta->table_metadata_index_node_map_) {
if (get_all_device_entries(entries, table_entry.second, read_file_,
pa) != E_OK) {
return result;
}
}

auto noop_deleter = [](ITimeseriesIndex*) {};
for (auto& device_entry : entries) {
std::vector<ITimeseriesIndex*> raw_ts_indexes;
if (tsfile_executor_->get_tsfile_io_reader()
->get_device_timeseries_meta_by_offset(
device_entry.start_offset, device_entry.end_offset,
raw_ts_indexes, tsfile_reader_meta_pa_) == E_OK) {
std::vector<std::shared_ptr<ITimeseriesIndex>> list;
for (auto ts_idx : raw_ts_indexes) {
list.emplace_back(
std::shared_ptr<ITimeseriesIndex>(ts_idx, noop_deleter));
}
result.insert(
std::make_pair(device_entry.device_id, std::move(list)));
}
}
return result;
Expand Down
Loading
Loading