Load Input Data in Parallel with Dask and UXarray#

Overview#

This usage example demonstrates how to load unstructured input data with UXarray and Dask. Loading in parallel and chunking and their respective performances are also showcased.

import uxarray as ux
from dask.distributed import Client, LocalCluster
import xarray as xr
import warnings

warnings.filterwarnings("ignore")

Data#

Data loaded in this notebook is the simulated output from the Department of Energy (DOE) Energy Exascale Earth System Model (E3SM) version 2. The case is set up as an atmosphere-only (AMIP) simulation with present-day control forcing (F2010) at a 1-degree horizontal resolution (ne30pg2), where sea surface temperatures and sea ice set as default as in the E3SMv2 model. The case is run for 6 years.

Chunking#

Chunks, which are small pieces of the array of interest, can be divided with Dask to be small enough to fit in memory.

UXarray inherited the chunking feature from Dask, where the chunks of the data can be specified when loading.

Loading Data with Chunking#

The following example demonstrates loading one monthly output from E3SM. By supplying the chunks argument, the data loaded will be split in the way as specified in the given dictionary. Typically, chunking the dataarray or dataset is done when loading the data.

In the following example, the data is split by the vertical levels in the atmosphere vert, as specified in the dictionary {'vert'=4}.

data_file_monthonly = "/glade/campaign/cisl/vast/uxarray/data/e3sm_keeling/ENSO_ctl_1std/unstructured/20231220.F2010.ENSO_ctl.lagreg.ne30pg2_EC30to60E2r2.keeling.eam.h0.0006-12.nc"
grid_file = (
    "/glade/campaign/cisl/vast/uxarray/data/e3sm_keeling/E3SM_grid/ne30pg2_grd.nc"
)
uxds_e3sm_mon = ux.open_dataset(grid_file, data_file_monthonly, chunks={"lev": 4})

Now look at one of the data arrays in the loaded dataset.

By calling one of the variables Q - specific humidity, we can look at the data array dimensions. The full data array has 1 point in time, 72 vertical levels, and a total of 21600 faces in the simulation grid, corresponding to the single monthly we loaded and the info shown below: time: 1, lev: 72, n_face: 21600.

The chunk size is also shown in the second line, where they contain 4 vertical levels instead of 72 (see chunksize=(1,4,21600)), proving we have successfully chunked the data.

uxds_e3sm_mon.Q
<xarray.UxDataArray 'Q' (time: 1, lev: 72, n_face: 21600)>
dask.array<open_dataset-Q, shape=(1, 72, 21600), dtype=float32, chunksize=(1, 4, 21600), chunktype=numpy.ndarray>
Coordinates:
  * lev      (lev) float64 0.1238 0.1828 0.2699 0.3986 ... 986.2 993.8 998.5
  * time     (time) object 0007-01-01 00:00:00
Dimensions without coordinates: n_face
Attributes:
    mdims:         1
    units:         kg/kg
    long_name:     Specific humidity
    cell_methods:  time: mean

UXarray also supports the same feature when loading multiple files at once with open_mfdataset and the same argument chunks as shown above.

Chunk size is important as it can be significant to performance, depending on the algorithm and usage. There are multiple possible configurations for chunking, such as splitting by uniform dimension size or specific chunk shape. Chunking can be done not only with a specified configuration, it can also be done with the automatic chunking feature in Dask.

Special values when specifying chunk size include: -1 for no chunking, None for no change in original chunking (in rechunking), and auto for automatic chunking. The default ideal chunk size in Dask is 128MB, which can be verified by calling dask.config.get('array.chunk-size').

More details on the possible configurations and guidelines on deciding chunk size can be found on Dask’s Page about Array Chunks.

Loading Grid with Chunking#

UXarray aso support chunking on the unstructured grid files. Both chunking on a single array and chunking the full grid with Dask is supported in UXarray.

The following shows the grid is first loaded and then chunked by calling .chunk(). This chunking method is equivalent to that for chunking data by defining chunk size with chunks argument when calling open_dataset().

Chunk a Single Array#

uxgrid = uxds_e3sm_mon.uxgrid
uxgrid.node_lon = uxgrid.node_lon.chunk()
uxgrid.node_lon.data
Array Chunk
Bytes 169.74 kiB 169.74 kiB
Shape (21727,) (21727,)
Dask graph 1 chunks in 1 graph layer
Data type float64 numpy.ndarray
21727 1

Loading Data with the parallel argument#

Similar to Xarray, UXarray also supports loading data in parallel. Performance may not be significant due to the chosen dataset for this notebook; and Dask client configuration requires customization depending on the data. Loading data in parallel using Dask can be helpful where the dataset of interest does not fit in memory and/or executions are to be distributed over several CPU cores or machines independently.

Loading 6-year monthly data in Serial#

We first demo loading 72 monthly files in serial and directly into memory. This case does not take any advantage of the lazy loading or parallel input/output that is provided by Dask.

