From 3304a9d46411fa29bcadb28ad25885d9a7ef6ad7 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Wed, 5 Mar 2025 15:07:04 -0500 Subject: [PATCH] Update usage of Beam Sequence type hints to pass new type checking --- temporian/beam/io/dict.py | 13 +++++++------ temporian/beam/operators/drop_index.py | 9 +++++---- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/temporian/beam/io/dict.py b/temporian/beam/io/dict.py index d2b1810a2..13f2c3ac0 100644 --- a/temporian/beam/io/dict.py +++ b/temporian/beam/io/dict.py @@ -1,5 +1,6 @@ """Utilities to import/export Beam-Event-Set from/to dataset containers.""" +from collections.abc import Iterable from typing import Dict, Any, Tuple, Iterator, Sequence, List import numpy as np @@ -104,7 +105,7 @@ def __init__(self, features: List[FeatureSchema]): def process( self, - item: Tuple[BeamIndexKey, Sequence[StructuredRowValue]], + item: Tuple[BeamIndexKey, Iterable[StructuredRowValue]], ) -> Iterator[FeatureItemWithIdx]: index, feat_and_ts = item timestamps = np.fromiter( @@ -126,7 +127,7 @@ def process( def _merge_timestamps_no_features( - item: Tuple[BeamIndexKey, Sequence[StructuredRowValue]], + item: Tuple[BeamIndexKey, Iterable[StructuredRowValue]], ) -> FeatureItem: """Same as _MergeTimestamps, but when there are no features.""" @@ -343,7 +344,7 @@ def to_event_set( def _convert_to_dict_event_key_value( item: Tuple[ BeamIndexKey, - Sequence[FeatureItemWithIdxValue], + Iterable[FeatureItemWithIdxValue], ], schema: Schema, timestamp_key: str, @@ -352,7 +353,7 @@ def _convert_to_dict_event_key_value( # Sort the feature by feature index. feature_blocks = sorted(feature_blocks, key=lambda x: x[POS_FEATURE_IDX]) - assert len(feature_blocks) > 0 + assert feature_blocks # All the feature blocks have the same timestamps. We use the first one. common_item_dict = {} @@ -374,7 +375,7 @@ def _convert_to_dict_event_key_value( def _convert_to_dict_event_set_key_value( item: Tuple[ BeamIndexKey, - Sequence[FeatureItemWithIdxValue], + Iterable[FeatureItemWithIdxValue], ], schema: Schema, timestamp_key: str, @@ -383,7 +384,7 @@ def _convert_to_dict_event_set_key_value( # Sort the feature by feature index. feature_blocks = sorted(feature_blocks, key=lambda x: x[POS_FEATURE_IDX]) - assert len(feature_blocks) > 0 + assert feature_blocks item_dict = {} for index_schema, index_value in zip(schema.indexes, index): diff --git a/temporian/beam/operators/drop_index.py b/temporian/beam/operators/drop_index.py index 62b3c771a..efdf32853 100644 --- a/temporian/beam/operators/drop_index.py +++ b/temporian/beam/operators/drop_index.py @@ -13,7 +13,8 @@ # limitations under the License. import numpy as np -from typing import Dict, Tuple, Sequence +from collections.abc import Iterable +from typing import Dict, Tuple import apache_beam as beam from temporian.core.operators.drop_index import ( @@ -54,7 +55,7 @@ def build_new_index(item: FeatureItem) -> BeamIndexKey: return tuple((src_indexes[i] for i in final_index_idxs)) def merge_events( - group: Tuple[BeamIndexKey, Sequence[FeatureItem]], + group: Tuple[BeamIndexKey, Iterable[FeatureItem]], ) -> FeatureItem: """Merges together events in the same output index.""" new_indexes, items = group @@ -81,7 +82,7 @@ def merge_events( return new_indexes, (new_timestamps, new_features) def src_index_to_feature( - any_group: Tuple[BeamIndexKey, Sequence[FeatureItem]], + any_group: Tuple[BeamIndexKey, Iterable[FeatureItem]], final_nonindex_idx: int, ) -> FeatureItem: """Create a feature with the dropped index.""" @@ -103,7 +104,7 @@ def src_index_to_feature( return new_indexes, (new_timestamps, new_features) def feature_less_event( - any_group: Tuple[BeamIndexKey, Sequence[FeatureItem]], + any_group: Tuple[BeamIndexKey, Iterable[FeatureItem]], ) -> FeatureItem: """Create an event without feature.""" new_indexes, items = any_group