Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions temporian/beam/io/dict.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Utilities to import/export Beam-Event-Set from/to dataset containers."""

from collections.abc import Iterable
from typing import Dict, Any, Tuple, Iterator, Sequence, List

import numpy as np
Expand Down Expand Up @@ -104,7 +105,7 @@ def __init__(self, features: List[FeatureSchema]):

def process(
self,
item: Tuple[BeamIndexKey, Sequence[StructuredRowValue]],
item: Tuple[BeamIndexKey, Iterable[StructuredRowValue]],
) -> Iterator[FeatureItemWithIdx]:
index, feat_and_ts = item
timestamps = np.fromiter(
Expand All @@ -126,7 +127,7 @@ def process(


def _merge_timestamps_no_features(
item: Tuple[BeamIndexKey, Sequence[StructuredRowValue]],
item: Tuple[BeamIndexKey, Iterable[StructuredRowValue]],
) -> FeatureItem:
"""Same as _MergeTimestamps, but when there are no features."""

Expand Down Expand Up @@ -343,7 +344,7 @@ def to_event_set(
def _convert_to_dict_event_key_value(
item: Tuple[
BeamIndexKey,
Sequence[FeatureItemWithIdxValue],
Iterable[FeatureItemWithIdxValue],
],
schema: Schema,
timestamp_key: str,
Expand All @@ -352,7 +353,7 @@ def _convert_to_dict_event_key_value(

# Sort the feature by feature index.
feature_blocks = sorted(feature_blocks, key=lambda x: x[POS_FEATURE_IDX])
assert len(feature_blocks) > 0
assert feature_blocks

# All the feature blocks have the same timestamps. We use the first one.
common_item_dict = {}
Expand All @@ -374,7 +375,7 @@ def _convert_to_dict_event_key_value(
def _convert_to_dict_event_set_key_value(
item: Tuple[
BeamIndexKey,
Sequence[FeatureItemWithIdxValue],
Iterable[FeatureItemWithIdxValue],
],
schema: Schema,
timestamp_key: str,
Expand All @@ -383,7 +384,7 @@ def _convert_to_dict_event_set_key_value(

# Sort the feature by feature index.
feature_blocks = sorted(feature_blocks, key=lambda x: x[POS_FEATURE_IDX])
assert len(feature_blocks) > 0
assert feature_blocks

item_dict = {}
for index_schema, index_value in zip(schema.indexes, index):
Expand Down
9 changes: 5 additions & 4 deletions temporian/beam/operators/drop_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
# limitations under the License.

import numpy as np
from typing import Dict, Tuple, Sequence
from collections.abc import Iterable
from typing import Dict, Tuple
import apache_beam as beam

from temporian.core.operators.drop_index import (
Expand Down Expand Up @@ -54,7 +55,7 @@ def build_new_index(item: FeatureItem) -> BeamIndexKey:
return tuple((src_indexes[i] for i in final_index_idxs))

def merge_events(
group: Tuple[BeamIndexKey, Sequence[FeatureItem]],
group: Tuple[BeamIndexKey, Iterable[FeatureItem]],
) -> FeatureItem:
"""Merges together events in the same output index."""
new_indexes, items = group
Expand All @@ -81,7 +82,7 @@ def merge_events(
return new_indexes, (new_timestamps, new_features)

def src_index_to_feature(
any_group: Tuple[BeamIndexKey, Sequence[FeatureItem]],
any_group: Tuple[BeamIndexKey, Iterable[FeatureItem]],
final_nonindex_idx: int,
) -> FeatureItem:
"""Create a feature with the dropped index."""
Expand All @@ -103,7 +104,7 @@ def src_index_to_feature(
return new_indexes, (new_timestamps, new_features)

def feature_less_event(
any_group: Tuple[BeamIndexKey, Sequence[FeatureItem]],
any_group: Tuple[BeamIndexKey, Iterable[FeatureItem]],
) -> FeatureItem:
"""Create an event without feature."""
new_indexes, items = any_group
Expand Down