Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ set(SPARROW_IPC_HEADERS
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/arrow_interface/arrow_schema/private_data.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/config.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/sparrow_ipc_version.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/compression.hpp
#${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/decompressed_buffers.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_fixedsizebinary_array.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_primitive_array.hpp
Expand All @@ -122,6 +124,8 @@ set(SPARROW_IPC_SRC
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_array/private_data.cpp
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema.cpp
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema/private_data.cpp
${SPARROW_IPC_SOURCE_DIR}/compression.cpp
#${SPARROW_IPC_SOURCE_DIR}/decompressed_buffers.cpp
${SPARROW_IPC_SOURCE_DIR}/deserialize_fixedsizebinary_array.cpp
${SPARROW_IPC_SOURCE_DIR}/deserialize_utils.cpp
${SPARROW_IPC_SOURCE_DIR}/deserialize.cpp
Expand Down Expand Up @@ -239,6 +243,8 @@ target_link_libraries(sparrow-ipc
PUBLIC
sparrow::sparrow
flatbuffers::flatbuffers
PRIVATE
lz4::lz4
)

# Ensure generated headers are available when building sparrow-ipc
Expand Down Expand Up @@ -297,6 +303,25 @@ if (TARGET flatbuffers)
endif()
endif()

if (TARGET lz4)
get_target_property(is_imported lz4 IMPORTED)
if(NOT is_imported)
# This means `lz4` was fetched using FetchContent
# We need to export `lz4` target explicitly
list(APPEND SPARROW_IPC_EXPORTED_TARGETS lz4)
endif()
endif()

if (TARGET lz4_static)
get_target_property(is_imported lz4_static IMPORTED)
if(NOT is_imported)
# `lz4_static` is needed as this is the actual library
# and `lz4` is an interface pointing to it.
# If `lz4_shared` is used instead for some reason, modify this accordingly
list(APPEND SPARROW_IPC_EXPORTED_TARGETS lz4_static)
endif()
endif()

install(TARGETS ${SPARROW_IPC_EXPORTED_TARGETS}
EXPORT ${PROJECT_NAME}-targets)

Expand Down
42 changes: 42 additions & 0 deletions cmake/Findlz4.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Find LZ4 library and headers

# This module defines:
# LZ4_FOUND - True if lz4 is found
# LZ4_INCLUDE_DIRS - LZ4 include directories
# LZ4_LIBRARIES - Libraries needed to use LZ4
# LZ4_VERSION - LZ4 version number
#

find_package(PkgConfig)
if(PKG_CONFIG_FOUND)
pkg_check_modules(LZ4 QUIET liblz4)
if(NOT LZ4_FOUND)
message(STATUS "Did not find 'liblz4.pc', trying 'lz4.pc'")
pkg_check_modules(LZ4 QUIET lz4)
endif()
endif()

find_path(LZ4_INCLUDE_DIR lz4.h)
# HINTS ${LZ4_INCLUDEDIR} ${LZ4_INCLUDE_DIRS})
find_library(LZ4_LIBRARY NAMES lz4 liblz4)
# HINTS ${LZ4_LIBDIR} ${LZ4_LIBRARY_DIRS})

include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(lz4 DEFAULT_MSG
LZ4_LIBRARY LZ4_INCLUDE_DIR)
mark_as_advanced(LZ4_INCLUDE_DIR LZ4_LIBRARY)

set(LZ4_LIBRARIES ${LZ4_LIBRARY})
set(LZ4_INCLUDE_DIRS ${LZ4_INCLUDE_DIR})

if(LZ4_FOUND AND NOT TARGET lz4::lz4)
add_library(lz4::lz4 UNKNOWN IMPORTED)
set_target_properties(lz4::lz4 PROPERTIES
IMPORTED_LOCATION "${LZ4_LIBRARIES}"
INTERFACE_INCLUDE_DIRECTORIES "${LZ4_INCLUDE_DIRS}")
if (NOT TARGET LZ4::LZ4 AND TARGET lz4::lz4)
add_library(LZ4::LZ4 ALIAS lz4::lz4)
endif ()
endif()

#TODO add version?
49 changes: 43 additions & 6 deletions cmake/external_dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ endif()

function(find_package_or_fetch)
set(options)
set(oneValueArgs CONAN_PKG_NAME PACKAGE_NAME GIT_REPOSITORY TAG)
set(multiValueArgs)
set(oneValueArgs CONAN_PKG_NAME PACKAGE_NAME GIT_REPOSITORY TAG SOURCE_SUBDIR)
set(multiValueArgs CMAKE_ARGS)
cmake_parse_arguments(PARSE_ARGV 0 arg
"${options}" "${oneValueArgs}" "${multiValueArgs}"
)
Expand All @@ -29,14 +29,25 @@ function(find_package_or_fetch)
if(FETCH_DEPENDENCIES_WITH_CMAKE STREQUAL "ON" OR FETCH_DEPENDENCIES_WITH_CMAKE STREQUAL "MISSING")
if(NOT ${actual_pkg_name}_FOUND)
message(STATUS "📦 Fetching ${arg_PACKAGE_NAME}")
FetchContent_Declare(
# Apply CMAKE_ARGS before fetching
foreach(cmake_arg ${arg_CMAKE_ARGS})
string(REGEX MATCH "^([^=]+)=(.*)$" _ ${cmake_arg})
if(CMAKE_MATCH_1)
set(${CMAKE_MATCH_1} ${CMAKE_MATCH_2} CACHE BOOL "" FORCE)
endif()
endforeach()
set(fetch_args
${arg_PACKAGE_NAME}
GIT_SHALLOW TRUE
GIT_REPOSITORY ${arg_GIT_REPOSITORY}
GIT_TAG ${arg_TAG}
GIT_PROGRESS TRUE
SYSTEM
EXCLUDE_FROM_ALL)
if(arg_SOURCE_SUBDIR)
list(APPEND fetch_args SOURCE_SUBDIR ${arg_SOURCE_SUBDIR})
endif()
FetchContent_Declare(${fetch_args})
FetchContent_MakeAvailable(${arg_PACKAGE_NAME})
message(STATUS "\t✅ Fetched ${arg_PACKAGE_NAME}")
else()
Expand Down Expand Up @@ -79,6 +90,25 @@ if(NOT TARGET flatbuffers::flatbuffers)
endif()
unset(FLATBUFFERS_BUILD_TESTS CACHE)

# Fetching lz4
# Disable bundled mode to allow shared libraries if needed
# lz4 is built as static by default if bundled
# set(LZ4_BUNDLED_MODE OFF CACHE BOOL "" FORCE)
# set(BUILD_SHARED_LIBS ON CACHE BOOL "" FORCE)
find_package_or_fetch(
PACKAGE_NAME lz4
GIT_REPOSITORY https://github.com/lz4/lz4.git
TAG v1.10.0
SOURCE_SUBDIR build/cmake
CMAKE_ARGS
"LZ4_BUILD_CLI=OFF"
"LZ4_BUILD_LEGACY_LZ4C=OFF"
)

if(NOT TARGET lz4::lz4)
add_library(lz4::lz4 ALIAS lz4)
endif()

if(SPARROW_IPC_BUILD_TESTS)
find_package_or_fetch(
PACKAGE_NAME doctest
Expand Down Expand Up @@ -109,10 +139,18 @@ if(SPARROW_IPC_BUILD_TESTS)
)
message(STATUS "\t✅ Fetched arrow-testing")

# Iterate over all the files in the arrow-testing-data source directiory. When it's a gz, extract in place.
file(GLOB_RECURSE arrow_testing_data_targz_files CONFIGURE_DEPENDS
# Fetch all the files in the 1.0.0-littleendian directory
file(GLOB_RECURSE arrow_testing_data_targz_files_littleendian CONFIGURE_DEPENDS
"${arrow-testing_SOURCE_DIR}/data/arrow-ipc-stream/integration/1.0.0-littleendian/*.json.gz"
)
# Fetch all the files in the 2.0.0-compression directory
file(GLOB_RECURSE arrow_testing_data_targz_files_compression CONFIGURE_DEPENDS
"${arrow-testing_SOURCE_DIR}/data/arrow-ipc-stream/integration/2.0.0-compression/*.json.gz"
)

# Combine lists of files
list(APPEND arrow_testing_data_targz_files ${arrow_testing_data_targz_files_littleendian} ${arrow_testing_data_targz_files_compression})
# Iterate over all the files in the arrow-testing-data source directory. When it's a gz, extract in place.
foreach(file_path IN LISTS arrow_testing_data_targz_files)
cmake_path(GET file_path PARENT_PATH parent_dir)
cmake_path(GET file_path STEM filename)
Expand All @@ -128,5 +166,4 @@ if(SPARROW_IPC_BUILD_TESTS)
endif()
endif()
endforeach()

endif()
2 changes: 2 additions & 0 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def configure(self):
def requirements(self):
self.requires("sparrow/1.0.0")
self.requires(f"flatbuffers/{self._flatbuffers_version}")
self.requires("lz4/1.9.4")
#self.requires("zstd/1.5.5")
if self.options.get_safe("build_tests"):
self.test_requires("doctest/2.4.12")

Expand Down
2 changes: 2 additions & 0 deletions environment-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ dependencies:
- cxx-compiler
# Libraries dependencies
- flatbuffers
- lz4
- nlohmann_json
- sparrow-devel >=1.1.2
# Testing dependencies
- doctest
# Documentation dependencies
- doxygen
Expand Down
78 changes: 65 additions & 13 deletions include/sparrow_ipc/arrow_interface/arrow_array.hpp
Original file line number Diff line number Diff line change
@@ -1,34 +1,86 @@

#pragma once

#include <vector>
#include <utility>

#include <sparrow/c_interface.hpp>
#include <sparrow/utils/contracts.hpp>

#include "sparrow_ipc/config/config.hpp"
#include "sparrow_ipc/arrow_interface/arrow_array/private_data.hpp"

namespace sparrow_ipc
{
[[nodiscard]] SPARROW_IPC_API ArrowArray make_non_owning_arrow_array(
SPARROW_IPC_API void release_arrow_array_children_and_dictionary(ArrowArray* array);

template <ArrowPrivateData T>
void arrow_array_release(ArrowArray* array)
{
SPARROW_ASSERT_TRUE(array != nullptr)
SPARROW_ASSERT_TRUE(array->release == std::addressof(arrow_array_release<T>))

SPARROW_ASSERT_TRUE(array->private_data != nullptr);

delete static_cast<T*>(array->private_data);
array->private_data = nullptr;
array->buffers = nullptr; // The buffers were deleted with the private data

release_arrow_array_children_and_dictionary(array);
array->release = nullptr;
}

template <ArrowPrivateData T, typename Arg>
void fill_arrow_array(
ArrowArray& array,
int64_t length,
int64_t null_count,
int64_t offset,
std::vector<std::uint8_t*>&& buffers,
size_t children_count,
ArrowArray** children,
ArrowArray* dictionary
);
ArrowArray* dictionary,
Arg&& private_data_arg
)
{
SPARROW_ASSERT_TRUE(length >= 0);
SPARROW_ASSERT_TRUE(null_count >= -1);
SPARROW_ASSERT_TRUE(offset >= 0);

SPARROW_IPC_API void release_non_owning_arrow_array(ArrowArray* array);
array.length = length;
array.null_count = null_count;
array.offset = offset;
array.n_children = static_cast<int64_t>(children_count);
array.children = children;
array.dictionary = dictionary;

SPARROW_IPC_API void fill_non_owning_arrow_array(
ArrowArray& array,
auto private_data = new T(std::forward<Arg>(private_data_arg));
array.private_data = private_data;
array.n_buffers = private_data->n_buffers();
array.buffers = private_data->buffers_ptrs();

array.release = &arrow_array_release<T>;
}

template <ArrowPrivateData T, typename Arg>
[[nodiscard]] ArrowArray make_arrow_array(
int64_t length,
int64_t null_count,
int64_t offset,
std::vector<std::uint8_t*>&& buffers,
size_t children_count,
ArrowArray** children,
ArrowArray* dictionary
);
}
ArrowArray* dictionary,
Arg&& private_data_arg
)
{
ArrowArray array{};
fill_arrow_array<T>(
array,
length,
null_count,
offset,
children_count,
children,
dictionary,
std::forward<Arg>(private_data_arg)
);
return array;
}
}
53 changes: 49 additions & 4 deletions include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp
Original file line number Diff line number Diff line change
@@ -1,25 +1,70 @@
#pragma once

#include <concepts>
#include <cstdint>
#include <vector>

#include "sparrow_ipc/config/config.hpp"

namespace sparrow_ipc
{
template <typename T>
concept ArrowPrivateData = requires(T& t)
{
{ t.buffers_ptrs() } -> std::same_as<const void**>;
{ t.n_buffers() } -> std::convertible_to<std::size_t>;
};

class owning_arrow_array_private_data
{
public:

explicit owning_arrow_array_private_data(std::vector<std::vector<std::uint8_t>>&& buffers)
: m_buffers(std::move(buffers))
{
m_buffer_pointers.reserve(m_buffers.size());
for (const auto& buffer : m_buffers)
{
m_buffer_pointers.push_back(buffer.data());
}
}

[[nodiscard]] SPARROW_IPC_API const void** buffers_ptrs() noexcept
{
return m_buffer_pointers.data();
}

[[nodiscard]] SPARROW_IPC_API std::size_t n_buffers() const noexcept
{
return m_buffers.size();
}

private:

std::vector<std::vector<std::uint8_t>> m_buffers;
std::vector<const void*> m_buffer_pointers;
};

class non_owning_arrow_array_private_data
{
public:

explicit constexpr non_owning_arrow_array_private_data(std::vector<std::uint8_t*>&& buffers_pointers)
: m_buffers_pointers(std::move(buffers_pointers))
: m_buffer_pointers(std::move(buffers_pointers))
{
}

[[nodiscard]] SPARROW_IPC_API const void** buffers_ptrs() noexcept
{
return const_cast<const void**>(reinterpret_cast<void**>(m_buffer_pointers.data()));
}

[[nodiscard]] SPARROW_IPC_API const void** buffers_ptrs() noexcept;
[[nodiscard]] SPARROW_IPC_API std::size_t n_buffers() const noexcept
{
return m_buffer_pointers.size();
}

private:

std::vector<std::uint8_t*> m_buffers_pointers;
std::vector<std::uint8_t*> m_buffer_pointers;
};
}
Loading
Loading