Skip to content

Commit 9a2f9c8

Browse files
committed
Add TQDM to pandas_to_eland()
1 parent f241ae9 commit 9a2f9c8

File tree

6 files changed

+373
-22
lines changed

6 files changed

+373
-22
lines changed

eland/etl.py

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import pandas as pd # type: ignore
2323
from elasticsearch import Elasticsearch
2424
from elasticsearch.helpers import parallel_bulk
25+
from tqdm.notebook import tqdm # type: ignore
2526

2627
from eland import DataFrame
2728
from eland.common import DEFAULT_CHUNK_SIZE, PANDAS_VERSION, ensure_es_client
@@ -46,6 +47,7 @@ def pandas_to_eland(
4647
thread_count: int = 4,
4748
chunksize: Optional[int] = None,
4849
use_pandas_index_for_es_ids: bool = True,
50+
show_progressbar: Optional[bool] = None,
4951
) -> DataFrame:
5052
"""
5153
Append a pandas DataFrame to an Elasticsearch index.
@@ -79,6 +81,10 @@ def pandas_to_eland(
7981
use_pandas_index_for_es_ids: bool, default 'True'
8082
* True: pandas.DataFrame.index fields will be used to populate Elasticsearch '_id' fields.
8183
* False: Ignore pandas.DataFrame.index when indexing into Elasticsearch
84+
show_progressbar: Optional[bool], default 'None'
85+
* True : show a progress bar only if we detect Jupyter Notebook (for now)
86+
* False : don't show a progress bar
87+
* None : show a progress bar only if we detect Jupyter Notebook
8288
8389
Returns
8490
-------
@@ -184,35 +190,62 @@ def pandas_to_eland(
184190
else:
185191
es_client.indices.create(index=es_dest_index, body=mapping)
186192

193+
if show_progressbar is None or show_progressbar is True:
194+
# Detect jupyter notebook
195+
try:
196+
from IPython import get_ipython # type: ignore
197+
198+
ip = get_ipython()
199+
if hasattr(ip, "kernel"):
200+
show_progressbar = True
201+
except ImportError:
202+
show_progressbar = False
203+
187204
def action_generator(
188205
pd_df: pd.DataFrame,
189206
es_dropna: bool,
190207
use_pandas_index_for_es_ids: bool,
191208
es_dest_index: str,
209+
show_progressbar: Optional[bool],
192210
) -> Generator[Dict[str, Any], None, None]:
193-
for row in pd_df.iterrows():
194-
if es_dropna:
195-
values = row[1].dropna().to_dict()
196-
else:
197-
values = row[1].to_dict()
198-
199-
if use_pandas_index_for_es_ids:
200-
# Use index as _id
201-
id = row[0]
202-
203-
action = {"_index": es_dest_index, "_source": values, "_id": str(id)}
204-
else:
205-
action = {"_index": es_dest_index, "_source": values}
206211

207-
yield action
212+
with tqdm(
213+
total=pd_df.shape[0],
214+
disable=not show_progressbar,
215+
desc="Progress",
216+
) as progress_bar:
217+
for row in pd_df.iterrows():
218+
if es_dropna:
219+
values = row[1].dropna().to_dict()
220+
else:
221+
values = row[1].to_dict()
222+
223+
if use_pandas_index_for_es_ids:
224+
# Use index as _id
225+
id = row[0]
226+
227+
action = {
228+
"_index": es_dest_index,
229+
"_source": values,
230+
"_id": str(id),
231+
}
232+
else:
233+
action = {"_index": es_dest_index, "_source": values}
234+
235+
progress_bar.update(1)
236+
yield action
208237

209238
# parallel_bulk is lazy generator so use deque to consume them immediately
210239
# maxlen = 0 because don't need results of parallel_bulk
211240
deque(
212241
parallel_bulk(
213242
client=es_client,
214243
actions=action_generator(
215-
pd_df, es_dropna, use_pandas_index_for_es_ids, es_dest_index
244+
pd_df=pd_df,
245+
es_dropna=es_dropna,
246+
use_pandas_index_for_es_ids=use_pandas_index_for_es_ids,
247+
es_dest_index=es_dest_index,
248+
show_progressbar=show_progressbar,
216249
),
217250
thread_count=thread_count,
218251
chunk_size=int(chunksize / thread_count),

requirements-dev.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,7 @@ nox
1111
lightgbm
1212
pytest-cov
1313
mypy
14+
tqdm
15+
jupyter
16+
notebook
17+
ipywidgets

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
elasticsearch>=7.7
22
pandas>=1
33
matplotlib
4+
tqdm

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
"pandas>=1.2,<1.4",
7676
"matplotlib",
7777
"numpy",
78+
"tqdm",
7879
],
7980
python_requires=">=3.7",
8081
package_data={"eland": ["py.typed"]},

tests/etl/test_pandas_to_eland.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,16 @@ def test_es_if_exists_fail(self):
7070
"to 'append' or 'replace' data."
7171
)
7272

73-
def test_es_if_exists_replace(self):
73+
@pytest.mark.parametrize("show_progressbar", [True, False, None])
74+
def test_es_if_exists_replace(self, show_progressbar):
7475
# Assert that 'replace' allows for creation
7576
df1 = pandas_to_eland(
7677
pd_df2,
7778
es_client=ES_TEST_CLIENT,
7879
es_dest_index="test-index",
7980
es_if_exists="replace",
8081
es_refresh=True,
82+
show_progressbar=show_progressbar,
8183
).to_pandas()
8284
assert_frame_equal(pd_df2, df1)
8385

0 commit comments

Comments
 (0)