%%time
# Regular Load
data_files = "/glade/campaign/cisl/vast/uxarray/data/e3sm_keeling/ENSO_ctl_1std/unstructured/*.nc"
uxds_e3sm_basic_load = ux.open_mfdataset(grid_file, data_files, parallel=False)
CPU times: user 19.4 s, sys: 321 ms, total: 19.7 s
Wall time: 25.4 s

Dask will by default chunk each file as a chunk. If to modify chunk sizes, it is recommended to do so at the call for open_mfdataset. It will result in inefficient use of memory if the data is first loaded and rechunked.

Loading 6-year monthly data in Parallel#

The following code demonstrates setting up a local cluster with the use of 128 cores (n_workers), with 2 jobs (threads_per_worker) for each core. Using a local cluster allows multi-process computation on your local machine (e.g. laptop) and provides a diagnostic dashboard for monitoring process performances.

Details on cluster configuration can be further read here.

warnings.filterwarnings("ignore")
cluster = LocalCluster(n_workers=128, threads_per_worker=2)
client = Client(cluster)
client

Client

Client-2c1e1c47-8680-11ef-82b2-0040a687f6f7

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/rtam/proxy/8787/status

Cluster Info

The difference between loading in serial or parallel is setting the argument parallel to True or False. Setting parallel=True speeds up the loading step by using dask.delayed, where the executions are deferred.

%%time
uxds_e3sm_parallel_load = ux.open_mfdataset(grid_file, data_files, parallel=True)
CPU times: user 13 s, sys: 1.68 s, total: 14.7 s
Wall time: 13.8 s
uxds_e3sm_parallel_load
<xarray.UxDataset>
Dimensions:              (time: 72, n_face: 21600, lev: 72, ilev: 73,
                          cosp_prs: 7, nbnd: 2, cosp_tau: 7, cosp_scol: 10,
                          cosp_ht: 40, cosp_sr: 15, cosp_sza: 5,
                          cosp_htmisr: 16, cosp_tau_modis: 7, cosp_reffice: 6,
                          cosp_reffliq: 6)
Coordinates: (12/13)
  * lev                  (lev) float64 0.1238 0.1828 0.2699 ... 993.8 998.5
  * ilev                 (ilev) float64 0.1 0.1477 0.218 ... 990.5 997.0 1e+03
  * cosp_prs             (cosp_prs) float64 9e+04 7.4e+04 ... 2.45e+04 9e+03
  * cosp_tau             (cosp_tau) float64 0.15 0.8 2.45 6.5 16.2 41.5 100.0
  * cosp_scol            (cosp_scol) int32 1 2 3 4 5 6 7 8 9 10
  * cosp_ht              (cosp_ht) float64 1.896e+04 1.848e+04 ... 720.0 240.0
    ...                   ...
  * cosp_sza             (cosp_sza) float64 0.0 20.0 40.0 60.0 80.0
  * cosp_htmisr          (cosp_htmisr) float64 0.0 250.0 ... 1.6e+04 1.8e+04
  * cosp_tau_modis       (cosp_tau_modis) float64 0.15 0.8 2.45 ... 41.5 100.0
  * cosp_reffice         (cosp_reffice) float64 5e-06 1.5e-05 ... 5e-05 7.5e-05
  * cosp_reffliq         (cosp_reffliq) float64 4e-06 9e-06 ... 1.75e-05 2.5e-05
  * time                 (time) object 0001-02-01 00:00:00 ... 0007-01-01 00:...
Dimensions without coordinates: n_face, nbnd
Data variables: (12/471)
    lat                  (time, n_face) float64 dask.array<chunksize=(1, 21600), meta=np.ndarray>
    lon                  (time, n_face) float64 dask.array<chunksize=(1, 21600), meta=np.ndarray>
    area                 (time, n_face) float64 dask.array<chunksize=(1, 21600), meta=np.ndarray>
    hyam                 (time, lev) float64 dask.array<chunksize=(1, 72), meta=np.ndarray>
    hybm                 (time, lev) float64 dask.array<chunksize=(1, 72), meta=np.ndarray>
    P0                   (time) float64 1e+05 1e+05 1e+05 ... 1e+05 1e+05 1e+05
    ...                   ...
    soa_c1DDF            (time, n_face) float32 dask.array<chunksize=(1, 21600), meta=np.ndarray>
    soa_c1SFWET          (time, n_face) float32 dask.array<chunksize=(1, 21600), meta=np.ndarray>
    soa_c2DDF            (time, n_face) float32 dask.array<chunksize=(1, 21600), meta=np.ndarray>
    soa_c2SFWET          (time, n_face) float32 dask.array<chunksize=(1, 21600), meta=np.ndarray>
    soa_c3DDF            (time, n_face) float32 dask.array<chunksize=(1, 21600), meta=np.ndarray>
    soa_c3SFWET          (time, n_face) float32 dask.array<chunksize=(1, 21600), meta=np.ndarray>