Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions bigframes/core/array_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import pandas
import pyarrow as pa

from bigframes.core import agg_expressions
from bigframes.core import agg_expressions, bq_data
import bigframes.core.expression as ex
import bigframes.core.guid
import bigframes.core.identifiers as ids
Expand Down Expand Up @@ -63,7 +63,7 @@ def from_pyarrow(cls, arrow_table: pa.Table, session: Session):
def from_managed(cls, source: local_data.ManagedArrowTable, session: Session):
scan_list = nodes.ScanList(
tuple(
nodes.ScanItem(ids.ColumnId(item.column), item.dtype, item.column)
nodes.ScanItem(ids.ColumnId(item.column), item.column)
for item in source.schema.items
)
)
Expand Down Expand Up @@ -100,7 +100,7 @@ def from_table(
if offsets_col and primary_key:
raise ValueError("must set at most one of 'offests', 'primary_key'")
# define data source only for needed columns, this makes row-hashing cheaper
table_def = nodes.GbqTable.from_table(table, columns=schema.names)
table_def = bq_data.GbqTable.from_table(table, columns=schema.names)

# create ordering from info
ordering = None
Expand All @@ -114,12 +114,13 @@ def from_table(
# Scan all columns by default, we define this list as it can be pruned while preserving source_def
scan_list = nodes.ScanList(
tuple(
nodes.ScanItem(ids.ColumnId(item.column), item.dtype, item.column)
nodes.ScanItem(ids.ColumnId(item.column), item.column)
for item in schema.items
)
)
source_def = nodes.BigqueryDataSource(
source_def = bq_data.BigqueryDataSource(
table=table_def,
schema=schema,
at_time=at_time,
sql_predicate=predicate,
ordering=ordering,
Expand All @@ -130,7 +131,7 @@ def from_table(
@classmethod
def from_bq_data_source(
cls,
source: nodes.BigqueryDataSource,
source: bq_data.BigqueryDataSource,
scan_list: nodes.ScanList,
session: Session,
):
Expand Down
75 changes: 41 additions & 34 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
Optional,
Sequence,
Tuple,
TYPE_CHECKING,
Union,
)
import warnings
Expand Down Expand Up @@ -70,9 +69,6 @@
from bigframes.session import dry_runs, execution_spec
from bigframes.session import executor as executors

if TYPE_CHECKING:
from bigframes.session.executor import ExecuteResult

# Type constraint for wherever column labels are used
Label = typing.Hashable

Expand All @@ -98,7 +94,6 @@
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]


@dataclasses.dataclass
class PandasBatches(Iterator[pd.DataFrame]):
"""Interface for mutable objects with state represented by a block value object."""

Expand Down Expand Up @@ -271,10 +266,14 @@ def shape(self) -> typing.Tuple[int, int]:
except Exception:
pass

row_count = self.session._executor.execute(
self.expr.row_count(),
execution_spec.ExecutionSpec(promise_under_10gb=True, ordered=False),
).to_py_scalar()
row_count = (
self.session._executor.execute(
self.expr.row_count(),
execution_spec.ExecutionSpec(promise_under_10gb=True, ordered=False),
)
.batches()
.to_py_scalar()
)
return (row_count, len(self.value_columns))

@property
Expand Down Expand Up @@ -584,7 +583,7 @@ def to_arrow(
ordered=ordered,
),
)
pa_table = execute_result.to_arrow_table()
pa_table = execute_result.batches().to_arrow_table()

pa_index_labels = []
for index_level, index_label in enumerate(self._index_labels):
Expand Down Expand Up @@ -636,17 +635,13 @@ def to_pandas(
max_download_size, sampling_method, random_state
)

ex_result = self._materialize_local(
return self._materialize_local(
materialize_options=MaterializationOptions(
downsampling=sampling,
allow_large_results=allow_large_results,
ordered=ordered,
)
)
df = ex_result.to_pandas()
df = self._copy_index_to_pandas(df)
df.set_axis(self.column_labels, axis=1, copy=False)
return df, ex_result.query_job

def _get_sampling_option(
self,
Expand Down Expand Up @@ -683,7 +678,7 @@ def try_peek(
self.expr,
execution_spec.ExecutionSpec(promise_under_10gb=under_10gb, peek=n),
)
df = result.to_pandas()
df = result.batches().to_pandas()
return self._copy_index_to_pandas(df)
else:
return None
Expand All @@ -704,13 +699,14 @@ def to_pandas_batches(
if (allow_large_results is not None)
else not bigframes.options._allow_large_results
)
execute_result = self.session._executor.execute(
execution_result = self.session._executor.execute(
self.expr,
execution_spec.ExecutionSpec(
promise_under_10gb=under_10gb,
ordered=True,
),
)
result_batches = execution_result.batches()

# To reduce the number of edge cases to consider when working with the
# results of this, always return at least one DataFrame. See:
Expand All @@ -724,19 +720,21 @@ def to_pandas_batches(
dfs = map(
lambda a: a[0],
itertools.zip_longest(
execute_result.to_pandas_batches(page_size, max_results),
result_batches.to_pandas_batches(page_size, max_results),
[0],
fillvalue=empty_val,
),
)
dfs = iter(map(self._copy_index_to_pandas, dfs))

total_rows = execute_result.total_rows
total_rows = result_batches.approx_total_rows
if (total_rows is not None) and (max_results is not None):
total_rows = min(total_rows, max_results)

return PandasBatches(
dfs, total_rows, total_bytes_processed=execute_result.total_bytes_processed
dfs,
total_rows,
total_bytes_processed=execution_result.total_bytes_processed,
)

def _copy_index_to_pandas(self, df: pd.DataFrame) -> pd.DataFrame:
Expand All @@ -754,7 +752,7 @@ def _copy_index_to_pandas(self, df: pd.DataFrame) -> pd.DataFrame:

def _materialize_local(
self, materialize_options: MaterializationOptions = MaterializationOptions()
) -> ExecuteResult:
) -> tuple[pd.DataFrame, Optional[bigquery.QueryJob]]:
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
# TODO(swast): Allow for dry run and timeout.
under_10gb = (
Expand All @@ -769,9 +767,11 @@ def _materialize_local(
ordered=materialize_options.ordered,
),
)
result_batches = execute_result.batches()

sample_config = materialize_options.downsampling
if execute_result.total_bytes is not None:
table_mb = execute_result.total_bytes / _BYTES_TO_MEGABYTES
if result_batches.approx_total_bytes is not None:
table_mb = result_batches.approx_total_bytes / _BYTES_TO_MEGABYTES
max_download_size = sample_config.max_download_size
fraction = (
max_download_size / table_mb
Expand All @@ -792,7 +792,7 @@ def _materialize_local(

# TODO: Maybe materialize before downsampling
# Some downsampling methods
if fraction < 1 and (execute_result.total_rows is not None):
if fraction < 1 and (result_batches.approx_total_rows is not None):
if not sample_config.enable_downsampling:
raise RuntimeError(
f"The data size ({table_mb:.2f} MB) exceeds the maximum download limit of "
Expand All @@ -811,7 +811,7 @@ def _materialize_local(
"the downloading limit."
)
warnings.warn(msg, category=UserWarning)
total_rows = execute_result.total_rows
total_rows = result_batches.approx_total_rows
# Remove downsampling config from subsequent invocations, as otherwise could result in many
# iterations if downsampling undershoots
return self._downsample(
Expand All @@ -823,7 +823,10 @@ def _materialize_local(
MaterializationOptions(ordered=materialize_options.ordered)
)
else:
return execute_result
df = result_batches.to_pandas()
df = self._copy_index_to_pandas(df)
df.set_axis(self.column_labels, axis=1, copy=False)
return df, execute_result.query_job

def _downsample(
self, total_rows: int, sampling_method: str, fraction: float, random_state
Expand Down Expand Up @@ -1662,15 +1665,19 @@ def retrieve_repr_request_results(
ordered=True,
),
)
row_count = self.session._executor.execute(
self.expr.row_count(),
execution_spec.ExecutionSpec(
promise_under_10gb=True,
ordered=False,
),
).to_py_scalar()
row_count = (
self.session._executor.execute(
self.expr.row_count(),
execution_spec.ExecutionSpec(
promise_under_10gb=True,
ordered=False,
),
)
.batches()
.to_py_scalar()
)

head_df = head_result.to_pandas()
head_df = head_result.batches().to_pandas()
return self._copy_index_to_pandas(head_df), row_count, head_result.query_job

def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]:
Expand Down
Loading
Loading