Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lonboard/_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from lonboard._geoarrow.ops.coord_layout import make_geometry_interleaved
from lonboard._geoarrow.parse_wkb import parse_serialized_table
from lonboard._geoarrow.row_index import add_positional_row_index
from lonboard._serialization import infer_rows_per_chunk
from lonboard._serialization.table.util import infer_rows_per_chunk
from lonboard._utils import auto_downcast as _auto_downcast
from lonboard._utils import get_geometry_column_index, remove_extension_kwargs
from lonboard.traits import (
Expand Down
205 changes: 0 additions & 205 deletions lonboard/_serialization.py

This file was deleted.

58 changes: 58 additions & 0 deletions lonboard/_serialization/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from lonboard import config
from lonboard._serialization.table import (
ArrowSerialization,
IPCSerialization,
ParquetSerialization,
)

if TYPE_CHECKING:
from arro3.core import ChunkedArray, Table

from lonboard._layer import BaseArrowLayer
from lonboard.experimental._layer import TripsLayer


def _choose_serialization() -> ArrowSerialization:
"""Handle choice of serialization method.

NOTE: we handle this choice **inside of `serialize_` functions** so that the choice
can be changed at runtime. We don't want a specific serialization class to be
attached to layer instances, because then it wouldn't update if the config changes.
"""
if config.USE_PARQUET:
return ParquetSerialization()

return IPCSerialization()


def serialize_accessor(
data: str | float | list | tuple | bytes | ChunkedArray,
obj: BaseArrowLayer,
) -> str | int | float | list | tuple | bytes | list[bytes]:
"""Serialize an Arrow Array or Column from a widget."""
return _choose_serialization().serialize_accessor(data, obj)


def serialize_timestamps(
timestamps: ChunkedArray,
obj: TripsLayer,
) -> list[bytes]:
"""Serialize timestamps for TripsLayer."""
return _choose_serialization().serialize_timestamps(timestamps, obj)


def serialize_table(
table: Table,
obj: BaseArrowLayer,
) -> list[bytes]:
"""Serialize an Arrow Table from a widget."""
return _choose_serialization().serialize_table(table, obj)


ACCESSOR_SERIALIZATION = {"to_json": serialize_accessor}
TIMESTAMP_SERIALIZATION = {"to_json": serialize_timestamps}
TABLE_SERIALIZATION = {"to_json": serialize_table}
60 changes: 60 additions & 0 deletions lonboard/_serialization/table/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from lonboard._serialization.table.arro3 import (
Arro3IPCSerialization,
Arro3ParquetSerialization,
)
from lonboard._serialization.table.base import ArrowSerialization
from lonboard._serialization.table.pyarrow import (
PyArrowIPCSerialization,
PyArrowParquetSerialization,
)

if TYPE_CHECKING:
from arro3.core import RecordBatch


class ParquetSerialization(ArrowSerialization):
"""Serialize Arrow Tables and Arrays to Parquet.

Uses `pyarrow` if installed, otherwise falls back to `arro3.io`.
"""

_impl: ArrowSerialization

def __init__(self) -> None:
try:
import pyarrow.parquet
except ImportError:
self._impl = Arro3ParquetSerialization()
else:
self._impl = PyArrowParquetSerialization()

super().__init__()

def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes:
return self._impl._serialize_arrow_batch(record_batch) # noqa SLF001


class IPCSerialization(ArrowSerialization):
"""Serialize Arrow Tables and Arrays to Arrow IPC.

Uses `pyarrow` if installed, otherwise falls back to `arro3.io`.
"""

_impl: ArrowSerialization

def __init__(self) -> None:
try:
import pyarrow as pa
except ImportError:
self._impl = Arro3IPCSerialization()
else:
self._impl = PyArrowIPCSerialization()

super().__init__()

def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes:
return self._impl._serialize_arrow_batch(record_batch) # noqa SLF001
49 changes: 49 additions & 0 deletions lonboard/_serialization/table/arro3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from __future__ import annotations

from io import BytesIO
from typing import TYPE_CHECKING

from .base import ArrowSerialization
from .config import DEFAULT_PARQUET_COMPRESSION, DEFAULT_PARQUET_COMPRESSION_LEVEL

if TYPE_CHECKING:
from arro3.core import RecordBatch


class Arro3ParquetSerialization(ArrowSerialization):
"""Serialize Arrow Tables and Arrays to Parquet using arro3."""

def __init__(self) -> None:
super().__init__()

def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes:
from arro3.io import write_parquet

compression_string = (
f"{DEFAULT_PARQUET_COMPRESSION}({DEFAULT_PARQUET_COMPRESSION_LEVEL})"
)
bio = BytesIO()
write_parquet(
record_batch,
bio,
compression=compression_string,
max_row_group_size=record_batch.num_rows,
)

return bio.getvalue()


class Arro3IPCSerialization(ArrowSerialization):
"""Serialize Arrow Tables and Arrays to Arrow IPC using arro3."""

def __init__(self) -> None:
super().__init__()

def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes:
"""Write a single RecordBatch to an Arrow IPC stream in memory and return the bytes."""
from arro3.io import write_ipc_stream

bio = BytesIO()
write_ipc_stream(record_batch, bio, compression=None)

return bio.getvalue()
Loading
Loading