Source code for xdas.core.routines

"""
Top-level routines for opening, concatenating, aligning, and splitting arrays.

Operates on :class:`DataArray` and :class:`DataCollection` objects; includes
multi-file helpers (``open_mfdataarray``, ``open_mfdatacollection``).
"""

import os
import re
import warnings
from collections import defaultdict
from concurrent.futures import as_completed
from glob import glob
from itertools import pairwise
from pathlib import Path

import numpy as np
import pandas as pd
import plotly.express as px
import xarray as xr
from loky import get_reusable_executor
from tqdm import tqdm

from ..coordinates.core import Coordinates, get_sampling_interval
from ..parallel import get_workers_count
from ..virtual import VirtualSource, VirtualStack
from .dataarray import DataArray
from .datacollection import DataCollection, DataMapping, DataSequence


[docs] def open( paths, dim="first", tolerance=None, squeeze=None, engine=None, parallel=None, verbose=False, **kwargs, ): """ Open one or several files as a data array or collection. Automatically dispatches to the appropriate reader based on the shape of `paths`: - **Single file** (plain path string): tries to open as a data collection first, falls back to a data array if the file does not contain a data collection. - **Multi-file** (wildcarded string with ``*``, ``?``, or ``[…]``, or a list of paths): tries to open and combine as a multi-file data collection first, falls back to a multi-file data array if the files are not data collections. - **Tree-like** (string containing ``{field}`` placeholders): opens a directory tree as a nested data collection using :func:`open_mfdatatree`. Parameters ---------- paths : str or list of str The path(s) to open. Can be: - A plain file path (single file). - A shell-style wildcard string (``*``, ``?``, ``[…]``) matching multiple files. - A list of explicit file paths. - A tree descriptor string containing ``{field}`` (dict level) and ``[field]`` (list level) placeholders. dim : str, optional The dimension along which multiple files are concatenated. Ignored when opening a single file. Default is ``"first"``. tolerance : float or timedelta64, optional Maximum gap or overlap allowed between consecutive files to still be considered continuous. For time coordinates, numeric values are interpreted as seconds. Ignored when opening a single file. Default is zero tolerance. squeeze : bool or None, optional Whether to return a DataArray instead of a DataCollection when the result contains only one data array. When ``None`` (default), the behaviour depends on the dispatch path: ``True`` for multi-file data arrays, ``False`` otherwise. Ignored when opening a single file. engine : str or callable, optional The file format engine to use, or a custom read callable. When ``None`` (default), the xdas NetCDF format is assumed. Providing an engine skips the automatic DataCollection detection. parallel: bool or int, optional Whether to use multiprocessing to fetch file metadata. If False or 1, runs in single-process mode. If an integer, use that many processes. If True, use as many processes as available cores. If None, use the global xdas configuration. Default to None. verbose : bool, optional Whether to display a progress bar while reading metadata. Ignored when opening a single file. Default is ``False``. **kwargs Additional keyword arguments forwarded to the underlying engine read function. Only used when `engine` is not ``None``. Returns ------- DataArray or DataCollection The opened data. The exact type depends on the dispatch path and the ``squeeze`` setting. Raises ------ ValueError If `paths` is neither a string nor a list. FileNotFoundError If no file matching `paths` can be found. See Also -------- open_dataarray : Open a single DataArray file. open_datacollection : Open a single DataCollection file. open_mfdataarray : Open and combine multiple DataArray files. open_mfdatacollection : Open and combine multiple DataCollection files. open_mfdatatree : Open a directory tree as a nested DataCollection. Examples -------- Open a single file (auto-detects DataCollection vs DataArray): >>> import xdas as xd >>> da = xd.open("path/to/file.nc") # doctest: +SKIP Open multiple files with a wildcard: >>> da = xd.open("path/to/files/*.nc") # doctest: +SKIP Open a list of explicit paths: >>> da = xd.open(["file1.nc", "file2.nc"]) # doctest: +SKIP Open a directory tree: >>> dc = xd.open("/data/{node}/[acq].nc", engine="asn") # doctest: +SKIP """ paths = _ensure_str_paths(paths) if isinstance(paths, str): if "{" in paths: method = "tree-like" elif "*" in paths or "?" in paths or "[" in paths: method = "multi-file" else: method = "single-file" elif isinstance(paths, list): method = "multi-file" else: raise Exception( f"`paths` must be either a string or a list, found {type(paths)}" ) match method: case "single-file": if engine is None: try: return open_datacollection(paths) except Exception: pass return open_dataarray(paths, engine=engine, **kwargs) case "multi-file": if engine is None: try: return open_mfdatacollection( paths, dim, tolerance, squeeze=False if squeeze is None else squeeze, parallel=parallel, verbose=verbose, ) except Exception: pass return open_mfdataarray( paths, dim, tolerance, squeeze=True if squeeze is None else squeeze, engine=engine, parallel=parallel, verbose=verbose, **kwargs, ) case "tree-like": # pragma: no branch return open_mfdatatree( paths, dim, tolerance, squeeze=False if squeeze is None else squeeze, engine=engine, parallel=parallel, verbose=verbose, **kwargs, )
[docs] def open_mfdatacollection( paths, dim="first", tolerance=None, squeeze=False, verbose=False, parallel=None ): """ Open a multiple file DataCollection. Files matching the wildcarded `paths` string will be opened and combined into one data collection. Each opened file must be a DataCollection. The data arrays nested inside the data collections are concatenated by their position within the data collection hierarchy using `combine_by_field`. For exemple, it can be used to combine daily data collections into one master data collection. Parameters ---------- paths : str or list The path names given as a shell-style wildcards string or a list of paths. dim : str, optional The dimension along which the data arrays are concatenated. Default to "first". tolerance : float or timedelta64, optional During concatenation, the tolerance to consider that the end of a file is continuous with beginning of the following one. For time coordinates, numeric values are considered as seconds. Default to zero tolerance. squeeze : bool, optional Whether to return a DataArray instead of a DataCollection if the combination results in a data collection containing a unique data array. parallel: bool or int, optional Whether to use multiprocessing to fetch file metadata. If False or 1, runs in single-process mode. If an integer, use that many processes. If True, use as many processes as available cores. If None, use the global xdas configuration. Default to None. verbose: bool Whether to display a progress bar. Default to False. Returns ------- DataCollection The combined data collection """ paths = _ensure_str_paths(paths) if isinstance(paths, str): paths = sorted(glob(paths)) elif isinstance(paths, list): for path in paths: if not os.path.exists(path): raise FileNotFoundError(f"could not find {path}") else: raise ValueError( f"`paths` must be either a string or a list, found {type(paths)}" ) if len(paths) == 0: raise FileNotFoundError("no file to open") if len(paths) > 100_000: raise NotImplementedError( "The maximum number of file that can be opened at once is for now limited " "to 100 000." ) max_workers = get_workers_count(parallel) if max_workers == 1: if verbose: iterator = tqdm(paths, desc="Fetching metadata from files") else: iterator = paths objs = [open_datacollection(path) for path in iterator] else: executor = get_reusable_executor(max_workers) futures = [executor.submit(open_datacollection, path) for path in paths] if verbose: iterator = tqdm( as_completed(futures), total=len(futures), desc="Fetching metadata from files", ) else: iterator = as_completed(futures) objs = [future.result() for future in iterator] return combine_by_field(objs, dim, tolerance, squeeze, True, verbose)
[docs] def open_mfdatatree( paths, dim="first", tolerance=None, squeeze=False, engine=None, verbose=False, parallel=None, **kwargs, ): """ Open a directory tree structure as a data collection. The tree structure is descirebed by a path descriptor provided as a string containings placeholders. Two flavours of placeholder can be provided: - `{field}`: this level of the tree will behave as a dict. It will use the directory/file names as keys. - `[field]`: this level of the tree will behave as a list. The directory/file names are not considered (as if the placeholder was replaced by a `*`) and files are gathered and combined as if using `open_mfdataarray`. Several dict placeholders with different names can be provided. They must be followed by one or more list placeholders that must share a unique name. The resulting data collection will be a nesting of dicts down to the lower level which will be a list of dataarrays. Parameters ---------- paths : str The path descriptor. dim : str, optional The dimension along which the data arrays are concatenated. Default to "first". tolerance : float or timedelta64, optional During concatenation, the tolerance to consider that the end of a file is continuous with beginning of the following one. For time coordinates, numeric values are considered as seconds. Default to zero tolerance. squeeze : bool, optional Whether to return a DataArray instead of a DataCollection if the combination results in a data collection containing a unique data array. engine: str or callable, optional The type of file to open or a read function. Default to xdas netcdf format. parallel: bool or int, optional Whether to use multiprocessing to fetch file metadata. If False or 1, runs in single-process mode. If an integer, use that many processes. If True, use as many processes as available cores. If None, use the global xdas configuration. Default to None. verbose: bool Whether to display a progress bar. Default to False. **kwargs Additional keyword arguments to be passed to the read function. Returns ------- DataCollection The collected data. Examples -------- >>> import xdas as xd >>> paths = "/data/{node}/{cable}/[acquisition]/proc/[acquisition].h5" >>> xd.open_mfdatatree(paths, engine="asn") # doctest: +SKIP Node: CCN: Cable: N: Acquisition: 0: <xdas.DataArray (time: ..., distance: ...)> 1: <xdas.DataArray (time: ..., distance: ...)> SER: Cable: N: Acquisition: 0: <xdas.DataArray (time: ..., distance: ...)> S: Acquisition: 0: <xdas.DataArray (time: ..., distance: ...)> 1: <xdas.DataArray (time: ..., distance: ...)> 2: <xdas.DataArray (time: ..., distance: ...)> """ paths = _ensure_str_paths(paths) placeholders = re.findall(r"[\{\[].*?[\}\]]", paths) seen = set() fields = tuple( placeholder[1:-1] for placeholder in placeholders if not (placeholder in seen or seen.add(placeholder)) ) wildcard = paths for placeholder in placeholders: wildcard = wildcard.replace(placeholder, "*") fnames = sorted(glob(wildcard)) regex = paths regex = regex.replace(".", r"\.") for placeholder in placeholders: if placeholder.startswith("{") and placeholder.endswith("}"): regex = regex.replace(placeholder, f"(?P<{placeholder[1:-1]}>.+)", 1) regex = regex.replace(placeholder, f"(?P={placeholder[1:-1]})") else: regex = regex.replace(placeholder, r".*") regex = re.compile(regex) tree = defaulttree(len(fields)) for fname in fnames: match = regex.match(fname) bag = tree for field in fields[:-1]: bag = bag[match.group(field)] bag.append(fname) return collect( tree, fields, dim, tolerance, squeeze, engine, parallel, verbose, **kwargs )
def collect( tree, fields, dim="first", tolerance=None, squeeze=False, engine=None, parallel=None, verbose=False, **kwargs, ): """ Collect the data from a tree of paths using `fields` as level names. Parameters ---------- tree : nested dict of lists The paths grouped in a tree hierarchy. fields : tuple of str The names of the levels of the tree hierarchy. dim : str, optional The dimension along which the data arrays are concatenated. Default to "first". tolerance : float or timedelta64, optional During concatenation, the tolerance to consider that the end of a file is continuous with beginning of the following one. For time coordinates, numeric values are considered as seconds. Default to zero tolerance. squeeze : bool, optional Whether to return a DataArray instead of a DataCollection if the combination results in a data collection containing a unique data array. engine: str or callable, optional The type of file to open or a read function. Default to xdas netcdf format. parallel: bool or int, optional Whether to use multiprocessing to fetch file metadata. If False or 1, runs in single-process mode. If an integer, use that many processes. If True, use as many processes as available cores. If None, use the global xdas configuration. Default to None. verbose: bool Whether to display a progress bar. Default to False. **kwargs Additional keyword arguments to be passed to the read function. Returns ------- DataCollection The collected data. """ fields = list(fields) name = fields.pop(0) collection = DataCollection({}, name=name) for key, value in tree.items(): if isinstance(value, list): dc = open_mfdataarray( value, dim, tolerance, squeeze, engine, parallel, verbose, **kwargs ) dc.name = fields[0] collection[key] = dc else: collection[key] = collect( value, fields, dim, tolerance, squeeze, engine, parallel, verbose, **kwargs, ) return collection def defaulttree(depth): """Generate a default tree of lists with given depth.""" if depth == 1: return list() else: return defaultdict(lambda: defaulttree(depth - 1))
[docs] def open_mfdataarray( paths, dim="first", tolerance=None, squeeze=True, engine=None, parallel=None, verbose=False, **kwargs, ): """ Open a multiple file dataset. Each file described by `path` will be opened as a data array. The data arrays are then combined along the `dim` dimension using `combine_by_coords`. If the coordinates of the data arrays are not compatible, the resulting object will be split into a sequence of data arrays. Parameters ---------- paths : str or list The path names given as a shell-style wildcards string or a list of paths. dim : str, optional The dimension along which the data arrays are concatenated. Default to "first". tolerance : float or timedelta64, optional During concatenation, the tolerance to consider that the end of a file is continuous with beginning of the following one. For time coordinates, numeric values are considered as seconds. Default to zero tolerance. squeeze : bool, optional Whether to return a DataArray instead of a DataCollection if the combination results in a data collection containing a unique data array. engine: str or callable, optional The type of file to open or a read function. Default to xdas netcdf format. parallel: bool or int, optional Whether to use multiprocessing to fetch file metadata. If False or 1, runs in single-process mode. If an integer, use that many processes. If True, use as many processes as available cores. If None, use the global xdas configuration. Default to None. verbose: bool Whether to display a progress bar. Default to False. **kwargs Additional keyword arguments to be passed to the read function. Returns ------- DataArray or DataSequence The data array containing all files data. If different acquisitions are found, a DataSequence is returned. Raises ------ FileNotFound If no file can be found. """ paths = _ensure_str_paths(paths) if isinstance(paths, str): paths = sorted(glob(paths)) elif isinstance(paths, list): for path in paths: if not os.path.exists(path): raise FileNotFoundError(f"could not find {path}") else: raise ValueError( f"`paths` must be either a string or a list, found {type(paths)}" ) if len(paths) == 0: raise FileNotFoundError("no file to open") if len(paths) > 100_000: raise NotImplementedError( "The maximum number of file that can be opened at once is for now limited " "to 100 000." ) max_workers = get_workers_count(parallel) objs = [] failures = [] if (max_workers == 1) or (engine == "miniseed"): # TODO: dirty miniseed fix iterator = ( tqdm(paths, desc="Fetching metadata from files") if verbose else paths ) for path in iterator: try: objs.append(open_dataarray(path, engine=engine, **kwargs)) except Exception as error: failures.append((path, error)) warnings.warn(f"could not open {path}: {error}", RuntimeWarning) else: executor = get_reusable_executor(max_workers) futures_to_paths = { executor.submit(open_dataarray, path, engine=engine, **kwargs): path for path in paths } if verbose: iterator = tqdm( as_completed(futures_to_paths), total=len(futures_to_paths), desc="Fetching metadata from files", ) else: iterator = as_completed(futures_to_paths) for future in iterator: try: obj = future.result() except Exception as error: path = futures_to_paths[future] failures.append((path, error)) warnings.warn(f"could not open {path}: {error}", RuntimeWarning) else: objs.append(obj) if len(objs) == 0: # there must be failures path, error = failures[0] raise RuntimeError( f"could not open any file with engine: {engine}; first failure was {path}: {error}" ) from error return combine_by_coords(objs, dim, tolerance, squeeze, None, verbose)
[docs] def open_dataarray(fname, engine=None, vtype=None, ctype=None, **kwargs): """ Open a dataarray. Parameters ---------- fname : str The path of the dataarray. engine: str or callable, optional The type of file to open or a read function. Default to xdas netcdf format. **kwargs Additional keyword arguments to be passed to the read function. Returns ------- DataArray The opened dataarray. Raises ------ ValueError If the engine is not recognized. Raises ------ FileNotFound If no file can be found. """ # parse & checks fname = _ensure_str_paths(fname) if not os.path.exists(fname): raise FileNotFoundError("no file to open") # dispatch & open if engine is None or isinstance(engine, str): from ..io.core import Engine engine = Engine[engine](vtype=vtype, ctype=ctype) return engine.open_dataarray(fname, **kwargs) elif callable(engine): return engine(fname, **kwargs) else: raise ValueError("engine not recognized")
[docs] def open_datacollection(fname, group=None): """ Open a DataCollection from a file. Parameters ---------- fname : str The path of the DataCollection. Returns ------- DataCollection The opened DataCollection. Raises ------ FileNotFound If no file can be found. """ fname = _ensure_str_paths(fname) if not os.path.exists(fname): raise FileNotFoundError("no file to open") return DataCollection.from_netcdf(fname, group)
[docs] def asdataarray(obj, tolerance=None): """ Try to convert given object to a dataarray. Only supports DataArray or xr.DataArray as input. Parameters ---------- obj : object The objected to convert tolerance : float or datetime64, optional For dense coordinates, tolerance error for interpolation representation. For time coordinates, numeric values are considered as seconds. Zero by default. Returns ------- DataArray The object converted to a DataArray. Data is not copied. Raises ------ ValueError If the object cannot be converted to a DataArray. """ if isinstance(obj, DataArray): return obj elif isinstance(obj, xr.DataArray): return DataArray.from_xarray(obj) else: raise ValueError("Cannot convert to dataarray.")
[docs] def combine_by_field( objs, dim="first", tolerance=None, squeeze=False, virtual=None, verbose=False ): """ Combine data collections by field along a dimension. The data arrays nested into each data collections are first grouped by their hierachical position. Data sequences are appended to each other such as each group consist of a list of data arrays which order is first given by the order of the `objs` data collections, and second by the order of the data array within its data sequence (if part of any sequence). Each group is eventually combined using `combined_by_coords`. Parameters ---------- objs : list of DataCollection The data collections to combine. dim : str, optional The dimension along which concatenate. Default to "first". tolerance : float or timedelta64, optional The tolerance to consider that the end of a file is continuous with beginning of the following. For time coordinates, numeric values are considered as seconds. Zero by default. squeeze : bool, optional Whether to return a Database instead of a DataCollection if the combinatison results in a data collection containing a unique Database. virtual : bool, optional Whether to create a virtual dataset. It requires that all concatenated dataarrays are virtual. By default tries to create a virtual dataset if possible. verbose: bool Whether to display a progress bar. Default to False. Returns ------- DataCollection The combined data collection. """ leaves = [dc for dc in objs if isinstance(dc, list)] nodes = [dc for dc in objs if isinstance(dc, dict)] if leaves and not nodes: objs = [da for dc in leaves for da in dc] dc = combine_by_coords(objs, dim, tolerance, squeeze, virtual, verbose) dc.name = leaves[0].name return dc elif nodes and not leaves: (name,) = set(dc.name for dc in nodes) keys = sorted(set.union(*[set(dc.keys()) for dc in nodes])) return DataCollection( { key: combine_by_field( [dc[key] for dc in objs if key in dc], dim, tolerance, squeeze, virtual, verbose, ) for key in keys }, name, ) else: raise NotImplementedError("cannot combine mixed node/leave levels for now")
[docs] def combine_by_coords( objs, dim="first", tolerance=None, squeeze=False, virtual=None, verbose=False ): """ Combine several data arrays by coordinates. The list `objs` if traversed and data arrays are grouped together as long as they share compatible coordinates. If a change is detected a new group is created. Shape compatibility implies same sampling interval along the combination dimension, exact equality along other dimensions and same dtype. Each group is then concatenated. Parameters ---------- objs : list of DataArray The data arrays to combine. dim : str, optional The dimension along which concatenate. Default to "first". tolerance : float or timedelta64, optional The tolerance to consider that the end of a file is continuous with beginning of the following. For time coordinates, numeric values are considered as seconds. Zero by default. squeeze : bool, optional Whether to return a Database instead of a DataCollection if the combination results in a data collection containing a unique Database. virtual : bool, optional Whether to create a virtual dataset. It requires that all concatenated data arrays are virtual. By default tries to create a virtual dataset if possible. verbose: bool Whether to display a progress bar. Default to False. Returns ------- DataSequence or DataArray The combined data arrays. """ # parse dim if dim == "first": dim = objs[0].dims[0] if dim == "last": dim = objs[0].dims[-1] # sort objs by dim if dim in objs[0].coords: objs = sorted( objs, key=lambda da: da[dim].values if da[dim].isscalar() else da[dim][0].values, ) # combine objs bags = [] bag = Bag(dim) for da in objs: try: bag.append(da) except CompatibilityError: bags.append(bag) bag = Bag(dim) bag.append(da) bags.append(bag) # concatenate each bag collection = DataCollection( [concatenate(bag, dim, tolerance, virtual, verbose) for bag in bags] ) # squeeze if possible if squeeze and len(collection) == 1: return collection[0] else: return collection
class CompatibilityError(Exception): """Custom exception to signal required splitting.""" def __init__(self, message): super().__init__(message) class Bag: """ Accumulator that collects :class:`DataArray` objects for concatenation along *dim*. Compatibility checks (dims, shape, coords, sampling interval, dtype) are run on each appended object; incompatible objects raise :exc:`CompatibilityError` so the caller can start a new bag. """ def __init__(self, dim): self.objs = [] self.dim = dim def __iter__(self): return iter(self.objs) def initialize(self, da): """Set *da* as the first element and record its shape, coords, sampling interval, and dtype.""" self.objs = [da] self.dims = da.dims self.subshape = tuple( size for dim, size in da.sizes.items() if not dim == self.dim ) self.subcoords = ( da.coords.drop_dims(self.dim) if self.dim in self.dims else da.coords.drop_coords(self.dim) ) if self.dim in da.coords: self.delta = get_sampling_interval(da, self.dim) else: self.delta = None self.dtype = da.dtype def append(self, da): """Add *da* after running all compatibility checks; initialises on first call.""" if not self.objs: self.initialize(da) else: self.check_dims(da) self.check_shape(da) self.check_coords(da) self.check_sampling_interval(da) self.check_dtype(da) self.objs.append(da) def check_dims(self, da): """Raise :exc:`CompatibilityError` if *da* has different dimensions.""" if not self.dims == da.dims: raise CompatibilityError("dimensions are not compatible") def check_shape(self, da): """Raise :exc:`CompatibilityError` if *da* has a different non-concat shape.""" subshape = tuple(size for dim, size in da.sizes.items() if not dim == self.dim) if not self.subshape == subshape: raise CompatibilityError("shapes are not compatible") def check_dtype(self, da): """Raise :exc:`CompatibilityError` if *da* has a different dtype.""" if not self.dtype == da.dtype: raise CompatibilityError("data types are not compatible") def check_coords(self, da): """Raise :exc:`CompatibilityError` if *da* has incompatible non-concat coordinates.""" subcoords = ( da.coords.drop_dims(self.dim) if self.dim in self.dims else da.coords.drop_coords(self.dim) ) if not self.subcoords.equals(subcoords): raise CompatibilityError("coordinates are not compatible") def check_sampling_interval(self, da): """Raise :exc:`CompatibilityError` if *da* has a different sampling interval.""" if self.delta is None: pass else: delta = get_sampling_interval(da, self.dim) if not np.isclose(delta, self.delta): raise CompatibilityError("sampling intervals are not compatible")
[docs] def concat(objs, dim="first", tolerance=None, virtual=None, verbose=None): """ Concatenate data arrays along a given dimension. Parameters ---------- objs : list of DataArray List of data arrays to concatenate. dim : str The dimension along which concatenate. tolerance : float or timedelta64, optional The tolerance to consider that the end of a file is continuous with beginning of the following, For time coordinates, numeric values are considered as seconds. Zero by default. virtual : bool, optional Whether to create a virtual dataset. It requires that all concatenated data arrays are virtual. By default tries to create a virtual dataset if possible. verbose: bool Whether to display a progress bar. Returns ------- DataArray The concatenated dataarray. Coordinates along axes other than *dim* are taken from the first element; no compatibility check is performed on ``objs[1:]``. """ objs = list(objs) non_empty = [da for da in objs if not da.empty] if not non_empty: return objs[0] if objs else DataArray([]) objs = non_empty if virtual is None: virtual = all(isinstance(da.data, (VirtualSource, VirtualStack)) for da in objs) if dim in objs[0].dims + ("first", "last"): axis = objs[0].get_axis_num(dim) dim = objs[0].dims[axis] # ensure not "first" or "last" dims = objs[0].dims else: axis = 0 dims = (dim, *objs[0].dims) objs = [da.expand_dims(dim) for da in objs] # TODO: check that objs[1:] have the same non-concat coords as objs[0] coords = objs[0].coords.drop_dims(dim) name = objs[0].name attrs = objs[0].attrs dim_has_coords = dim in objs[0].coords if dim_has_coords: coord, order = concat_coords( [obj[dim] for obj in objs], sort=True, return_order=True, tolerance=tolerance, ) objs = [objs[idx] for idx in order] coords[dim] = coord iterator = ( tqdm(objs, desc="Linking dataarray") if verbose else objs ) # TODO : remove tqdm? data = [] for da in iterator: if isinstance(da.data, VirtualStack): for source in da.data.sources: data.append(source) else: data.append(da.data) if virtual: data = VirtualStack(data, axis) else: data = np.concatenate(data, axis) return DataArray(data, coords, dims, name, attrs)
concatenate = concat # TODO: deprecate it
[docs] def concat_coords(objs, *, sort=False, return_order=False, tolerance=False): """ Concatenate coordinate objects. Parameters ---------- objs : sequence Sequence of coordinate-like objects to concatenate. sort : bool, optional If True, sort `objs` by the start value before concatenation. return_order : bool, optional If True, return `(coord, order)` where `order` is the list of indices used to sort the input objects. tolerance : float or timedelta64, optional The tolerance to consider that the end of a coordinate object is continuous with beginning of the following, For time coordinates, numeric values are considered as seconds. No simplification by default. Returns ------- coord The concatenated coordinate object. order : list of int, optional The sort order for `objs` when `return_order` is True. """ # sort order = list(range(len(objs))) if sort: order = sorted(order, key=lambda idx: objs[idx][0].values) objs = [objs[index] for index in order] out = objs[0] # concat for obj in objs[1:]: out = out.concat(obj) # simplify if tolerance is not False: try: out = out.simplify(tolerance) except NotImplementedError: if ( tolerance is not None ): # TODO: Default to False and remove this condition here? raise TypeError( "`tolerance` can only be used with coordinates " "that implements `simplify`" ) if return_order: return out, order return out
[docs] def split(da, indices_or_sections="discontinuities", dim="first", tolerance=None): """ Split a data array along a dimension. Splitting can either be performed at each discontinuity , at a given set of indices (given as a list of int) or in order to get a given number of equal sized chunks (if a single int is provided). Parameters ---------- da : DataArray The data array to split indices_or_sections : str, int or list of int, default="discontinuities" Describe how the splitting must be done: - If `indices_or_section` is an integer N, the array will be divided into N almost equal (can differ by one element if the `dim` size is not a multiple of N). - If `indices_or_section` is a 1-D array of sorted integers, the entries indicate where the array is split along `dim`. For example, `[2, 3]` would, for `dim="first"`, result in [da[:2], da[2:3], da[3:]]. - If `indices_or_section` is one of "discontinuities", "gaps" or "overlaps", splitting will occurs at the indices given by `Coordinate.get_split_indices`. dim : str, optional The dimension along which to split, by default "first" tolerance : float or timedelta64, optional Passed to `Coordinate.get_split_indices` if `indices_or_section` is "discontinuities", "gaps" or "overlaps" to determine what can be considered as a discontiuity. For time coordinates, numeric values are considered as seconds. Zero tolerance by default. Returns ------- list of DataArray The splitted data array. """ if isinstance(indices_or_sections, str): indices_or_sections = da[dim].get_split_indices(indices_or_sections, tolerance) else: if tolerance: raise ValueError( "`tolerance` cannot be used when `indices_or_sections` " "is an integer or a list of indices" ) if isinstance(indices_or_sections, int): nsamples = da.sizes[dim] nchunk = indices_or_sections if nchunk <= 0: raise ValueError("`n` must be larger than 0") if nchunk >= nsamples: raise ValueError("`n` must be smaller than the number of samples") chunk_size, extras = divmod(nsamples, nchunk) chunks = extras * [chunk_size + 1] + (nchunk - extras) * [chunk_size] div_points = np.cumsum([0] + chunks, dtype=np.int64) else: div_points = np.concatenate([[0], indices_or_sections, [da.sizes[dim]]]) return DataCollection( [da.isel({dim: slice(start, stop)}) for start, stop in pairwise(div_points)] )
[docs] def align(*objs): """ Given any number of data arrays, returns new objects with aligned dimensions. New objects will all share the same dimensions with the same order. This is done by expanding missing dimensions and transposing to the same `dims`. The order of the resulting `dims` is given by the order in which dimensions are first encountered while iterating through each objects `dims`. For each dimensions, the data arrays must either share the same coordinate or not having any. Array from the aligned objects are suitable as input to mathematical operators, as their shapes are compatible in term of broadcasting. Parameters ---------- *objects : DataArray Data arrays to align. Returns ------- aligned : tuple of DataArray Tuple of data arrays with aligned coordinates. Examples -------- >>> import xdas as xd >>> import numpy as np >>> da1 = xd.DataArray(np.arange(2), {"x": [0, 1]}) >>> da2 = xd.DataArray(np.arange(3), {"y": [2, 3, 4]}) >>> da1, da2 = xd.align(da1, da2) >>> da1 <xdas.DataArray (x: 2, y: 1)> [[0] [1]] Coordinates: * x (x): [0 1] Dimensions without coordinates: y >>> da2 <xdas.DataArray (x: 1, y: 3)> [[0 1 2]] Coordinates: * y (y): [2 ... 4] Dimensions without coordinates: x """ coords = broadcast_coords(*objs) return tuple(broadcast_to(obj, coords) for obj in objs)
def broadcast_coords(*objs): """ Broadcasts the coordinates of multiple objects and returns a new Coordinates object. Parameters ---------- *objs : Variable number of objects with sizes and coordinates. Returns ------- Coordinates A new Coordinates object with the broadcasted coordinates. Raises ------ ValueError If the data arrays have incompatible sizes along any dimension or if the coordinates differ between data arrays. Examples -------- >>> import xdas as xd >>> import numpy as np >>> da1 = xd.DataArray(np.arange(2), {"x": [0, 1]}) >>> da2 = xd.DataArray(np.arange(3), {"y": [2, 3, 4]}) >>> xd.broadcast_coords(da1, da2) Coordinates: * x (x): [0 1] * y (y): [2 ... 4] """ sizes = {} coords = {} for obj in objs: for dim, size in obj.sizes.items(): if dim in sizes: if sizes[dim] == 1: sizes[dim] = size if not (size == 1 or size == sizes[dim]): raise ValueError( f"data arrays to align have incompatible sizes along {dim}" ) else: sizes[dim] = size for name, coord in obj.coords.items(): if coord.isscalar(): continue if name in coords: if not coord.equals(coords[name]): raise ValueError( f"coordinate {name} differs from one data array to another" ) else: coords[name] = coord dims = tuple(dim for dim in sizes) return Coordinates(coords, dims) def broadcast_to(obj, coords): """ Broadcasts an object to match the dimensions specified by the given coordinates. Parameters ---------- obj : DataArray or array-like The object to be broadcasted. coords : Coordinates The coordinates specifying the dimensions to match. Returns ------- DataArray The broadcasted object. Notes ----- - If the input object is not a DataArray, it will be converted to a DataArray using the pro. - The dimensions of the input object will be expanded to match the dimensions specified by the coordinates. - The order of dimensions in the output object will be rearranged to match the order specified by the coordinates. """ if not isinstance(obj, DataArray): _data = np.asarray(obj) _dims = coords.dims[len(coords.dims) - _data.ndim :] _coords = { name: (coord.dim, coord) for name, coord in coords.items() if coord.dim in _dims } obj = DataArray(_data, _coords, _dims) for dim in coords.dims: if dim not in obj.dims: obj = obj.expand_dims(dim) obj = obj.transpose(*coords.dims) return obj
[docs] def plot_availability(obj, dim="first", **kwargs): """ Plot the availability of a given dimension in a timeline chart. The availability is determined by finding the discontinuities and availabilities of the specified dimension in the object. The resulting timeline chart shows the start and end values of each availability period, as well as any gaps or overlaps in the data. If a data collection is provided, the timeline chart will show the availability of each data array in the collection. Note that data arrays in the same data sequence will be on the same timeline whereas data arrays in data mappings will be on separate timelines. This function only works on interpolated coordinates. Parameters ---------- obj : DataArray or DataCollection The data array containing the dimension to plot. dim : str The name of the dimension to plot. **kwargs Additional keyword arguments to be passed to the `px.timeline` function. Returns ------- fig : plotly.graph_objects.Figure The timeline Notes ----- This function uses the `px.timeline` function from the `plotly.express` library. """ dataframe = _get_timeline_dataframe(obj, dim, "") category_orders = {"type": ["data", "gap", "overlap"]} color_discrete_map = {"data": "#00CC96", "gap": "#636EFA", "overlap": "#EF553B"} pattern_shape_map = {"data": "", "gap": "/", "overlap": "\\"} fig = px.timeline( dataframe, x_start="start_value", x_end="end_value", y="name", color="type", category_orders=category_orders, color_discrete_map=color_discrete_map, pattern_shape_map=pattern_shape_map, **kwargs, ) for elem in fig.data: elem["marker"]["line_color"] = color_discrete_map[elem["legendgroup"]] fig.update_yaxes(title_text="") return fig
def _get_timeline_dataframe(obj, dim="first", name=None): if isinstance(obj, DataArray): discontinuities = obj[dim].get_discontinuities() availabilities = obj[dim].get_availabilities() dataframe = pd.concat([availabilities, discontinuities]) dataframe["name"] = "" if name is None else name elif isinstance(obj, DataSequence): dataframes = [_get_timeline_dataframe(val, dim, name) for val in obj] dataframe = pd.concat(dataframes) elif isinstance(obj, DataMapping): dataframes = [ _get_timeline_dataframe(val, dim, f"{name}.{key}" if name else key) for key, val in obj.items() ] dataframe = pd.concat(dataframes) else: raise TypeError( f"`obj` must be a DataArray of a DataCollection, found {type(obj)}" ) return dataframe def _ensure_str_paths(paths): if isinstance(paths, Path): paths = str(paths) if isinstance(paths, list): paths = [str(path) if isinstance(path, Path) else path for path in paths] return paths