"""
Tools for importing and converting netCDF files generated from simulations run
using PySD.
"""
import itertools
import warnings
from pathlib import Path
from csv import QUOTE_NONE
from typing import Union, Optional
import xarray as xr
import pandas as pd
[docs]class NCFile():
"""
Helper class to extract data from netCDF files.
Parameters
----------
ncfile: str or pathlib.Path
Path to the netCDF file to process.
parallel: bool (optional)
When True, the Dataset is opened using chunks=-1 (see xarray
documentation for details) and DataArrays are processed in parallel
using dask delayed. Dask is not included as a requirement for pysd,
hence it must be installed separately. Setting parallel=True is
highly recommended when the Dataset contains large multidimensional
DataArrays.
"""
valid_export_file_types = [".csv", ".tab"]
def __init__(self,
filename: Union[str, Path],
parallel: Optional[bool] = False) -> None:
self.ncfile = NCFile._validate_nc_path(filename)
self.parallel = parallel
self.ds = self.open_nc()
[docs] def to_text_file(self,
outfile: Optional[Union[str, Path]] = "result.tab",
subset: Optional[list] = None,
time_in_row: Optional[bool] = False,
) -> pd.DataFrame:
"""
Convert netCDF file contents into comma separated or tab delimited
file.
Parameters
----------
outfile: str or pathlib.Path (optional)
Path to the output file.
subset: list (optional)
List of variables to export from the netCDF.
time_in_row: bool (optional)
Whether time increases along row.
Default is False.
Returns
-------
df: pandas.DataFrame
Dataframe with all colums specified in subset.
"""
df = self.to_df(subset=subset)
NCFile.df_to_text_file(df, outfile, time_in_row)
return df
[docs] def to_df(self,
subset: Optional[list] = None,
) -> pd.DataFrame:
"""
Wrapper to ds_to_df static method. Convert xarray.Dataset into a
pandas DataFrame.
Parameters
----------
subset: list (optional)
List of variables to export from the Dataset.
Returns
-------
df: pandas.DataFrame
Dataframe with all colums specified in subset.
"""
return NCFile.ds_to_df(self.ds, subset, self.parallel)
[docs] def open_nc(self) -> xr.Dataset:
"""
Loads netCDF file into xarray Dataset. It's basically a wrapper to
xr.open_dataset to simplify the interface for pysd use case (loading
simulation results).
Returns
-------
xarray.Dataset
"""
if self.parallel:
return xr.open_dataset(self.ncfile, engine="netcdf4", chunks=-1)
return xr.open_dataset(self.ncfile, engine="netcdf4")
[docs] @staticmethod
def ds_to_df(ds: xr.Dataset,
subset: Optional[list] = None,
parallel: Optional[bool] = False,
index_dim: Optional[str] = "time"
) -> pd.DataFrame:
"""
Convert xarray.Dataset into a pandas DataFrame.
Parameters
----------
ds: xarray.Dataset
Dataset object.
subset: list (optional)
List of variables to export from the Dataset.
parallel: bool (optional)
When True, DataArrays are processed in parallel using dask delayed.
Setting parallel=True is highly recommended when DataArrays are
large and multidimensional.
index_dim: str (optional)
Name of dimensions to use as index of the resulting DataFrame
(usually "time").
Returns
-------
df: pandas.DataFrame
Dataframe with all colums specified in subset.
"""
subset = NCFile._validate_ds_subset(ds, subset)
if parallel:
processing_func = NCFile.da_to_dict_delayed
else:
processing_func = NCFile.da_to_dict
savedict = {}
for name in subset:
print(f"\nProcessing variable {name}.")
da = ds[name]
dims = da.dims
if not dims or dims == (index_dim,):
savedict.update({name: da.values.tolist()})
else:
savedict.update(processing_func(da, index_dim))
return NCFile.dict_to_df(savedict)
[docs] @staticmethod
def df_to_text_file(df: pd.DataFrame, outfile: Path,
time_in_row: Optional[bool] = False
) -> None:
"""
Store pandas DataFrame into csv or tab file.
Parameters
----------
df: pandas.DataFrame
DataFrame to save as csv or tab file.
outfile: str or pathlib.Path
Path of the output file.
time_in_row: bool (optional)
Whether time increases along a column or a row.
Returns
-------
None
"""
outfile = Path(outfile)
out_fmt = outfile.suffix
if out_fmt not in NCFile.valid_export_file_types:
raise TypeError("Invalid output file format {out_fmt}\n"
"Supported formats are csv and tab.")
outfile.parent. mkdir(parents=True, exist_ok=True)
if not isinstance(time_in_row, bool):
raise ValueError("time_in_row argument takes boolen values.")
# process output file path
if outfile.suffix == ".csv":
sep = ","
df.columns = [col.replace(",", ";") for col in df.columns]
else:
sep = "\t"
if time_in_row:
df = df.transpose()
# QUOTE_NONE used to print the csv/tab files as vensim does with
# special characterse, e.g.: "my-var"[Dimension]
df.to_csv(outfile, sep=sep, index_label="Time", quoting=QUOTE_NONE)
print(f"Data saved in '{outfile}'")
[docs] @staticmethod
def da_to_dict(da: xr.DataArray, index_dim: str) -> dict:
"""
Splits a DataArray into a dictionary, with keys equal to the name
of the variable plus all combinations of the cartesian product of
coordinates within brackets, and values equal to the data
corresponding to those coordinates along the index_dim dimension.
Parameters
----------
index_dim: str
The coordinates of this dimension will not be fixed during
indexing of the DataArray (i.e. the indexed data will be a
scalar or an array along this dimension).
"""
dims, coords = NCFile._get_da_dims_coords(da, index_dim)
indexes = []
# TODO: try to achieve the same as itertools.product with
# xr.DataArray.stack
for coords_prod in itertools.product(*coords):
indexes.append(
NCFile._index_da_by_coord_labels(da, dims, coords_prod)
)
return dict(indexes)
[docs] @staticmethod
def da_to_dict_delayed(da: xr.DataArray, index_dim: str) -> dict:
"""
Same as da_to_dict, but using dask delayed and compute.
This function runs much faster when da is a dask array (chunked).
To use it on its own, you must first make the following imports:
from dask import delayed, compute
from dask.diagnostics import ProgressBar
Parameters
----------
index_dim: str
The coordinates of this dimension will not be fixed during
indexing (the indexed data will be an array along this dimension).
"""
namespace = dir()
if not all(
map(lambda x: x in namespace, [
"delayed", "compute", "ProgressBar"]
)):
from dask import delayed, compute
from dask.diagnostics import ProgressBar
dims, coords = NCFile._get_da_dims_coords(da, index_dim)
# loading data into memory for faster indexing
da.load()
indexes = []
# TODO: try to achieve the same as itertools.product with
# xr.DataArray.stack
for coords_prod in itertools.product(*coords):
x = delayed(
NCFile._index_da_by_coord_labels
)(da, dims, coords_prod)
indexes.append(x)
with ProgressBar():
res = compute(*indexes)
return dict(res)
[docs] @staticmethod
def dict_to_df(d: dict) -> pd.DataFrame:
"""
Convert a dict to a pandas Dataframe.
Parameters
----------
d: dict
Dictionary to convert to pandas DataFrame.
"""
if "time" not in d:
raise KeyError("Missing time key.")
return pd.DataFrame(d).set_index('time')
@staticmethod
def _validate_nc_path(nc_path: Union[str, Path]) -> Path:
"""
Checks validity of the nc_path passed by the user. We run these
checks because xarray Exceptions are not very explicit.
"""
if not isinstance(nc_path, (str, Path)):
raise TypeError(f"Invalid file path type: {type(nc_path)}.\n"
"Please provide string or pathlib Path")
nc_path = Path(nc_path)
if not nc_path.is_file():
raise FileNotFoundError(f"{nc_path} could not be found.")
if not nc_path.suffix == ".nc":
raise ValueError("Input file must have nc extension.")
return nc_path
@staticmethod
def _validate_ds_subset(ds: xr.Dataset, subset: list) -> list:
"""
If subset=None, it returns a list with all variable names in the ds.
If var names in subset are present in ds, it returns them, else it
warns the user.
Parameters
----------
subset: list
Subset of variable names in the xarray Dataset.
"""
# use all variable names
if not subset:
new_subset = [name for name in ds.data_vars.keys()]
else:
if not isinstance(subset, list) or \
not all(map(lambda x: isinstance(x, str), subset)):
raise TypeError("Subset argument must be a list of strings.")
new_subset = []
for name in subset:
if name in ds.data_vars.keys():
new_subset.append(name)
else:
warnings.warn(f"{name} not in Dataset.")
if not new_subset:
raise ValueError("None of the elements of the subset are "
"present in the Dataset.")
# adding time in the final subset
if "time" not in new_subset:
new_subset.append("time")
return new_subset
@staticmethod
def _index_da_by_coord_labels(da: xr.DataArray, dims: list,
coords: tuple) -> tuple:
"""
Generates variable names, combining the actual name of the variable
with the coordinate names between brackets and separated by commas,
and indexes the DataArray by the coordinate names specified in
the coords argument.
Parameters
----------
da: xr.Dataset
Dataset to be indexed.
dims: list
Dimensions along which the DataArray will be indexed.
coords: tuple
Coordinate names for each of the dimensons in the dims list.
Returns
-------
A tuple consisting of the string
var_name[dim_1_coord_j, ..., dim_n_coord_k] in the first index, and the
indexed data as the second index.
"""
name = da.name
idx = dict(zip(dims, coords))
subs = "[" + ",".join(map(lambda x: str(x), coords)) + "]"
return name + subs, da.loc[idx].values
@staticmethod
def _get_da_dims_coords(da: xr.DataArray, exclude_dim: str) -> tuple:
"""
Returns the dimension names and coordinate labels in two
separate lists. If a dimension name is in the exclude_dims
list, the returned dims and coords will not include it.
Parameters
----------
exclude_dim: str
Names of dimension to exclude.
Returns
-------
dims: list
List containing the names of the DataArray dimensions.
coords: list
List of lists of coordinates for each dimension.
"""
dims, coords = [], []
for dim in da.dims:
if dim != exclude_dim:
dims.append(dim)
coords.append(da.coords[dim].values)
return dims, coords