Commands¶
Shift
+Enter
: Run and move to the next cell.Ctrl
+Enter
: Run the cell in place.Alt
+Enter
: Run and insert a new cell below.- To delete a cell: Press
esc
to enter command mode, then presscmd
+m
+d
. - To insert a new cell below: Press
esc
thenb
. - To insert a new cell above: Press
esc
thena
.
Topics¶
Part 2
Introduction to xarray:
- Data exploration
- Indexing
Visualizing data with xarray:
- Static plots
- Interactive plotting
Computation with Xarray:
- Built-in functions
- Costume functions
End with and exercise
Second Half:
- Scaling computations with Dask:
- Handling out-of-memory (large) datasets
- Parallel programming
- Accessing Cloud-Based Data Catalogs:
- Searching Earth Engine and Planetary Computer data (explore a STAC Catalog)
- Integrating cloud data into xarray workflows
If time allows:
- Running Dask on HPC/HTC (Requires CHTC accounts):
- Setting up Dask on UW HPC using vscode
Geospatial Data Formats for Climate and Satellite Data¶
Key Formats¶
Common geospatial file formats frequently used for climate and satellite data:
NetCDF (Network Common Data Form; Our focus today)¶
Purpose:
- Storing and sharing multidimensional scientific array-based data with comprehensive metadata.
Key Features:
- Self-describing with rich metadata for efficient algorithm development.
- Scalable for efficient access to subsets of large datasets, even remotely.
- Appendable for data addition without structure redefinition.
- Sharable with support for multi-user access.
Common Applications:
- Gridded climate data
- Satellite images
- Earth system model outputs
CF Conventions (Climate and Forecast):
- Standardized metadata for self-description and interoperability.
- Ensures variables have associated descriptions, physical units, and spatiotemporal coordinates.
- Enables software tools to work effectively with minimal user intervention.
Links:
HDF5 (Hierarchical Data Format version 5)¶
Purpose:
- Storing complex heterogeneous datasets.
Key Features:
- Hierarchical data organization with groups and datasets.
- Self-describing with metadata within the file.
- Multiple data type support (integers, floats, strings, complex numbers).
- Chunking and compression for efficient storage and access.
- Large file and dataset support (terabytes to petabytes).
- Parallel processing capabilities.
Common Applications:
- Many satellite data (e.g. MODIS) is HDF5.
- Earth system model outputs.
Links:
Zarr¶
Purpose:
- Efficient parallel processing and cloud storage of large datasets.
Key Features:
- Stores data in chunks across multiple files. Makes reading and writing large datasets faster.
- Optimized for high-performance computing (HPC), high-throughput computing (HTC), and cloud environments.
Common Applications:
- Large-scale scientific datasets
- Cloud-based data analysis
Links:
STAC (SpatioTemporal Asset Catalog)¶
Purpose::
- Simplify search and discovery of geospatial data across different providers and platforms.
- Enable interoperability between various tools and applications working with geospatial data.
- Facilitate easier cloud storage and access for large datasets.
Key Features:
- Uses JSON files to describe assets, providing information like location, time, data type, metadata, and availability.
- Flexible and extensible, allowing customization for specific data types and needs.
Links:
Large Data and Speed: The Next Challenge¶
We've covered the basics of climate data processing in Python. Now, let's tackle the real challenges:
- Out-of-memory data: What happens when datasets are too large to fit in memory?
- Speeding up computations: How can we analyze data faster, especially with large datasets?
In the next session, we'll introduce Dask
, a powerful tool for handling these issues.
Introducing Dask¶
What is Dask? - Dask is an open-source library designed to provide parallelism to the existing Python stack. It provides integrations with Python libraries like NumPy, Pandas, Xarray and scikit-learn to enable parallel execution across multiple cores, processors, and computers without having to learn new libraries or languages.
### Scaling with Dask
import xarray as xr
import dask
import numpy as np
import matplotlib.pyplot as plt
import dask.array
from dask.distributed import Client
import multiprocessing
from dask.distributed import Client
# Get the number of cores
n_cores = 2
# Specify the number of threads per worker
threads_per_worker = 2 # adjust this based on your workload
client = Client(n_workers=n_cores, threads_per_worker=threads_per_worker)
client
Lets start with a simple example of a 2D array of numbers:
shape = (1000, 4000)
ones_np = np.ones(shape)
print("Size:", ones_np.nbytes / 1e6, "MB")
We can create an equivalent Dask array using dask.array.ones:
ones_da = dask.array.ones(shape)
ones_da
# Note: 1 MiB = 1,048,576 bytes
In this example we have not "chunk" the data yet. There is only one chunk with same shape as our Array.
Now lets chunk the data into 1000 by 1000 blocks:
chunks_size = (1000, 1000)
ones_da = dask.array.ones(shape, chunks=chunks_size)
ones_da
pay close attention to the chunks size and the memory usage.
Dask graph¶
A Dask graph, often referred to as a Dask task graph or computation graph, represents the logical structure of a computation. In this graph, each node represents a task or operation, and the edges represent dependencies between these tasks.
Thus, note that Dask does not implement any computations until you explicitly request them. At this stage, Dask only graphs the "workflow" of all tasks you've asked it to perform for you.
You can utilize the dask.array.visualize() function to visualize the Dask graph.
# Uncomment if you get error related to graphviz when plotting
#!pip uninstall graphviz
# conda install -c conda-forge graphviz
# conda install -c conda-forge python-graphviz
# Visualize the dask graph
dask.visualize(ones_da)
Similar to numpy, we can perform arithmetic operations on dask arrays:
# Dask is lazy, it graphs the tasks but not doing it, until we specifically ask for it through compute
ones_mean = ones_da.mean()
ones_mean
One key distinction between Dask and NumPy lies in Dask's "lazy" evaluation approach, where computations are deferred until explicitly requested.
The provided code represents merely the computation graph, outlining the sequence of operations, rather than executing the computations themselves:
dask.visualize(ones_da.mean())
To compute the mean, we can use the .compute() method to trigger the computation and get the result as a NumPy array.
# To calculate
ones_mean.compute()
Parallelize the calculation¶
Now we know about chunking, what about parallel computing.
The following code is designed to take precisely 4 seconds when executed sequentially:
import time
def inc(x):
# Takes two seconds to compute
time.sleep(2)
return x + 1
def dec(y):
# Takes one second to compute
time.sleep(1)
return y - 1
def add(x, y):
# Takes one seconds to compute
time.sleep(1)
return x + y
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
with dask.delayed we can make the above computation lazy. Meaning we only design the computation graph but not doing the computation.
inc = dask.delayed(inc)
dec = dask.delayed(dec)
add = dask.delayed(add)
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
Note how fast the cell runs! This is because the dask.delayed calls are building up a task graph, but not actually executing it.
Lets visualize the computation graph:
# Visualize the dask graph for calculation of z
z.visualize(rankdir="LR")
What is your guess for the time it will take to compute z?
%%time
z.compute()
Dask-Xarray for Large-Scale Gridded Geospatial Data Analysis¶
You've already explored the individual strengths of Xarray and Dask - the former providing a familiar and intuitive interface for labeled arrays, and the latter unlocking parallel processing on massive datasets. Now, let's delve into the exciting stuff when they're combined:
Key Advantages:
- Parallel Processing: Dask distributes data and computations across multiple cores or worker processes, enabling significantly faster analysis on large datasets. Operations like aggregation, reshaping, and arithmetic leverage distributed computing power, drastically reducing analysis times.
- Lazy Evaluation: Dask-Xarray adopts a "lazy" approach, deferring actual computations until absolutely necessary.
- This optimizes resource utilization by focusing only on the required parts of complex workflows, further boosting efficiency.
- Reproducibility of the research. Computation the metadata than the data is more transferable, especially if the metadata is the same across datasets (e.g. STAC)
- Streaming: For datasets exceeding disk capacity, Dask-Xarray employs streaming to process data in chunks, eliminating the need to load everything at once.
- Familiar API: Xarray's intuitive API, reminiscent of NumPy and pandas, ensures a consistent experience regardless of data location: in-memory arrays or out-of-memory Dask arrays. This minimizes the learning curve and simplifies code adaptation.
- Flexibility: Dask-Xarray adapts to your hardware and software environment, whether you're working on a single workstation, multi-core machine, cluster, or cloud platform.
OK! now we know how to use Dask to scale our computation. Let's go back to our air temprature dataset and see how we can use Dask to scale our computation.
If we just open the DataArrays it will be loaded into memory:
ds = xr.open_dataset("./data/air.mon.mean.nc")
da = xr.open_dataarray("./data/air.mon.mean.nc")
da
If data is huge then this approach fails! so we need to use chunks
(remember from dask) argument to load the data in chunks:
# Open the air temprature Dataset with Dask enabled
da = xr.open_dataarray(
"./data/air.mon.mean.nc",
chunks={
"time": 100,
"lat": "auto",
"lon": "auto",
},
)
da
The data has been loaded as a Dask array, with the chunks argument determining how the data is partitioned into manageable chunks.
This implies that the entire dataset is not loaded into memory; instead, a computational graph is constructed.
This graph consists of tasks that are executed on-demand, following a "lazy" evaluation strategy similar to the example demonstrated at the beginning of the session.
Extract data from the dataset there are two ways:
.data
which returns a dask array.to_numpy
and.values
, which means it will call thecompute()
- Use
.to_numpy
instead of.values
as it retruns more generlizable array (e.g. sparse arrays)
da.data
Note that the data is a dask array and lazy!
To get the numpy array use .to_numpy()
method:
data = da.to_numpy()
print(type(data))
print(data)
P.S. Don't worry about the nans, this is land temperature data and it is expected to have nans over the ocean and other places.
Lazy computation¶
As expected, the computation on xarray when calling chunks is lazy.
It means that the actual computation is deferred until it is explicitly needed.
Lets do an example of a simple calculation with Dask-xarray by taking the mean:
mean = da.mean()
std = da.std()
mean_std = mean + std
mean_std
Note its fast since its a lazy computation ---> no actual computation is done.
visualize the dask graph for this calculation:
dask.visualize(mean_std, rankdir="LR")
There are various ways to perform the computation:
.compute()
: returns an xarray object. Recommended for smaller datasets where outputs are small enough to fit into memory..load()
: similar to compute but returns a dask object. (Not generally recommended for out-of-memory situations)..persist():
does the computation but holds the results in the distributed cluster memory. Most common for out-of-memory situations but it requires access to clusters/clouds.
mean_std_calculated = mean_std.compute()
Now dask is executing the graph and calculating the result.
mean_std_calculated
mean_std.load()
Calculate the trend but with Dask¶
da_annual = da.resample(time="Y").mean()
da_annual = da_annual.chunk(dict(time=-1))
da_annual = da_annual.chunk({"lat": 100, "lon": 100, "time": -1})
da_annual
# Define a function that calculates the linear trend using numpy polyfit
def linear_trend(y):
# y is the variable of interest
# Check if there is any NaN in y
if np.any(np.isnan(y)):
# Return NaN as slope
return np.nan
else:
# Create an array of indices as x
x = np.arange(len(y))
# Return only the slope of the linear fit
return np.polyfit(x, y, 1)[0]
trend = xr.apply_ufunc(
linear_trend,
da_annual.variable,
input_core_dims=[["time"]],
output_core_dims=[[]],
vectorize=True,
dask="parallelized",
output_dtypes=[float],
)
trend_dataarray = xr.DataArray(
trend, dims=["lat", "lon"], coords={"lat": ds.lat, "lon": ds.lon}
)
trend_dataarray.plot()
from dask import optimize
(optimized,) = optimize(trend.data)
optimized.visualize()
When using Dask to parallelize computations, there can be overhead associated with parallelization, chunking, and data movement between workers. In some cases, for smaller datasets, the overhead of parallelization can outweigh the benefits of parallel processing, resulting in longer execution times compared to a non-parallelized approach.
In this specific case, with a small array (77 time steps, 360 latitudes, and 720 longitudes), the overhead introduced by Dask's parallelization may dominate the computation time. Dask is designed to handle larger-than-memory datasets efficiently by breaking them into smaller chunks and processing them in parallel. However, for smaller datasets that can fit into memory, the overhead of parallelization may outweigh the benefits.
Real out-of-memory-example:¶
Do not try to visualize! Its too big and will fail
bigshape = (200000, 40000)
chunk_shape = (1000, 1000) # define your chunk shape
big_ones = dask.array.ones(bigshape, chunks=chunk_shape)
print("Size is:", big_ones.nbytes / 1e9, "GB! To big to fit in memory")
big_ones
Lets make a fake xarray object and give it fake lat and lon ( in real world this big data could be your geospatial satellite/climate data).
big_ones_xr = xr.DataArray(
big_ones,
dims=["lat", "lon"],
coords={"lat": np.arange(bigshape[0]), "lon": np.arange(bigshape[1])},
name="big_ones",
attrs={"units": "m"},
)
big_ones_xr
Do big calculations with Dask
big_mean = big_ones.mean() + big_ones.std()
from dask.diagnostics import ProgressBar
ProgressBar().register()
with ProgressBar():
result = big_mean.compute()
result
Practice:¶
- Try to chunk the data along the time dimension and calculate the linear trend using the apply_ufunc function. What do you observe?
- Also experiement with different lat and lon chunk sizes and examine the difference in execution time.
Close Dask client
client.close()
Bonus section: Microsoft planetary computer uses Xarray and Dask for Gepspatial data analysis¶
Downloading Data from Microsoft Planetary Computer¶
The Microsoft Planetary Computer is a powerful cloud platform specifically designed for researchers in climate and environmental fields. It provides unprecedented access to a vast repository of data, including:
- Climate observations
- Satellite imagery
- Model outputs
The Planetary Computer leverages the STAC API (SpatioTemporal Asset Catalog), a standardized interface that makes it easy to discover, search, and access datasets based on specific criteria like location, time period, and data type.
Here's a quick overview of the process:
1. Explore the Data Catalog: Browse the Planetary Computer's extensive collection of datasets through the user-friendly Data Catalog. Filter by parameters like spatial coverage, temporal resolution, and data type to find the resources relevant to your research.
2. Utilize the STAC API: Interact with the data programmatically using the STAC API. This protocol enables flexible querying, subsetting, and retrieval of specific data segments you need for your analysis.
3. Download or Process Data: Download the retrieved data directly to your local machine or leverage cloud-based processing environments within the Planetary Computer platform.
Additional Resources:
Load the libraries
!pip install pystac-client planetary-computer odc.stac
import pystac_client
import planetary_computer
import odc.stac
import matplotlib.pyplot as plt
from pystac.extensions.eo import EOExtension as eo
To access the data, we’ll create a pystac_client.Client. We’ll explain the modifier part later on, but it’s what lets us download the data assets.
catalog = pystac_client.Client.open(
"https://planetarycomputer.microsoft.com/api/stac/v1",
modifier=planetary_computer.sign_inplace,
)
Define area of interest
bbox_of_interest = [-122.001, 47, -122, 47.001]
time_of_interest = "2021-01-01/2021-12-31"
# area_of_interest = {"type": "Point", "coordinates": [-122.2751, 47.5469]}
Search the catalog for the landsat data collection 2 for the year 2021
search = catalog.search(
collections=["landsat-c2-l2"],
# intersects=area_of_interest,
bbox=bbox_of_interest,
datetime=time_of_interest,
query={"eo:cloud_cover": {"lt": 10}},
)
items = search.item_collection()
print(f"Returned {len(items)} Items")
Inspect one item to see what it looks like
items = list(items)
items[0]
Download the data where each item is a time step
import xarray as xr
bands_of_interest = ["nir08"]
data_list = []
for item in items:
data = odc.stac.stac_load(
[item], bands=bands_of_interest, bbox=bbox_of_interest
).isel(time=0)
data_list.append(data)
combined_data = xr.concat(data_list, dim="item")
combined_data
Do some plotting
combined_data.nir08.mean(["x", "y"]).plot()
combined_data.nir08.mean("item").plot()
data["nir08"].plot()