diff --git a/lonboard/_layer.py b/lonboard/_layer.py index cc3453c2..32114914 100644 --- a/lonboard/_layer.py +++ b/lonboard/_layer.py @@ -31,7 +31,7 @@ from lonboard._geoarrow.ops.coord_layout import make_geometry_interleaved from lonboard._geoarrow.parse_wkb import parse_serialized_table from lonboard._geoarrow.row_index import add_positional_row_index -from lonboard._serialization import infer_rows_per_chunk +from lonboard._serialization.table.util import infer_rows_per_chunk from lonboard._utils import auto_downcast as _auto_downcast from lonboard._utils import get_geometry_column_index, remove_extension_kwargs from lonboard.traits import ( diff --git a/lonboard/_serialization.py b/lonboard/_serialization.py deleted file mode 100644 index 04a3fd10..00000000 --- a/lonboard/_serialization.py +++ /dev/null @@ -1,205 +0,0 @@ -from __future__ import annotations - -import math -from concurrent.futures import ThreadPoolExecutor -from io import BytesIO -from typing import TYPE_CHECKING, Any, overload - -import arro3.compute as ac -from arro3.core import ( - Array, - ChunkedArray, - DataType, - RecordBatch, - Scalar, - Table, - list_array, - list_flatten, - list_offsets, -) -from traitlets import TraitError - -from lonboard._utils import timestamp_start_offset - -if TYPE_CHECKING: - from lonboard._layer import BaseArrowLayer - from lonboard.experimental._layer import TripsLayer - from lonboard.models import ViewState - - -DEFAULT_PARQUET_COMPRESSION = "ZSTD" -DEFAULT_PARQUET_COMPRESSION_LEVEL = 7 -DEFAULT_PARQUET_CHUNK_SIZE = 2**16 -# Target chunk size for Arrow (uncompressed) per Parquet chunk -DEFAULT_ARROW_CHUNK_BYTES_SIZE = 5 * 1024 * 1024 # 5MB - -# Maximum number of separate chunks/row groups to allow splitting an input layer into -# Deck.gl can pick from a maximum of 256 layers, and a user could have many layers, so -# we don't want to use too many layers per data file. -DEFAULT_MAX_NUM_CHUNKS = 32 - - -def write_parquet_batch(record_batch: RecordBatch) -> bytes: - """Write a RecordBatch to a Parquet file. - - We still use pyarrow.parquet.ParquetWriter if pyarrow is installed because pyarrow - has better encoding defaults. So Parquet files written by pyarrow are smaller by - default than files written by arro3.io.write_parquet. - """ - # Occasionally it's possible for there to be empty batches in the - # pyarrow table. This will error when writing to parquet. We want to - # give a more informative error. - if record_batch.num_rows == 0: - raise ValueError("Batch with 0 rows.") - - try: - import pyarrow as pa - import pyarrow.parquet as pq - - bio = BytesIO() - with pq.ParquetWriter( - bio, - schema=pa.schema(record_batch.schema), - compression=DEFAULT_PARQUET_COMPRESSION, - compression_level=DEFAULT_PARQUET_COMPRESSION_LEVEL, - ) as writer: - writer.write_batch( - pa.record_batch(record_batch), - row_group_size=record_batch.num_rows, - ) - - return bio.getvalue() - - except ImportError: - from arro3.io import write_parquet - - compression_string = ( - f"{DEFAULT_PARQUET_COMPRESSION}({DEFAULT_PARQUET_COMPRESSION_LEVEL})" - ) - bio = BytesIO() - write_parquet( - record_batch, - bio, - compression=compression_string, - max_row_group_size=record_batch.num_rows, - ) - - return bio.getvalue() - - -def serialize_table_to_parquet(table: Table, *, max_chunksize: int) -> list[bytes]: - assert max_chunksize > 0 - - with ThreadPoolExecutor() as executor: - return list( - executor.map( - write_parquet_batch, - table.rechunk(max_chunksize=max_chunksize).to_batches(), - ), - ) - - -def serialize_pyarrow_column( - data: Array | ChunkedArray, - *, - max_chunksize: int, -) -> list[bytes]: - """Serialize a pyarrow column to a Parquet file with one column.""" - pyarrow_table = Table.from_pydict({"value": data}) - return serialize_table_to_parquet(pyarrow_table, max_chunksize=max_chunksize) - - -@overload -def serialize_accessor( - data: ChunkedArray, - obj: BaseArrowLayer, -) -> list[bytes]: ... -@overload -def serialize_accessor( - data: str | float | list | tuple | bytes, - obj: BaseArrowLayer, -) -> str | int | float | list | tuple | bytes: ... -def serialize_accessor( - data: str | float | list | tuple | bytes | ChunkedArray, - obj: BaseArrowLayer, -): - if data is None: - return None - - # We assume data has already been validated to the right type for this accessor - # Allow any json-serializable type through - if isinstance(data, (str, int, float, list, tuple, bytes)): - return data - - assert isinstance(data, ChunkedArray) - validate_accessor_length_matches_table(data, obj.table) - return serialize_pyarrow_column(data, max_chunksize=obj._rows_per_chunk) # noqa: SLF001 - - -def serialize_table(data: Table, obj: BaseArrowLayer) -> list[bytes]: - assert isinstance(data, Table), "expected Arrow table" - return serialize_table_to_parquet(data, max_chunksize=obj._rows_per_chunk) # noqa: SLF001 - - -def infer_rows_per_chunk(table: Table) -> int: - # At least one chunk - num_chunks = max(round(table.nbytes / DEFAULT_ARROW_CHUNK_BYTES_SIZE), 1) - - # Clamp to the maximum number of chunks - num_chunks = min(num_chunks, DEFAULT_MAX_NUM_CHUNKS) - - return math.ceil(table.num_rows / num_chunks) - - -def validate_accessor_length_matches_table( - accessor: Array | ChunkedArray, - table: Table, -) -> None: - if len(accessor) != len(table): - raise TraitError("accessor must have same length as table") - - -def serialize_view_state(data: ViewState | None, obj: Any) -> None | dict[str, Any]: # noqa: ARG001 - if data is None: - return None - - return data._asdict() - - -def serialize_timestamp_accessor( - timestamps: ChunkedArray, - obj: TripsLayer, -) -> list[bytes]: - """Subtract off min timestamp to fit into f32 integer range. - - Then cast to float32. - """ - # Note: this has some overlap with `timestamp_max_physical_value` in utils. - # Cast to int64 type - timestamps = timestamps.cast(DataType.list(DataType.int64())) - - start_offset_adjustment = Scalar( - timestamp_start_offset(timestamps), - type=DataType.int64(), - ) - - list_offsets_iter = list_offsets(timestamps) - inner_values_iter = list_flatten(timestamps) - - offsetted_chunks = [] - for offsets, inner_values in zip( - list_offsets_iter, - inner_values_iter, - strict=True, - ): - offsetted_values = ac.add(inner_values, start_offset_adjustment) - f32_values = offsetted_values.cast(DataType.int64()).cast(DataType.float32()) - offsetted_chunks.append(list_array(offsets, f32_values)) - - f32_timestamps_col = ChunkedArray(offsetted_chunks) - return serialize_accessor(f32_timestamps_col, obj) - - -ACCESSOR_SERIALIZATION = {"to_json": serialize_accessor} -TIMESTAMP_ACCESSOR_SERIALIZATION = {"to_json": serialize_timestamp_accessor} -TABLE_SERIALIZATION = {"to_json": serialize_table} diff --git a/lonboard/_serialization/__init__.py b/lonboard/_serialization/__init__.py new file mode 100644 index 00000000..3ebce342 --- /dev/null +++ b/lonboard/_serialization/__init__.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from lonboard import config +from lonboard._serialization.table import ( + ArrowSerialization, + IPCSerialization, + ParquetSerialization, +) + +if TYPE_CHECKING: + from arro3.core import ChunkedArray, Table + + from lonboard._layer import BaseArrowLayer + from lonboard.experimental._layer import TripsLayer + + +def _choose_serialization() -> ArrowSerialization: + """Handle choice of serialization method. + + NOTE: we handle this choice **inside of `serialize_` functions** so that the choice + can be changed at runtime. We don't want a specific serialization class to be + attached to layer instances, because then it wouldn't update if the config changes. + """ + if config.USE_PARQUET: + return ParquetSerialization() + + return IPCSerialization() + + +def serialize_accessor( + data: str | float | list | tuple | bytes | ChunkedArray, + obj: BaseArrowLayer, +) -> str | int | float | list | tuple | bytes | list[bytes]: + """Serialize an Arrow Array or Column from a widget.""" + return _choose_serialization().serialize_accessor(data, obj) + + +def serialize_timestamps( + timestamps: ChunkedArray, + obj: TripsLayer, +) -> list[bytes]: + """Serialize timestamps for TripsLayer.""" + return _choose_serialization().serialize_timestamps(timestamps, obj) + + +def serialize_table( + table: Table, + obj: BaseArrowLayer, +) -> list[bytes]: + """Serialize an Arrow Table from a widget.""" + return _choose_serialization().serialize_table(table, obj) + + +ACCESSOR_SERIALIZATION = {"to_json": serialize_accessor} +TIMESTAMP_SERIALIZATION = {"to_json": serialize_timestamps} +TABLE_SERIALIZATION = {"to_json": serialize_table} diff --git a/lonboard/_serialization/table/__init__.py b/lonboard/_serialization/table/__init__.py new file mode 100644 index 00000000..7998e493 --- /dev/null +++ b/lonboard/_serialization/table/__init__.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from lonboard._serialization.table.arro3 import ( + Arro3IPCSerialization, + Arro3ParquetSerialization, +) +from lonboard._serialization.table.base import ArrowSerialization +from lonboard._serialization.table.pyarrow import ( + PyArrowIPCSerialization, + PyArrowParquetSerialization, +) + +if TYPE_CHECKING: + from arro3.core import RecordBatch + + +class ParquetSerialization(ArrowSerialization): + """Serialize Arrow Tables and Arrays to Parquet. + + Uses `pyarrow` if installed, otherwise falls back to `arro3.io`. + """ + + _impl: ArrowSerialization + + def __init__(self) -> None: + try: + import pyarrow.parquet + except ImportError: + self._impl = Arro3ParquetSerialization() + else: + self._impl = PyArrowParquetSerialization() + + super().__init__() + + def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes: + return self._impl._serialize_arrow_batch(record_batch) # noqa SLF001 + + +class IPCSerialization(ArrowSerialization): + """Serialize Arrow Tables and Arrays to Arrow IPC. + + Uses `pyarrow` if installed, otherwise falls back to `arro3.io`. + """ + + _impl: ArrowSerialization + + def __init__(self) -> None: + try: + import pyarrow as pa + except ImportError: + self._impl = Arro3IPCSerialization() + else: + self._impl = PyArrowIPCSerialization() + + super().__init__() + + def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes: + return self._impl._serialize_arrow_batch(record_batch) # noqa SLF001 diff --git a/lonboard/_serialization/table/arro3.py b/lonboard/_serialization/table/arro3.py new file mode 100644 index 00000000..d0ac37ff --- /dev/null +++ b/lonboard/_serialization/table/arro3.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +from io import BytesIO +from typing import TYPE_CHECKING + +from .base import ArrowSerialization +from .config import DEFAULT_PARQUET_COMPRESSION, DEFAULT_PARQUET_COMPRESSION_LEVEL + +if TYPE_CHECKING: + from arro3.core import RecordBatch + + +class Arro3ParquetSerialization(ArrowSerialization): + """Serialize Arrow Tables and Arrays to Parquet using arro3.""" + + def __init__(self) -> None: + super().__init__() + + def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes: + from arro3.io import write_parquet + + compression_string = ( + f"{DEFAULT_PARQUET_COMPRESSION}({DEFAULT_PARQUET_COMPRESSION_LEVEL})" + ) + bio = BytesIO() + write_parquet( + record_batch, + bio, + compression=compression_string, + max_row_group_size=record_batch.num_rows, + ) + + return bio.getvalue() + + +class Arro3IPCSerialization(ArrowSerialization): + """Serialize Arrow Tables and Arrays to Arrow IPC using arro3.""" + + def __init__(self) -> None: + super().__init__() + + def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes: + """Write a single RecordBatch to an Arrow IPC stream in memory and return the bytes.""" + from arro3.io import write_ipc_stream + + bio = BytesIO() + write_ipc_stream(record_batch, bio, compression=None) + + return bio.getvalue() diff --git a/lonboard/_serialization/table/base.py b/lonboard/_serialization/table/base.py new file mode 100644 index 00000000..17e005ce --- /dev/null +++ b/lonboard/_serialization/table/base.py @@ -0,0 +1,148 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from concurrent.futures import ThreadPoolExecutor +from typing import TYPE_CHECKING, overload + +import arro3.compute as ac +from arro3.core import ( + Array, + ChunkedArray, + DataType, + RecordBatch, + Scalar, + Table, + list_array, + list_flatten, + list_offsets, +) +from traitlets import TraitError + +from lonboard._utils import timestamp_start_offset + +if TYPE_CHECKING: + from arro3.core import Array, RecordBatch + + from lonboard._layer import BaseArrowLayer + from lonboard.experimental._layer import TripsLayer + + +class ArrowSerialization(ABC): + """Base class for serializing Arrow Tables and Arrays. + + Ipywidgets does not easily support streaming of data, and the transport can choke on + large single buffers. Therefore, we split a table into multiple RecordBatches and + serialize them individually. Then we send a list of buffers to the frontend. + """ + + @abstractmethod + def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes: + """Serialize one Arrow RecordBatch to a buffer.""" + + def _serialize_arrow_table( + self, + table: Table, + *, + max_chunksize: int, + ) -> list[bytes]: + assert max_chunksize > 0 + + batches = table.rechunk(max_chunksize=max_chunksize).to_batches() + if any(batch.num_rows == 0 for batch in batches): + raise ValueError("Batch with 0 rows.") + + with ThreadPoolExecutor() as executor: + return list(executor.map(self._serialize_arrow_batch, batches)) + + def _serialize_arrow_column( + self, + array: Array | ChunkedArray, + *, + max_chunksize: int, + ) -> list[bytes]: + """Serialize an Arrow Array or Column as a table with one column named "value".""" + pyarrow_table = Table.from_pydict({"value": array}) + return self._serialize_arrow_table(pyarrow_table, max_chunksize=max_chunksize) + + def serialize_table( + self, + table: Table, + obj: BaseArrowLayer, + ) -> list[bytes]: + """Serialize an Arrow Table from a widget.""" + assert isinstance(table, Table), "expected Arrow table" + return self._serialize_arrow_table(table, max_chunksize=obj._rows_per_chunk) # noqa: SLF001 + + @overload + def serialize_accessor( + self, + data: ChunkedArray, + obj: BaseArrowLayer, + ) -> list[bytes]: ... + @overload + def serialize_accessor( + self, + data: str | float | list | tuple | bytes, + obj: BaseArrowLayer, + ) -> str | int | float | list | tuple | bytes: ... + def serialize_accessor( + self, + data: str | float | list | tuple | bytes | ChunkedArray, + obj: BaseArrowLayer, + ): + """Serialize an Arrow Array or Column from a widget.""" + if data is None: + return None + + # We assume data has already been validated to the right type for this accessor + # Allow any json-serializable type through + if isinstance(data, (str, int, float, list, tuple, bytes)): + return data + + assert isinstance(data, ChunkedArray) + validate_accessor_length_matches_table(data, obj.table) + return self._serialize_arrow_column(data, max_chunksize=obj._rows_per_chunk) # noqa: SLF001 + + def serialize_timestamps( + self, + timestamps: ChunkedArray, + obj: TripsLayer, + ) -> list[bytes]: + """Serialize timestamps for TripsLayer. + + Subtract off min timestamp to fit into f32 integer range. Then cast to float32. + """ + # Note: this has some overlap with `timestamp_max_physical_value` in utils. + # Cast to int64 type + timestamps = timestamps.cast(DataType.list(DataType.int64())) + + start_offset_adjustment = Scalar( + timestamp_start_offset(timestamps), + type=DataType.int64(), + ) + + list_offsets_iter = list_offsets(timestamps) + inner_values_iter = list_flatten(timestamps) + + offsetted_chunks = [] + for offsets, inner_values in zip( + list_offsets_iter, + inner_values_iter, + strict=True, + ): + offsetted_values = ac.add(inner_values, start_offset_adjustment) + f32_values = offsetted_values.cast(DataType.int64()).cast( + DataType.float32(), + ) + offsetted_chunks.append(list_array(offsets, f32_values)) + + f32_timestamps_col = ChunkedArray(offsetted_chunks) + return self.serialize_accessor(f32_timestamps_col, obj) + + +def validate_accessor_length_matches_table( + accessor: Array | ChunkedArray, + table: Table, +) -> None: + if len(accessor) != len(table): + raise TraitError("accessor must have same length as table") diff --git a/lonboard/_serialization/table/config.py b/lonboard/_serialization/table/config.py new file mode 100644 index 00000000..b2ecf9e5 --- /dev/null +++ b/lonboard/_serialization/table/config.py @@ -0,0 +1,2 @@ +DEFAULT_PARQUET_COMPRESSION = "ZSTD" +DEFAULT_PARQUET_COMPRESSION_LEVEL = 7 diff --git a/lonboard/_serialization/table/pyarrow.py b/lonboard/_serialization/table/pyarrow.py new file mode 100644 index 00000000..587b8d19 --- /dev/null +++ b/lonboard/_serialization/table/pyarrow.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +from io import BytesIO +from typing import TYPE_CHECKING + +from .base import ArrowSerialization +from .config import DEFAULT_PARQUET_COMPRESSION, DEFAULT_PARQUET_COMPRESSION_LEVEL + +if TYPE_CHECKING: + from arro3.core import RecordBatch + + +class PyArrowParquetSerialization(ArrowSerialization): + """Serialize Arrow Tables and Arrays to Parquet using pyarrow.""" + + def __init__(self) -> None: + # Validate that pyarrow is installed + import pyarrow.parquet # noqa: F401 + + super().__init__() + + def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes: + import pyarrow as pa + import pyarrow.parquet as pq + + bio = BytesIO() + with pq.ParquetWriter( + bio, + schema=pa.schema(record_batch.schema), + compression=DEFAULT_PARQUET_COMPRESSION, + compression_level=DEFAULT_PARQUET_COMPRESSION_LEVEL, + ) as writer: + writer.write_batch( + pa.record_batch(record_batch), + row_group_size=record_batch.num_rows, + ) + + return bio.getvalue() + + +class PyArrowIPCSerialization(ArrowSerialization): + """Serialize Arrow Tables and Arrays to Arrow IPC using pyarrow.""" + + def __init__(self) -> None: + # Validate that pyarrow is installed + import pyarrow as pa # noqa: F401 + + super().__init__() + + def _serialize_arrow_batch(self, record_batch: RecordBatch) -> bytes: + """Write a single RecordBatch to an Arrow IPC stream in memory and return the bytes.""" + import pyarrow as pa + + bio = BytesIO() + with pa.ipc.new_stream( + bio, + schema=pa.schema(record_batch.schema), + options=pa.ipc.IpcWriteOptions(compression=None), + ) as writer: + writer.write_batch(pa.record_batch(record_batch)) + + return bio.getvalue() diff --git a/lonboard/_serialization/table/util.py b/lonboard/_serialization/table/util.py new file mode 100644 index 00000000..e8266439 --- /dev/null +++ b/lonboard/_serialization/table/util.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +import math +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from arro3.core import Table + +# Target chunk size for Arrow (uncompressed) per Parquet chunk +DEFAULT_ARROW_CHUNK_BYTES_SIZE = 5 * 1024 * 1024 # 5MB + +# Maximum number of separate chunks/row groups to allow splitting an input layer into +# Deck.gl can pick from a maximum of 256 layers, and a user could have many layers, so +# we don't want to use too many layers per data file. +DEFAULT_MAX_NUM_CHUNKS = 32 + + +def infer_rows_per_chunk(table: Table) -> int: + # At least one chunk + num_chunks = max(round(table.nbytes / DEFAULT_ARROW_CHUNK_BYTES_SIZE), 1) + + # Clamp to the maximum number of chunks + num_chunks = min(num_chunks, DEFAULT_MAX_NUM_CHUNKS) + + return math.ceil(table.num_rows / num_chunks) diff --git a/lonboard/_serialization/view_state.py b/lonboard/_serialization/view_state.py new file mode 100644 index 00000000..7e1193b0 --- /dev/null +++ b/lonboard/_serialization/view_state.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from lonboard.models import ViewState + + +def serialize_view_state(data: ViewState | None, obj: Any) -> None | dict[str, Any]: # noqa: ARG001 + """Serialize ViewState for the frontend.""" + if data is None: + return None + + return data._asdict() diff --git a/lonboard/config.py b/lonboard/config.py new file mode 100644 index 00000000..5f539353 --- /dev/null +++ b/lonboard/config.py @@ -0,0 +1,3 @@ +# TODO: real config system + +USE_PARQUET: bool = False diff --git a/lonboard/experimental/traits.py b/lonboard/experimental/traits.py index 526340e1..f97dc085 100644 --- a/lonboard/experimental/traits.py +++ b/lonboard/experimental/traits.py @@ -19,7 +19,7 @@ ) from lonboard._constants import MAX_INTEGER_FLOAT32, MIN_INTEGER_FLOAT32 -from lonboard._serialization import TIMESTAMP_ACCESSOR_SERIALIZATION +from lonboard._serialization import TIMESTAMP_SERIALIZATION from lonboard._utils import get_geometry_column_index from lonboard.traits import FixedErrorTraitType @@ -56,7 +56,7 @@ def __init__( **kwargs: Any, ) -> None: super().__init__(*args, **kwargs) - self.tag(sync=True, **TIMESTAMP_ACCESSOR_SERIALIZATION) + self.tag(sync=True, **TIMESTAMP_SERIALIZATION) def _reduce_precision( self, diff --git a/lonboard/traits.py b/lonboard/traits.py index a0a61862..8d6303af 100644 --- a/lonboard/traits.py +++ b/lonboard/traits.py @@ -31,11 +31,8 @@ from lonboard._environment import DEFAULT_HEIGHT from lonboard._geoarrow.box_to_polygon import parse_box_encoded_table from lonboard._geoarrow.ops.coord_layout import convert_struct_column_to_interleaved -from lonboard._serialization import ( - ACCESSOR_SERIALIZATION, - TABLE_SERIALIZATION, - serialize_view_state, -) +from lonboard._serialization import ACCESSOR_SERIALIZATION, TABLE_SERIALIZATION +from lonboard._serialization.view_state import serialize_view_state from lonboard._utils import get_geometry_column_index from lonboard._vendor.matplotlib.colors import _to_rgba_no_colorcycle from lonboard.models import ViewState diff --git a/package-lock.json b/package-lock.json index 268c6264..033b6094 100644 --- a/package-lock.json +++ b/package-lock.json @@ -21,6 +21,7 @@ "framer-motion": "^12.23.19", "lodash.debounce": "^4.0.8", "lodash.throttle": "^4.1.1", + "lz4js": "^0.2.0", "maplibre-gl": "^5.9.0", "memoize-one": "^6.0.0", "parquet-wasm": "0.7.1", @@ -39,6 +40,7 @@ "@types/lodash": "^4.17.13", "@types/lodash.debounce": "^4.0.9", "@types/lodash.throttle": "^4.1.9", + "@types/lz4js": "^0.2.1", "@types/react": "^19.1.1", "@types/uuid": "^10.0.0", "autoprefixer": "^10.4.20", @@ -7557,6 +7559,13 @@ "@types/lodash": "*" } }, + "node_modules/@types/lz4js": { + "version": "0.2.1", + "resolved": "https://registry.npmjs.org/@types/lz4js/-/lz4js-0.2.1.tgz", + "integrity": "sha512-aAnbA4uKPNqZqu0XK1QAwKP0Wskb4Oa7ZFqxW5CMIyGgqYQKFgBxTfK3m3KODXoOLv5t14VregzgrEak13uGQA==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/node": { "version": "24.7.2", "resolved": "https://registry.npmjs.org/@types/node/-/node-24.7.2.tgz", @@ -11752,8 +11761,7 @@ "version": "0.2.0", "resolved": "https://registry.npmjs.org/lz4js/-/lz4js-0.2.0.tgz", "integrity": "sha512-gY2Ia9Lm7Ep8qMiuGRhvUq0Q7qUereeldZPP1PMEJxPtEWHJLqw9pgX68oHajBH0nzJK4MaZEA/YNV3jT8u8Bg==", - "license": "ISC", - "optional": true + "license": "ISC" }, "node_modules/lzo-wasm": { "version": "0.0.4", @@ -15144,7 +15152,6 @@ "resolved": "https://registry.npmjs.org/tailwindcss/-/tailwindcss-3.4.18.tgz", "integrity": "sha512-6A2rnmW5xZMdw11LYjhcI5846rt9pbLSabY5XPxo+XWdxwZaFEn47Go4NzFiHu9sNNmr/kXivP1vStfvMaK1GQ==", "license": "MIT", - "peer": true, "dependencies": { "@alloc/quick-lru": "^5.2.0", "arg": "^5.0.2", @@ -15767,6 +15774,7 @@ "integrity": "sha512-j3lYzGC3P+B5Yfy/pfKNgVEg4+UtcIJcVRt2cDjIOmhLourAqPqf8P7acgxeiSgUB7E3p2P8/3gNIgDLpwzs4g==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "esbuild": "^0.21.3", "postcss": "^8.4.43", diff --git a/package.json b/package.json index 6f38c717..ca5fd6bb 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "framer-motion": "^12.23.19", "lodash.debounce": "^4.0.8", "lodash.throttle": "^4.1.1", + "lz4js": "^0.2.0", "maplibre-gl": "^5.9.0", "memoize-one": "^6.0.0", "parquet-wasm": "0.7.1", @@ -38,6 +39,7 @@ "@types/lodash": "^4.17.13", "@types/lodash.debounce": "^4.0.9", "@types/lodash.throttle": "^4.1.9", + "@types/lz4js": "^0.2.1", "@types/react": "^19.1.1", "@types/uuid": "^10.0.0", "autoprefixer": "^10.4.20", diff --git a/src/accessor.ts b/src/accessor.ts index 402291f9..5ebe8f52 100644 --- a/src/accessor.ts +++ b/src/accessor.ts @@ -1,61 +1,13 @@ import * as arrow from "apache-arrow"; -import { useState, useEffect } from "react"; -import { parseParquetBuffers } from "./parquet.js"; +import { deserializeArrowTable } from "./serialization/index.js"; type AccessorRaw = DataView[] | unknown; -export function useTableBufferState( - wasmReady: boolean, - dataRaw: DataView[], -): [arrow.Table | null] { - const [dataTable, setDataTable] = useState(null); - // Only parse the parquet buffer when the data itself or wasmReady has changed - useEffect(() => { - const callback = () => { - if (wasmReady && dataRaw && dataRaw.length > 0) { - console.log( - `table byte lengths: ${dataRaw.map( - (dataView) => dataView.byteLength, - )}`, - ); - - setDataTable(parseParquetBuffers(dataRaw)); - } - }; - callback(); - }, [wasmReady, dataRaw]); - - return [dataTable]; -} - -export function useAccessorState( - wasmReady: boolean, - accessorRaw: AccessorRaw, -): [arrow.Vector | null] { - const [accessorValue, setAccessorValue] = useState(null); - - // Only parse the parquet buffer when the data itself or wasmReady has changed - useEffect(() => { - const callback = () => { - setAccessorValue( - accessorRaw instanceof Array && accessorRaw?.[0] instanceof DataView - ? wasmReady && accessorRaw?.[0].byteLength > 0 - ? parseParquetBuffers(accessorRaw).getChildAt(0) - : null - : (accessorRaw as arrow.Vector | null), - ); - }; - callback(); - }, [wasmReady, accessorRaw]); - - return [accessorValue]; -} - export function parseAccessor(accessorRaw: AccessorRaw): arrow.Vector | null { return accessorRaw instanceof Array && accessorRaw?.[0] instanceof DataView ? accessorRaw?.[0].byteLength > 0 - ? parseParquetBuffers(accessorRaw).getChildAt(0) + ? deserializeArrowTable(accessorRaw).getChildAt(0) : null : (accessorRaw as arrow.Vector | null); } diff --git a/src/index.tsx b/src/index.tsx index 3fa03e6d..42be11dd 100644 --- a/src/index.tsx +++ b/src/index.tsx @@ -15,7 +15,6 @@ import { type BaseLayerModel, initializeChildModels, } from "./model/index.js"; -import { initParquetWasm } from "./parquet.js"; import DeckFirstRenderer from "./renderers/deck-first.js"; import OverlayRenderer from "./renderers/overlay.js"; import { MapRendererProps } from "./renderers/types.js"; @@ -31,8 +30,6 @@ import * as selectors from "./xstate/selectors"; import "maplibre-gl/dist/maplibre-gl.css"; import "./globals.css"; -await initParquetWasm(); - const DEFAULT_INITIAL_VIEW_STATE = { latitude: 10, longitude: 0, diff --git a/src/model/base.ts b/src/model/base.ts index 0d9faa08..f2dd2161 100644 --- a/src/model/base.ts +++ b/src/model/base.ts @@ -16,11 +16,6 @@ export abstract class BaseModel { this.callbacks = new Map(); this.callbacks.set("change", updateStateCallback); } - - async loadSubModels() { - return; - } - /** * Initialize an attribute that does not need any transformation from its * serialized representation to its deck.gl representation. diff --git a/src/model/extension.ts b/src/model/extension.ts index 7ae75f7b..ede75be1 100644 --- a/src/model/extension.ts +++ b/src/model/extension.ts @@ -258,6 +258,5 @@ export async function initializeExtension( throw new Error(`no known model for extension type ${extensionType}`); } - await extensionModel.loadSubModels(); return extensionModel; } diff --git a/src/model/layer.ts b/src/model/layer.ts index d7110362..3aeb3cef 100644 --- a/src/model/layer.ts +++ b/src/model/layer.ts @@ -27,8 +27,8 @@ import type { import type { WidgetModel } from "@jupyter-widgets/base"; import * as arrow from "apache-arrow"; -import { parseParquetBuffers } from "../parquet.js"; import { BaseLayerModel } from "./base-layer.js"; +import { deserializeArrowTable } from "../serialization/index.js"; import { isDefined } from "../util.js"; import { PointVector, @@ -64,13 +64,13 @@ export abstract class BaseArrowLayerModel extends BaseLayerModel { * @param {string} pythonName Name of attribute on Python model (usually snake-cased) */ initTable(pythonName: string) { - this.table = parseParquetBuffers(this.model.get(pythonName)); + this.table = deserializeArrowTable(this.model.get(pythonName)); // Remove all existing change callbacks for this attribute this.model.off(`change:${pythonName}`); const callback = () => { - this.table = parseParquetBuffers(this.model.get(pythonName)); + this.table = deserializeArrowTable(this.model.get(pythonName)); }; this.model.on(`change:${pythonName}`, callback); diff --git a/src/serialization/index.ts b/src/serialization/index.ts new file mode 100644 index 00000000..d96b2229 --- /dev/null +++ b/src/serialization/index.ts @@ -0,0 +1,34 @@ +import * as arrow from "apache-arrow"; + +import { parseArrowIPC } from "./ipc"; +import { isParquetBuffer, parseParquet } from "./parquet"; + +/** + * Parse a list of buffers containing either Parquet or Arrow IPC chunks into an + * Arrow JS table + * + * Each buffer in the list is expected to be a fully self-contained Parquet file + * or Arrow IPC file/stream that can parse on its own and consists of one arrow + * Record Batch + */ +export function deserializeArrowTable(dataViews: DataView[]): arrow.Table { + const batches: arrow.RecordBatch[] = []; + for (const chunkBuffer of dataViews) { + let table: arrow.Table; + + if (isParquetBuffer(chunkBuffer)) { + table = parseParquet(chunkBuffer); + } else { + // Assume Arrow IPC + table = parseArrowIPC(chunkBuffer); + } + + if (table.batches.length !== 1) { + console.warn(`Expected one batch in table, got ${table.batches.length}`); + } + + batches.push(...table.batches); + } + + return new arrow.Table(batches); +} diff --git a/src/serialization/ipc.ts b/src/serialization/ipc.ts new file mode 100644 index 00000000..10617f75 --- /dev/null +++ b/src/serialization/ipc.ts @@ -0,0 +1,28 @@ +import * as arrow from "apache-arrow"; +import * as lz4 from "lz4js"; + +const lz4Codec: arrow.Codec = { + encode(data: Uint8Array): Uint8Array { + return lz4.compress(data); + }, + decode(data: Uint8Array): Uint8Array { + return lz4.decompress(data); + }, +}; + +let LZ4_CODEC_SET: boolean = false; + +/** + * Parse an Arrow IPC buffer to an Arrow JS table + */ +export function parseArrowIPC(dataView: DataView): arrow.Table { + if (!LZ4_CODEC_SET) { + arrow.compressionRegistry.set(arrow.CompressionType.LZ4_FRAME, lz4Codec); + LZ4_CODEC_SET = true; + } + + console.time("readArrowIPC"); + const arrowTable = arrow.tableFromIPC(dataView); + console.timeEnd("readArrowIPC"); + return arrowTable; +} diff --git a/src/parquet.ts b/src/serialization/parquet.ts similarity index 55% rename from src/parquet.ts rename to src/serialization/parquet.ts index beda1ade..33a1e83a 100644 --- a/src/parquet.ts +++ b/src/serialization/parquet.ts @@ -1,5 +1,6 @@ import * as arrow from "apache-arrow"; import _initParquetWasm, { readParquet } from "parquet-wasm"; +import {} from "parquet-wasm"; // NOTE: this version must be synced exactly with the parquet-wasm version in // use. @@ -7,23 +8,45 @@ const PARQUET_WASM_VERSION = "0.7.1"; const PARQUET_WASM_CDN_URL = `https://cdn.jsdelivr.net/npm/parquet-wasm@${PARQUET_WASM_VERSION}/esm/parquet_wasm_bg.wasm`; let WASM_READY: boolean = false; +// We initiate the fetch immediately (but don't await it) so that it can be +// downloaded in the background on app start +const PARQUET_WASM_FETCH = fetch(PARQUET_WASM_CDN_URL); + +const PARQUET_MAGIC = new TextEncoder().encode("PAR1"); + +/** Initialize the parquet-wasm WASM buffer */ export async function initParquetWasm() { if (WASM_READY) { return; } - await _initParquetWasm(PARQUET_WASM_CDN_URL); + const wasm_buffer = await PARQUET_WASM_FETCH; + await _initParquetWasm(wasm_buffer); WASM_READY = true; + return; +} + +// For now, simplest to just ensure this is called at least once on module load +await initParquetWasm(); + +export function isParquetBuffer(dataView: DataView): boolean { + if (dataView.byteLength < PARQUET_MAGIC.length) { + return false; + } + + for (let i = 0; i < PARQUET_MAGIC.length; i++) { + if (dataView.getUint8(i) !== PARQUET_MAGIC[i]) { + return false; + } + } + + return true; } /** * Parse a Parquet buffer to an Arrow JS table */ export function parseParquet(dataView: DataView): arrow.Table { - if (!WASM_READY) { - throw new Error("wasm not ready"); - } - console.time("readParquet"); // TODO: use arrow-js-ffi for more memory-efficient wasm --> js transfer? @@ -36,24 +59,3 @@ export function parseParquet(dataView: DataView): arrow.Table { return arrowTable; } - -/** - * Parse a list of buffers containing Parquet chunks into an Arrow JS table - * - * Each buffer in the list is expected to be a fully self-contained Parquet file - * that can parse on its own and consists of one arrow Record Batch - * - * @var {[type]} - */ -export function parseParquetBuffers(dataViews: DataView[]): arrow.Table { - const batches: arrow.RecordBatch[] = []; - for (const chunkBuffer of dataViews) { - const table = parseParquet(chunkBuffer); - if (table.batches.length !== 1) { - console.warn("Expected one batch"); - } - batches.push(...table.batches); - } - - return new arrow.Table(batches); -}