From 35dabfe8dab08b645761947eb889c3e49b553237 Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Mon, 6 Oct 2025 15:12:25 +0200 Subject: [PATCH 01/12] First attempt for compression in serialization (not tested and to be cleaned up) --- CMakeLists.txt | 10 +++ cmake/Findlz4.cmake | 94 +++++++++++++++++++++ cmake/external_dependencies.cmake | 26 +++++- conanfile.py | 2 + environment-dev.yml | 4 +- include/sparrow_ipc/compression.hpp | 23 +++++ include/sparrow_ipc/serialize.hpp | 4 +- include/sparrow_ipc/serialize_utils.hpp | 31 +++---- src/compression.cpp | 98 ++++++++++++++++++++++ src/deserialize.cpp | 3 +- src/serialize_utils.cpp | 94 +++++++++++++++++++-- tests/test_de_serialization_with_files.cpp | 63 +++++++++++++- tests/test_serialize_utils.cpp | 11 +-- 13 files changed, 421 insertions(+), 42 deletions(-) create mode 100644 cmake/Findlz4.cmake create mode 100644 include/sparrow_ipc/compression.hpp create mode 100644 src/compression.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 86b80e3..ca5cba2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,6 +18,8 @@ set(SPARROW_IPC_COMPILE_DEFINITIONS "" CACHE STRING "List of public compile defi set(SPARROW_IPC_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/include) set(SPARROW_IPC_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src) +# set(ENV{PKG_CONFIG_PATH} "$ENV{CONDA_PREFIX}/lib/pkgconfig:$ENV{PKG_CONFIG_PATH}") + # Linter options # ============= OPTION(ACTIVATE_LINTER "Create targets to run clang-format" OFF) @@ -102,6 +104,7 @@ set(SPARROW_IPC_HEADERS ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/arrow_interface/arrow_schema/private_data.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/config.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/sparrow_ipc_version.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/compression.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_fixedsizebinary_array.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_primitive_array.hpp @@ -122,6 +125,7 @@ set(SPARROW_IPC_SRC ${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_array/private_data.cpp ${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema.cpp ${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema/private_data.cpp + ${SPARROW_IPC_SOURCE_DIR}/compression.cpp ${SPARROW_IPC_SOURCE_DIR}/deserialize_fixedsizebinary_array.cpp ${SPARROW_IPC_SOURCE_DIR}/deserialize_utils.cpp ${SPARROW_IPC_SOURCE_DIR}/deserialize.cpp @@ -239,8 +243,14 @@ target_link_libraries(sparrow-ipc PUBLIC sparrow::sparrow flatbuffers::flatbuffers + PRIVATE + lz4::lz4 ) +target_include_directories(sparrow-ipc PRIVATE ${LZ4_INCLUDE_DIRS}) +target_link_libraries(sparrow-ipc PRIVATE ${LZ4_LIBRARIES}) + + # Ensure generated headers are available when building sparrow-ipc add_dependencies(sparrow-ipc generate_flatbuffers_headers) diff --git a/cmake/Findlz4.cmake b/cmake/Findlz4.cmake new file mode 100644 index 0000000..e3d3e3b --- /dev/null +++ b/cmake/Findlz4.cmake @@ -0,0 +1,94 @@ +# Find LZ4 library and headers + +# This module defines: +# LZ4_FOUND - True if lz4 is found +# LZ4_INCLUDE_DIRS - LZ4 include directories +# LZ4_LIBRARIES - Libraries needed to use LZ4 +# LZ4_VERSION - LZ4 version number +# + +find_package(PkgConfig) +if(PKG_CONFIG_FOUND) + pkg_check_modules(LZ4 QUIET liblz4) + if(NOT LZ4_FOUND) + message(STATUS "Did not find 'liblz4.pc', trying 'lz4.pc'") + pkg_check_modules(LZ4 QUIET lz4) + endif() +endif() + +find_path(LZ4_INCLUDE_DIR lz4.h) +# HINTS ${LZ4_INCLUDEDIR} ${LZ4_INCLUDE_DIRS}) +find_library(LZ4_LIBRARY NAMES lz4 liblz4) +# HINTS ${LZ4_LIBDIR} ${LZ4_LIBRARY_DIRS}) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(lz4 DEFAULT_MSG + LZ4_LIBRARY LZ4_INCLUDE_DIR) +mark_as_advanced(LZ4_INCLUDE_DIR LZ4_LIBRARY) + +set(LZ4_LIBRARIES ${LZ4_LIBRARY}) +set(LZ4_INCLUDE_DIRS ${LZ4_INCLUDE_DIR}) + +if(LZ4_FOUND AND NOT TARGET lz4::lz4) + add_library(lz4::lz4 UNKNOWN IMPORTED) + set_target_properties(lz4::lz4 PROPERTIES + IMPORTED_LOCATION "${LZ4_LIBRARIES}" + INTERFACE_INCLUDE_DIRECTORIES "${LZ4_INCLUDE_DIRS}") + if (NOT TARGET LZ4::LZ4 AND TARGET lz4::lz4) + add_library(LZ4::LZ4 ALIAS lz4::lz4) + endif () +endif() + +#TODO add version? +########### + +# find_path(LZ4_INCLUDE_DIR +# NAMES lz4.h +# DOC "lz4 include directory") +# mark_as_advanced(LZ4_INCLUDE_DIR) +# find_library(LZ4_LIBRARY +# NAMES lz4 liblz4 +# DOC "lz4 library") +# mark_as_advanced(LZ4_LIBRARY) +# +# if (LZ4_INCLUDE_DIR) +# file(STRINGS "${LZ4_INCLUDE_DIR}/lz4.h" _lz4_version_lines +# REGEX "#define[ \t]+LZ4_VERSION_(MAJOR|MINOR|RELEASE)") +# string(REGEX REPLACE ".*LZ4_VERSION_MAJOR *\([0-9]*\).*" "\\1" _lz4_version_major "${_lz4_version_lines}") +# string(REGEX REPLACE ".*LZ4_VERSION_MINOR *\([0-9]*\).*" "\\1" _lz4_version_minor "${_lz4_version_lines}") +# string(REGEX REPLACE ".*LZ4_VERSION_RELEASE *\([0-9]*\).*" "\\1" _lz4_version_release "${_lz4_version_lines}") +# set(LZ4_VERSION "${_lz4_version_major}.${_lz4_version_minor}.${_lz4_version_release}") +# unset(_lz4_version_major) +# unset(_lz4_version_minor) +# unset(_lz4_version_release) +# unset(_lz4_version_lines) +# endif () +# +# include(FindPackageHandleStandardArgs) +# find_package_handle_standard_args(LZ4 +# REQUIRED_VARS LZ4_LIBRARY LZ4_INCLUDE_DIR +# VERSION_VAR LZ4_VERSION) +# +# if (LZ4_FOUND) +# # `lz4_FOUND` needs to be defined because: +# # - Other dependencies also resolve LZ4 using `find_package(lz4 ...)` +# # - CMake's syntax is case-sensitive +# # +# # See: +# # - https://github.com/facebook/rocksdb/blob/0836a2b26dfbbe30c15e8cebf47771917d55e760/cmake/RocksDBConfig.cmake.in#L36 +# # - https://github.com/facebook/rocksdb/blob/0836a2b26dfbbe30c15e8cebf47771917d55e760/cmake/modules/Findlz4.cmake#L17 +# # - https://github.com/man-group/ArcticDB/pull/961 +# set(lz4_FOUND TRUE) +# set(LZ4_INCLUDE_DIRS "${LZ4_INCLUDE_DIR}") +# set(LZ4_LIBRARIES "${LZ4_LIBRARY}") +# +# if (NOT TARGET LZ4::LZ4) +# add_library(LZ4::LZ4 UNKNOWN IMPORTED) +# set_target_properties(LZ4::LZ4 PROPERTIES +# IMPORTED_LOCATION "${LZ4_LIBRARY}" +# INTERFACE_INCLUDE_DIRECTORIES "${LZ4_INCLUDE_DIR}") +# if (NOT TARGET lz4::lz4 AND TARGET LZ4::LZ4) +# add_library(lz4::lz4 ALIAS LZ4::LZ4) +# endif () +# endif () +# endif () diff --git a/cmake/external_dependencies.cmake b/cmake/external_dependencies.cmake index 0276425..b10f777 100644 --- a/cmake/external_dependencies.cmake +++ b/cmake/external_dependencies.cmake @@ -79,6 +79,19 @@ if(NOT TARGET flatbuffers::flatbuffers) endif() unset(FLATBUFFERS_BUILD_TESTS CACHE) +############################### +#TODO need to add fetch for zstd +# find_package_or_fetch( +# PACKAGE_NAME lz4 +# GIT_REPOSITORY https://github.com/lz4/lz4.git +# TAG v1.10.0 +# ) +find_package(lz4 REQUIRED) +# if(NOT TARGET lz4::lz4) +# add_library(lz4::lz4 ALIAS lz4) +# endif() + +############################### if(SPARROW_IPC_BUILD_TESTS) find_package_or_fetch( PACKAGE_NAME doctest @@ -109,10 +122,18 @@ if(SPARROW_IPC_BUILD_TESTS) ) message(STATUS "\t✅ Fetched arrow-testing") - # Iterate over all the files in the arrow-testing-data source directiory. When it's a gz, extract in place. - file(GLOB_RECURSE arrow_testing_data_targz_files CONFIGURE_DEPENDS + # Fetch all the files in the 1.0.0-littleendian directory + file(GLOB_RECURSE arrow_testing_data_targz_files_littleendian CONFIGURE_DEPENDS "${arrow-testing_SOURCE_DIR}/data/arrow-ipc-stream/integration/1.0.0-littleendian/*.json.gz" ) + # Fetch all the files in the 2.0.0-compression directory + file(GLOB_RECURSE arrow_testing_data_targz_files_compression CONFIGURE_DEPENDS + "${arrow-testing_SOURCE_DIR}/data/arrow-ipc-stream/integration/2.0.0-compression/*.json.gz" + ) + + # Combine lists of files + list(APPEND arrow_testing_data_targz_files ${arrow_testing_data_targz_files_littleendian} ${arrow_testing_data_targz_files_compression}) + # Iterate over all the files in the arrow-testing-data source directory. When it's a gz, extract in place. foreach(file_path IN LISTS arrow_testing_data_targz_files) cmake_path(GET file_path PARENT_PATH parent_dir) cmake_path(GET file_path STEM filename) @@ -128,5 +149,4 @@ if(SPARROW_IPC_BUILD_TESTS) endif() endif() endforeach() - endif() diff --git a/conanfile.py b/conanfile.py index 59916f8..1783a32 100644 --- a/conanfile.py +++ b/conanfile.py @@ -45,6 +45,8 @@ def configure(self): def requirements(self): self.requires("sparrow/1.0.0") self.requires(f"flatbuffers/{self._flatbuffers_version}") + self.requires("lz4/1.9.4") + self.requires("zstd/1.5.5") if self.options.get_safe("build_tests"): self.test_requires("doctest/2.4.12") diff --git a/environment-dev.yml b/environment-dev.yml index 7a3f086..a659628 100644 --- a/environment-dev.yml +++ b/environment-dev.yml @@ -8,8 +8,10 @@ dependencies: - cxx-compiler # Libraries dependencies - flatbuffers - - nlohmann_json - sparrow-devel >=1.1.2 + - nlohmann_json + - lz4 + #- zstd - doctest # Documentation dependencies - doxygen diff --git a/include/sparrow_ipc/compression.hpp b/include/sparrow_ipc/compression.hpp new file mode 100644 index 0000000..9bf51e7 --- /dev/null +++ b/include/sparrow_ipc/compression.hpp @@ -0,0 +1,23 @@ +#pragma once + +#include +#include +#include + +#include "Message_generated.h" + +namespace sparrow_ipc +{ +// enum class CompressionType // TODO class ? enum cf. mamba? +// { +// NONE, +// LZ4, +// ZSTD +// }; + +// CompressionType to_compression_type(org::apache::arrow::flatbuf::CompressionType compression_type); + + std::vector compress(org::apache::arrow::flatbuf::CompressionType compression_type, std::span data); +// std::vector decompress(CompressionType type, std::span data); + +} diff --git a/include/sparrow_ipc/serialize.hpp b/include/sparrow_ipc/serialize.hpp index 1ab8003..d3a7d16 100644 --- a/include/sparrow_ipc/serialize.hpp +++ b/include/sparrow_ipc/serialize.hpp @@ -38,7 +38,7 @@ namespace sparrow_ipc */ template requires std::same_as, sparrow::record_batch> - std::vector serialize(const R& record_batches) + std::vector serialize(const R& record_batches, std::optional compression) { if (record_batches.empty()) { @@ -51,7 +51,7 @@ namespace sparrow_ipc ); } std::vector serialized_schema = serialize_schema_message(record_batches[0]); - std::vector serialized_record_batches = serialize_record_batches_without_schema_message(record_batches); + std::vector serialized_record_batches = serialize_record_batches_without_schema_message(record_batches, compression); serialized_schema.insert( serialized_schema.end(), std::make_move_iterator(serialized_record_batches.begin()), diff --git a/include/sparrow_ipc/serialize_utils.hpp b/include/sparrow_ipc/serialize_utils.hpp index 9ead8ea..b0804cc 100644 --- a/include/sparrow_ipc/serialize_utils.hpp +++ b/include/sparrow_ipc/serialize_utils.hpp @@ -10,6 +10,7 @@ #include "sparrow_ipc/config/config.hpp" #include "sparrow_ipc/magic_values.hpp" #include "sparrow_ipc/utils.hpp" +#include "compression.hpp" namespace sparrow_ipc { @@ -41,35 +42,21 @@ namespace sparrow_ipc * consists of a metadata section followed by a body section containing the actual data. * * @param record_batch The sparrow record batch to be serialized + * TODO add parameter compression here and every place it was added to * @return std::vector A byte vector containing the complete serialized record batch * in Arrow IPC format, ready for transmission or storage */ [[nodiscard]] SPARROW_IPC_API std::vector - serialize_record_batch(const sparrow::record_batch& record_batch); + serialize_record_batch(const sparrow::record_batch& record_batch, std::optional compression); template requires std::same_as, sparrow::record_batch> - /** - * @brief Serializes a collection of record batches into a single byte vector. - * - * This function takes a range or container of record batches and serializes each one - * individually, then concatenates all the serialized data into a single output vector. - * The serialization is performed by calling serialize_record_batch() for each record batch - * in the input collection. - * - * @tparam R The type of the record batch container/range (must be iterable) - * @param record_batches A collection of record batches to be serialized - * @return std::vector A byte vector containing the serialized data of all record batches - * - * @note The function uses move iterators to efficiently transfer the serialized data - * from individual record batches to the output vector. - */ - [[nodiscard]] std::vector serialize_record_batches_without_schema_message(const R& record_batches) + [[nodiscard]] std::vector serialize_record_batches_without_schema_message(const R& record_batches, std::optional compression) { std::vector output; for (const auto& record_batch : record_batches) { - const auto rb_serialized = serialize_record_batch(record_batch); + const auto rb_serialized = serialize_record_batch(record_batch, compression); output.insert( output.end(), std::make_move_iterator(rb_serialized.begin()), @@ -215,6 +202,8 @@ namespace sparrow_ipc std::vector& nodes ); + void fill_body_and_get_buffers_compressed(const sparrow::arrow_proxy& arrow_proxy, std::vector& body, std::vector& flatbuf_buffers, int64_t& offset, org::apache::arrow::flatbuf::CompressionType compression_type); + /** * @brief Creates a vector of Apache Arrow FieldNode objects from a record batch. * @@ -345,7 +334,9 @@ namespace sparrow_ipc [[nodiscard]] SPARROW_IPC_API flatbuffers::FlatBufferBuilder get_record_batch_message_builder( const sparrow::record_batch& record_batch, const std::vector& nodes, - const std::vector& buffers + const std::vector& buffers, + int64_t body_size, + std::optional compression ); /** @@ -366,7 +357,7 @@ namespace sparrow_ipc * includes both metadata and data portions of the record batch */ [[nodiscard]] SPARROW_IPC_API std::vector - serialize_record_batch(const sparrow::record_batch& record_batch); + serialize_record_batch(const sparrow::record_batch& record_batch, std::optional compression); /** * @brief Adds padding bytes to a buffer to ensure 8-byte alignment. diff --git a/src/compression.cpp b/src/compression.cpp new file mode 100644 index 0000000..41fdb89 --- /dev/null +++ b/src/compression.cpp @@ -0,0 +1,98 @@ +#include +#include +#include +#include + +#include "sparrow_ipc/compression.hpp" + +namespace sparrow_ipc +{ + // TODO not sure we need this unless if we need it to hide flatbuffers dependency +// CompressionType to_compression_type(org::apache::arrow::flatbuf::CompressionType compression_type) +// { +// switch (compression_type) +// { +// case org::apache::arrow::flatbuf::CompressionType::LZ4_FRAME: +// return CompressionType::LZ4; +// // case org::apache::arrow::flatbuf::CompressionType::ZSTD: +// // // TODO: Add ZSTD support +// // break; +// default: +// return CompressionType::NONE; +// } +// } + + std::vector compress(org::apache::arrow::flatbuf::CompressionType compression_type, std::span data) + { + if (data.empty()) + { + return {}; + } + switch (compression_type) + { + case org::apache::arrow::flatbuf::CompressionType::LZ4_FRAME: + { + int64_t uncompressed_size = data.size(); + const size_t max_compressed_size = LZ4F_compressFrameBound(uncompressed_size, nullptr); + std::vector compressed_data(max_compressed_size); + const size_t compressed_size = LZ4F_compressFrame(compressed_data.data(), max_compressed_size, data.data(), uncompressed_size, nullptr); + if (LZ4F_isError(compressed_size)) + { + throw std::runtime_error("Failed to compress data with LZ4 frame format"); + } + compressed_data.resize(compressed_size); + return compressed_data; + } +// case CompressionType::NONE: + default: + return {data.begin(), data.end()}; + } + } + +// std::vector decompress(CompressionType type, std::span data) +// { +// switch (type) +// { +// case CompressionType::LZ4: +// { +// if (data.empty()) +// { +// return {}; +// } +// if (data.size() < sizeof(int64_t)) +// { +// throw std::runtime_error("Invalid LZ4 compressed data: missing uncompressed size"); +// } +// const int64_t uncompressed_size = *reinterpret_cast(data.data()); +// if (uncompressed_size == -1) +// { +// return {data.begin() + sizeof(uncompressed_size), data.end()}; +// } +// +// std::vector decompressed_data(uncompressed_size); +// LZ4F_dctx *dctx; +// if (LZ4F_isError(LZ4F_createDecompressionContext(&dctx, LZ4F_VERSION))) +// { +// throw std::runtime_error("Failed to create LZ4 decompression context"); +// } +// +// size_t decompressed_size = uncompressed_size; +// size_t src_size = data.size() - sizeof(uncompressed_size); +// const size_t result = LZ4F_decompress(dctx, decompressed_data.data(), &decompressed_size, data.data() + sizeof(uncompressed_size), &src_size, nullptr); +// +// LZ4F_freeDecompressionContext(dctx); +// +// if (LZ4F_isError(result) || decompressed_size != (size_t)uncompressed_size) +// { +// throw std::runtime_error("Failed to decompress data with LZ4 frame format"); +// } +// +// return decompressed_data; +// } +// case CompressionType::NONE: +// default: +// return {data.begin(), data.end()}; +// } +// } + +} diff --git a/src/deserialize.cpp b/src/deserialize.cpp index 0d13072..1776673 100644 --- a/src/deserialize.cpp +++ b/src/deserialize.cpp @@ -49,7 +49,6 @@ namespace sparrow_ipc const std::vector>>& field_metadata ) { - const size_t length = static_cast(record_batch.length()); size_t buffer_index = 0; std::vector arrays; @@ -270,4 +269,4 @@ namespace sparrow_ipc } while (true); return record_batches; } -} \ No newline at end of file +} diff --git a/src/serialize_utils.cpp b/src/serialize_utils.cpp index ac1e026..aa275ee 100644 --- a/src/serialize_utils.cpp +++ b/src/serialize_utils.cpp @@ -1,4 +1,5 @@ #include +#include #include "sparrow_ipc/magic_values.hpp" #include "sparrow_ipc/serialize.hpp" @@ -181,6 +182,41 @@ namespace sparrow_ipc return buffers; } + // TODO to be factorized later after testing using arrow testing files + void fill_body_and_get_buffers_compressed(const sparrow::arrow_proxy& arrow_proxy, std::vector& body, std::vector& flatbuf_buffers, int64_t& offset, org::apache::arrow::flatbuf::CompressionType compression_type) + { + const auto& buffers = arrow_proxy.buffers(); + for (const auto& buffer : buffers) + { + if (buffer.size() > 0) + { + auto compressed_buffer = compress(compression_type, {buffer.data(), buffer.size()}); + int64_t uncompressed_size = buffer.size(); + body.insert(body.end(), reinterpret_cast(&uncompressed_size), reinterpret_cast(&uncompressed_size) + sizeof(uncompressed_size)); + body.insert(body.end(), compressed_buffer.begin(), compressed_buffer.end()); + add_padding(body); +// const auto current_buffer_content_size = sizeof(uncompressed_size) + compressed_buffer.size(); +// const auto padding = utils::align_to_8(current_buffer_content_size) - current_buffer_content_size; +// if (padding > 0) +// { +// body.insert(body.end(), padding, 0); +// } + +// flatbuf_buffers.emplace_back(offset, current_buffer_content_size); // ? + padding); + flatbuf_buffers.emplace_back(offset, sizeof(uncompressed_size) + compressed_buffer.size()); + offset = body.size(); + } + else + { + flatbuf_buffers.emplace_back(offset, 0); + } + } + for (const auto& child : arrow_proxy.children()) + { + fill_body_and_get_buffers_compressed(child, body, flatbuf_buffers, offset, compression_type); + } + } + void fill_body(const sparrow::arrow_proxy& arrow_proxy, std::vector& body) { for (const auto& buffer : arrow_proxy.buffers()) @@ -236,23 +272,31 @@ namespace sparrow_ipc flatbuffers::FlatBufferBuilder get_record_batch_message_builder( const sparrow::record_batch& record_batch, const std::vector& nodes, - const std::vector& buffers + const std::vector& buffers, + int64_t body_size, + std::optional compression ) { flatbuffers::FlatBufferBuilder record_batch_builder; auto nodes_offset = record_batch_builder.CreateVectorOfStructs(nodes); auto buffers_offset = record_batch_builder.CreateVectorOfStructs(buffers); + flatbuffers::Offset compression_offset = 0; + if (compression) + { + // TODO check BodyCompressionMethod::BUFFER, when use other values? + compression_offset = org::apache::arrow::flatbuf::CreateBodyCompression(record_batch_builder, *compression, org::apache::arrow::flatbuf::BodyCompressionMethod::BUFFER); + } const auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch( record_batch_builder, static_cast(record_batch.nb_rows()), nodes_offset, buffers_offset, - 0, // TODO: Compression + compression_offset, 0 // TODO :variadic buffer Counts ); - const int64_t body_size = calculate_body_size(record_batch); +// const int64_t body_size = calculate_body_size(record_batch); const auto record_batch_message_offset = org::apache::arrow::flatbuf::CreateMessage( record_batch_builder, org::apache::arrow::flatbuf::MetadataVersion::V5, @@ -265,14 +309,48 @@ namespace sparrow_ipc return record_batch_builder; } - std::vector serialize_record_batch(const sparrow::record_batch& record_batch) + std::vector serialize_record_batch(const sparrow::record_batch& record_batch, std::optional compression)//const CompressionType compression) { std::vector nodes = create_fieldnodes(record_batch); - std::vector flatbuf_buffers = get_buffers(record_batch); + + ///////////////////////////////////////////////////////////////// + + std::vector body; + std::vector flatbuf_buffers; + int64_t body_size = 0; + + if (compression) + { + int64_t offset = 0; + for (const auto& column : record_batch.columns()) + { + const auto& arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(column); + fill_body_and_get_buffers_compressed(arrow_proxy, body, flatbuf_buffers, offset, *compression); + } + body_size = body.size(); + } + else + { + body = generate_body(record_batch); + flatbuf_buffers = get_buffers(record_batch); + body_size = calculate_body_size(record_batch); + } +// TODO use here when everything works and added/using conversion enum fct +// std::optional compression_opt = std::nullopt; +// if (compression == CompressionType::LZ4) +// { +// compression_opt = org::apache::arrow::flatbuf::CompressionType::LZ4_FRAME; +// } + + //////////////////////////////////////////////////////////////// + +// std::vector flatbuf_buffers = get_buffers(record_batch); flatbuffers::FlatBufferBuilder record_batch_builder = get_record_batch_message_builder( record_batch, nodes, - flatbuf_buffers + flatbuf_buffers, + body_size, + compression ); std::vector output; output.insert(output.end(), continuation.begin(), continuation.end()); @@ -288,7 +366,7 @@ namespace sparrow_ipc record_batch_builder.GetBufferPointer() + record_batch_len ); add_padding(output); - std::vector body = generate_body(record_batch); +// std::vector body = generate_body(record_batch); output.insert(output.end(), std::make_move_iterator(body.begin()), std::make_move_iterator(body.end())); return output; } @@ -302,4 +380,4 @@ namespace sparrow_ipc ); } -} \ No newline at end of file +} diff --git a/tests/test_de_serialization_with_files.cpp b/tests/test_de_serialization_with_files.cpp index 8fe825b..9b5f1bf 100644 --- a/tests/test_de_serialization_with_files.cpp +++ b/tests/test_de_serialization_with_files.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -21,6 +22,9 @@ const std::filesystem::path arrow_testing_data_dir = ARROW_TESTING_DATA_DIR; const std::filesystem::path tests_resources_files_path = arrow_testing_data_dir / "data" / "arrow-ipc-stream" / "integration" / "1.0.0-littleendian"; +const std::filesystem::path tests_resources_files_path_with_compression = arrow_testing_data_dir / "data" / "arrow-ipc-stream" + / "integration" / "2.0.0-compression"; + const std::vector files_paths_to_test = { tests_resources_files_path / "generated_primitive", tests_resources_files_path / "generated_primitive_large_offsets", @@ -28,6 +32,14 @@ const std::vector files_paths_to_test = { // tests_resources_files_path / "generated_primitive_no_batches" }; +const std::vector files_paths_to_test_with_compression = { + tests_resources_files_path_with_compression / "generated_lz4" +// tests_resources_files_path_with_compression/ "generated_uncompressible_lz4" +// tests_resources_files_path_with_compression / "generated_zstd" +// tests_resources_files_path_with_compression/ "generated_uncompressible_zstd" +}; + + size_t get_number_of_batches(const std::filesystem::path& json_path) { std::ifstream json_file(json_path); @@ -162,7 +174,7 @@ TEST_SUITE("Integration tests") std::span(stream_data) ); - const auto serialized_data = sparrow_ipc::serialize(record_batches_from_json); + const auto serialized_data = sparrow_ipc::serialize(record_batches_from_json, std::nullopt); const auto deserialized_serialized_data = sparrow_ipc::deserialize_stream( std::span(serialized_data) ); @@ -170,4 +182,53 @@ TEST_SUITE("Integration tests") } } } + + TEST_CASE("Serialization with LZ4 compression") + { + for (const auto& file_path : files_paths_to_test_with_compression) + { + std::filesystem::path json_path = file_path; + json_path.replace_extension(".json"); + const std::string test_name = "Testing LZ4 compression with " + file_path.filename().string(); + SUBCASE(test_name.c_str()) + { + // Load the JSON file + auto json_data = load_json_file(json_path); + CHECK(json_data != nullptr); + + const size_t num_batches = get_number_of_batches(json_path); + std::vector record_batches_from_json; + for (size_t batch_idx = 0; batch_idx < num_batches; ++batch_idx) + { + INFO("Processing batch " << batch_idx << " of " << num_batches); + record_batches_from_json.emplace_back( + sparrow::json_reader::build_record_batch_from_json(json_data, batch_idx) + ); + } + + // Load stream file + std::filesystem::path stream_file_path = file_path; + stream_file_path.replace_extension(".stream"); + std::ifstream stream_file(stream_file_path, std::ios::in | std::ios::binary); + REQUIRE(stream_file.is_open()); + const std::vector stream_data( + (std::istreambuf_iterator(stream_file)), + (std::istreambuf_iterator()) + ); + stream_file.close(); + + // Process the stream file + const auto record_batches_from_stream = sparrow_ipc::deserialize_stream( + std::span(stream_data) + ); + + const auto serialized_data = sparrow_ipc::serialize(record_batches_from_json, std::nullopt); +// const auto deserialized_serialized_data = sparrow_ipc::deserialize_stream( +// std::span(serialized_data) +// ); +// compare_record_batches(record_batches_from_stream, deserialized_serialized_data); + } + + } + } } diff --git a/tests/test_serialize_utils.cpp b/tests/test_serialize_utils.cpp index 2997843..237c161 100644 --- a/tests/test_serialize_utils.cpp +++ b/tests/test_serialize_utils.cpp @@ -292,7 +292,8 @@ namespace sparrow_ipc auto record_batch = create_test_record_batch(); auto nodes = create_fieldnodes(record_batch); auto buffers = get_buffers(record_batch); - auto builder = get_record_batch_message_builder(record_batch, nodes, buffers); + auto body_size = calculate_body_size(record_batch); + auto builder = get_record_batch_message_builder(record_batch, nodes, buffers, body_size, std::nullopt); CHECK_GT(builder.GetSize(), 0); CHECK_NE(builder.GetBufferPointer(), nullptr); } @@ -303,7 +304,7 @@ namespace sparrow_ipc SUBCASE("Valid record batch") { auto record_batch = create_test_record_batch(); - auto serialized = serialize_record_batch(record_batch); + auto serialized = serialize_record_batch(record_batch, std::nullopt); CHECK_GT(serialized.size(), 0); // Check that it starts with continuation bytes @@ -335,7 +336,7 @@ namespace sparrow_ipc SUBCASE("Empty record batch") { auto empty_batch = sp::record_batch({}); - auto serialized = serialize_record_batch(empty_batch); + auto serialized = serialize_record_batch(empty_batch, std::nullopt); CHECK_GT(serialized.size(), 0); CHECK_GE(serialized.size(), continuation.size()); } @@ -348,7 +349,7 @@ namespace sparrow_ipc auto record_batch = create_test_record_batch(); auto schema_serialized = serialize_schema_message(record_batch); - auto record_batch_serialized = serialize_record_batch(record_batch); + auto record_batch_serialized = serialize_record_batch(record_batch, std::nullopt); CHECK_GT(schema_serialized.size(), 0); CHECK_GT(record_batch_serialized.size(), 0); @@ -361,4 +362,4 @@ namespace sparrow_ipc CHECK_EQ(schema_serialized.size() % 8, 0); } } -} \ No newline at end of file +} From 22224d031b9e56e409adf855ab0d53e540764c65 Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Mon, 6 Oct 2025 16:11:50 +0200 Subject: [PATCH 02/12] Clean up and fetch --- cmake/Findlz4.cmake | 52 --------------------- cmake/external_dependencies.cmake | 14 +++--- include/sparrow_ipc/compression.hpp | 7 ++- include/sparrow_ipc/serialize.hpp | 1 + include/sparrow_ipc/serialize_utils.hpp | 7 ++- src/compression.cpp | 60 +++---------------------- src/serialize_utils.cpp | 29 ++---------- 7 files changed, 23 insertions(+), 147 deletions(-) diff --git a/cmake/Findlz4.cmake b/cmake/Findlz4.cmake index e3d3e3b..2b9e9c0 100644 --- a/cmake/Findlz4.cmake +++ b/cmake/Findlz4.cmake @@ -40,55 +40,3 @@ if(LZ4_FOUND AND NOT TARGET lz4::lz4) endif() #TODO add version? -########### - -# find_path(LZ4_INCLUDE_DIR -# NAMES lz4.h -# DOC "lz4 include directory") -# mark_as_advanced(LZ4_INCLUDE_DIR) -# find_library(LZ4_LIBRARY -# NAMES lz4 liblz4 -# DOC "lz4 library") -# mark_as_advanced(LZ4_LIBRARY) -# -# if (LZ4_INCLUDE_DIR) -# file(STRINGS "${LZ4_INCLUDE_DIR}/lz4.h" _lz4_version_lines -# REGEX "#define[ \t]+LZ4_VERSION_(MAJOR|MINOR|RELEASE)") -# string(REGEX REPLACE ".*LZ4_VERSION_MAJOR *\([0-9]*\).*" "\\1" _lz4_version_major "${_lz4_version_lines}") -# string(REGEX REPLACE ".*LZ4_VERSION_MINOR *\([0-9]*\).*" "\\1" _lz4_version_minor "${_lz4_version_lines}") -# string(REGEX REPLACE ".*LZ4_VERSION_RELEASE *\([0-9]*\).*" "\\1" _lz4_version_release "${_lz4_version_lines}") -# set(LZ4_VERSION "${_lz4_version_major}.${_lz4_version_minor}.${_lz4_version_release}") -# unset(_lz4_version_major) -# unset(_lz4_version_minor) -# unset(_lz4_version_release) -# unset(_lz4_version_lines) -# endif () -# -# include(FindPackageHandleStandardArgs) -# find_package_handle_standard_args(LZ4 -# REQUIRED_VARS LZ4_LIBRARY LZ4_INCLUDE_DIR -# VERSION_VAR LZ4_VERSION) -# -# if (LZ4_FOUND) -# # `lz4_FOUND` needs to be defined because: -# # - Other dependencies also resolve LZ4 using `find_package(lz4 ...)` -# # - CMake's syntax is case-sensitive -# # -# # See: -# # - https://github.com/facebook/rocksdb/blob/0836a2b26dfbbe30c15e8cebf47771917d55e760/cmake/RocksDBConfig.cmake.in#L36 -# # - https://github.com/facebook/rocksdb/blob/0836a2b26dfbbe30c15e8cebf47771917d55e760/cmake/modules/Findlz4.cmake#L17 -# # - https://github.com/man-group/ArcticDB/pull/961 -# set(lz4_FOUND TRUE) -# set(LZ4_INCLUDE_DIRS "${LZ4_INCLUDE_DIR}") -# set(LZ4_LIBRARIES "${LZ4_LIBRARY}") -# -# if (NOT TARGET LZ4::LZ4) -# add_library(LZ4::LZ4 UNKNOWN IMPORTED) -# set_target_properties(LZ4::LZ4 PROPERTIES -# IMPORTED_LOCATION "${LZ4_LIBRARY}" -# INTERFACE_INCLUDE_DIRECTORIES "${LZ4_INCLUDE_DIR}") -# if (NOT TARGET lz4::lz4 AND TARGET LZ4::LZ4) -# add_library(lz4::lz4 ALIAS LZ4::LZ4) -# endif () -# endif () -# endif () diff --git a/cmake/external_dependencies.cmake b/cmake/external_dependencies.cmake index b10f777..ac5bc66 100644 --- a/cmake/external_dependencies.cmake +++ b/cmake/external_dependencies.cmake @@ -79,19 +79,15 @@ if(NOT TARGET flatbuffers::flatbuffers) endif() unset(FLATBUFFERS_BUILD_TESTS CACHE) -############################### -#TODO need to add fetch for zstd -# find_package_or_fetch( -# PACKAGE_NAME lz4 -# GIT_REPOSITORY https://github.com/lz4/lz4.git -# TAG v1.10.0 -# ) -find_package(lz4 REQUIRED) +find_package_or_fetch( + PACKAGE_NAME lz4 + GIT_REPOSITORY https://github.com/lz4/lz4.git + TAG v1.10.0 +) # if(NOT TARGET lz4::lz4) # add_library(lz4::lz4 ALIAS lz4) # endif() -############################### if(SPARROW_IPC_BUILD_TESTS) find_package_or_fetch( PACKAGE_NAME doctest diff --git a/include/sparrow_ipc/compression.hpp b/include/sparrow_ipc/compression.hpp index 9bf51e7..14403a7 100644 --- a/include/sparrow_ipc/compression.hpp +++ b/include/sparrow_ipc/compression.hpp @@ -8,7 +8,8 @@ namespace sparrow_ipc { -// enum class CompressionType // TODO class ? enum cf. mamba? +// TODO use these later if needed for wrapping purposes (flatbuffers/lz4) +// enum class CompressionType // { // NONE, // LZ4, @@ -17,7 +18,5 @@ namespace sparrow_ipc // CompressionType to_compression_type(org::apache::arrow::flatbuf::CompressionType compression_type); - std::vector compress(org::apache::arrow::flatbuf::CompressionType compression_type, std::span data); -// std::vector decompress(CompressionType type, std::span data); - + std::vector compress(org::apache::arrow::flatbuf::CompressionType compression_type, std::span data); } diff --git a/include/sparrow_ipc/serialize.hpp b/include/sparrow_ipc/serialize.hpp index d3a7d16..edc1752 100644 --- a/include/sparrow_ipc/serialize.hpp +++ b/include/sparrow_ipc/serialize.hpp @@ -26,6 +26,7 @@ namespace sparrow_ipc * @tparam R Container type that holds record batches (must support empty(), operator[], begin(), end()) * @param record_batches Collection of record batches to serialize. All batches must have identical * schemas. + * @param compression The compression type to use when serializing * * @return std::vector Binary serialized data containing schema, record batches, and * end-of-stream marker. Returns empty vector if input collection is empty. diff --git a/include/sparrow_ipc/serialize_utils.hpp b/include/sparrow_ipc/serialize_utils.hpp index b0804cc..e555de0 100644 --- a/include/sparrow_ipc/serialize_utils.hpp +++ b/include/sparrow_ipc/serialize_utils.hpp @@ -42,7 +42,7 @@ namespace sparrow_ipc * consists of a metadata section followed by a body section containing the actual data. * * @param record_batch The sparrow record batch to be serialized - * TODO add parameter compression here and every place it was added to + * @param compression The compression type to use when serializing * @return std::vector A byte vector containing the complete serialized record batch * in Arrow IPC format, ready for transmission or storage */ @@ -322,6 +322,8 @@ namespace sparrow_ipc * @param nodes Vector of field nodes describing the structure and null counts of columns * @param buffers Vector of buffer descriptors containing offset and length information * for the data buffers + * @param body_size The body size + * @param compression The compression type to use when serializing * * @return A FlatBufferBuilder containing the complete serialized message ready for * transmission or storage. The builder is finished and ready to be accessed @@ -335,7 +337,7 @@ namespace sparrow_ipc const sparrow::record_batch& record_batch, const std::vector& nodes, const std::vector& buffers, - int64_t body_size, + const int64_t body_size, std::optional compression ); @@ -350,6 +352,7 @@ namespace sparrow_ipc * - The record batch body containing the actual data buffers * * @param record_batch The sparrow record batch to serialize + * @param compression The compression type to use when serializing * @return std::vector A byte vector containing the serialized record batch * in Arrow IPC format, ready for transmission or storage * diff --git a/src/compression.cpp b/src/compression.cpp index 41fdb89..964d304 100644 --- a/src/compression.cpp +++ b/src/compression.cpp @@ -1,13 +1,11 @@ -#include -#include #include -#include + +#include #include "sparrow_ipc/compression.hpp" namespace sparrow_ipc { - // TODO not sure we need this unless if we need it to hide flatbuffers dependency // CompressionType to_compression_type(org::apache::arrow::flatbuf::CompressionType compression_type) // { // switch (compression_type) @@ -22,7 +20,7 @@ namespace sparrow_ipc // } // } - std::vector compress(org::apache::arrow::flatbuf::CompressionType compression_type, std::span data) + std::vector compress(org::apache::arrow::flatbuf::CompressionType compression_type, std::span data) { if (data.empty()) { @@ -32,9 +30,9 @@ namespace sparrow_ipc { case org::apache::arrow::flatbuf::CompressionType::LZ4_FRAME: { - int64_t uncompressed_size = data.size(); + std::int64_t uncompressed_size = data.size(); const size_t max_compressed_size = LZ4F_compressFrameBound(uncompressed_size, nullptr); - std::vector compressed_data(max_compressed_size); + std::vector compressed_data(max_compressed_size); const size_t compressed_size = LZ4F_compressFrame(compressed_data.data(), max_compressed_size, data.data(), uncompressed_size, nullptr); if (LZ4F_isError(compressed_size)) { @@ -43,56 +41,8 @@ namespace sparrow_ipc compressed_data.resize(compressed_size); return compressed_data; } -// case CompressionType::NONE: default: return {data.begin(), data.end()}; } } - -// std::vector decompress(CompressionType type, std::span data) -// { -// switch (type) -// { -// case CompressionType::LZ4: -// { -// if (data.empty()) -// { -// return {}; -// } -// if (data.size() < sizeof(int64_t)) -// { -// throw std::runtime_error("Invalid LZ4 compressed data: missing uncompressed size"); -// } -// const int64_t uncompressed_size = *reinterpret_cast(data.data()); -// if (uncompressed_size == -1) -// { -// return {data.begin() + sizeof(uncompressed_size), data.end()}; -// } -// -// std::vector decompressed_data(uncompressed_size); -// LZ4F_dctx *dctx; -// if (LZ4F_isError(LZ4F_createDecompressionContext(&dctx, LZ4F_VERSION))) -// { -// throw std::runtime_error("Failed to create LZ4 decompression context"); -// } -// -// size_t decompressed_size = uncompressed_size; -// size_t src_size = data.size() - sizeof(uncompressed_size); -// const size_t result = LZ4F_decompress(dctx, decompressed_data.data(), &decompressed_size, data.data() + sizeof(uncompressed_size), &src_size, nullptr); -// -// LZ4F_freeDecompressionContext(dctx); -// -// if (LZ4F_isError(result) || decompressed_size != (size_t)uncompressed_size) -// { -// throw std::runtime_error("Failed to decompress data with LZ4 frame format"); -// } -// -// return decompressed_data; -// } -// case CompressionType::NONE: -// default: -// return {data.begin(), data.end()}; -// } -// } - } diff --git a/src/serialize_utils.cpp b/src/serialize_utils.cpp index aa275ee..2295b82 100644 --- a/src/serialize_utils.cpp +++ b/src/serialize_utils.cpp @@ -195,14 +195,7 @@ namespace sparrow_ipc body.insert(body.end(), reinterpret_cast(&uncompressed_size), reinterpret_cast(&uncompressed_size) + sizeof(uncompressed_size)); body.insert(body.end(), compressed_buffer.begin(), compressed_buffer.end()); add_padding(body); -// const auto current_buffer_content_size = sizeof(uncompressed_size) + compressed_buffer.size(); -// const auto padding = utils::align_to_8(current_buffer_content_size) - current_buffer_content_size; -// if (padding > 0) -// { -// body.insert(body.end(), padding, 0); -// } -// flatbuf_buffers.emplace_back(offset, current_buffer_content_size); // ? + padding); flatbuf_buffers.emplace_back(offset, sizeof(uncompressed_size) + compressed_buffer.size()); offset = body.size(); } @@ -273,7 +266,7 @@ namespace sparrow_ipc const sparrow::record_batch& record_batch, const std::vector& nodes, const std::vector& buffers, - int64_t body_size, + const int64_t body_size, std::optional compression ) { @@ -284,8 +277,7 @@ namespace sparrow_ipc flatbuffers::Offset compression_offset = 0; if (compression) { - // TODO check BodyCompressionMethod::BUFFER, when use other values? - compression_offset = org::apache::arrow::flatbuf::CreateBodyCompression(record_batch_builder, *compression, org::apache::arrow::flatbuf::BodyCompressionMethod::BUFFER); + compression_offset = org::apache::arrow::flatbuf::CreateBodyCompression(record_batch_builder, compression.value(), org::apache::arrow::flatbuf::BodyCompressionMethod::BUFFER); } const auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch( record_batch_builder, @@ -296,7 +288,6 @@ namespace sparrow_ipc 0 // TODO :variadic buffer Counts ); -// const int64_t body_size = calculate_body_size(record_batch); const auto record_batch_message_offset = org::apache::arrow::flatbuf::CreateMessage( record_batch_builder, org::apache::arrow::flatbuf::MetadataVersion::V5, @@ -309,12 +300,10 @@ namespace sparrow_ipc return record_batch_builder; } - std::vector serialize_record_batch(const sparrow::record_batch& record_batch, std::optional compression)//const CompressionType compression) + std::vector serialize_record_batch(const sparrow::record_batch& record_batch, std::optional compression) { std::vector nodes = create_fieldnodes(record_batch); - ///////////////////////////////////////////////////////////////// - std::vector body; std::vector flatbuf_buffers; int64_t body_size = 0; @@ -325,7 +314,7 @@ namespace sparrow_ipc for (const auto& column : record_batch.columns()) { const auto& arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(column); - fill_body_and_get_buffers_compressed(arrow_proxy, body, flatbuf_buffers, offset, *compression); + fill_body_and_get_buffers_compressed(arrow_proxy, body, flatbuf_buffers, offset, compression.value()); } body_size = body.size(); } @@ -335,16 +324,7 @@ namespace sparrow_ipc flatbuf_buffers = get_buffers(record_batch); body_size = calculate_body_size(record_batch); } -// TODO use here when everything works and added/using conversion enum fct -// std::optional compression_opt = std::nullopt; -// if (compression == CompressionType::LZ4) -// { -// compression_opt = org::apache::arrow::flatbuf::CompressionType::LZ4_FRAME; -// } - - //////////////////////////////////////////////////////////////// -// std::vector flatbuf_buffers = get_buffers(record_batch); flatbuffers::FlatBufferBuilder record_batch_builder = get_record_batch_message_builder( record_batch, nodes, @@ -366,7 +346,6 @@ namespace sparrow_ipc record_batch_builder.GetBufferPointer() + record_batch_len ); add_padding(output); -// std::vector body = generate_body(record_batch); output.insert(output.end(), std::make_move_iterator(body.begin()), std::make_move_iterator(body.end())); return output; } From c63fd968b78b45bbaf10b0c437edac4530b74301 Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Mon, 6 Oct 2025 17:38:00 +0200 Subject: [PATCH 03/12] More cleanup --- CMakeLists.txt | 3 --- conanfile.py | 2 +- environment-dev.yml | 6 +++--- include/sparrow_ipc/serialize_utils.hpp | 18 +++++++++++++++++- src/serialize_utils.cpp | 1 - tests/test_de_serialization_with_files.cpp | 1 - 6 files changed, 21 insertions(+), 10 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ca5cba2..e542038 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,8 +18,6 @@ set(SPARROW_IPC_COMPILE_DEFINITIONS "" CACHE STRING "List of public compile defi set(SPARROW_IPC_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/include) set(SPARROW_IPC_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src) -# set(ENV{PKG_CONFIG_PATH} "$ENV{CONDA_PREFIX}/lib/pkgconfig:$ENV{PKG_CONFIG_PATH}") - # Linter options # ============= OPTION(ACTIVATE_LINTER "Create targets to run clang-format" OFF) @@ -250,7 +248,6 @@ target_link_libraries(sparrow-ipc target_include_directories(sparrow-ipc PRIVATE ${LZ4_INCLUDE_DIRS}) target_link_libraries(sparrow-ipc PRIVATE ${LZ4_LIBRARIES}) - # Ensure generated headers are available when building sparrow-ipc add_dependencies(sparrow-ipc generate_flatbuffers_headers) diff --git a/conanfile.py b/conanfile.py index 1783a32..e2f251a 100644 --- a/conanfile.py +++ b/conanfile.py @@ -46,7 +46,7 @@ def requirements(self): self.requires("sparrow/1.0.0") self.requires(f"flatbuffers/{self._flatbuffers_version}") self.requires("lz4/1.9.4") - self.requires("zstd/1.5.5") + #self.requires("zstd/1.5.5") if self.options.get_safe("build_tests"): self.test_requires("doctest/2.4.12") diff --git a/environment-dev.yml b/environment-dev.yml index a659628..b05e4fd 100644 --- a/environment-dev.yml +++ b/environment-dev.yml @@ -8,10 +8,10 @@ dependencies: - cxx-compiler # Libraries dependencies - flatbuffers - - sparrow-devel >=1.1.2 - - nlohmann_json - lz4 - #- zstd + - nlohmann_json + - sparrow-devel >=1.1.2 + # Testing dependencies - doctest # Documentation dependencies - doxygen diff --git a/include/sparrow_ipc/serialize_utils.hpp b/include/sparrow_ipc/serialize_utils.hpp index e555de0..80dbccd 100644 --- a/include/sparrow_ipc/serialize_utils.hpp +++ b/include/sparrow_ipc/serialize_utils.hpp @@ -8,9 +8,9 @@ #include "Message_generated.h" #include "sparrow_ipc/config/config.hpp" +#include "sparrow_ipc/compression.hpp" #include "sparrow_ipc/magic_values.hpp" #include "sparrow_ipc/utils.hpp" -#include "compression.hpp" namespace sparrow_ipc { @@ -51,6 +51,22 @@ namespace sparrow_ipc template requires std::same_as, sparrow::record_batch> + /** + * @brief Serializes a collection of record batches into a single byte vector. + * + * This function takes a range or container of record batches and serializes each one + * individually, then concatenates all the serialized data into a single output vector. + * The serialization is performed by calling serialize_record_batch() for each record batch + * in the input collection. + * + * @tparam R The type of the record batch container/range (must be iterable) + * @param record_batches A collection of record batches to be serialized + * @param compression The compression type to use when serializing + * @return std::vector A byte vector containing the serialized data of all record batches + * + * @note The function uses move iterators to efficiently transfer the serialized data + * from individual record batches to the output vector. + */ [[nodiscard]] std::vector serialize_record_batches_without_schema_message(const R& record_batches, std::optional compression) { std::vector output; diff --git a/src/serialize_utils.cpp b/src/serialize_utils.cpp index 2295b82..2dbf9af 100644 --- a/src/serialize_utils.cpp +++ b/src/serialize_utils.cpp @@ -1,5 +1,4 @@ #include -#include #include "sparrow_ipc/magic_values.hpp" #include "sparrow_ipc/serialize.hpp" diff --git a/tests/test_de_serialization_with_files.cpp b/tests/test_de_serialization_with_files.cpp index 9b5f1bf..ff8ae17 100644 --- a/tests/test_de_serialization_with_files.cpp +++ b/tests/test_de_serialization_with_files.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include From f14295cc5abcc808ba09153baf6f97c4bced699a Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Tue, 7 Oct 2025 10:17:31 +0200 Subject: [PATCH 04/12] Fix fetching lz4 --- CMakeLists.txt | 3 --- cmake/external_dependencies.cmake | 33 +++++++++++++++++++++++++------ 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index e542038..ef5dc52 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -245,9 +245,6 @@ target_link_libraries(sparrow-ipc lz4::lz4 ) -target_include_directories(sparrow-ipc PRIVATE ${LZ4_INCLUDE_DIRS}) -target_link_libraries(sparrow-ipc PRIVATE ${LZ4_LIBRARIES}) - # Ensure generated headers are available when building sparrow-ipc add_dependencies(sparrow-ipc generate_flatbuffers_headers) diff --git a/cmake/external_dependencies.cmake b/cmake/external_dependencies.cmake index ac5bc66..2bc20fd 100644 --- a/cmake/external_dependencies.cmake +++ b/cmake/external_dependencies.cmake @@ -11,8 +11,8 @@ endif() function(find_package_or_fetch) set(options) - set(oneValueArgs CONAN_PKG_NAME PACKAGE_NAME GIT_REPOSITORY TAG) - set(multiValueArgs) + set(oneValueArgs CONAN_PKG_NAME PACKAGE_NAME GIT_REPOSITORY TAG SOURCE_SUBDIR) + set(multiValueArgs CMAKE_ARGS) cmake_parse_arguments(PARSE_ARGV 0 arg "${options}" "${oneValueArgs}" "${multiValueArgs}" ) @@ -29,7 +29,14 @@ function(find_package_or_fetch) if(FETCH_DEPENDENCIES_WITH_CMAKE STREQUAL "ON" OR FETCH_DEPENDENCIES_WITH_CMAKE STREQUAL "MISSING") if(NOT ${actual_pkg_name}_FOUND) message(STATUS "📦 Fetching ${arg_PACKAGE_NAME}") - FetchContent_Declare( + # Apply CMAKE_ARGS before fetching + foreach(cmake_arg ${arg_CMAKE_ARGS}) + string(REGEX MATCH "^([^=]+)=(.*)$" _ ${cmake_arg}) + if(CMAKE_MATCH_1) + set(${CMAKE_MATCH_1} ${CMAKE_MATCH_2} CACHE BOOL "" FORCE) + endif() + endforeach() + set(fetch_args ${arg_PACKAGE_NAME} GIT_SHALLOW TRUE GIT_REPOSITORY ${arg_GIT_REPOSITORY} @@ -37,6 +44,10 @@ function(find_package_or_fetch) GIT_PROGRESS TRUE SYSTEM EXCLUDE_FROM_ALL) + if(arg_SOURCE_SUBDIR) + list(APPEND fetch_args SOURCE_SUBDIR ${arg_SOURCE_SUBDIR}) + endif() + FetchContent_Declare(${fetch_args}) FetchContent_MakeAvailable(${arg_PACKAGE_NAME}) message(STATUS "\t✅ Fetched ${arg_PACKAGE_NAME}") else() @@ -79,14 +90,24 @@ if(NOT TARGET flatbuffers::flatbuffers) endif() unset(FLATBUFFERS_BUILD_TESTS CACHE) +# Fetching lz4 +# Disable bundled mode to allow shared libraries if needed +# lz4 is built as static by default if bundled +# set(LZ4_BUNDLED_MODE OFF CACHE BOOL "" FORCE) +# set(BUILD_SHARED_LIBS ON CACHE BOOL "" FORCE) find_package_or_fetch( PACKAGE_NAME lz4 GIT_REPOSITORY https://github.com/lz4/lz4.git TAG v1.10.0 + SOURCE_SUBDIR build/cmake + CMAKE_ARGS + "LZ4_BUILD_CLI=OFF" + "LZ4_BUILD_LEGACY_LZ4C=OFF" ) -# if(NOT TARGET lz4::lz4) -# add_library(lz4::lz4 ALIAS lz4) -# endif() + +if(NOT TARGET lz4::lz4) + add_library(lz4::lz4 ALIAS lz4) +endif() if(SPARROW_IPC_BUILD_TESTS) find_package_or_fetch( From ed87642aa762bf38237320d600ec78e8fa0de8d1 Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Wed, 8 Oct 2025 13:53:23 +0200 Subject: [PATCH 05/12] Add lz4 targets to be exported --- CMakeLists.txt | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index ef5dc52..49e6719 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -301,6 +301,25 @@ if (TARGET flatbuffers) endif() endif() +if (TARGET lz4) + get_target_property(is_imported lz4 IMPORTED) + if(NOT is_imported) + # This means `lz4` was fetched using FetchContent + # We need to export `lz4` target explicitly + list(APPEND SPARROW_IPC_EXPORTED_TARGETS lz4) + endif() +endif() + +if (TARGET lz4_static) + get_target_property(is_imported lz4_static IMPORTED) + if(NOT is_imported) + # `lz4_static` is needed as this is the actual library + # and `lz4` is an interface pointing to it. + # If `lz4_shared` is used instead for some reason, modify this accordingly + list(APPEND SPARROW_IPC_EXPORTED_TARGETS lz4_static) + endif() +endif() + install(TARGETS ${SPARROW_IPC_EXPORTED_TARGETS} EXPORT ${PROJECT_NAME}-targets) From 413dffe981c04a927433b514da5ea7463c9550f7 Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Fri, 10 Oct 2025 11:50:21 +0200 Subject: [PATCH 06/12] Add lz4 compression in deserialization --- include/sparrow_ipc/compression.hpp | 1 + .../deserialize_primitive_array.hpp | 81 ++++++++++++-- ...deserialize_variable_size_binary_array.hpp | 103 +++++++++++++++--- src/compression.cpp | 40 +++++++ tests/test_de_serialization_with_files.cpp | 24 ++-- 5 files changed, 209 insertions(+), 40 deletions(-) diff --git a/include/sparrow_ipc/compression.hpp b/include/sparrow_ipc/compression.hpp index 14403a7..b21a8b1 100644 --- a/include/sparrow_ipc/compression.hpp +++ b/include/sparrow_ipc/compression.hpp @@ -19,4 +19,5 @@ namespace sparrow_ipc // CompressionType to_compression_type(org::apache::arrow::flatbuf::CompressionType compression_type); std::vector compress(org::apache::arrow::flatbuf::CompressionType compression_type, std::span data); + std::vector decompress(org::apache::arrow::flatbuf::CompressionType compression_type, std::span data); } diff --git a/include/sparrow_ipc/deserialize_primitive_array.hpp b/include/sparrow_ipc/deserialize_primitive_array.hpp index a1c5dad..13ab4b9 100644 --- a/include/sparrow_ipc/deserialize_primitive_array.hpp +++ b/include/sparrow_ipc/deserialize_primitive_array.hpp @@ -9,10 +9,30 @@ #include "Message_generated.h" #include "sparrow_ipc/arrow_interface/arrow_array.hpp" #include "sparrow_ipc/arrow_interface/arrow_schema.hpp" +#include "sparrow_ipc/compression.hpp" #include "sparrow_ipc/deserialize_utils.hpp" namespace sparrow_ipc { + namespace + { + struct DecompressedBuffers + { + std::vector validity_buffer; + std::vector data_buffer; + }; + + void release_decompressed_buffers(ArrowArray* array) + { + if (array->private_data) + { + delete static_cast(array->private_data); + array->private_data = nullptr; + } + array->release = nullptr; + } + } + template [[nodiscard]] sparrow::primitive_array deserialize_non_owning_primitive_array( const org::apache::arrow::flatbuf::RecordBatch& record_batch, @@ -22,6 +42,46 @@ namespace sparrow_ipc size_t& buffer_index ) { + const auto compression = record_batch.compression(); + DecompressedBuffers* decompressed_buffers_owner = nullptr; + + // Validity buffer + const auto validity_buffer_metadata = record_batch.buffers()->Get(buffer_index++); + auto validity_buffer_span = body.subspan(validity_buffer_metadata->offset(), validity_buffer_metadata->length()); + if (compression) + { + if (!decompressed_buffers_owner) + { + decompressed_buffers_owner = new DecompressedBuffers(); + } + decompressed_buffers_owner->validity_buffer = decompress(compression->codec(), validity_buffer_span); + validity_buffer_span = decompressed_buffers_owner->validity_buffer; + } + auto bitmap_ptr = const_cast(validity_buffer_span.data()); + const sparrow::dynamic_bitset_view bitmap_view{ + bitmap_ptr, + static_cast(record_batch.length())}; + auto null_count = bitmap_view.null_count(); + if (validity_buffer_metadata->length() == 0) + { + bitmap_ptr = nullptr; + null_count = 0; + } + + // Data buffer + const auto primitive_buffer_metadata = record_batch.buffers()->Get(buffer_index++); + auto data_buffer_span = body.subspan(primitive_buffer_metadata->offset(), primitive_buffer_metadata->length()); + if (compression) + { + if (!decompressed_buffers_owner) + { + decompressed_buffers_owner = new DecompressedBuffers(); + } + decompressed_buffers_owner->data_buffer = decompress(compression->codec(), data_buffer_span); + data_buffer_span = decompressed_buffers_owner->data_buffer; + } + auto primitives_ptr = const_cast(data_buffer_span.data()); + const std::string_view format = data_type_to_format( sparrow::detail::get_data_type_from_array>::get() ); @@ -34,17 +94,7 @@ namespace sparrow_ipc nullptr, nullptr ); - const auto [bitmap_ptr, null_count] = utils::get_bitmap_pointer_and_null_count( - record_batch, - body, - buffer_index++ - ); - const auto primitive_buffer_metadata = record_batch.buffers()->Get(buffer_index++); - if (body.size() < (primitive_buffer_metadata->offset() + primitive_buffer_metadata->length())) - { - throw std::runtime_error("Primitive buffer exceeds body size"); - } - auto primitives_ptr = const_cast(body.data() + primitive_buffer_metadata->offset()); + std::vector buffers = {bitmap_ptr, primitives_ptr}; ArrowArray array = make_non_owning_arrow_array( record_batch.length(), @@ -55,7 +105,14 @@ namespace sparrow_ipc nullptr, nullptr ); + + if (decompressed_buffers_owner) + { + array.private_data = decompressed_buffers_owner; + array.release = release_decompressed_buffers; + } + sparrow::arrow_proxy ap{std::move(array), std::move(schema)}; return sparrow::primitive_array{std::move(ap)}; } -} \ No newline at end of file +} diff --git a/include/sparrow_ipc/deserialize_variable_size_binary_array.hpp b/include/sparrow_ipc/deserialize_variable_size_binary_array.hpp index f6a5729..36c99f9 100644 --- a/include/sparrow_ipc/deserialize_variable_size_binary_array.hpp +++ b/include/sparrow_ipc/deserialize_variable_size_binary_array.hpp @@ -8,10 +8,32 @@ #include "Message_generated.h" #include "sparrow_ipc/arrow_interface/arrow_array.hpp" #include "sparrow_ipc/arrow_interface/arrow_schema.hpp" +#include "sparrow_ipc/compression.hpp" #include "sparrow_ipc/deserialize_utils.hpp" namespace sparrow_ipc { + // TODO after handling deserialize_primitive_array, do the same here and then in other data types + namespace + { + struct DecompressedBinaryBuffers + { + std::vector validity_buffer; + std::vector offset_buffer; + std::vector data_buffer; + }; + + void release_decompressed_binary_buffers(ArrowArray* array) + { + if (array->private_data) + { + delete static_cast(array->private_data); + array->private_data = nullptr; + } + array->release = nullptr; + } + } + template [[nodiscard]] T deserialize_non_owning_variable_size_binary( const org::apache::arrow::flatbuf::RecordBatch& record_batch, @@ -21,6 +43,61 @@ namespace sparrow_ipc size_t& buffer_index ) { + const auto compression = record_batch.compression(); + DecompressedBinaryBuffers* decompressed_buffers_owner = nullptr; + + // Validity buffer + const auto validity_buffer_metadata = record_batch.buffers()->Get(buffer_index++); + auto validity_buffer_span = body.subspan(validity_buffer_metadata->offset(), validity_buffer_metadata->length()); + if (compression && validity_buffer_metadata->length() > 0) + { + if (!decompressed_buffers_owner) + { + decompressed_buffers_owner = new DecompressedBinaryBuffers(); + } + decompressed_buffers_owner->validity_buffer = decompress(compression->codec(), validity_buffer_span); + validity_buffer_span = decompressed_buffers_owner->validity_buffer; + } + + uint8_t* bitmap_ptr = nullptr; + int64_t null_count = 0; + if (validity_buffer_metadata->length() > 0) + { + bitmap_ptr = const_cast(validity_buffer_span.data()); + const sparrow::dynamic_bitset_view bitmap_view{ + bitmap_ptr, + static_cast(record_batch.length())}; + null_count = bitmap_view.null_count(); + } + + // Offset buffer + const auto offset_metadata = record_batch.buffers()->Get(buffer_index++); + auto offset_buffer_span = body.subspan(offset_metadata->offset(), offset_metadata->length()); + if (compression) + { + if (!decompressed_buffers_owner) + { + decompressed_buffers_owner = new DecompressedBinaryBuffers(); + } + decompressed_buffers_owner->offset_buffer = decompress(compression->codec(), offset_buffer_span); + offset_buffer_span = decompressed_buffers_owner->offset_buffer; + } + auto offset_ptr = const_cast(offset_buffer_span.data()); + + // Data buffer + const auto buffer_metadata = record_batch.buffers()->Get(buffer_index++); + auto data_buffer_span = body.subspan(buffer_metadata->offset(), buffer_metadata->length()); + if (compression) + { + if (!decompressed_buffers_owner) + { + decompressed_buffers_owner = new DecompressedBinaryBuffers(); + } + decompressed_buffers_owner->data_buffer = decompress(compression->codec(), data_buffer_span); + data_buffer_span = decompressed_buffers_owner->data_buffer; + } + auto buffer_ptr = const_cast(data_buffer_span.data()); + const std::string_view format = data_type_to_format(sparrow::detail::get_data_type_from_array::get()); ArrowSchema schema = make_non_owning_arrow_schema( format, @@ -31,24 +108,7 @@ namespace sparrow_ipc nullptr, nullptr ); - const auto [bitmap_ptr, null_count] = utils::get_bitmap_pointer_and_null_count( - record_batch, - body, - buffer_index++ - ); - const auto offset_metadata = record_batch.buffers()->Get(buffer_index++); - if ((offset_metadata->offset() + offset_metadata->length()) > body.size()) - { - throw std::runtime_error("Offset buffer exceeds body size"); - } - auto offset_ptr = const_cast(body.data() + offset_metadata->offset()); - const auto buffer_metadata = record_batch.buffers()->Get(buffer_index++); - if ((buffer_metadata->offset() + buffer_metadata->length()) > body.size()) - { - throw std::runtime_error("Data buffer exceeds body size"); - } - auto buffer_ptr = const_cast(body.data() + buffer_metadata->offset()); std::vector buffers = {bitmap_ptr, offset_ptr, buffer_ptr}; ArrowArray array = make_non_owning_arrow_array( record_batch.length(), @@ -59,7 +119,14 @@ namespace sparrow_ipc nullptr, nullptr ); + + if (decompressed_buffers_owner) + { + array.private_data = decompressed_buffers_owner; + array.release = release_decompressed_binary_buffers; + } + sparrow::arrow_proxy ap{std::move(array), std::move(schema)}; return T{std::move(ap)}; } -} \ No newline at end of file +} diff --git a/src/compression.cpp b/src/compression.cpp index 964d304..59d6d56 100644 --- a/src/compression.cpp +++ b/src/compression.cpp @@ -45,4 +45,44 @@ namespace sparrow_ipc return {data.begin(), data.end()}; } } + + std::vector decompress(org::apache::arrow::flatbuf::CompressionType compression_type, std::span data) + { + if (data.empty()) + { + return {}; + } + switch (compression_type) + { + case org::apache::arrow::flatbuf::CompressionType::LZ4_FRAME: + { + if (data.size() < 8) + { + throw std::runtime_error("Invalid compressed data: missing decompressed size"); + } + const std::int64_t decompressed_size = *reinterpret_cast(data.data()); + const auto compressed_data = data.subspan(8); + + if (decompressed_size == -1) + { + return {compressed_data.begin(), compressed_data.end()}; + } + + std::vector decompressed_data(decompressed_size); + LZ4F_dctx* dctx = nullptr; + LZ4F_createDecompressionContext(&dctx, LZ4F_VERSION); + size_t compressed_size_in_out = compressed_data.size(); + size_t decompressed_size_in_out = decompressed_size; + size_t result = LZ4F_decompress(dctx, decompressed_data.data(), &decompressed_size_in_out, compressed_data.data(), &compressed_size_in_out, nullptr); + if (LZ4F_isError(result)) + { + throw std::runtime_error("Failed to decompress data with LZ4 frame format"); + } + LZ4F_freeDecompressionContext(dctx); + return decompressed_data; + } + default: + return {data.begin(), data.end()}; + } + } } diff --git a/tests/test_de_serialization_with_files.cpp b/tests/test_de_serialization_with_files.cpp index ff8ae17..6459581 100644 --- a/tests/test_de_serialization_with_files.cpp +++ b/tests/test_de_serialization_with_files.cpp @@ -32,8 +32,8 @@ const std::vector files_paths_to_test = { }; const std::vector files_paths_to_test_with_compression = { - tests_resources_files_path_with_compression / "generated_lz4" -// tests_resources_files_path_with_compression/ "generated_uncompressible_lz4" + tests_resources_files_path_with_compression / "generated_lz4", + tests_resources_files_path_with_compression/ "generated_uncompressible_lz4" // tests_resources_files_path_with_compression / "generated_zstd" // tests_resources_files_path_with_compression/ "generated_uncompressible_zstd" }; @@ -66,21 +66,27 @@ void compare_record_batches( ) { REQUIRE_EQ(record_batches_1.size(), record_batches_2.size()); +// std::cout << "record_batches1 size: " << record_batches_1.size() << " record_batches2 size: " << record_batches_2.size() << std::endl; for (size_t i = 0; i < record_batches_1.size(); ++i) { for (size_t y = 0; y < record_batches_1[i].nb_columns(); y++) { +// std::cout << "record_batches1 nb cols: " << record_batches_1[i].nb_columns() << " record_batches2 nb cols: " << record_batches_2[i].nb_columns() << std::endl; const auto& column_1 = record_batches_1[i].get_column(y); const auto& column_2 = record_batches_2[i].get_column(y); REQUIRE_EQ(column_1.size(), column_2.size()); +// std::cout << "column_1.size(): " << column_1.size() << " column_2.size(): " << column_2.size() << std::endl; for (size_t z = 0; z < column_1.size(); z++) { const auto col_name = column_1.name().value_or("NA"); INFO("Comparing batch " << i << ", column " << y << " named :" << col_name << " , row " << z); +// std::cout << "Comparing batch " << i << ", column " << y << " named :" << col_name << " , row " << z << std::endl; REQUIRE_EQ(column_1.data_type(), column_2.data_type()); CHECK_EQ(column_1.name(), column_2.name()); +// std::cout << "column_1.name() :" << column_1.name() << " and " << column_2.name() << std::endl; const auto& column_1_value = column_1[z]; const auto& column_2_value = column_2[z]; +// std::cout << "column_1_value :" << column_1_value << " and " << column_2_value << std::endl; CHECK_EQ(column_1_value, column_2_value); } } @@ -182,7 +188,7 @@ TEST_SUITE("Integration tests") } } - TEST_CASE("Serialization with LZ4 compression") + TEST_CASE("Compare record_batch serialization with stream file using LZ4 compression") { for (const auto& file_path : files_paths_to_test_with_compression) { @@ -220,14 +226,12 @@ TEST_SUITE("Integration tests") const auto record_batches_from_stream = sparrow_ipc::deserialize_stream( std::span(stream_data) ); - - const auto serialized_data = sparrow_ipc::serialize(record_batches_from_json, std::nullopt); -// const auto deserialized_serialized_data = sparrow_ipc::deserialize_stream( -// std::span(serialized_data) -// ); -// compare_record_batches(record_batches_from_stream, deserialized_serialized_data); + const auto serialized_data = sparrow_ipc::serialize(record_batches_from_json, org::apache::arrow::flatbuf::CompressionType::LZ4_FRAME); + const auto deserialized_serialized_data = sparrow_ipc::deserialize_stream( + std::span(serialized_data) + ); + compare_record_batches(record_batches_from_stream, deserialized_serialized_data); } - } } } From a1639007c4bbd5f3337d9151f47dad72760e91ba Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Mon, 13 Oct 2025 11:46:07 +0200 Subject: [PATCH 07/12] Add owning_arrow_array_private_data --- CMakeLists.txt | 2 + .../arrow_interface/arrow_array.hpp | 11 ++- .../arrow_array/private_data.hpp | 30 ++++++++ .../deserialize_primitive_array.hpp | 76 ++++++++----------- src/arrow_interface/arrow_array.cpp | 58 ++++++++++++++ 5 files changed, 130 insertions(+), 47 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 49e6719..166bb09 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -103,6 +103,7 @@ set(SPARROW_IPC_HEADERS ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/config.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/sparrow_ipc_version.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/compression.hpp + #${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/decompressed_buffers.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_fixedsizebinary_array.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_primitive_array.hpp @@ -124,6 +125,7 @@ set(SPARROW_IPC_SRC ${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema.cpp ${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema/private_data.cpp ${SPARROW_IPC_SOURCE_DIR}/compression.cpp + #${SPARROW_IPC_SOURCE_DIR}/decompressed_buffers.cpp ${SPARROW_IPC_SOURCE_DIR}/deserialize_fixedsizebinary_array.cpp ${SPARROW_IPC_SOURCE_DIR}/deserialize_utils.cpp ${SPARROW_IPC_SOURCE_DIR}/deserialize.cpp diff --git a/include/sparrow_ipc/arrow_interface/arrow_array.hpp b/include/sparrow_ipc/arrow_interface/arrow_array.hpp index 2f1f72d..9348c44 100644 --- a/include/sparrow_ipc/arrow_interface/arrow_array.hpp +++ b/include/sparrow_ipc/arrow_interface/arrow_array.hpp @@ -1,4 +1,3 @@ - #pragma once #include @@ -9,6 +8,16 @@ namespace sparrow_ipc { + [[nodiscard]] SPARROW_IPC_API ArrowArray make_owning_arrow_array( + int64_t length, + int64_t null_count, + int64_t offset, + std::vector>&& buffers, + size_t children_count, + ArrowArray** children, + ArrowArray* dictionary + ); + [[nodiscard]] SPARROW_IPC_API ArrowArray make_non_owning_arrow_array( int64_t length, int64_t null_count, diff --git a/include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp b/include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp index 90e633f..265d9e2 100644 --- a/include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp +++ b/include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp @@ -7,6 +7,36 @@ namespace sparrow_ipc { + class owning_arrow_array_private_data + { + public: + + explicit owning_arrow_array_private_data(std::vector>&& buffers) + : m_buffers(std::move(buffers)) + { + m_buffer_pointers.reserve(m_buffers.size()); + for (const auto& buffer : m_buffers) + { + m_buffer_pointers.push_back(buffer.data()); + } + } + + const void** buffers_ptrs() noexcept + { + return m_buffer_pointers.data(); + } + + std::size_t n_buffers() const noexcept + { + return m_buffers.size(); + } + + private: + + std::vector> m_buffers; + std::vector m_buffer_pointers; + }; + class non_owning_arrow_array_private_data { public: diff --git a/include/sparrow_ipc/deserialize_primitive_array.hpp b/include/sparrow_ipc/deserialize_primitive_array.hpp index 13ab4b9..6578565 100644 --- a/include/sparrow_ipc/deserialize_primitive_array.hpp +++ b/include/sparrow_ipc/deserialize_primitive_array.hpp @@ -14,25 +14,6 @@ namespace sparrow_ipc { - namespace - { - struct DecompressedBuffers - { - std::vector validity_buffer; - std::vector data_buffer; - }; - - void release_decompressed_buffers(ArrowArray* array) - { - if (array->private_data) - { - delete static_cast(array->private_data); - array->private_data = nullptr; - } - array->release = nullptr; - } - } - template [[nodiscard]] sparrow::primitive_array deserialize_non_owning_primitive_array( const org::apache::arrow::flatbuf::RecordBatch& record_batch, @@ -43,19 +24,15 @@ namespace sparrow_ipc ) { const auto compression = record_batch.compression(); - DecompressedBuffers* decompressed_buffers_owner = nullptr; // Validity buffer const auto validity_buffer_metadata = record_batch.buffers()->Get(buffer_index++); auto validity_buffer_span = body.subspan(validity_buffer_metadata->offset(), validity_buffer_metadata->length()); + std::vector> decompressed_buffers; if (compression) { - if (!decompressed_buffers_owner) - { - decompressed_buffers_owner = new DecompressedBuffers(); - } - decompressed_buffers_owner->validity_buffer = decompress(compression->codec(), validity_buffer_span); - validity_buffer_span = decompressed_buffers_owner->validity_buffer; + decompressed_buffers.emplace_back(decompress(compression->codec(), validity_buffer_span)); + validity_buffer_span = decompressed_buffers.back(); } auto bitmap_ptr = const_cast(validity_buffer_span.data()); const sparrow::dynamic_bitset_view bitmap_view{ @@ -73,12 +50,8 @@ namespace sparrow_ipc auto data_buffer_span = body.subspan(primitive_buffer_metadata->offset(), primitive_buffer_metadata->length()); if (compression) { - if (!decompressed_buffers_owner) - { - decompressed_buffers_owner = new DecompressedBuffers(); - } - decompressed_buffers_owner->data_buffer = decompress(compression->codec(), data_buffer_span); - data_buffer_span = decompressed_buffers_owner->data_buffer; + decompressed_buffers.emplace_back(decompress(compression->codec(), data_buffer_span)); + data_buffer_span = decompressed_buffers.back(); } auto primitives_ptr = const_cast(data_buffer_span.data()); @@ -95,24 +68,35 @@ namespace sparrow_ipc nullptr ); - std::vector buffers = {bitmap_ptr, primitives_ptr}; - ArrowArray array = make_non_owning_arrow_array( - record_batch.length(), - null_count, - 0, - std::move(buffers), - 0, - nullptr, - nullptr - ); - - if (decompressed_buffers_owner) + ArrowArray array; + if (compression) + { + array = make_owning_arrow_array( + record_batch.length(), + null_count, + 0, + std::move(decompressed_buffers), + 0, + nullptr, + nullptr + ); + } + else { - array.private_data = decompressed_buffers_owner; - array.release = release_decompressed_buffers; + std::vector buffers = {bitmap_ptr, primitives_ptr}; + array = make_non_owning_arrow_array( + record_batch.length(), + null_count, + 0, + std::move(buffers), + 0, + nullptr, + nullptr + ); } sparrow::arrow_proxy ap{std::move(array), std::move(schema)}; return sparrow::primitive_array{std::move(ap)}; } } + diff --git a/src/arrow_interface/arrow_array.cpp b/src/arrow_interface/arrow_array.cpp index ed0a0f2..7afe070 100644 --- a/src/arrow_interface/arrow_array.cpp +++ b/src/arrow_interface/arrow_array.cpp @@ -10,6 +10,64 @@ namespace sparrow_ipc { + void release_owning_arrow_array(ArrowArray* array) + { + SPARROW_ASSERT_FALSE(array == nullptr) + SPARROW_ASSERT_TRUE(array->release == std::addressof(release_owning_arrow_array)) + + if (array->private_data) + { + delete static_cast(array->private_data); + array->private_data = nullptr; + } + + for (int64_t i = 0; i < array->n_children; ++i) + { + if (array->children[i] && array->children[i]->release) + { + array->children[i]->release(array->children[i]); + } + } + delete[] array->children; + array->children = nullptr; + + if (array->dictionary && array->dictionary->release) + { + array->dictionary->release(array->dictionary); + } + delete array->dictionary; + array->dictionary = nullptr; + + array->release = nullptr; + } + + ArrowArray make_owning_arrow_array( + int64_t length, + int64_t null_count, + int64_t offset, + std::vector>&& buffers, + size_t children_count, + ArrowArray** children, + ArrowArray* dictionary + ) + { + ArrowArray array{}; + array.length = length; + array.null_count = null_count; + array.offset = offset; + + auto private_data = new owning_arrow_array_private_data(std::move(buffers)); + array.private_data = private_data; + array.n_buffers = private_data->n_buffers(); + array.buffers = private_data->buffers_ptrs(); + + array.n_children = static_cast(children_count); + array.children = children; + array.dictionary = dictionary; + array.release = release_owning_arrow_array; + return array; + } + void release_non_owning_arrow_array(ArrowArray* array) { SPARROW_ASSERT_FALSE(array == nullptr) From 6eb449e3824be71e7484a1d8db230405b45bce0d Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Tue, 14 Oct 2025 10:07:07 +0200 Subject: [PATCH 08/12] Factorize --- .../arrow_interface/arrow_array.hpp | 85 ++++++++--- .../arrow_array/private_data.hpp | 27 +++- .../deserialize_primitive_array.hpp | 66 ++++----- include/sparrow_ipc/deserialize_utils.hpp | 29 +++- ...deserialize_variable_size_binary_array.hpp | 126 +++++----------- src/arrow_interface/arrow_array.cpp | 137 +++--------------- .../arrow_array/private_data.cpp | 9 -- src/deserialize_fixedsizebinary_array.cpp | 7 +- src/deserialize_utils.cpp | 21 +++ 9 files changed, 226 insertions(+), 281 deletions(-) diff --git a/include/sparrow_ipc/arrow_interface/arrow_array.hpp b/include/sparrow_ipc/arrow_interface/arrow_array.hpp index 9348c44..726ecb9 100644 --- a/include/sparrow_ipc/arrow_interface/arrow_array.hpp +++ b/include/sparrow_ipc/arrow_interface/arrow_array.hpp @@ -1,43 +1,86 @@ #pragma once -#include +#include #include +#include #include "sparrow_ipc/config/config.hpp" +#include "sparrow_ipc/arrow_interface/arrow_array/private_data.hpp" namespace sparrow_ipc { - [[nodiscard]] SPARROW_IPC_API ArrowArray make_owning_arrow_array( - int64_t length, - int64_t null_count, - int64_t offset, - std::vector>&& buffers, - size_t children_count, - ArrowArray** children, - ArrowArray* dictionary - ); + SPARROW_IPC_API void release_arrow_array_children_and_dictionary(ArrowArray* array); + + template + void arrow_array_release(ArrowArray* array) + { + SPARROW_ASSERT_TRUE(array != nullptr) + SPARROW_ASSERT_TRUE(array->release == std::addressof(arrow_array_release)) + + SPARROW_ASSERT_TRUE(array->private_data != nullptr); - [[nodiscard]] SPARROW_IPC_API ArrowArray make_non_owning_arrow_array( + delete static_cast(array->private_data); + array->private_data = nullptr; + array->buffers = nullptr; // The buffers were deleted with the private data + + release_arrow_array_children_and_dictionary(array); + array->release = nullptr; + } + + template + SPARROW_IPC_API void fill_arrow_array( + ArrowArray& array, int64_t length, int64_t null_count, int64_t offset, - std::vector&& buffers, size_t children_count, ArrowArray** children, - ArrowArray* dictionary - ); + ArrowArray* dictionary, + Arg&& private_data_arg + ) + { + SPARROW_ASSERT_TRUE(length >= 0); + SPARROW_ASSERT_TRUE(null_count >= -1); + SPARROW_ASSERT_TRUE(offset >= 0); - SPARROW_IPC_API void release_non_owning_arrow_array(ArrowArray* array); + array.length = length; + array.null_count = null_count; + array.offset = offset; + array.n_children = static_cast(children_count); + array.children = children; + array.dictionary = dictionary; - SPARROW_IPC_API void fill_non_owning_arrow_array( - ArrowArray& array, + auto private_data = new T(std::forward(private_data_arg)); + array.private_data = private_data; + array.n_buffers = private_data->n_buffers(); + array.buffers = private_data->buffers_ptrs(); + + array.release = &arrow_array_release; + } + + template + [[nodiscard]] SPARROW_IPC_API ArrowArray make_arrow_array( int64_t length, int64_t null_count, int64_t offset, - std::vector&& buffers, size_t children_count, ArrowArray** children, - ArrowArray* dictionary - ); -} \ No newline at end of file + ArrowArray* dictionary, + Arg&& private_data_arg + ) + { + ArrowArray array{}; + fill_arrow_array( + array, + length, + null_count, + offset, + children_count, + children, + dictionary, + std::forward(private_data_arg) + ); + return array; + } +} diff --git a/include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp b/include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp index 265d9e2..2a9386f 100644 --- a/include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp +++ b/include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp @@ -1,5 +1,5 @@ #pragma once - +#include #include #include @@ -7,6 +7,13 @@ namespace sparrow_ipc { + template + concept ArrowPrivateData = requires(T& t) + { + { t.buffers_ptrs() } -> std::same_as; + { t.n_buffers() } -> std::convertible_to; + }; + class owning_arrow_array_private_data { public: @@ -21,12 +28,12 @@ namespace sparrow_ipc } } - const void** buffers_ptrs() noexcept + [[nodiscard]] SPARROW_IPC_API const void** buffers_ptrs() noexcept { return m_buffer_pointers.data(); } - std::size_t n_buffers() const noexcept + [[nodiscard]] SPARROW_IPC_API std::size_t n_buffers() const noexcept { return m_buffers.size(); } @@ -42,14 +49,22 @@ namespace sparrow_ipc public: explicit constexpr non_owning_arrow_array_private_data(std::vector&& buffers_pointers) - : m_buffers_pointers(std::move(buffers_pointers)) + : m_buffer_pointers(std::move(buffers_pointers)) { } - [[nodiscard]] SPARROW_IPC_API const void** buffers_ptrs() noexcept; + [[nodiscard]] SPARROW_IPC_API const void** buffers_ptrs() noexcept + { + return const_cast(reinterpret_cast(m_buffer_pointers.data())); + } + + [[nodiscard]] SPARROW_IPC_API std::size_t n_buffers() const noexcept + { + return m_buffer_pointers.size(); + } private: - std::vector m_buffers_pointers; + std::vector m_buffer_pointers; }; } diff --git a/include/sparrow_ipc/deserialize_primitive_array.hpp b/include/sparrow_ipc/deserialize_primitive_array.hpp index 6578565..1808789 100644 --- a/include/sparrow_ipc/deserialize_primitive_array.hpp +++ b/include/sparrow_ipc/deserialize_primitive_array.hpp @@ -9,7 +9,6 @@ #include "Message_generated.h" #include "sparrow_ipc/arrow_interface/arrow_array.hpp" #include "sparrow_ipc/arrow_interface/arrow_schema.hpp" -#include "sparrow_ipc/compression.hpp" #include "sparrow_ipc/deserialize_utils.hpp" namespace sparrow_ipc @@ -23,38 +22,6 @@ namespace sparrow_ipc size_t& buffer_index ) { - const auto compression = record_batch.compression(); - - // Validity buffer - const auto validity_buffer_metadata = record_batch.buffers()->Get(buffer_index++); - auto validity_buffer_span = body.subspan(validity_buffer_metadata->offset(), validity_buffer_metadata->length()); - std::vector> decompressed_buffers; - if (compression) - { - decompressed_buffers.emplace_back(decompress(compression->codec(), validity_buffer_span)); - validity_buffer_span = decompressed_buffers.back(); - } - auto bitmap_ptr = const_cast(validity_buffer_span.data()); - const sparrow::dynamic_bitset_view bitmap_view{ - bitmap_ptr, - static_cast(record_batch.length())}; - auto null_count = bitmap_view.null_count(); - if (validity_buffer_metadata->length() == 0) - { - bitmap_ptr = nullptr; - null_count = 0; - } - - // Data buffer - const auto primitive_buffer_metadata = record_batch.buffers()->Get(buffer_index++); - auto data_buffer_span = body.subspan(primitive_buffer_metadata->offset(), primitive_buffer_metadata->length()); - if (compression) - { - decompressed_buffers.emplace_back(decompress(compression->codec(), data_buffer_span)); - data_buffer_span = decompressed_buffers.back(); - } - auto primitives_ptr = const_cast(data_buffer_span.data()); - const std::string_view format = data_type_to_format( sparrow::detail::get_data_type_from_array>::get() ); @@ -68,30 +35,50 @@ namespace sparrow_ipc nullptr ); + const auto compression = record_batch.compression(); + std::vector> decompressed_buffers; + + auto validity_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers); + + uint8_t* bitmap_ptr = nullptr; + int64_t null_count = 0; + + if (validity_buffer_span.size() > 0) + { + bitmap_ptr = const_cast(validity_buffer_span.data()); + const sparrow::dynamic_bitset_view bitmap_view{ + bitmap_ptr, + static_cast(record_batch.length())}; + null_count = bitmap_view.null_count(); + } + + auto data_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers); + ArrowArray array; if (compression) { - array = make_owning_arrow_array( + array = make_arrow_array( record_batch.length(), null_count, 0, - std::move(decompressed_buffers), 0, nullptr, - nullptr + nullptr, + std::move(decompressed_buffers) ); } else { + auto primitives_ptr = const_cast(data_buffer_span.data()); std::vector buffers = {bitmap_ptr, primitives_ptr}; - array = make_non_owning_arrow_array( + array = make_arrow_array( record_batch.length(), null_count, 0, - std::move(buffers), 0, nullptr, - nullptr + nullptr, + std::move(buffers) ); } @@ -99,4 +86,3 @@ namespace sparrow_ipc return sparrow::primitive_array{std::move(ap)}; } } - diff --git a/include/sparrow_ipc/deserialize_utils.hpp b/include/sparrow_ipc/deserialize_utils.hpp index fc1ca05..cbc52b3 100644 --- a/include/sparrow_ipc/deserialize_utils.hpp +++ b/include/sparrow_ipc/deserialize_utils.hpp @@ -2,12 +2,12 @@ #include #include +#include #include #include #include "Message_generated.h" -#include "Schema_generated.h" namespace sparrow_ipc::utils { @@ -33,4 +33,29 @@ namespace sparrow_ipc::utils std::span body, size_t index ); -} \ No newline at end of file + + /** + * @brief Extracts a buffer from a RecordBatch and decompresses it if necessary. + * + * This function retrieves a buffer span from the specified index, increments the index, + * and applies decompression if specified. If the buffer is decompressed, the new + * data is stored in `decompressed_storage` and the returned span will point to this new data. + * + * @param record_batch The Arrow RecordBatch containing buffer metadata. + * @param body The raw buffer data as a byte span. + * @param buffer_index The index of the buffer to retrieve. This value is incremented by the function. + * @param compression The compression algorithm to use. If nullptr, no decompression is performed. + * @param decompressed_storage A vector that will be used to store the data of any decompressed buffers. + * + * @return A span viewing the resulting buffer data. This will be a view of the original + * `body` if no decompression occurs, or a view of the newly added buffer in + * `decompressed_storage` if decompression occurs. + */ + [[nodiscard]] std::span get_and_decompress_buffer( + const org::apache::arrow::flatbuf::RecordBatch& record_batch, + std::span body, + size_t& buffer_index, + const org::apache::arrow::flatbuf::BodyCompression* compression, + std::vector>& decompressed_storage + ); +} diff --git a/include/sparrow_ipc/deserialize_variable_size_binary_array.hpp b/include/sparrow_ipc/deserialize_variable_size_binary_array.hpp index 36c99f9..9895b97 100644 --- a/include/sparrow_ipc/deserialize_variable_size_binary_array.hpp +++ b/include/sparrow_ipc/deserialize_variable_size_binary_array.hpp @@ -8,32 +8,10 @@ #include "Message_generated.h" #include "sparrow_ipc/arrow_interface/arrow_array.hpp" #include "sparrow_ipc/arrow_interface/arrow_schema.hpp" -#include "sparrow_ipc/compression.hpp" #include "sparrow_ipc/deserialize_utils.hpp" namespace sparrow_ipc { - // TODO after handling deserialize_primitive_array, do the same here and then in other data types - namespace - { - struct DecompressedBinaryBuffers - { - std::vector validity_buffer; - std::vector offset_buffer; - std::vector data_buffer; - }; - - void release_decompressed_binary_buffers(ArrowArray* array) - { - if (array->private_data) - { - delete static_cast(array->private_data); - array->private_data = nullptr; - } - array->release = nullptr; - } - } - template [[nodiscard]] T deserialize_non_owning_variable_size_binary( const org::apache::arrow::flatbuf::RecordBatch& record_batch, @@ -43,25 +21,25 @@ namespace sparrow_ipc size_t& buffer_index ) { - const auto compression = record_batch.compression(); - DecompressedBinaryBuffers* decompressed_buffers_owner = nullptr; + const std::string_view format = data_type_to_format(sparrow::detail::get_data_type_from_array::get()); + ArrowSchema schema = make_non_owning_arrow_schema( + format, + name.data(), + metadata, + std::nullopt, + 0, + nullptr, + nullptr + ); - // Validity buffer - const auto validity_buffer_metadata = record_batch.buffers()->Get(buffer_index++); - auto validity_buffer_span = body.subspan(validity_buffer_metadata->offset(), validity_buffer_metadata->length()); - if (compression && validity_buffer_metadata->length() > 0) - { - if (!decompressed_buffers_owner) - { - decompressed_buffers_owner = new DecompressedBinaryBuffers(); - } - decompressed_buffers_owner->validity_buffer = decompress(compression->codec(), validity_buffer_span); - validity_buffer_span = decompressed_buffers_owner->validity_buffer; - } + const auto compression = record_batch.compression(); + std::vector> decompressed_buffers; + // TODO add another fct here + auto validity_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers); uint8_t* bitmap_ptr = nullptr; int64_t null_count = 0; - if (validity_buffer_metadata->length() > 0) + if (validity_buffer_span.size() > 0) { bitmap_ptr = const_cast(validity_buffer_span.data()); const sparrow::dynamic_bitset_view bitmap_view{ @@ -70,60 +48,36 @@ namespace sparrow_ipc null_count = bitmap_view.null_count(); } - // Offset buffer - const auto offset_metadata = record_batch.buffers()->Get(buffer_index++); - auto offset_buffer_span = body.subspan(offset_metadata->offset(), offset_metadata->length()); - if (compression) - { - if (!decompressed_buffers_owner) - { - decompressed_buffers_owner = new DecompressedBinaryBuffers(); - } - decompressed_buffers_owner->offset_buffer = decompress(compression->codec(), offset_buffer_span); - offset_buffer_span = decompressed_buffers_owner->offset_buffer; - } - auto offset_ptr = const_cast(offset_buffer_span.data()); + auto offset_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers); + auto data_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers); - // Data buffer - const auto buffer_metadata = record_batch.buffers()->Get(buffer_index++); - auto data_buffer_span = body.subspan(buffer_metadata->offset(), buffer_metadata->length()); + ArrowArray array; if (compression) { - if (!decompressed_buffers_owner) - { - decompressed_buffers_owner = new DecompressedBinaryBuffers(); - } - decompressed_buffers_owner->data_buffer = decompress(compression->codec(), data_buffer_span); - data_buffer_span = decompressed_buffers_owner->data_buffer; + array = make_arrow_array( + record_batch.length(), + null_count, + 0, + 0, + nullptr, + nullptr, + std::move(decompressed_buffers) + ); } - auto buffer_ptr = const_cast(data_buffer_span.data()); - - const std::string_view format = data_type_to_format(sparrow::detail::get_data_type_from_array::get()); - ArrowSchema schema = make_non_owning_arrow_schema( - format, - name.data(), - metadata, - std::nullopt, - 0, - nullptr, - nullptr - ); - - std::vector buffers = {bitmap_ptr, offset_ptr, buffer_ptr}; - ArrowArray array = make_non_owning_arrow_array( - record_batch.length(), - null_count, - 0, - std::move(buffers), - 0, - nullptr, - nullptr - ); - - if (decompressed_buffers_owner) + else { - array.private_data = decompressed_buffers_owner; - array.release = release_decompressed_binary_buffers; + auto offset_ptr = const_cast(offset_buffer_span.data()); + auto buffer_ptr = const_cast(data_buffer_span.data()); + std::vector buffers = {bitmap_ptr, offset_ptr, buffer_ptr}; + array = make_arrow_array( + record_batch.length(), + null_count, + 0, + 0, + nullptr, + nullptr, + std::move(buffers) + ); } sparrow::arrow_proxy ap{std::move(array), std::move(schema)}; diff --git a/src/arrow_interface/arrow_array.cpp b/src/arrow_interface/arrow_array.cpp index 7afe070..a01006b 100644 --- a/src/arrow_interface/arrow_array.cpp +++ b/src/arrow_interface/arrow_array.cpp @@ -1,131 +1,40 @@ #include "sparrow_ipc/arrow_interface/arrow_array.hpp" -#include - #include -#include - -#include "sparrow_ipc/arrow_interface/arrow_array/private_data.hpp" -#include "sparrow_ipc/arrow_interface/arrow_array_schema_common_release.hpp" namespace sparrow_ipc { - void release_owning_arrow_array(ArrowArray* array) + void release_arrow_array_children_and_dictionary(ArrowArray* array) { - SPARROW_ASSERT_FALSE(array == nullptr) - SPARROW_ASSERT_TRUE(array->release == std::addressof(release_owning_arrow_array)) - - if (array->private_data) - { - delete static_cast(array->private_data); - array->private_data = nullptr; - } + SPARROW_ASSERT_TRUE(array != nullptr) - for (int64_t i = 0; i < array->n_children; ++i) + if (array->children) { - if (array->children[i] && array->children[i]->release) + for (int64_t i = 0; i < array->n_children; ++i) { - array->children[i]->release(array->children[i]); + ArrowArray* child = array->children[i]; + if (child) + { + if (child->release) + { + child->release(child); + } + delete child; + child = nullptr; + } } + delete[] array->children; + array->children = nullptr; } - delete[] array->children; - array->children = nullptr; - if (array->dictionary && array->dictionary->release) + if (array->dictionary) { - array->dictionary->release(array->dictionary); + if (array->dictionary->release) + { + array->dictionary->release(array->dictionary); + } + delete array->dictionary; + array->dictionary = nullptr; } - delete array->dictionary; - array->dictionary = nullptr; - - array->release = nullptr; - } - - ArrowArray make_owning_arrow_array( - int64_t length, - int64_t null_count, - int64_t offset, - std::vector>&& buffers, - size_t children_count, - ArrowArray** children, - ArrowArray* dictionary - ) - { - ArrowArray array{}; - array.length = length; - array.null_count = null_count; - array.offset = offset; - - auto private_data = new owning_arrow_array_private_data(std::move(buffers)); - array.private_data = private_data; - array.n_buffers = private_data->n_buffers(); - array.buffers = private_data->buffers_ptrs(); - - array.n_children = static_cast(children_count); - array.children = children; - array.dictionary = dictionary; - array.release = release_owning_arrow_array; - return array; - } - - void release_non_owning_arrow_array(ArrowArray* array) - { - SPARROW_ASSERT_FALSE(array == nullptr) - SPARROW_ASSERT_TRUE(array->release == std::addressof(release_non_owning_arrow_array)) - - release_common_non_owning_arrow(*array); - array->buffers = nullptr; // The buffers were deleted with the private data - } - - void fill_non_owning_arrow_array( - ArrowArray& array, - int64_t length, - int64_t null_count, - int64_t offset, - std::vector&& buffers, - size_t children_count, - ArrowArray** children, - ArrowArray* dictionary - ) - { - SPARROW_ASSERT_TRUE(length >= 0); - SPARROW_ASSERT_TRUE(null_count >= -1); - SPARROW_ASSERT_TRUE(offset >= 0); - - array.length = length; - array.null_count = null_count; - array.offset = offset; - array.n_buffers = static_cast(buffers.size()); - array.private_data = new non_owning_arrow_array_private_data(std::move(buffers)); - const auto private_data = static_cast(array.private_data); - array.buffers = private_data->buffers_ptrs(); - array.n_children = static_cast(children_count); - array.children = children; - array.dictionary = dictionary; - array.release = release_non_owning_arrow_array; - } - - ArrowArray make_non_owning_arrow_array( - int64_t length, - int64_t null_count, - int64_t offset, - std::vector&& buffers, - size_t children_count, - ArrowArray** children, - ArrowArray* dictionary - ) - { - ArrowArray array{}; - fill_non_owning_arrow_array( - array, - length, - null_count, - offset, - std::move(buffers), - children_count, - children, - dictionary - ); - return array; } } diff --git a/src/arrow_interface/arrow_array/private_data.cpp b/src/arrow_interface/arrow_array/private_data.cpp index b133c8e..e69de29 100644 --- a/src/arrow_interface/arrow_array/private_data.cpp +++ b/src/arrow_interface/arrow_array/private_data.cpp @@ -1,9 +0,0 @@ -#include "sparrow_ipc/arrow_interface/arrow_array/private_data.hpp" - -namespace sparrow_ipc -{ - const void** non_owning_arrow_array_private_data::buffers_ptrs() noexcept - { - return const_cast(reinterpret_cast(m_buffers_pointers.data())); - } -} \ No newline at end of file diff --git a/src/deserialize_fixedsizebinary_array.cpp b/src/deserialize_fixedsizebinary_array.cpp index 63ea213..427f600 100644 --- a/src/deserialize_fixedsizebinary_array.cpp +++ b/src/deserialize_fixedsizebinary_array.cpp @@ -2,6 +2,7 @@ namespace sparrow_ipc { + // TODO add compression here and tests (not available for this type in apache arrow integration tests files) sparrow::fixed_width_binary_array deserialize_non_owning_fixedwidthbinary( const org::apache::arrow::flatbuf::RecordBatch& record_batch, std::span body, @@ -33,14 +34,14 @@ namespace sparrow_ipc } auto buffer_ptr = const_cast(body.data() + buffer_metadata->offset()); std::vector buffers = {bitmap_ptr, buffer_ptr}; - ArrowArray array = make_non_owning_arrow_array( + ArrowArray array = make_arrow_array( record_batch.length(), null_count, 0, - std::move(buffers), 0, nullptr, - nullptr + nullptr, + std::move(buffers) ); sparrow::arrow_proxy ap{std::move(array), std::move(schema)}; return sparrow::fixed_width_binary_array{std::move(ap)}; diff --git a/src/deserialize_utils.cpp b/src/deserialize_utils.cpp index d89be6c..d42ddca 100644 --- a/src/deserialize_utils.cpp +++ b/src/deserialize_utils.cpp @@ -1,5 +1,7 @@ #include "sparrow_ipc/deserialize_utils.hpp" +#include "sparrow_ipc/compression.hpp" + namespace sparrow_ipc::utils { std::pair get_bitmap_pointer_and_null_count( @@ -24,4 +26,23 @@ namespace sparrow_ipc::utils }; return {ptr, bitmap_view.null_count()}; } + + std::span get_and_decompress_buffer( + const org::apache::arrow::flatbuf::RecordBatch& record_batch, + std::span body, + size_t& buffer_index, + const org::apache::arrow::flatbuf::BodyCompression* compression, + std::vector>& decompressed_storage + ) + { + const auto buffer_metadata = record_batch.buffers()->Get(buffer_index++); + auto buffer_span = body.subspan(buffer_metadata->offset(), buffer_metadata->length()); + + if (compression) + { + decompressed_storage.emplace_back(decompress(compression->codec(), buffer_span)); + buffer_span = decompressed_storage.back(); + } + return buffer_span; + } } \ No newline at end of file From 5045456659ef7a0954b03939b44bbbde65401617 Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Tue, 14 Oct 2025 10:19:05 +0200 Subject: [PATCH 09/12] Remove macro from template --- include/sparrow_ipc/arrow_interface/arrow_array.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/sparrow_ipc/arrow_interface/arrow_array.hpp b/include/sparrow_ipc/arrow_interface/arrow_array.hpp index 726ecb9..4faecf4 100644 --- a/include/sparrow_ipc/arrow_interface/arrow_array.hpp +++ b/include/sparrow_ipc/arrow_interface/arrow_array.hpp @@ -29,7 +29,7 @@ namespace sparrow_ipc } template - SPARROW_IPC_API void fill_arrow_array( + void fill_arrow_array( ArrowArray& array, int64_t length, int64_t null_count, @@ -60,7 +60,7 @@ namespace sparrow_ipc } template - [[nodiscard]] SPARROW_IPC_API ArrowArray make_arrow_array( + [[nodiscard]] ArrowArray make_arrow_array( int64_t length, int64_t null_count, int64_t offset, From 4c3d4559deb59565c09da665ce8d83dab6240f2a Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Tue, 14 Oct 2025 11:46:15 +0200 Subject: [PATCH 10/12] Add fct and remove cout --- .../deserialize_primitive_array.hpp | 12 +---------- include/sparrow_ipc/deserialize_utils.hpp | 21 +++++++++++++++++++ ...deserialize_variable_size_binary_array.hpp | 13 ++---------- src/deserialize_utils.cpp | 19 ++++++++++++++++- tests/test_de_serialization_with_files.cpp | 6 ------ 5 files changed, 42 insertions(+), 29 deletions(-) diff --git a/include/sparrow_ipc/deserialize_primitive_array.hpp b/include/sparrow_ipc/deserialize_primitive_array.hpp index 1808789..76f7212 100644 --- a/include/sparrow_ipc/deserialize_primitive_array.hpp +++ b/include/sparrow_ipc/deserialize_primitive_array.hpp @@ -40,17 +40,7 @@ namespace sparrow_ipc auto validity_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers); - uint8_t* bitmap_ptr = nullptr; - int64_t null_count = 0; - - if (validity_buffer_span.size() > 0) - { - bitmap_ptr = const_cast(validity_buffer_span.data()); - const sparrow::dynamic_bitset_view bitmap_view{ - bitmap_ptr, - static_cast(record_batch.length())}; - null_count = bitmap_view.null_count(); - } + const auto [bitmap_ptr, null_count] = utils::get_bitmap_pointer_and_null_count(validity_buffer_span, record_batch.length()); auto data_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers); diff --git a/include/sparrow_ipc/deserialize_utils.hpp b/include/sparrow_ipc/deserialize_utils.hpp index cbc52b3..36f93ad 100644 --- a/include/sparrow_ipc/deserialize_utils.hpp +++ b/include/sparrow_ipc/deserialize_utils.hpp @@ -11,6 +11,26 @@ namespace sparrow_ipc::utils { + /** + * @brief Extracts bitmap pointer and null count from a validity buffer span. + * + * This function calculates the number of null values represented by the bitmap. + * + * @param validity_buffer_span The validity buffer as a byte span. + * @param length The Arrow RecordBatch length (number of values in the array). + * + * @return A pair containing: + * - First: Pointer to the bitmap data (nullptr if buffer is empty) + * - Second: Count of null values in the bitmap (0 if buffer is empty) + * + * @note If the bitmap buffer is empty, returns {nullptr, 0} + * @note The returned pointer is a non-const cast of the original const data + */ + [[nodiscard]] std::pair get_bitmap_pointer_and_null_count( + std::span validity_buffer_span, + const int64_t length + ); + /** * @brief Extracts bitmap pointer and null count from a RecordBatch buffer. * @@ -28,6 +48,7 @@ namespace sparrow_ipc::utils * @note If the bitmap buffer has zero length, returns {nullptr, 0} * @note The returned pointer is a non-const cast of the original const data */ + // TODO to be removed when not used anymore (after adding compression to deserialize_fixedsizebinary_array) [[nodiscard]] std::pair get_bitmap_pointer_and_null_count( const org::apache::arrow::flatbuf::RecordBatch& record_batch, std::span body, diff --git a/include/sparrow_ipc/deserialize_variable_size_binary_array.hpp b/include/sparrow_ipc/deserialize_variable_size_binary_array.hpp index 9895b97..623776d 100644 --- a/include/sparrow_ipc/deserialize_variable_size_binary_array.hpp +++ b/include/sparrow_ipc/deserialize_variable_size_binary_array.hpp @@ -35,18 +35,9 @@ namespace sparrow_ipc const auto compression = record_batch.compression(); std::vector> decompressed_buffers; - // TODO add another fct here auto validity_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers); - uint8_t* bitmap_ptr = nullptr; - int64_t null_count = 0; - if (validity_buffer_span.size() > 0) - { - bitmap_ptr = const_cast(validity_buffer_span.data()); - const sparrow::dynamic_bitset_view bitmap_view{ - bitmap_ptr, - static_cast(record_batch.length())}; - null_count = bitmap_view.null_count(); - } + + const auto [bitmap_ptr, null_count] = utils::get_bitmap_pointer_and_null_count(validity_buffer_span, record_batch.length()); auto offset_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers); auto data_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers); diff --git a/src/deserialize_utils.cpp b/src/deserialize_utils.cpp index d42ddca..1d01029 100644 --- a/src/deserialize_utils.cpp +++ b/src/deserialize_utils.cpp @@ -4,6 +4,23 @@ namespace sparrow_ipc::utils { + std::pair get_bitmap_pointer_and_null_count( + std::span validity_buffer_span, + const int64_t length + ) + { + if (validity_buffer_span.empty()) + { + return {nullptr, 0}; + } + auto ptr = const_cast(validity_buffer_span.data()); + const sparrow::dynamic_bitset_view bitmap_view{ + ptr, + static_cast(length) + }; + return {ptr, bitmap_view.null_count()}; + } + std::pair get_bitmap_pointer_and_null_count( const org::apache::arrow::flatbuf::RecordBatch& record_batch, std::span body, @@ -45,4 +62,4 @@ namespace sparrow_ipc::utils } return buffer_span; } -} \ No newline at end of file +} diff --git a/tests/test_de_serialization_with_files.cpp b/tests/test_de_serialization_with_files.cpp index 6459581..13a504f 100644 --- a/tests/test_de_serialization_with_files.cpp +++ b/tests/test_de_serialization_with_files.cpp @@ -66,27 +66,21 @@ void compare_record_batches( ) { REQUIRE_EQ(record_batches_1.size(), record_batches_2.size()); -// std::cout << "record_batches1 size: " << record_batches_1.size() << " record_batches2 size: " << record_batches_2.size() << std::endl; for (size_t i = 0; i < record_batches_1.size(); ++i) { for (size_t y = 0; y < record_batches_1[i].nb_columns(); y++) { -// std::cout << "record_batches1 nb cols: " << record_batches_1[i].nb_columns() << " record_batches2 nb cols: " << record_batches_2[i].nb_columns() << std::endl; const auto& column_1 = record_batches_1[i].get_column(y); const auto& column_2 = record_batches_2[i].get_column(y); REQUIRE_EQ(column_1.size(), column_2.size()); -// std::cout << "column_1.size(): " << column_1.size() << " column_2.size(): " << column_2.size() << std::endl; for (size_t z = 0; z < column_1.size(); z++) { const auto col_name = column_1.name().value_or("NA"); INFO("Comparing batch " << i << ", column " << y << " named :" << col_name << " , row " << z); -// std::cout << "Comparing batch " << i << ", column " << y << " named :" << col_name << " , row " << z << std::endl; REQUIRE_EQ(column_1.data_type(), column_2.data_type()); CHECK_EQ(column_1.name(), column_2.name()); -// std::cout << "column_1.name() :" << column_1.name() << " and " << column_2.name() << std::endl; const auto& column_1_value = column_1[z]; const auto& column_2_value = column_2[z]; -// std::cout << "column_1_value :" << column_1_value << " and " << column_2_value << std::endl; CHECK_EQ(column_1_value, column_2_value); } } From 6e9c8cabd5d2b6956cb028dbf28837fa54ba7e9c Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Tue, 14 Oct 2025 12:16:14 +0200 Subject: [PATCH 11/12] Move implementation details to cpp and add check boundaries --- CMakeLists.txt | 2 -- .../arrow_array/private_data.hpp | 34 +++---------------- .../arrow_array/private_data.cpp | 34 +++++++++++++++++++ src/deserialize_utils.cpp | 4 +++ 4 files changed, 43 insertions(+), 31 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 166bb09..49e6719 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -103,7 +103,6 @@ set(SPARROW_IPC_HEADERS ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/config.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/sparrow_ipc_version.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/compression.hpp - #${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/decompressed_buffers.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_fixedsizebinary_array.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_primitive_array.hpp @@ -125,7 +124,6 @@ set(SPARROW_IPC_SRC ${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema.cpp ${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema/private_data.cpp ${SPARROW_IPC_SOURCE_DIR}/compression.cpp - #${SPARROW_IPC_SOURCE_DIR}/decompressed_buffers.cpp ${SPARROW_IPC_SOURCE_DIR}/deserialize_fixedsizebinary_array.cpp ${SPARROW_IPC_SOURCE_DIR}/deserialize_utils.cpp ${SPARROW_IPC_SOURCE_DIR}/deserialize.cpp diff --git a/include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp b/include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp index 2a9386f..5ad6c90 100644 --- a/include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp +++ b/include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp @@ -18,28 +18,12 @@ namespace sparrow_ipc { public: - explicit owning_arrow_array_private_data(std::vector>&& buffers) - : m_buffers(std::move(buffers)) - { - m_buffer_pointers.reserve(m_buffers.size()); - for (const auto& buffer : m_buffers) - { - m_buffer_pointers.push_back(buffer.data()); - } - } + explicit owning_arrow_array_private_data(std::vector>&& buffers); - [[nodiscard]] SPARROW_IPC_API const void** buffers_ptrs() noexcept - { - return m_buffer_pointers.data(); - } - - [[nodiscard]] SPARROW_IPC_API std::size_t n_buffers() const noexcept - { - return m_buffers.size(); - } + [[nodiscard]] SPARROW_IPC_API const void** buffers_ptrs() noexcept; + [[nodiscard]] SPARROW_IPC_API std::size_t n_buffers() const noexcept; private: - std::vector> m_buffers; std::vector m_buffer_pointers; }; @@ -53,18 +37,10 @@ namespace sparrow_ipc { } - [[nodiscard]] SPARROW_IPC_API const void** buffers_ptrs() noexcept - { - return const_cast(reinterpret_cast(m_buffer_pointers.data())); - } - - [[nodiscard]] SPARROW_IPC_API std::size_t n_buffers() const noexcept - { - return m_buffer_pointers.size(); - } + [[nodiscard]] SPARROW_IPC_API const void** buffers_ptrs() noexcept; + [[nodiscard]] SPARROW_IPC_API std::size_t n_buffers() const noexcept; private: - std::vector m_buffer_pointers; }; } diff --git a/src/arrow_interface/arrow_array/private_data.cpp b/src/arrow_interface/arrow_array/private_data.cpp index e69de29..9c3738b 100644 --- a/src/arrow_interface/arrow_array/private_data.cpp +++ b/src/arrow_interface/arrow_array/private_data.cpp @@ -0,0 +1,34 @@ +#include "sparrow_ipc/arrow_interface/arrow_array/private_data.hpp" + +namespace sparrow_ipc +{ + owning_arrow_array_private_data::owning_arrow_array_private_data(std::vector>&& buffers) + : m_buffers(std::move(buffers)) + { + m_buffer_pointers.reserve(m_buffers.size()); + for (const auto& buffer : m_buffers) + { + m_buffer_pointers.push_back(buffer.data()); + } + } + + const void** owning_arrow_array_private_data::buffers_ptrs() noexcept + { + return m_buffer_pointers.data(); + } + + std::size_t owning_arrow_array_private_data::n_buffers() const noexcept + { + return m_buffers.size(); + } + + const void** non_owning_arrow_array_private_data::buffers_ptrs() noexcept + { + return const_cast(reinterpret_cast(m_buffer_pointers.data())); + } + + std::size_t non_owning_arrow_array_private_data::n_buffers() const noexcept + { + return m_buffer_pointers.size(); + } +} diff --git a/src/deserialize_utils.cpp b/src/deserialize_utils.cpp index 1d01029..3eba4cc 100644 --- a/src/deserialize_utils.cpp +++ b/src/deserialize_utils.cpp @@ -53,6 +53,10 @@ namespace sparrow_ipc::utils ) { const auto buffer_metadata = record_batch.buffers()->Get(buffer_index++); + if (body.size() < (buffer_metadata->offset() + buffer_metadata->length())) + { + throw std::runtime_error("Buffer metadata exceeds body size"); + } auto buffer_span = body.subspan(buffer_metadata->offset(), buffer_metadata->length()); if (compression) From 43760d7b65e0c9a724c8a16b915cab3df032b3c4 Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Wed, 15 Oct 2025 10:42:31 +0200 Subject: [PATCH 12/12] Factorize more --- include/sparrow_ipc/compression.hpp | 4 +- include/sparrow_ipc/serialize_utils.hpp | 18 ++++++- src/compression.cpp | 6 +-- src/serialize_utils.cpp | 62 ++++++++++++++++--------- 4 files changed, 61 insertions(+), 29 deletions(-) diff --git a/include/sparrow_ipc/compression.hpp b/include/sparrow_ipc/compression.hpp index b21a8b1..47d0cc3 100644 --- a/include/sparrow_ipc/compression.hpp +++ b/include/sparrow_ipc/compression.hpp @@ -18,6 +18,6 @@ namespace sparrow_ipc // CompressionType to_compression_type(org::apache::arrow::flatbuf::CompressionType compression_type); - std::vector compress(org::apache::arrow::flatbuf::CompressionType compression_type, std::span data); - std::vector decompress(org::apache::arrow::flatbuf::CompressionType compression_type, std::span data); + std::vector compress(const org::apache::arrow::flatbuf::CompressionType compression_type, std::span data); + std::vector decompress(const org::apache::arrow::flatbuf::CompressionType compression_type, std::span data); } diff --git a/include/sparrow_ipc/serialize_utils.hpp b/include/sparrow_ipc/serialize_utils.hpp index 80dbccd..b947832 100644 --- a/include/sparrow_ipc/serialize_utils.hpp +++ b/include/sparrow_ipc/serialize_utils.hpp @@ -218,7 +218,23 @@ namespace sparrow_ipc std::vector& nodes ); - void fill_body_and_get_buffers_compressed(const sparrow::arrow_proxy& arrow_proxy, std::vector& body, std::vector& flatbuf_buffers, int64_t& offset, org::apache::arrow::flatbuf::CompressionType compression_type); + /** + * @brief Generates the compressed message body and buffer metadata for a record batch. + * + * This function traverses the record batch, compresses each buffer using the specified + * compression algorithm, and constructs the message body. For each compressed buffer, + * it is prefixed by its 8-byte uncompressed size. Padding is added after each + * compressed buffer to ensure 8-byte alignment. + * + * @param record_batch The record batch to serialize. + * @param compression_type The compression algorithm to use (e.g., LZ4_FRAME, ZSTD). + * @return A std::pair containing: + * - first: A vector of bytes representing the complete compressed message body. + * - second: A vector of FlatBuffer Buffer objects describing the offset and + * size of each buffer within the compressed body. + */ + [[nodiscard]] SPARROW_IPC_API std::pair, std::vector> + generate_compressed_body_and_buffers(const sparrow::record_batch& record_batch, const org::apache::arrow::flatbuf::CompressionType compression_type); /** * @brief Creates a vector of Apache Arrow FieldNode objects from a record batch. diff --git a/src/compression.cpp b/src/compression.cpp index 59d6d56..5170903 100644 --- a/src/compression.cpp +++ b/src/compression.cpp @@ -20,7 +20,7 @@ namespace sparrow_ipc // } // } - std::vector compress(org::apache::arrow::flatbuf::CompressionType compression_type, std::span data) + std::vector compress(const org::apache::arrow::flatbuf::CompressionType compression_type, std::span data) { if (data.empty()) { @@ -30,7 +30,7 @@ namespace sparrow_ipc { case org::apache::arrow::flatbuf::CompressionType::LZ4_FRAME: { - std::int64_t uncompressed_size = data.size(); + const std::int64_t uncompressed_size = data.size(); const size_t max_compressed_size = LZ4F_compressFrameBound(uncompressed_size, nullptr); std::vector compressed_data(max_compressed_size); const size_t compressed_size = LZ4F_compressFrame(compressed_data.data(), max_compressed_size, data.data(), uncompressed_size, nullptr); @@ -46,7 +46,7 @@ namespace sparrow_ipc } } - std::vector decompress(org::apache::arrow::flatbuf::CompressionType compression_type, std::span data) + std::vector decompress(const org::apache::arrow::flatbuf::CompressionType compression_type, std::span data) { if (data.empty()) { diff --git a/src/serialize_utils.cpp b/src/serialize_utils.cpp index 2dbf9af..d590021 100644 --- a/src/serialize_utils.cpp +++ b/src/serialize_utils.cpp @@ -1,4 +1,5 @@ #include +#include #include "sparrow_ipc/magic_values.hpp" #include "sparrow_ipc/serialize.hpp" @@ -181,32 +182,47 @@ namespace sparrow_ipc return buffers; } - // TODO to be factorized later after testing using arrow testing files - void fill_body_and_get_buffers_compressed(const sparrow::arrow_proxy& arrow_proxy, std::vector& body, std::vector& flatbuf_buffers, int64_t& offset, org::apache::arrow::flatbuf::CompressionType compression_type) + namespace { - const auto& buffers = arrow_proxy.buffers(); - for (const auto& buffer : buffers) + void fill_compressed_body_and_buffers_recursive(const sparrow::arrow_proxy& arrow_proxy, std::vector& body, std::vector& flatbuf_buffers, int64_t& offset, const org::apache::arrow::flatbuf::CompressionType compression_type) { - if (buffer.size() > 0) + for (const auto& buffer : arrow_proxy.buffers()) { - auto compressed_buffer = compress(compression_type, {buffer.data(), buffer.size()}); - int64_t uncompressed_size = buffer.size(); - body.insert(body.end(), reinterpret_cast(&uncompressed_size), reinterpret_cast(&uncompressed_size) + sizeof(uncompressed_size)); - body.insert(body.end(), compressed_buffer.begin(), compressed_buffer.end()); - add_padding(body); + if (buffer.size() > 0) + { + auto compressed_buffer = compress(compression_type, {buffer.data(), buffer.size()}); + int64_t uncompressed_size = buffer.size(); + body.insert(body.end(), reinterpret_cast(&uncompressed_size), reinterpret_cast(&uncompressed_size) + sizeof(uncompressed_size)); + body.insert(body.end(), compressed_buffer.begin(), compressed_buffer.end()); + add_padding(body); - flatbuf_buffers.emplace_back(offset, sizeof(uncompressed_size) + compressed_buffer.size()); - offset = body.size(); + flatbuf_buffers.emplace_back(offset, sizeof(uncompressed_size) + compressed_buffer.size()); + offset = body.size(); + } + else + { + flatbuf_buffers.emplace_back(offset, 0); + } } - else + for (const auto& child : arrow_proxy.children()) { - flatbuf_buffers.emplace_back(offset, 0); + fill_compressed_body_and_buffers_recursive(child, body, flatbuf_buffers, offset, compression_type); } } - for (const auto& child : arrow_proxy.children()) + } // namespace + + std::pair, std::vector> + generate_compressed_body_and_buffers(const sparrow::record_batch& record_batch, const org::apache::arrow::flatbuf::CompressionType compression_type) + { + std::vector body; + std::vector flatbuf_buffers; + int64_t offset = 0; + for (const auto& column : record_batch.columns()) { - fill_body_and_get_buffers_compressed(child, body, flatbuf_buffers, offset, compression_type); + const auto& arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(column); + fill_compressed_body_and_buffers_recursive(arrow_proxy, body, flatbuf_buffers, offset, compression_type); } + return {std::move(body), std::move(flatbuf_buffers)}; } void fill_body(const sparrow::arrow_proxy& arrow_proxy, std::vector& body) @@ -309,12 +325,7 @@ namespace sparrow_ipc if (compression) { - int64_t offset = 0; - for (const auto& column : record_batch.columns()) - { - const auto& arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(column); - fill_body_and_get_buffers_compressed(arrow_proxy, body, flatbuf_buffers, offset, compression.value()); - } + std::tie(body, flatbuf_buffers) = generate_compressed_body_and_buffers(record_batch, compression.value()); body_size = body.size(); } else @@ -331,9 +342,14 @@ namespace sparrow_ipc body_size, compression ); + const flatbuffers::uoffset_t record_batch_len = record_batch_builder.GetSize(); + const size_t metadata_size = continuation.size() + sizeof(record_batch_len) + record_batch_len; + const size_t padded_metadata_size = utils::align_to_8(metadata_size); + std::vector output; + output.reserve(padded_metadata_size + body.size()); + output.insert(output.end(), continuation.begin(), continuation.end()); - const flatbuffers::uoffset_t record_batch_len = record_batch_builder.GetSize(); output.insert( output.end(), reinterpret_cast(&record_batch_len),