Skip to content

Commit 4cd6629

Browse files
TomNicholaspre-commit-ci[bot]maxrjoneschuckwondo
authored
Docs on open_virtual_mfdataset and scaling (#590)
* add note on using open_virtual_mfdataset with combine=nested * clarification * combine by coords * glob * add scaling page with pre-requisites * strategy * threadpool executor * dask * lithops * custom executor * kerchunk references format * memory usage * caching * batching * retries * add open_virtual_mfdataset to docs * improve docstring * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * release note * spelling * Correct syntax in code example * comment about Icechunk * add executors to API docs * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * correct class name * spelling * add new page to nav * correct name of class in api docs * Max's APi links suggestions Co-authored-by: Max Jones <[email protected]> * reader->parser * Chunk's grammaritcal usggestions Co-authored-by: Chuck Daniels <[email protected]> * another cross-reference * correct API link in release notes * try linking to xarray API docs * Relative link within page to caching section Co-authored-by: Max Jones <[email protected]> * Use adminoitions for TODos Co-authored-by: Max Jones <[email protected]> * Correct admonition syntax for note Co-authored-by: Max Jones <[email protected]> * add a bunch more cross-linked references * try removing backticks * remove links to virtualizarr.parallel module * missed one * add Dockerfile * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * add link to Dockerfile --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Max Jones <[email protected]> Co-authored-by: Chuck Daniels <[email protected]>
1 parent ac01156 commit 4cd6629

File tree

9 files changed

+409
-4
lines changed

9 files changed

+409
-4
lines changed

docs/api.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,18 @@ Users can use xarray for every step apart from reading and serializing virtual r
88
### Reading
99

1010
::: virtualizarr.open_virtual_dataset
11+
::: virtualizarr.open_virtual_mfdataset
1112

1213
### Serialization
1314

1415
::: virtualizarr.accessor.VirtualiZarrDatasetAccessor
15-
1616
::: virtualizarr.accessor.VirtualiZarrDataTreeAccessor
1717

1818
### Information
1919

2020
::: virtualizarr.accessor.VirtualiZarrDatasetAccessor.nbytes
2121

2222
### Rewriting
23-
---------
2423

2524
::: virtualizarr.accessor.VirtualiZarrDatasetAccessor.rename_paths
2625

@@ -43,3 +42,13 @@ VirtualiZarr's [virtualizarr.manifests.ManifestArray][] objects support a limite
4342
::: virtualizarr.manifests.array_api.stack
4443
::: virtualizarr.manifests.array_api.expand_dims
4544
::: virtualizarr.manifests.array_api.broadcast_to
45+
46+
#### Parallelization
47+
48+
Parallelizing virtual reference generation can be done using a number of parallel execution frameworks.
49+
Advanced users may want to call one of these executors directly.
50+
See the docs page on Scaling.
51+
52+
::: virtualizarr.parallel.SerialExecutor
53+
::: virtualizarr.parallel.DaskDelayedExecutor
54+
::: virtualizarr.parallel.LithopsEagerFunctionExecutor

docs/releases.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@
4545

4646
- Added new docs page on how to write a custom reader for bespoke file formats ([#452](https://github.com/zarr-developers/VirtualiZarr/issues/452), [#580](https://github.com/zarr-developers/VirtualiZarr/pull/580))
4747
By [Tom Nicholas](https://github.com/TomNicholas).
48+
- Added new docs page on how to scale VirtualiZarr effectively[#590](https://github.com/zarr-developers/VirtualiZarr/issues/590).
49+
By [Tom Nicholas](https://github.com/TomNicholas).
50+
- Documented the new [`virtualizarr.open_virtual_mfdataset`] function [#590](https://github.com/zarr-developers/VirtualiZarr/issues/590).
51+
By [Tom Nicholas](https://github.com/TomNicholas).
4852
- Added MUR SST virtual and zarr icechunk store generation using lithops example.
4953
([#475](https://github.com/zarr-developers/VirtualiZarr/pull/475)) by [Aimee Barciauskas](https://github.com/abarciauskas-bgse).
5054
- Added FAQ answer about what data can be virtualized ([#430](https://github.com/zarr-developers/VirtualiZarr/issues/430), [#532](https://github.com/zarr-developers/VirtualiZarr/pull/532))

docs/scaling.md

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
# Scaling
2+
3+
This page explains how to scale up your usage of VirtualiZarr to cloud-optimize large numbers of files.
4+
5+
## Pre-requisites
6+
7+
Before you attempt to use VirtualiZarr on a large number of files at once, you should check that you can successfully use the library on a small subset of your data.
8+
9+
In particular, you should check that:
10+
11+
- You can call [`open_virtual_dataset`][virtualizarr.open_virtual_dataset] on one of your files, which requires there to be a parser which can interpret that file format.
12+
- After calling [`open_virtual_dataset`][virtualizarr.open_virtual_dataset] on a few files making up a representative subset of your data, you can concatenate them into one logical datacube without errors (see the [FAQ](faq.md#can-my-specific-data-be-virtualized) for possible reasons for errors at this stage).
13+
- You can serialize those virtual references to some format (e.g. Kerchunk/Icechunk) and read the data back.
14+
- The data you read back is exactly what you would have expected to get if you read the data from the original files.
15+
16+
If you don't do these checks now, you might find that you deploy a large amount of resources to run VirtualiZarr on many files, only to hit a problem that you could have found much earlier.
17+
18+
## Strategy
19+
20+
### The need for parallelization
21+
22+
VirtualiZarr is a tool designed for taking a large number of slow-to-access files (i.e. non-cloud-optimized data) and creating a way to make all subsequent accesses much faster (i.e. a cloud-optimized datacube).
23+
24+
Running [`open_virtual_dataset`][virtualizarr.open_virtual_dataset] on just one file can take a while (seconds to minutes), because for data in object storage, fetching just the metadata can be almost as time-consuming as fetching the actual data.
25+
(For a full explanation as to why [see this article](https://earthmover.io/blog/fundamentals-what-is-cloud-optimized-scientific-data)).
26+
In some cases we may find it's easiest to load basically the entire contents of the file in order to virtualize it.
27+
28+
Therefore we should expect that running VirtualiZarr on all our data files will take a long time - we are paying this cost once up front so that our users do not have to pay it again on subsequent data accesses.
29+
30+
However, the [`open_virtual_dataset`][virtualizarr.open_virtual_dataset] calls for each file are completely independent, meaning that part of the computation is "embarrassingly parallelizable".
31+
32+
### Map-reduce
33+
34+
The problem of scaling VirtualiZarr is an example of a classic map-reduce problem, with two parts:
35+
36+
1. We first must apply the [`open_virtual_dataset`][virtualizarr.open_virtual_dataset] function over every file we want to virtualize. This is the map step, and can be parallelized.
37+
2. Then we must take all the resultant virtual datasets (one per file), and combine them together into one final virtual dataset. This is the reduce step.
38+
39+
Finally we write this single virtual dataset to some persistent format.
40+
We have already reduced the data, so this step is a third step, the serialization step.
41+
42+
In our case the amount of data being reduced is fairly small - each virtual dataset is hopefully only a few kBs in memory, small enough to send over the network.
43+
Even a million such virtual datasets together would only require a few GB of RAM in total to hold in memory at once.
44+
This means that as long as we can get all the virtual datasets to be sent back successfully, the reduce step can generally be performed in memory on a single small machine, such as a laptop.
45+
This avoids the need for more complicated parallelization strategies such as a tree-reduce.
46+
47+
## Parallelization Approaches
48+
49+
There are two ways you can implement a map-reduce approach to virtualization in your code.
50+
The first is to write it yourself, and the second is to use [`open_virtual_mfdataset`][virtualizarr.open_virtual_mfdataset].
51+
52+
### Manual parallelism
53+
54+
You are free to call [`open_virtual_dataset`][virtualizarr.open_virtual_dataset] on your various files however you like, using any method to apply them, including applying them in parallel.
55+
56+
For example you may want to parallelize using the [dask library](https://www.dask.org/), which you can do by wrapping each call using `dask.delayed` like this:
57+
58+
```python
59+
import virtualizarr as vz
60+
import dask
61+
62+
tasks = [dask.delayed(vz.open_virtual_dataset)(path) for path in filepaths]
63+
virtual_datasets = dask.compute(tasks)
64+
```
65+
66+
This returns a list of virtual `xr.Dataset` objects, which you can then combine:
67+
68+
```python
69+
import xarray as xr
70+
71+
combined_vds = xr.combine_by_coords(virtual_datasets)
72+
```
73+
74+
### The `parallel` kwarg to `open_virtual_mfdataset`
75+
76+
Alternatively, you can use [virtualizarr.open_virtual_mfdataset][]'s `parallel` keyword argument.
77+
78+
This argument allows you to conveniently choose from a range of pre-defined parallel execution frameworks, or even pass your own executor.
79+
80+
The resulting code only takes one function call to generate virtual references in parallel and combine them into one virtual dataset.
81+
82+
```python
83+
combined_vds = vz.open_virtual_mfdataset(filepaths, parallel=<choice_of_executor>)
84+
```
85+
86+
VirtualiZarr's [`open_virtual_mfdataset`][virtualizarr.open_virtual_mfdataset] is designed to mimic the API of xarray's [`open_mfdataset`][xarray.open_mfdataset], and so accepts all the same keyword argument options for combining.
87+
88+
## Executors
89+
90+
VirtualiZarr comes with a small selection of executors you can choose from when using [`open_virtual_mfdataset`][virtualizarr.open_virtual_mfdataset], provided under the `virtualizarr.parallel` namespace.
91+
92+
!!!note
93+
If you prefer to do manual parallelism but would like to use one of these executors you can - just import the executor directly from the `virtualizarr.parallel` namespace and use its `.map` method.
94+
95+
### Serial
96+
97+
The simplest executor is the [`SerialExecutor`][virtualizarr.parallel.SerialExecutor], which executes all the [`open_virtual_dataset`][virtualizarr.open_virtual_dataset] calls in serial, not in parallel.
98+
It is the default executor.
99+
100+
### Threads or Processes
101+
102+
One way to parallelize creating virtual references from a single machine is to across multiple threads or processes.
103+
For this you can use the [`ThreadPoolExecutor`][concurrent.futures.ThreadPoolExecutor] or [`ProcessPoolExecutor`][concurrent.futures.ProcessPoolExecutor] class from the [`concurrent.futures`][] module in the python standard library.
104+
You simply pass the executor class directly via the `parallel` kwarg to [`open_virtual_mfdataset`][virtualizarr.open_virtual_mfdataset].
105+
106+
```python
107+
from concurrent.futures import ThreadPoolExecutor
108+
109+
combined_vds = vz.open_virtual_mfdataset(filepaths, parallel=ThreadPoolExecutor)
110+
```
111+
112+
This can work well when virtualizing files in remote object storage because it parallelizes the issuing of HTTP GET requests for each file.
113+
114+
### Dask Delayed
115+
116+
You can parallelize using `dask.delayed` automatically by passing `parallel='dask'`.
117+
This will select the [`DaskDelayedExecutor`][virtualizarr.parallel.DaskDelayedExecutor].
118+
119+
```python
120+
combined_vds = vz.open_virtual_mfdataset(filepaths, parallel='dask')
121+
```
122+
123+
This uses the same approach that [`open_mfdataset`][xarray.open_mfdataset] does when `parallel=True` is passed to it.
124+
Using `dask.delayed` allows for parallelizing with any type of dask cluster, included a managed [Coiled](http://www.coiled.io) cluster.
125+
126+
### Lithops
127+
128+
As the map step is totally embarrassingly parallel, it can be performed entirely using serverless functions.
129+
This approach allows for virtualizing N files in the same time it takes to virtualize 1 file, (assuming you can provision N concurrent serverless functions), avoiding the need to configure, scale, and shutdown a cluster.
130+
131+
You can parallelize VirtualiZarr serverlessly by using the [lithops](http://lithops-cloud.github.io) library.
132+
Lithops can run on all the main cloud provider's serverless FaaS platforms.
133+
134+
To run on lithops you need to configure lithops for the relevant compute backend (e.g. AWS Lambda), build a runtime using Docker ([example Dockerfile](https://github.com/zarr-developers/VirtualiZarr/tree/develop/examples/oae/Dockerfile) with the required dependencies), and ensure the necessary cloud permissions to run are available.
135+
Then you can use the [`LithopsEagerFunctionExecutor`][virtualizarr.parallel.LithopsEagerFunctionExecutor] simply via:
136+
137+
```python
138+
combined_vds = vz.open_virtual_mfdataset(filepaths, parallel='lithops')
139+
```
140+
141+
### Custom Executors
142+
143+
You can also define your own executor to run in some other way, for example on a different serverless platform such as [Modal](https://modal.com).
144+
145+
Your custom executor must inherit from the [`concurrent.futures.Executor`][] ABC, and must implement the `.map` method.
146+
147+
```python
148+
from concurrent.futures import Executor
149+
150+
class CustomExecutor(Executor):
151+
def map(
152+
self,
153+
fn: Callable,
154+
*iterables: Iterable,
155+
) -> Iterator:
156+
...
157+
158+
combined_vds = vz.open_virtual_mfdataset(filepaths, parallel=CustomExecutor)
159+
```
160+
161+
## Memory usage
162+
163+
For the virtualization to succeed you need to ensure that your available memory is not exceeded at any point.
164+
There are 3 points at which this might happen:
165+
166+
1. While generating references
167+
2. While combining references
168+
3. While writing references
169+
170+
While generating references each worker calling [`open_virtual_dataset`][virtualizarr.open_virtual_dataset] needs to avoid running out of memory.
171+
This primarily depends on how the file is read - see the section on [caching](#caching-remote-files) below.
172+
173+
The combine step happens back on the machine on which [`open_virtual_mfdataset`][virtualizarr.open_virtual_mfdataset] was called, so while combining references that machine must have enough memory to hold all the virtual references at once.
174+
You can find the in-memory size of the references for a single virtual dataset by calling the [`.nbytes`][virtualizarr.accessor.VirtualiZarrDatasetAccessor.nbytes] accessor method on it (not to be confused with the [`.nbytes`][xarray.Dataset.nbytes] xarray method, which returns the total size if all that data were actually loaded into memory).
175+
Do this for one file, and multiply by the number of files you have to estimate the total memory required for this step.
176+
177+
Writing the combined virtual references out requires converting them to a different references format, which may have different memory requirements.
178+
179+
## Scalability of references formats
180+
181+
After the map-reduce operation is complete, you will likely still want to persist the virtual references in some format.
182+
Depending on the format, this step may also have scalability concerns.
183+
184+
### Kerchunk
185+
186+
The Kerchunk references specification supports 3 formats: an in-memory (nested) `dict`, JSON, and Parquet.
187+
188+
Both the in-memory Kerchunk `dict` and Kerchunk JSON formats are extremely inefficient ways to represent virtual references.
189+
You may well find that a virtual dataset object that easily fits in memory suddenly uses up many times more memory or space on disk when converted to one of these formats.
190+
Persisting large numbers of references in these formats is therefore not recommended.
191+
192+
The Kerchunk Parquet format is more scalable, but you may want to experiment with the `record_size` and `categorical_threshold` arguments to the virtualizarr [`.to_kerchunk`][virtualizarr.accessor.VirtualiZarrDatasetAccessor.to_kerchunk] accessor method.
193+
194+
### Icechunk
195+
196+
[Icechunk](https://icechunk.io/) uses it's own [open format](https://icechunk.io/en/latest/spec/) for persisting virtual references.
197+
198+
Icechunk's format stores the virtual references in dedicated binary files, and can use "manifest splitting", together meaning that it should be a scalable way to store large numbers of references.
199+
200+
!!! TODO
201+
Put numbers on this by testing at large scale once manifest splitting is actually released in Icechunk.
202+
203+
## Tips for success
204+
205+
Here are some assorted tips for successfully scaling VirtualiZarr.
206+
207+
### Caching remote files
208+
209+
When you call [`open_virtual_dataset`][virtualizarr.open_virtual_dataset] on a remote file, it needs to extract the metadata and store it in memory (the returned virtual dataset).
210+
211+
One way to do this is to issue HTTP range requests only for each piece of metadata.
212+
This will download the absolute minimum amount of data in total, but issue a lot of HTTP requests, each of which can take a long time to be returned from high-latency object storage.
213+
This approach therefore uses the minimum amount of memory on the worker but takes more time.
214+
215+
!!! TODO
216+
Describe how this is the default with obstore
217+
218+
The other extreme is to download the entire file up front.
219+
This downloads all the metadata by definition, but also all the actual data, which is likely millions of times more than you need for virtualization.
220+
This approach usually takes a lot less time on the worker but requires the maximum amount of memory - using this approach on every file in the dataset entails downloading the entire dataset across all workers!
221+
222+
!!! TODO
223+
How to enable this by passing `cache=True` to obstore
224+
225+
There are various tricks one can use when fetching metadata, such as pre-fetching, minimum fetch sizes, or read-ahead caching strategies.
226+
All of these approaches will put your memory requirements somewhere in between the two extremes described above, and are not necessary for successful execution.
227+
228+
Generally if you have access only to a limited amount of RAM you want to avoid caching to avoid running out of memory, whereas if you are able to scale out across many workers (e.g. serverlessly using lithops) your job will complete faster if you cache the files.
229+
Caching a file onto a worker requires that the memory available on that worker is greater than the size of the file.
230+
231+
### Batching
232+
233+
You don't need to create and write virtual references for all your files in one go.
234+
235+
Creating virtual references for subsets of files in batches means the memory requirements for combining and serializing each batch are lower.
236+
237+
Batching also allows you to pick up where you left off.
238+
This works particularly well with Icechunk, as you can durably commit each batch of references in a separate transaction.
239+
240+
```python
241+
import icehunk as ic
242+
243+
repo = ic.open(<repo_url>)
244+
245+
for i, batch in enumerate(file_batches):
246+
session = repo.writable_session("main")
247+
248+
combined_batch_vds = vz.open_virtual_mfdataset(batch)
249+
250+
combined_batch_vds.virtualize.to_icechunk(session.store, append_dim=...)
251+
252+
session.commit(f"wrote virtual references for batch {i}")
253+
```
254+
255+
Notice this workflow could also be used for appending data only as it becomes available, e.g. by replacing the for loop with a cron job.
256+
257+
### Retries
258+
259+
Sometimes an [`open_virtual_dataset`][virtualizarr.open_virtual_dataset] call might fail for a transient reason, such as a failed HTTP response from a server.
260+
In such a scenario automatically retrying the failed call might be enough to obtain success and keep the computation proceeding.
261+
262+
If you are batching your computation then you could retry each loop iteration if any [`open_virtual_dataset`][virtualizarr.open_virtual_dataset] calls fail, but that's potentially very inefficient, because that would also retry the successful calls.
263+
264+
Instead what is more efficient is to use per-task retries at te executor level.
265+
266+
!!! TODO
267+
We plan to add support for automatic retries to the Lithops and Dask executors (see Github PR #575)

0 commit comments

Comments
 (0)