"""
Top-level routines for opening, concatenating, aligning, and splitting arrays.
Operates on :class:`DataArray` and :class:`DataCollection` objects; includes
multi-file helpers (``open_mfdataarray``, ``open_mfdatacollection``).
"""
import os
import re
import warnings
from collections import defaultdict
from concurrent.futures import as_completed
from glob import glob
from itertools import pairwise
from pathlib import Path
import numpy as np
import pandas as pd
import plotly.express as px
import xarray as xr
from loky import get_reusable_executor
from tqdm import tqdm
from ..coordinates.core import Coordinates, get_sampling_interval
from ..parallel import get_workers_count
from ..virtual import VirtualSource, VirtualStack
from .dataarray import DataArray
from .datacollection import DataCollection, DataMapping, DataSequence
[docs]
def open(
paths,
dim="first",
tolerance=None,
squeeze=None,
engine=None,
parallel=None,
verbose=False,
**kwargs,
):
"""
Open one or several files as a data array or collection.
Automatically dispatches to the appropriate reader based on the shape of `paths`:
- **Single file** (plain path string): tries to open as a data collection first,
falls back to a data array if the file does not contain a data collection.
- **Multi-file** (wildcarded string with ``*``, ``?``, or ``[…]``, or a list of
paths): tries to open and combine as a multi-file data collection first, falls
back to a multi-file data array if the files are not data collections.
- **Tree-like** (string containing ``{field}`` placeholders):
opens a directory tree as a nested data collection using
:func:`open_mfdatatree`.
Parameters
----------
paths : str or list of str
The path(s) to open. Can be:
- A plain file path (single file).
- A shell-style wildcard string (``*``, ``?``, ``[…]``) matching multiple
files.
- A list of explicit file paths.
- A tree descriptor string containing ``{field}`` (dict level) and
``[field]`` (list level) placeholders.
dim : str, optional
The dimension along which multiple files are concatenated. Ignored when
opening a single file. Default is ``"first"``.
tolerance : float or timedelta64, optional
Maximum gap or overlap allowed between consecutive files to still be
considered continuous. For time coordinates, numeric values are interpreted
as seconds. Ignored when opening a single file. Default is zero tolerance.
squeeze : bool or None, optional
Whether to return a DataArray instead of a DataCollection when the result
contains only one data array. When ``None`` (default), the behaviour depends
on the dispatch path: ``True`` for multi-file data arrays, ``False``
otherwise. Ignored when opening a single file.
engine : str or callable, optional
The file format engine to use, or a custom read callable. When ``None``
(default), the xdas NetCDF format is assumed. Providing an engine skips the
automatic DataCollection detection.
parallel: bool or int, optional
Whether to use multiprocessing to fetch file metadata. If False or 1,
runs in single-process mode. If an integer, use that many processes.
If True, use as many processes as available cores. If None, use the
global xdas configuration. Default to None.
verbose : bool, optional
Whether to display a progress bar while reading metadata. Ignored when
opening a single file. Default is ``False``.
**kwargs
Additional keyword arguments forwarded to the underlying engine read
function. Only used when `engine` is not ``None``.
Returns
-------
DataArray or DataCollection
The opened data. The exact type depends on the dispatch path and the
``squeeze`` setting.
Raises
------
ValueError
If `paths` is neither a string nor a list.
FileNotFoundError
If no file matching `paths` can be found.
See Also
--------
open_dataarray : Open a single DataArray file.
open_datacollection : Open a single DataCollection file.
open_mfdataarray : Open and combine multiple DataArray files.
open_mfdatacollection : Open and combine multiple DataCollection files.
open_mfdatatree : Open a directory tree as a nested DataCollection.
Examples
--------
Open a single file (auto-detects DataCollection vs DataArray):
>>> import xdas as xd
>>> da = xd.open("path/to/file.nc") # doctest: +SKIP
Open multiple files with a wildcard:
>>> da = xd.open("path/to/files/*.nc") # doctest: +SKIP
Open a list of explicit paths:
>>> da = xd.open(["file1.nc", "file2.nc"]) # doctest: +SKIP
Open a directory tree:
>>> dc = xd.open("/data/{node}/[acq].nc", engine="asn") # doctest: +SKIP
"""
paths = _ensure_str_paths(paths)
if isinstance(paths, str):
if "{" in paths:
method = "tree-like"
elif "*" in paths or "?" in paths or "[" in paths:
method = "multi-file"
else:
method = "single-file"
elif isinstance(paths, list):
method = "multi-file"
else:
raise Exception(
f"`paths` must be either a string or a list, found {type(paths)}"
)
match method:
case "single-file":
if engine is None:
try:
return open_datacollection(paths)
except Exception:
pass
return open_dataarray(paths, engine=engine, **kwargs)
case "multi-file":
if engine is None:
try:
return open_mfdatacollection(
paths,
dim,
tolerance,
squeeze=False if squeeze is None else squeeze,
parallel=parallel,
verbose=verbose,
)
except Exception:
pass
return open_mfdataarray(
paths,
dim,
tolerance,
squeeze=True if squeeze is None else squeeze,
engine=engine,
parallel=parallel,
verbose=verbose,
**kwargs,
)
case "tree-like": # pragma: no branch
return open_mfdatatree(
paths,
dim,
tolerance,
squeeze=False if squeeze is None else squeeze,
engine=engine,
parallel=parallel,
verbose=verbose,
**kwargs,
)
[docs]
def open_mfdatacollection(
paths, dim="first", tolerance=None, squeeze=False, verbose=False, parallel=None
):
"""
Open a multiple file DataCollection.
Files matching the wildcarded `paths` string will be opened and combined into one
data collection. Each opened file must be a DataCollection. The data arrays nested
inside the data collections are concatenated by their position within the data
collection hierarchy using `combine_by_field`.
For exemple, it can be used to combine daily data collections into one master
data collection.
Parameters
----------
paths : str or list
The path names given as a shell-style wildcards string or a list of paths.
dim : str, optional
The dimension along which the data arrays are concatenated. Default to "first".
tolerance : float or timedelta64, optional
During concatenation, the tolerance to consider that the end of a file is
continuous with beginning of the following one. For time coordinates, numeric
values are considered as seconds. Default to zero tolerance.
squeeze : bool, optional
Whether to return a DataArray instead of a DataCollection if the combination
results in a data collection containing a unique data array.
parallel: bool or int, optional
Whether to use multiprocessing to fetch file metadata. If False or 1,
runs in single-process mode. If an integer, use that many processes.
If True, use as many processes as available cores. If None, use the
global xdas configuration. Default to None.
verbose: bool
Whether to display a progress bar. Default to False.
Returns
-------
DataCollection
The combined data collection
"""
paths = _ensure_str_paths(paths)
if isinstance(paths, str):
paths = sorted(glob(paths))
elif isinstance(paths, list):
for path in paths:
if not os.path.exists(path):
raise FileNotFoundError(f"could not find {path}")
else:
raise ValueError(
f"`paths` must be either a string or a list, found {type(paths)}"
)
if len(paths) == 0:
raise FileNotFoundError("no file to open")
if len(paths) > 100_000:
raise NotImplementedError(
"The maximum number of file that can be opened at once is for now limited "
"to 100 000."
)
max_workers = get_workers_count(parallel)
if max_workers == 1:
if verbose:
iterator = tqdm(paths, desc="Fetching metadata from files")
else:
iterator = paths
objs = [open_datacollection(path) for path in iterator]
else:
executor = get_reusable_executor(max_workers)
futures = [executor.submit(open_datacollection, path) for path in paths]
if verbose:
iterator = tqdm(
as_completed(futures),
total=len(futures),
desc="Fetching metadata from files",
)
else:
iterator = as_completed(futures)
objs = [future.result() for future in iterator]
return combine_by_field(objs, dim, tolerance, squeeze, True, verbose)
[docs]
def open_mfdatatree(
paths,
dim="first",
tolerance=None,
squeeze=False,
engine=None,
verbose=False,
parallel=None,
**kwargs,
):
"""
Open a directory tree structure as a data collection.
The tree structure is descirebed by a path descriptor provided as a string
containings placeholders. Two flavours of placeholder can be provided:
- `{field}`: this level of the tree will behave as a dict. It will use the
directory/file names as keys.
- `[field]`: this level of the tree will behave as a list. The directory/file
names are not considered (as if the placeholder was replaced by a `*`) and
files are gathered and combined as if using `open_mfdataarray`.
Several dict placeholders with different names can be provided. They must be
followed by one or more list placeholders that must share a unique name. The
resulting data collection will be a nesting of dicts down to the lower level
which will be a list of dataarrays.
Parameters
----------
paths : str
The path descriptor.
dim : str, optional
The dimension along which the data arrays are concatenated. Default to "first".
tolerance : float or timedelta64, optional
During concatenation, the tolerance to consider that the end of a file is
continuous with beginning of the following one. For time coordinates, numeric
values are considered as seconds. Default to zero tolerance.
squeeze : bool, optional
Whether to return a DataArray instead of a DataCollection if the combination
results in a data collection containing a unique data array.
engine: str or callable, optional
The type of file to open or a read function. Default to xdas netcdf format.
parallel: bool or int, optional
Whether to use multiprocessing to fetch file metadata. If False or 1,
runs in single-process mode. If an integer, use that many processes.
If True, use as many processes as available cores. If None, use the
global xdas configuration. Default to None.
verbose: bool
Whether to display a progress bar. Default to False.
**kwargs
Additional keyword arguments to be passed to the read function.
Returns
-------
DataCollection
The collected data.
Examples
--------
>>> import xdas as xd
>>> paths = "/data/{node}/{cable}/[acquisition]/proc/[acquisition].h5"
>>> xd.open_mfdatatree(paths, engine="asn") # doctest: +SKIP
Node:
CCN:
Cable:
N:
Acquisition:
0: <xdas.DataArray (time: ..., distance: ...)>
1: <xdas.DataArray (time: ..., distance: ...)>
SER:
Cable:
N:
Acquisition:
0: <xdas.DataArray (time: ..., distance: ...)>
S:
Acquisition:
0: <xdas.DataArray (time: ..., distance: ...)>
1: <xdas.DataArray (time: ..., distance: ...)>
2: <xdas.DataArray (time: ..., distance: ...)>
"""
paths = _ensure_str_paths(paths)
placeholders = re.findall(r"[\{\[].*?[\}\]]", paths)
seen = set()
fields = tuple(
placeholder[1:-1]
for placeholder in placeholders
if not (placeholder in seen or seen.add(placeholder))
)
wildcard = paths
for placeholder in placeholders:
wildcard = wildcard.replace(placeholder, "*")
fnames = sorted(glob(wildcard))
regex = paths
regex = regex.replace(".", r"\.")
for placeholder in placeholders:
if placeholder.startswith("{") and placeholder.endswith("}"):
regex = regex.replace(placeholder, f"(?P<{placeholder[1:-1]}>.+)", 1)
regex = regex.replace(placeholder, f"(?P={placeholder[1:-1]})")
else:
regex = regex.replace(placeholder, r".*")
regex = re.compile(regex)
tree = defaulttree(len(fields))
for fname in fnames:
match = regex.match(fname)
bag = tree
for field in fields[:-1]:
bag = bag[match.group(field)]
bag.append(fname)
return collect(
tree, fields, dim, tolerance, squeeze, engine, parallel, verbose, **kwargs
)
def collect(
tree,
fields,
dim="first",
tolerance=None,
squeeze=False,
engine=None,
parallel=None,
verbose=False,
**kwargs,
):
"""
Collect the data from a tree of paths using `fields` as level names.
Parameters
----------
tree : nested dict of lists
The paths grouped in a tree hierarchy.
fields : tuple of str
The names of the levels of the tree hierarchy.
dim : str, optional
The dimension along which the data arrays are concatenated. Default to "first".
tolerance : float or timedelta64, optional
During concatenation, the tolerance to consider that the end of a file is
continuous with beginning of the following one. For time coordinates, numeric
values are considered as seconds. Default to zero tolerance.
squeeze : bool, optional
Whether to return a DataArray instead of a DataCollection if the combination
results in a data collection containing a unique data array.
engine: str or callable, optional
The type of file to open or a read function. Default to xdas netcdf format.
parallel: bool or int, optional
Whether to use multiprocessing to fetch file metadata. If False or 1,
runs in single-process mode. If an integer, use that many processes.
If True, use as many processes as available cores. If None, use the
global xdas configuration. Default to None.
verbose: bool
Whether to display a progress bar. Default to False.
**kwargs
Additional keyword arguments to be passed to the read function.
Returns
-------
DataCollection
The collected data.
"""
fields = list(fields)
name = fields.pop(0)
collection = DataCollection({}, name=name)
for key, value in tree.items():
if isinstance(value, list):
dc = open_mfdataarray(
value, dim, tolerance, squeeze, engine, parallel, verbose, **kwargs
)
dc.name = fields[0]
collection[key] = dc
else:
collection[key] = collect(
value,
fields,
dim,
tolerance,
squeeze,
engine,
parallel,
verbose,
**kwargs,
)
return collection
def defaulttree(depth):
"""Generate a default tree of lists with given depth."""
if depth == 1:
return list()
else:
return defaultdict(lambda: defaulttree(depth - 1))
[docs]
def open_mfdataarray(
paths,
dim="first",
tolerance=None,
squeeze=True,
engine=None,
parallel=None,
verbose=False,
**kwargs,
):
"""
Open a multiple file dataset.
Each file described by `path` will be opened as a data array. The data arrays are
then combined along the `dim` dimension using `combine_by_coords`. If the
coordinates of the data arrays are not compatible, the resulting object will be
split into a sequence of data arrays.
Parameters
----------
paths : str or list
The path names given as a shell-style wildcards string or a list of paths.
dim : str, optional
The dimension along which the data arrays are concatenated. Default to "first".
tolerance : float or timedelta64, optional
During concatenation, the tolerance to consider that the end of a file is
continuous with beginning of the following one. For time coordinates, numeric
values are considered as seconds. Default to zero tolerance.
squeeze : bool, optional
Whether to return a DataArray instead of a DataCollection if the combination
results in a data collection containing a unique data array.
engine: str or callable, optional
The type of file to open or a read function. Default to xdas netcdf format.
parallel: bool or int, optional
Whether to use multiprocessing to fetch file metadata. If False or 1,
runs in single-process mode. If an integer, use that many processes.
If True, use as many processes as available cores. If None, use the
global xdas configuration. Default to None.
verbose: bool
Whether to display a progress bar. Default to False.
**kwargs
Additional keyword arguments to be passed to the read function.
Returns
-------
DataArray or DataSequence
The data array containing all files data. If different acquisitions are found,
a DataSequence is returned.
Raises
------
FileNotFound
If no file can be found.
"""
paths = _ensure_str_paths(paths)
if isinstance(paths, str):
paths = sorted(glob(paths))
elif isinstance(paths, list):
for path in paths:
if not os.path.exists(path):
raise FileNotFoundError(f"could not find {path}")
else:
raise ValueError(
f"`paths` must be either a string or a list, found {type(paths)}"
)
if len(paths) == 0:
raise FileNotFoundError("no file to open")
if len(paths) > 100_000:
raise NotImplementedError(
"The maximum number of file that can be opened at once is for now limited "
"to 100 000."
)
max_workers = get_workers_count(parallel)
objs = []
failures = []
if (max_workers == 1) or (engine == "miniseed"): # TODO: dirty miniseed fix
iterator = (
tqdm(paths, desc="Fetching metadata from files") if verbose else paths
)
for path in iterator:
try:
objs.append(open_dataarray(path, engine=engine, **kwargs))
except Exception as error:
failures.append((path, error))
warnings.warn(f"could not open {path}: {error}", RuntimeWarning)
else:
executor = get_reusable_executor(max_workers)
futures_to_paths = {
executor.submit(open_dataarray, path, engine=engine, **kwargs): path
for path in paths
}
if verbose:
iterator = tqdm(
as_completed(futures_to_paths),
total=len(futures_to_paths),
desc="Fetching metadata from files",
)
else:
iterator = as_completed(futures_to_paths)
for future in iterator:
try:
obj = future.result()
except Exception as error:
path = futures_to_paths[future]
failures.append((path, error))
warnings.warn(f"could not open {path}: {error}", RuntimeWarning)
else:
objs.append(obj)
if len(objs) == 0: # there must be failures
path, error = failures[0]
raise RuntimeError(
f"could not open any file with engine: {engine}; first failure was {path}: {error}"
) from error
return combine_by_coords(objs, dim, tolerance, squeeze, None, verbose)
[docs]
def open_dataarray(fname, engine=None, vtype=None, ctype=None, **kwargs):
"""
Open a dataarray.
Parameters
----------
fname : str
The path of the dataarray.
engine: str or callable, optional
The type of file to open or a read function. Default to xdas netcdf format.
**kwargs
Additional keyword arguments to be passed to the read function.
Returns
-------
DataArray
The opened dataarray.
Raises
------
ValueError
If the engine is not recognized.
Raises
------
FileNotFound
If no file can be found.
"""
# parse & checks
fname = _ensure_str_paths(fname)
if not os.path.exists(fname):
raise FileNotFoundError("no file to open")
# dispatch & open
if engine is None or isinstance(engine, str):
from ..io.core import Engine
engine = Engine[engine](vtype=vtype, ctype=ctype)
return engine.open_dataarray(fname, **kwargs)
elif callable(engine):
return engine(fname, **kwargs)
else:
raise ValueError("engine not recognized")
[docs]
def open_datacollection(fname, group=None):
"""
Open a DataCollection from a file.
Parameters
----------
fname : str
The path of the DataCollection.
Returns
-------
DataCollection
The opened DataCollection.
Raises
------
FileNotFound
If no file can be found.
"""
fname = _ensure_str_paths(fname)
if not os.path.exists(fname):
raise FileNotFoundError("no file to open")
return DataCollection.from_netcdf(fname, group)
[docs]
def asdataarray(obj, tolerance=None):
"""
Try to convert given object to a dataarray.
Only supports DataArray or xr.DataArray as input.
Parameters
----------
obj : object
The objected to convert
tolerance : float or datetime64, optional
For dense coordinates, tolerance error for interpolation representation.
For time coordinates, numeric values are considered as seconds.
Zero by default.
Returns
-------
DataArray
The object converted to a DataArray. Data is not copied.
Raises
------
ValueError
If the object cannot be converted to a DataArray.
"""
if isinstance(obj, DataArray):
return obj
elif isinstance(obj, xr.DataArray):
return DataArray.from_xarray(obj)
else:
raise ValueError("Cannot convert to dataarray.")
[docs]
def combine_by_field(
objs, dim="first", tolerance=None, squeeze=False, virtual=None, verbose=False
):
"""
Combine data collections by field along a dimension.
The data arrays nested into each data collections are first grouped by their
hierachical position. Data sequences are appended to each other such as each group
consist of a list of data arrays which order is first given by the order of the
`objs` data collections, and second by the order of the data array within its data
sequence (if part of any sequence). Each group is eventually combined using
`combined_by_coords`.
Parameters
----------
objs : list of DataCollection
The data collections to combine.
dim : str, optional
The dimension along which concatenate. Default to "first".
tolerance : float or timedelta64, optional
The tolerance to consider that the end of a file is continuous with beginning of
the following. For time coordinates, numeric values are considered as seconds.
Zero by default.
squeeze : bool, optional
Whether to return a Database instead of a DataCollection if the combinatison
results in a data collection containing a unique Database.
virtual : bool, optional
Whether to create a virtual dataset. It requires that all concatenated
dataarrays are virtual. By default tries to create a virtual dataset if possible.
verbose: bool
Whether to display a progress bar. Default to False.
Returns
-------
DataCollection
The combined data collection.
"""
leaves = [dc for dc in objs if isinstance(dc, list)]
nodes = [dc for dc in objs if isinstance(dc, dict)]
if leaves and not nodes:
objs = [da for dc in leaves for da in dc]
dc = combine_by_coords(objs, dim, tolerance, squeeze, virtual, verbose)
dc.name = leaves[0].name
return dc
elif nodes and not leaves:
(name,) = set(dc.name for dc in nodes)
keys = sorted(set.union(*[set(dc.keys()) for dc in nodes]))
return DataCollection(
{
key: combine_by_field(
[dc[key] for dc in objs if key in dc],
dim,
tolerance,
squeeze,
virtual,
verbose,
)
for key in keys
},
name,
)
else:
raise NotImplementedError("cannot combine mixed node/leave levels for now")
[docs]
def combine_by_coords(
objs, dim="first", tolerance=None, squeeze=False, virtual=None, verbose=False
):
"""
Combine several data arrays by coordinates.
The list `objs` if traversed and data arrays are grouped together as long as they
share compatible coordinates. If a change is detected a new group is created. Shape
compatibility implies same sampling interval along the combination dimension, exact
equality along other dimensions and same dtype. Each group is then concatenated.
Parameters
----------
objs : list of DataArray
The data arrays to combine.
dim : str, optional
The dimension along which concatenate. Default to "first".
tolerance : float or timedelta64, optional
The tolerance to consider that the end of a file is continuous with beginning of
the following. For time coordinates, numeric values are considered as seconds.
Zero by default.
squeeze : bool, optional
Whether to return a Database instead of a DataCollection if the combination
results in a data collection containing a unique Database.
virtual : bool, optional
Whether to create a virtual dataset. It requires that all concatenated
data arrays are virtual. By default tries to create a virtual dataset if possible.
verbose: bool
Whether to display a progress bar. Default to False.
Returns
-------
DataSequence or DataArray
The combined data arrays.
"""
# parse dim
if dim == "first":
dim = objs[0].dims[0]
if dim == "last":
dim = objs[0].dims[-1]
# sort objs by dim
if dim in objs[0].coords:
objs = sorted(
objs,
key=lambda da: da[dim].values if da[dim].isscalar() else da[dim][0].values,
)
# combine objs
bags = []
bag = Bag(dim)
for da in objs:
try:
bag.append(da)
except CompatibilityError:
bags.append(bag)
bag = Bag(dim)
bag.append(da)
bags.append(bag)
# concatenate each bag
collection = DataCollection(
[concatenate(bag, dim, tolerance, virtual, verbose) for bag in bags]
)
# squeeze if possible
if squeeze and len(collection) == 1:
return collection[0]
else:
return collection
class CompatibilityError(Exception):
"""Custom exception to signal required splitting."""
def __init__(self, message):
super().__init__(message)
class Bag:
"""
Accumulator that collects :class:`DataArray` objects for concatenation along *dim*.
Compatibility checks (dims, shape, coords, sampling interval, dtype) are run on
each appended object; incompatible objects raise :exc:`CompatibilityError` so the
caller can start a new bag.
"""
def __init__(self, dim):
self.objs = []
self.dim = dim
def __iter__(self):
return iter(self.objs)
def initialize(self, da):
"""Set *da* as the first element and record its shape, coords, sampling interval, and dtype."""
self.objs = [da]
self.dims = da.dims
self.subshape = tuple(
size for dim, size in da.sizes.items() if not dim == self.dim
)
self.subcoords = (
da.coords.drop_dims(self.dim)
if self.dim in self.dims
else da.coords.drop_coords(self.dim)
)
if self.dim in da.coords:
self.delta = get_sampling_interval(da, self.dim)
else:
self.delta = None
self.dtype = da.dtype
def append(self, da):
"""Add *da* after running all compatibility checks; initialises on first call."""
if not self.objs:
self.initialize(da)
else:
self.check_dims(da)
self.check_shape(da)
self.check_coords(da)
self.check_sampling_interval(da)
self.check_dtype(da)
self.objs.append(da)
def check_dims(self, da):
"""Raise :exc:`CompatibilityError` if *da* has different dimensions."""
if not self.dims == da.dims:
raise CompatibilityError("dimensions are not compatible")
def check_shape(self, da):
"""Raise :exc:`CompatibilityError` if *da* has a different non-concat shape."""
subshape = tuple(size for dim, size in da.sizes.items() if not dim == self.dim)
if not self.subshape == subshape:
raise CompatibilityError("shapes are not compatible")
def check_dtype(self, da):
"""Raise :exc:`CompatibilityError` if *da* has a different dtype."""
if not self.dtype == da.dtype:
raise CompatibilityError("data types are not compatible")
def check_coords(self, da):
"""Raise :exc:`CompatibilityError` if *da* has incompatible non-concat coordinates."""
subcoords = (
da.coords.drop_dims(self.dim)
if self.dim in self.dims
else da.coords.drop_coords(self.dim)
)
if not self.subcoords.equals(subcoords):
raise CompatibilityError("coordinates are not compatible")
def check_sampling_interval(self, da):
"""Raise :exc:`CompatibilityError` if *da* has a different sampling interval."""
if self.delta is None:
pass
else:
delta = get_sampling_interval(da, self.dim)
if not np.isclose(delta, self.delta):
raise CompatibilityError("sampling intervals are not compatible")
[docs]
def concat(objs, dim="first", tolerance=None, virtual=None, verbose=None):
"""
Concatenate data arrays along a given dimension.
Parameters
----------
objs : list of DataArray
List of data arrays to concatenate.
dim : str
The dimension along which concatenate.
tolerance : float or timedelta64, optional
The tolerance to consider that the end of a file is continuous with beginning of
the following, For time coordinates, numeric values are considered as seconds.
Zero by default.
virtual : bool, optional
Whether to create a virtual dataset. It requires that all concatenated
data arrays are virtual. By default tries to create a virtual dataset if possible.
verbose: bool
Whether to display a progress bar.
Returns
-------
DataArray
The concatenated dataarray. Coordinates along axes other than *dim* are
taken from the first element; no compatibility check is performed on ``objs[1:]``.
"""
objs = list(objs)
non_empty = [da for da in objs if not da.empty]
if not non_empty:
return objs[0] if objs else DataArray([])
objs = non_empty
if virtual is None:
virtual = all(isinstance(da.data, (VirtualSource, VirtualStack)) for da in objs)
if dim in objs[0].dims + ("first", "last"):
axis = objs[0].get_axis_num(dim)
dim = objs[0].dims[axis] # ensure not "first" or "last"
dims = objs[0].dims
else:
axis = 0
dims = (dim, *objs[0].dims)
objs = [da.expand_dims(dim) for da in objs]
# TODO: check that objs[1:] have the same non-concat coords as objs[0]
coords = objs[0].coords.drop_dims(dim)
name = objs[0].name
attrs = objs[0].attrs
dim_has_coords = dim in objs[0].coords
if dim_has_coords:
coord, order = concat_coords(
[obj[dim] for obj in objs],
sort=True,
return_order=True,
tolerance=tolerance,
)
objs = [objs[idx] for idx in order]
coords[dim] = coord
iterator = (
tqdm(objs, desc="Linking dataarray") if verbose else objs
) # TODO : remove tqdm?
data = []
for da in iterator:
if isinstance(da.data, VirtualStack):
for source in da.data.sources:
data.append(source)
else:
data.append(da.data)
if virtual:
data = VirtualStack(data, axis)
else:
data = np.concatenate(data, axis)
return DataArray(data, coords, dims, name, attrs)
concatenate = concat # TODO: deprecate it
[docs]
def concat_coords(objs, *, sort=False, return_order=False, tolerance=False):
"""
Concatenate coordinate objects.
Parameters
----------
objs : sequence
Sequence of coordinate-like objects to concatenate.
sort : bool, optional
If True, sort `objs` by the start value before concatenation.
return_order : bool, optional
If True, return `(coord, order)` where `order` is the list of
indices used to sort the input objects.
tolerance : float or timedelta64, optional
The tolerance to consider that the end of a coordinate object is continuous
with beginning of the following, For time coordinates, numeric values are
considered as seconds. No simplification by default.
Returns
-------
coord
The concatenated coordinate object.
order : list of int, optional
The sort order for `objs` when `return_order` is True.
"""
# sort
order = list(range(len(objs)))
if sort:
order = sorted(order, key=lambda idx: objs[idx][0].values)
objs = [objs[index] for index in order]
out = objs[0]
# concat
for obj in objs[1:]:
out = out.concat(obj)
# simplify
if tolerance is not False:
try:
out = out.simplify(tolerance)
except NotImplementedError:
if (
tolerance is not None
): # TODO: Default to False and remove this condition here?
raise TypeError(
"`tolerance` can only be used with coordinates "
"that implements `simplify`"
)
if return_order:
return out, order
return out
[docs]
def split(da, indices_or_sections="discontinuities", dim="first", tolerance=None):
"""
Split a data array along a dimension.
Splitting can either be performed at each discontinuity , at a given set of indices
(given as a list of int) or in order to get a given number of equal sized chunks
(if a single int is provided).
Parameters
----------
da : DataArray
The data array to split
indices_or_sections : str, int or list of int, default="discontinuities"
Describe how the splitting must be done:
- If `indices_or_section` is an integer N, the array will be divided into N
almost equal (can differ by one element if the `dim` size is not a multiple of
N).
- If `indices_or_section` is a 1-D array of sorted integers, the entries
indicate where the array is split along `dim`. For example, `[2, 3]` would, for
`dim="first"`, result in [da[:2], da[2:3], da[3:]].
- If `indices_or_section` is one of "discontinuities", "gaps" or "overlaps",
splitting will occurs at the indices given by `Coordinate.get_split_indices`.
dim : str, optional
The dimension along which to split, by default "first"
tolerance : float or timedelta64, optional
Passed to `Coordinate.get_split_indices` if `indices_or_section` is
"discontinuities", "gaps" or "overlaps" to determine what can be considered as
a discontiuity. For time coordinates, numeric values are considered as seconds.
Zero tolerance by default.
Returns
-------
list of DataArray
The splitted data array.
"""
if isinstance(indices_or_sections, str):
indices_or_sections = da[dim].get_split_indices(indices_or_sections, tolerance)
else:
if tolerance:
raise ValueError(
"`tolerance` cannot be used when `indices_or_sections` "
"is an integer or a list of indices"
)
if isinstance(indices_or_sections, int):
nsamples = da.sizes[dim]
nchunk = indices_or_sections
if nchunk <= 0:
raise ValueError("`n` must be larger than 0")
if nchunk >= nsamples:
raise ValueError("`n` must be smaller than the number of samples")
chunk_size, extras = divmod(nsamples, nchunk)
chunks = extras * [chunk_size + 1] + (nchunk - extras) * [chunk_size]
div_points = np.cumsum([0] + chunks, dtype=np.int64)
else:
div_points = np.concatenate([[0], indices_or_sections, [da.sizes[dim]]])
return DataCollection(
[da.isel({dim: slice(start, stop)}) for start, stop in pairwise(div_points)]
)
[docs]
def align(*objs):
"""
Given any number of data arrays, returns new objects with aligned dimensions.
New objects will all share the same dimensions with the same order. This is done by
expanding missing dimensions and transposing to the same `dims`. The order of
the resulting `dims` is given by the order in which dimensions are first encountered
while iterating through each objects `dims`. For each dimensions, the data arrays
must either share the same coordinate or not having any.
Array from the aligned objects are suitable as input to mathematical
operators, as their shapes are compatible in term of broadcasting.
Parameters
----------
*objects : DataArray
Data arrays to align.
Returns
-------
aligned : tuple of DataArray
Tuple of data arrays with aligned coordinates.
Examples
--------
>>> import xdas as xd
>>> import numpy as np
>>> da1 = xd.DataArray(np.arange(2), {"x": [0, 1]})
>>> da2 = xd.DataArray(np.arange(3), {"y": [2, 3, 4]})
>>> da1, da2 = xd.align(da1, da2)
>>> da1
<xdas.DataArray (x: 2, y: 1)>
[[0]
[1]]
Coordinates:
* x (x): [0 1]
Dimensions without coordinates: y
>>> da2
<xdas.DataArray (x: 1, y: 3)>
[[0 1 2]]
Coordinates:
* y (y): [2 ... 4]
Dimensions without coordinates: x
"""
coords = broadcast_coords(*objs)
return tuple(broadcast_to(obj, coords) for obj in objs)
def broadcast_coords(*objs):
"""
Broadcasts the coordinates of multiple objects and returns a new Coordinates object.
Parameters
----------
*objs : Variable number of objects with sizes and coordinates.
Returns
-------
Coordinates
A new Coordinates object with the broadcasted coordinates.
Raises
------
ValueError
If the data arrays have incompatible sizes along any dimension or if the
coordinates differ between data arrays.
Examples
--------
>>> import xdas as xd
>>> import numpy as np
>>> da1 = xd.DataArray(np.arange(2), {"x": [0, 1]})
>>> da2 = xd.DataArray(np.arange(3), {"y": [2, 3, 4]})
>>> xd.broadcast_coords(da1, da2)
Coordinates:
* x (x): [0 1]
* y (y): [2 ... 4]
"""
sizes = {}
coords = {}
for obj in objs:
for dim, size in obj.sizes.items():
if dim in sizes:
if sizes[dim] == 1:
sizes[dim] = size
if not (size == 1 or size == sizes[dim]):
raise ValueError(
f"data arrays to align have incompatible sizes along {dim}"
)
else:
sizes[dim] = size
for name, coord in obj.coords.items():
if coord.isscalar():
continue
if name in coords:
if not coord.equals(coords[name]):
raise ValueError(
f"coordinate {name} differs from one data array to another"
)
else:
coords[name] = coord
dims = tuple(dim for dim in sizes)
return Coordinates(coords, dims)
def broadcast_to(obj, coords):
"""
Broadcasts an object to match the dimensions specified by the given coordinates.
Parameters
----------
obj : DataArray or array-like
The object to be broadcasted.
coords : Coordinates
The coordinates specifying the dimensions to match.
Returns
-------
DataArray
The broadcasted object.
Notes
-----
- If the input object is not a DataArray, it will be converted to a DataArray using
the pro.
- The dimensions of the input object will be expanded to match the dimensions
specified by the coordinates.
- The order of dimensions in the output object will be rearranged to match the
order specified by the coordinates.
"""
if not isinstance(obj, DataArray):
_data = np.asarray(obj)
_dims = coords.dims[len(coords.dims) - _data.ndim :]
_coords = {
name: (coord.dim, coord)
for name, coord in coords.items()
if coord.dim in _dims
}
obj = DataArray(_data, _coords, _dims)
for dim in coords.dims:
if dim not in obj.dims:
obj = obj.expand_dims(dim)
obj = obj.transpose(*coords.dims)
return obj
[docs]
def plot_availability(obj, dim="first", **kwargs):
"""
Plot the availability of a given dimension in a timeline chart.
The availability is determined by finding the discontinuities and availabilities
of the specified dimension in the object. The resulting timeline chart shows
the start and end values of each availability period, as well as any gaps or
overlaps in the data. If a data collection is provided, the timeline chart will
show the availability of each data array in the collection. Note that data arrays
in the same data sequence will be on the same timeline whereas data arrays in
data mappings will be on separate timelines.
This function only works on interpolated coordinates.
Parameters
----------
obj : DataArray or DataCollection
The data array containing the dimension to plot.
dim : str
The name of the dimension to plot.
**kwargs
Additional keyword arguments to be passed to the `px.timeline` function.
Returns
-------
fig : plotly.graph_objects.Figure
The timeline
Notes
-----
This function uses the `px.timeline` function from the `plotly.express` library.
"""
dataframe = _get_timeline_dataframe(obj, dim, "")
category_orders = {"type": ["data", "gap", "overlap"]}
color_discrete_map = {"data": "#00CC96", "gap": "#636EFA", "overlap": "#EF553B"}
pattern_shape_map = {"data": "", "gap": "/", "overlap": "\\"}
fig = px.timeline(
dataframe,
x_start="start_value",
x_end="end_value",
y="name",
color="type",
category_orders=category_orders,
color_discrete_map=color_discrete_map,
pattern_shape_map=pattern_shape_map,
**kwargs,
)
for elem in fig.data:
elem["marker"]["line_color"] = color_discrete_map[elem["legendgroup"]]
fig.update_yaxes(title_text="")
return fig
def _get_timeline_dataframe(obj, dim="first", name=None):
if isinstance(obj, DataArray):
discontinuities = obj[dim].get_discontinuities()
availabilities = obj[dim].get_availabilities()
dataframe = pd.concat([availabilities, discontinuities])
dataframe["name"] = "" if name is None else name
elif isinstance(obj, DataSequence):
dataframes = [_get_timeline_dataframe(val, dim, name) for val in obj]
dataframe = pd.concat(dataframes)
elif isinstance(obj, DataMapping):
dataframes = [
_get_timeline_dataframe(val, dim, f"{name}.{key}" if name else key)
for key, val in obj.items()
]
dataframe = pd.concat(dataframes)
else:
raise TypeError(
f"`obj` must be a DataArray of a DataCollection, found {type(obj)}"
)
return dataframe
def _ensure_str_paths(paths):
if isinstance(paths, Path):
paths = str(paths)
if isinstance(paths, list):
paths = [str(path) if isinstance(path, Path) else path for path in paths]
return paths