{repr(event)}
"
+
def _get_next_batch(self) -> bool:
"""
Gets the next batch of data from the generator and appends to cache.
@@ -180,7 +186,9 @@ def _cached_data(self) -> pd.DataFrame:
def _reset_batches_for_new_page_size(self):
"""Reset the batch iterator when page size changes."""
- self._batches = self._dataframe.to_pandas_batches(page_size=self.page_size)
+ self._batches = self._dataframe._to_pandas_batches_colab(
+ page_size=self.page_size, callback=self._update_progress
+ )
self._cached_batches = []
self._batch_iter = None
self._all_data_loaded = False
diff --git a/bigframes/display/table_widget.js b/bigframes/display/table_widget.js
index 6b4d99ff28..4356c79faa 100644
--- a/bigframes/display/table_widget.js
+++ b/bigframes/display/table_widget.js
@@ -19,11 +19,13 @@ const ModelProperty = {
PAGE_SIZE: "page_size",
ROW_COUNT: "row_count",
TABLE_HTML: "table_html",
+ PROGRESS_HTML: "progress_html",
};
const Event = {
CHANGE: "change",
CHANGE_TABLE_HTML: `change:${ModelProperty.TABLE_HTML}`,
+ CHANGE_PROGRESS_HTML: `change:${ModelProperty.PROGRESS_HTML}`,
CLICK: "click",
};
@@ -39,6 +41,7 @@ function render({ model, el }) {
el.classList.add("bigframes-widget");
// Structure
+ const progressContainer = document.createElement("div");
const tableContainer = document.createElement("div");
const footer = document.createElement("div");
@@ -57,6 +60,7 @@ function render({ model, el }) {
const pageSizeSelect = document.createElement("select");
// Add CSS classes
+ progressContainer.classList.add("progress-container");
tableContainer.classList.add("table-container");
footer.classList.add("footer");
paginationContainer.classList.add("pagination");
@@ -119,6 +123,13 @@ function render({ model, el }) {
}
}
+ /** Updates the HTML in the progress container. */
+ function handleProgressHTMLChange() {
+ // Note: Using innerHTML is safe here because the content is generated
+ // by a trusted backend (formatting_helpers).
+ progressContainer.innerHTML = model.get(ModelProperty.PROGRESS_HTML);
+ }
+
/** Updates the HTML in the table container and refreshes button states. */
function handleTableHTMLChange() {
// Note: Using innerHTML is safe here because the content is generated
@@ -137,6 +148,7 @@ function render({ model, el }) {
}
});
model.on(Event.CHANGE_TABLE_HTML, handleTableHTMLChange);
+ model.on(Event.CHANGE_PROGRESS_HTML, handleProgressHTMLChange);
// Assemble the DOM
paginationContainer.appendChild(prevPage);
@@ -150,6 +162,7 @@ function render({ model, el }) {
footer.appendChild(paginationContainer);
footer.appendChild(pageSizeContainer);
+ el.appendChild(progressContainer);
el.appendChild(tableContainer);
el.appendChild(footer);
diff --git a/bigframes/formatting_helpers.py b/bigframes/formatting_helpers.py
index 48afb4fdbd..2c2318dfed 100644
--- a/bigframes/formatting_helpers.py
+++ b/bigframes/formatting_helpers.py
@@ -13,15 +13,17 @@
# limitations under the License.
"""Shared helper functions for formatting jobs related info."""
-# TODO(orrbradford): cleanup up typings and documenttion in this file
+
+from __future__ import annotations
import datetime
import random
-from typing import Any, Optional, Type, Union
+from typing import Any, Callable, Optional, Type, Union
import bigframes_vendored.constants as constants
import google.api_core.exceptions as api_core_exceptions
import google.cloud.bigquery as bigquery
+import google.cloud.bigquery._job_helpers
import humanize
import IPython
import IPython.display as display
@@ -40,6 +42,45 @@
}
+def create_progress_bar_callback(
+ *,
+ progress_bar: Optional[str] = None,
+ callback: Callable = lambda _: None,
+) -> Callable:
+ if progress_bar == "auto":
+ progress_bar = "notebook" if in_ipython() else "terminal"
+
+ if progress_bar == "notebook":
+ loading_bar = display.HTML("")
+ display_id = str(random.random())
+ display.display(loading_bar, display_id=display_id)
+
+ def outer_callback(event):
+ callback(event)
+ display.update_display(
+ display.HTML(get_query_job_loading_html(event)),
+ display_id=display_id,
+ )
+
+ elif progress_bar == "terminal":
+ previous_bar_text = ""
+
+ def outer_callback(event):
+ nonlocal previous_bar_text
+
+ callback(event)
+
+ bar_text = get_query_job_loading_string(event)
+ if bar_text != previous_bar_text:
+ print(bar_text)
+ previous_bar_text = bar_text
+
+ else:
+ outer_callback = callback
+
+ return outer_callback
+
+
def add_feedback_link(
exception: Union[
api_core_exceptions.RetryError, api_core_exceptions.GoogleAPICallError
@@ -123,7 +164,7 @@ def wait_for_query_job(
query_job: bigquery.QueryJob,
max_results: Optional[int] = None,
page_size: Optional[int] = None,
- progress_bar: Optional[str] = None,
+ callback: Callable = lambda _: None,
) -> bigquery.table.RowIterator:
"""Return query results. Displays a progress bar while the query is running
Args:
@@ -138,46 +179,57 @@ def wait_for_query_job(
Returns:
A row iterator over the query results.
"""
- if progress_bar == "auto":
- progress_bar = "notebook" if in_ipython() else "terminal"
-
try:
- if progress_bar == "notebook":
- display_id = str(random.random())
- loading_bar = display.HTML(get_query_job_loading_html(query_job))
- display.display(loading_bar, display_id=display_id)
- query_result = query_job.result(
- max_results=max_results, page_size=page_size
+ callback(
+ # DONOTSUBMIT: we should create our own events.
+ google.cloud.bigquery._job_helpers.QueryReceivedEvent(
+ billing_project=query_job.project,
+ location=query_job.location,
+ job_id=query_job.job_id,
+ statement_type=query_job.statement_type,
+ state=query_job.state,
+ query_plan=query_job.query_plan,
+ created=query_job.created,
+ started=query_job.started,
+ ended=query_job.ended,
)
- query_job.reload()
- display.update_display(
- display.HTML(get_query_job_loading_html(query_job)),
- display_id=display_id,
- )
- elif progress_bar == "terminal":
- initial_loading_bar = get_query_job_loading_string(query_job)
- print(initial_loading_bar)
- query_result = query_job.result(
- max_results=max_results, page_size=page_size
- )
- query_job.reload()
- if initial_loading_bar != get_query_job_loading_string(query_job):
- print(get_query_job_loading_string(query_job))
- else:
- # No progress bar.
- query_result = query_job.result(
- max_results=max_results, page_size=page_size
+ )
+ # TODO(tswast): Add a timeout so that progress bars can make updates as
+ # the query stats come int.
+ # TODO(tswast): Listen for cancellation on the callback (or maybe
+ # callbacks should just raise KeyboardInterrupt like IPython does?).
+ query_results = query_job.result(
+ page_size=page_size,
+ max_results=max_results,
+ )
+ callback(
+ # DONOTSUBMIT: we should create our own events.
+ google.cloud.bigquery._job_helpers.QueryFinishedEvent(
+ billing_project=query_job.project,
+ location=query_results.location,
+ query_id=query_results.query_id,
+ job_id=query_results.job_id,
+ total_rows=query_results.total_rows,
+ total_bytes_processed=query_results.total_bytes_processed,
+ slot_millis=query_results.slot_millis,
+ destination=query_job.destination,
+ created=query_job.created,
+ started=query_job.started,
+ ended=query_job.ended,
)
- query_job.reload()
- return query_result
+ )
+ return query_results
except api_core_exceptions.RetryError as exc:
+ # TODO: turn this into a callback event, too.
add_feedback_link(exc)
raise
except api_core_exceptions.GoogleAPICallError as exc:
+ # TODO: turn this into a callback event, too.
add_feedback_link(exc)
raise
except KeyboardInterrupt:
query_job.cancel()
+ # TODO: turn this into a callback event, too.
print(
f"Requested cancellation for {query_job.job_type.capitalize()}"
f" job {query_job.job_id} in location {query_job.location}..."
diff --git a/bigframes/pandas/io/api.py b/bigframes/pandas/io/api.py
index 483bc5e530..d2e4e6658f 100644
--- a/bigframes/pandas/io/api.py
+++ b/bigframes/pandas/io/api.py
@@ -273,7 +273,8 @@ def _try_read_gbq_colab_sessionless_dry_run(
with _default_location_lock:
if not config.options.bigquery._session_started:
return _run_read_gbq_colab_sessionless_dry_run(
- query, pyformat_args=pyformat_args
+ query,
+ pyformat_args=pyformat_args,
)
# Explicitly return None to indicate that we didn't run the dry run query.
@@ -305,6 +306,7 @@ def _read_gbq_colab(
*,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: bool = False,
+ callback: Callable = lambda _: None,
) -> bigframes.dataframe.DataFrame | pandas.Series:
"""A Colab-specific version of read_gbq.
@@ -319,6 +321,8 @@ def _read_gbq_colab(
dry_run (bool):
If True, estimates the query results size without returning data.
The return will be a pandas Series with query metadata.
+ callback (Callable):
+ A callback function used by bigframes to report query progress.
Returns:
Union[bigframes.dataframe.DataFrame, pandas.Series]:
@@ -364,6 +368,7 @@ def _read_gbq_colab(
query_or_table,
pyformat_args=pyformat_args,
dry_run=dry_run,
+ callback=callback,
)
diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py
index df67e64e9e..42526a0800 100644
--- a/bigframes/session/__init__.py
+++ b/bigframes/session/__init__.py
@@ -503,6 +503,7 @@ def _read_gbq_colab(
*,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: bool = False,
+ callback: Callable = lambda _: None,
) -> Union[dataframe.DataFrame, pandas.Series]:
"""A version of read_gbq that has the necessary default values for use in colab integrations.
@@ -519,6 +520,11 @@ def _read_gbq_colab(
instead. Note: unlike read_gbq / read_gbq_query, even if set to
None, this function always assumes {var} refers to a variable
that is supposed to be supplied in this dictionary.
+ dry_run (bool):
+ If True, estimates the query results size without returning data.
+ The return will be a pandas Series with query metadata.
+ callback (Callable):
+ A callback function used by bigframes to report query progress.
"""
if pyformat_args is None:
pyformat_args = {}
@@ -538,6 +544,7 @@ def _read_gbq_colab(
force_total_order=False,
dry_run=typing.cast(Union[Literal[False], Literal[True]], dry_run),
allow_large_results=allow_large_results,
+ callback=callback,
)
@overload
diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py
index 83f63e8b9a..64e968c0ed 100644
--- a/bigframes/session/_io/bigquery/__init__.py
+++ b/bigframes/session/_io/bigquery/__init__.py
@@ -22,7 +22,17 @@
import textwrap
import types
import typing
-from typing import Dict, Iterable, Literal, Mapping, Optional, overload, Tuple, Union
+from typing import (
+ Callable,
+ Dict,
+ Iterable,
+ Literal,
+ Mapping,
+ Optional,
+ overload,
+ Tuple,
+ Union,
+)
import bigframes_vendored.google_cloud_bigquery.retry as third_party_gcb_retry
import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq
@@ -268,6 +278,38 @@ def start_query_with_client(
...
+@overload
+def start_query_with_client(
+ bq_client: bigquery.Client,
+ sql: str,
+ *,
+ job_config: bigquery.QueryJobConfig,
+ location: Optional[str],
+ project: Optional[str],
+ timeout: Optional[float],
+ metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
+ query_with_job: Literal[False],
+ callback: Callable = ...,
+) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
+ ...
+
+
+@overload
+def start_query_with_client(
+ bq_client: bigquery.Client,
+ sql: str,
+ *,
+ job_config: bigquery.QueryJobConfig,
+ location: Optional[str],
+ project: Optional[str],
+ timeout: Optional[float],
+ metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
+ query_with_job: Literal[True],
+ callback: Callable = ...,
+) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
+ ...
+
+
@overload
def start_query_with_client(
bq_client: bigquery.Client,
@@ -315,23 +357,32 @@ def start_query_with_client(
# https://github.com/googleapis/python-bigquery/pull/2256 merged, likely
# version 3.36.0 or later.
job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY,
+ callback: Callable = lambda _: None,
) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
"""
Starts query job and waits for results.
"""
+ opts = bigframes.options.display
+ progress_callback = formatting_helpers.create_progress_bar_callback(
+ progress_bar=opts.progress_bar,
+ callback=callback,
+ )
+
try:
# Note: Ensure no additional labels are added to job_config after this
# point, as `add_and_trim_labels` ensures the label count does not
# exceed MAX_LABELS_COUNT.
add_and_trim_labels(job_config)
if not query_with_job:
- results_iterator = bq_client.query_and_wait(
+ # DONOTSUBMIT: we should create our own events for callback.
+ results_iterator = bq_client._query_and_wait_bigframes(
sql,
job_config=job_config,
location=location,
project=project,
api_timeout=timeout,
job_retry=job_retry,
+ callback=progress_callback,
)
if metrics is not None:
metrics.count_job_stats(row_iterator=results_iterator)
@@ -350,11 +401,11 @@ def start_query_with_client(
ex.message += CHECK_DRIVE_PERMISSIONS
raise
- opts = bigframes.options.display
if opts.progress_bar is not None and not query_job.configuration.dry_run:
results_iterator = formatting_helpers.wait_for_query_job(
query_job,
progress_bar=opts.progress_bar,
+ callback=callback,
)
else:
results_iterator = query_job.result()
diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py
index b428cd646c..60de87e70e 100644
--- a/bigframes/session/bq_caching_executor.py
+++ b/bigframes/session/bq_caching_executor.py
@@ -16,7 +16,7 @@
import math
import threading
-from typing import Literal, Mapping, Optional, Sequence, Tuple
+from typing import Callable, Literal, Mapping, Optional, Sequence, Tuple
import warnings
import weakref
@@ -186,13 +186,18 @@ def execute(
self,
array_value: bigframes.core.ArrayValue,
execution_spec: ex_spec.ExecutionSpec,
+ *,
+ callback: Callable = lambda _: None,
) -> executor.ExecuteResult:
# TODO: Support export jobs in combination with semi executors
if execution_spec.destination_spec is None:
plan = self.prepare_plan(array_value.node, target="simplify")
for exec in self._semi_executors:
maybe_result = exec.execute(
- plan, ordered=execution_spec.ordered, peek=execution_spec.peek
+ plan,
+ ordered=execution_spec.ordered,
+ peek=execution_spec.peek,
+ callback=callback,
)
if maybe_result:
return maybe_result
@@ -203,7 +208,9 @@ def execute(
"Ordering and peeking not supported for gbq export"
)
# separate path for export_gbq, as it has all sorts of annoying logic, such as possibly running as dml
- return self._export_gbq(array_value, execution_spec.destination_spec)
+ return self._export_gbq(
+ array_value, execution_spec.destination_spec, callback=callback
+ )
result = self._execute_plan_gbq(
array_value.node,
@@ -213,6 +220,7 @@ def execute(
if isinstance(execution_spec.destination_spec, ex_spec.CacheSpec)
else None,
must_create_table=not execution_spec.promise_under_10gb,
+ callback=callback,
)
# post steps: export
if isinstance(execution_spec.destination_spec, ex_spec.GcsOutputSpec):
@@ -221,7 +229,10 @@ def execute(
return result
def _export_result_gcs(
- self, result: executor.ExecuteResult, gcs_export_spec: ex_spec.GcsOutputSpec
+ self,
+ result: executor.ExecuteResult,
+ gcs_export_spec: ex_spec.GcsOutputSpec,
+ callback: Callable = lambda _: None,
):
query_job = result.query_job
assert query_job is not None
@@ -242,6 +253,7 @@ def _export_result_gcs(
location=None,
timeout=None,
query_with_job=True,
+ callback=callback,
)
def _maybe_find_existing_table(
@@ -266,7 +278,10 @@ def _maybe_find_existing_table(
return None
def _export_gbq(
- self, array_value: bigframes.core.ArrayValue, spec: ex_spec.TableOutputSpec
+ self,
+ array_value: bigframes.core.ArrayValue,
+ spec: ex_spec.TableOutputSpec,
+ callback: Callable = lambda _: None,
) -> executor.ExecuteResult:
"""
Export the ArrayValue to an existing BigQuery table.
@@ -309,6 +324,7 @@ def _export_gbq(
row_iter, query_job = self._run_execute_query(
sql=sql,
job_config=job_config,
+ callback=callback,
)
has_timedelta_col = any(
@@ -372,6 +388,7 @@ def _run_execute_query(
sql: str,
job_config: Optional[bq_job.QueryJobConfig] = None,
query_with_job: bool = True,
+ callback: Callable = lambda _: None,
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]:
"""
Starts BigQuery query job and waits for results.
@@ -397,6 +414,7 @@ def _run_execute_query(
location=None,
timeout=None,
query_with_job=True,
+ callback=callback,
)
else:
return bq_io.start_query_with_client(
@@ -408,6 +426,7 @@ def _run_execute_query(
location=None,
timeout=None,
query_with_job=False,
+ callback=callback,
)
except google.api_core.exceptions.BadRequest as e:
@@ -587,6 +606,7 @@ def _execute_plan_gbq(
peek: Optional[int] = None,
cache_spec: Optional[ex_spec.CacheSpec] = None,
must_create_table: bool = True,
+ callback: Callable = lambda _: None,
) -> executor.ExecuteResult:
"""Just execute whatever plan as is, without further caching or decomposition."""
# TODO(swast): plumb through the api_name of the user-facing api that
@@ -637,6 +657,7 @@ def _execute_plan_gbq(
sql=compiled.sql,
job_config=job_config,
query_with_job=(destination_table is not None),
+ callback=callback,
)
table_info: Optional[bigquery.Table] = None
diff --git a/bigframes/session/direct_gbq_execution.py b/bigframes/session/direct_gbq_execution.py
index ff91747a62..f09c31f92b 100644
--- a/bigframes/session/direct_gbq_execution.py
+++ b/bigframes/session/direct_gbq_execution.py
@@ -13,7 +13,7 @@
# limitations under the License.
from __future__ import annotations
-from typing import Literal, Optional, Tuple
+from typing import Callable, Literal, Optional, Tuple
from google.cloud import bigquery
import google.cloud.bigquery.job as bq_job
@@ -45,6 +45,8 @@ def execute(
plan: nodes.BigFrameNode,
ordered: bool,
peek: Optional[int] = None,
+ *,
+ callback: Callable = lambda _: None,
) -> executor.ExecuteResult:
"""Just execute whatever plan as is, without further caching or decomposition."""
# TODO(swast): plumb through the api_name of the user-facing api that
@@ -69,6 +71,7 @@ def _run_execute_query(
self,
sql: str,
job_config: Optional[bq_job.QueryJobConfig] = None,
+ callback: Callable = lambda _: None,
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]:
"""
Starts BigQuery query job and waits for results.
@@ -82,4 +85,5 @@ def _run_execute_query(
timeout=None,
metrics=None,
query_with_job=False,
+ callback=callback,
)
diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py
index 748b10647a..3ac580e773 100644
--- a/bigframes/session/executor.py
+++ b/bigframes/session/executor.py
@@ -18,7 +18,7 @@
import dataclasses
import functools
import itertools
-from typing import Iterator, Literal, Optional, Union
+from typing import Callable, Iterator, Literal, Optional, Union
from google.cloud import bigquery
import pandas as pd
@@ -153,6 +153,8 @@ def execute(
self,
array_value: bigframes.core.ArrayValue,
execution_spec: ex_spec.ExecutionSpec,
+ *,
+ callback: Callable = lambda _: None,
) -> ExecuteResult:
"""
Execute the ArrayValue.
diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py
index 49b1195235..d0c7ea346c 100644
--- a/bigframes/session/loader.py
+++ b/bigframes/session/loader.py
@@ -22,6 +22,7 @@
import os
import typing
from typing import (
+ Callable,
cast,
Dict,
Generator,
@@ -899,6 +900,7 @@ def read_gbq_query( # type: ignore[overload-overlap]
dry_run: Literal[False] = ...,
force_total_order: Optional[bool] = ...,
allow_large_results: bool,
+ callback: Callable = ...,
) -> dataframe.DataFrame:
...
@@ -916,6 +918,7 @@ def read_gbq_query(
dry_run: Literal[True] = ...,
force_total_order: Optional[bool] = ...,
allow_large_results: bool,
+ callback: Callable = ...,
) -> pandas.Series:
...
@@ -932,6 +935,7 @@ def read_gbq_query(
dry_run: bool = False,
force_total_order: Optional[bool] = None,
allow_large_results: bool,
+ callback: Callable = lambda _: None,
) -> dataframe.DataFrame | pandas.Series:
configuration = _transform_read_gbq_configuration(configuration)
@@ -1016,6 +1020,7 @@ def read_gbq_query(
rows = self._start_query_with_job_optional(
query,
job_config=job_config,
+ callback=callback,
)
# If there is a query job, fetch it so that we can get the
@@ -1163,6 +1168,7 @@ def _start_query_with_job_optional(
*,
job_config: Optional[google.cloud.bigquery.QueryJobConfig] = None,
timeout: Optional[float] = None,
+ callback: Callable = lambda _: None,
) -> google.cloud.bigquery.table.RowIterator:
"""
Starts BigQuery query with job optional and waits for results.
@@ -1179,6 +1185,7 @@ def _start_query_with_job_optional(
project=None,
metrics=None,
query_with_job=False,
+ callback=callback,
)
return rows
@@ -1188,6 +1195,7 @@ def _start_query_with_job(
*,
job_config: Optional[google.cloud.bigquery.QueryJobConfig] = None,
timeout: Optional[float] = None,
+ callback: Callable = lambda _: None,
) -> bigquery.QueryJob:
"""
Starts BigQuery query job and waits for results.
@@ -1204,6 +1212,7 @@ def _start_query_with_job(
project=None,
metrics=None,
query_with_job=True,
+ callback=callback,
)
return query_job
diff --git a/bigframes/session/local_scan_executor.py b/bigframes/session/local_scan_executor.py
index 65f088e8a1..d7618f1c7e 100644
--- a/bigframes/session/local_scan_executor.py
+++ b/bigframes/session/local_scan_executor.py
@@ -13,7 +13,7 @@
# limitations under the License.
from __future__ import annotations
-from typing import Optional
+from typing import Callable, Optional
from bigframes.core import bigframe_node, rewrite
from bigframes.session import executor, semi_executor
@@ -29,6 +29,8 @@ def execute(
plan: bigframe_node.BigFrameNode,
ordered: bool,
peek: Optional[int] = None,
+ *,
+ callback: Callable = lambda _: None,
) -> Optional[executor.ExecuteResult]:
reduced_result = rewrite.try_reduce_to_local_scan(plan)
if not reduced_result:
diff --git a/bigframes/session/polars_executor.py b/bigframes/session/polars_executor.py
index d8df558fe4..d483a01d64 100644
--- a/bigframes/session/polars_executor.py
+++ b/bigframes/session/polars_executor.py
@@ -14,7 +14,7 @@
from __future__ import annotations
import itertools
-from typing import Optional, TYPE_CHECKING
+from typing import Callable, Optional, TYPE_CHECKING
import pyarrow as pa
@@ -133,6 +133,8 @@ def execute(
plan: bigframe_node.BigFrameNode,
ordered: bool,
peek: Optional[int] = None,
+ *,
+ callback: Callable = lambda _: None,
) -> Optional[executor.ExecuteResult]:
if not self._can_execute(plan):
return None
diff --git a/bigframes/session/read_api_execution.py b/bigframes/session/read_api_execution.py
index 037fde011f..0405a909dc 100644
--- a/bigframes/session/read_api_execution.py
+++ b/bigframes/session/read_api_execution.py
@@ -13,7 +13,7 @@
# limitations under the License.
from __future__ import annotations
-from typing import Any, Iterator, Optional
+from typing import Any, Callable, Iterator, Optional
from google.cloud import bigquery_storage_v1
import pyarrow as pa
@@ -38,6 +38,8 @@ def execute(
plan: bigframe_node.BigFrameNode,
ordered: bool,
peek: Optional[int] = None,
+ *,
+ callback: Callable = lambda _: None,
) -> Optional[executor.ExecuteResult]:
adapt_result = self._try_adapt_plan(plan, ordered)
if not adapt_result:
diff --git a/bigframes/session/semi_executor.py b/bigframes/session/semi_executor.py
index c41d7c96d3..126f41f637 100644
--- a/bigframes/session/semi_executor.py
+++ b/bigframes/session/semi_executor.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
-from typing import Optional
+from typing import Callable, Optional
from bigframes.core import bigframe_node
from bigframes.session import executor
@@ -29,5 +29,7 @@ def execute(
plan: bigframe_node.BigFrameNode,
ordered: bool,
peek: Optional[int] = None,
+ *,
+ callback: Callable = lambda _: None,
) -> Optional[executor.ExecuteResult]:
raise NotImplementedError("execute not implemented for this executor")
diff --git a/bigframes/testing/compiler_session.py b/bigframes/testing/compiler_session.py
index 289b2600fd..77b218ef29 100644
--- a/bigframes/testing/compiler_session.py
+++ b/bigframes/testing/compiler_session.py
@@ -13,7 +13,7 @@
# limitations under the License.
import dataclasses
-import typing
+from typing import Callable, Optional
import bigframes.core
import bigframes.core.compile.sqlglot as sqlglot
@@ -29,7 +29,7 @@ class SQLCompilerExecutor(bigframes.session.executor.Executor):
def to_sql(
self,
array_value: bigframes.core.ArrayValue,
- offset_column: typing.Optional[str] = None,
+ offset_column: Optional[str] = None,
ordered: bool = True,
enable_cache: bool = False,
) -> str:
@@ -46,5 +46,7 @@ def execute(
self,
array_value,
execution_spec,
+ *,
+ callback: Callable = lambda _: None,
):
raise NotImplementedError("SQLCompilerExecutor.execute not implemented")
diff --git a/bigframes/testing/polars_session.py b/bigframes/testing/polars_session.py
index 29eae20b7a..cd41881c33 100644
--- a/bigframes/testing/polars_session.py
+++ b/bigframes/testing/polars_session.py
@@ -13,7 +13,7 @@
# limitations under the License.
import dataclasses
-from typing import Union
+from typing import Callable, Union
import weakref
import pandas
@@ -37,6 +37,8 @@ def execute(
self,
array_value: bigframes.core.ArrayValue,
execution_spec: bigframes.session.execution_spec.ExecutionSpec,
+ *,
+ callback: Callable = lambda _: None,
):
"""
Execute the ArrayValue, storing the result to a temporary session-owned table.
diff --git a/notebooks/dataframes/anywidget_mode.ipynb b/notebooks/dataframes/anywidget_mode.ipynb
index 617329ba65..e0622e383c 100644
--- a/notebooks/dataframes/anywidget_mode.ipynb
+++ b/notebooks/dataframes/anywidget_mode.ipynb
@@ -56,7 +56,8 @@
"outputs": [],
"source": [
"bpd.options.bigquery.ordering_mode = \"partial\"\n",
- "bpd.options.display.repr_mode = \"anywidget\""
+ "bpd.options.display.repr_mode = \"anywidget\"\n",
+ "bpd.options.display.progress_bar = \"notebook\""
]
},
{
@@ -75,9 +76,7 @@
"outputs": [
{
"data": {
- "text/html": [
- "Query job a643d120-4af9-44fc-ba3c-ed461cf1092b is DONE. 0 Bytes processed. Open Job"
- ],
+ "text/html": [],
"text/plain": [
"\n", - " | title | \n", - "views | \n", - "
---|---|---|
21911 | \n", - "1414560 | \n", - "|
27669 | \n", - "Google_Chrome | \n", - "962482 | \n", - "
28394 | \n", - "Google_Earth | \n", - "383566 | \n", - "
29184 | \n", - "Google_Maps | \n", - "205089 | \n", - "
27251 | \n", - "Google_Android | \n", - "99450 | \n", - "
33900 | \n", - "Google_search | \n", - "97665 | \n", - "
31825 | \n", - "Google_chrome | \n", - "78399 | \n", - "
30204 | \n", - "Google_Street_View | \n", - "71580 | \n", - "
40798 | \n", - "Image:Google_Chrome.png | \n", - "60746 | \n", - "
35222 | \n", - "Googleplex | \n", - "53848 | \n", - "
10 rows × 2 columns
\n", - "