Source code for xdas.virtual

"""
Virtual (lazy) array types for deferred HDF5/NetCDF4 access.

Includes :class:`VirtualArray` base, :class:`VirtualSource` for a single
dataset slice, and :class:`VirtualStack` for concatenating sources along
an axis.
"""

import os
from copy import copy, deepcopy
from tempfile import TemporaryDirectory

import h5py
import numpy as np


class VirtualArray:
    """
    Abstract base class for lazy array objects backed by HDF5/NetCDF4 files.

    Subclasses must implement :meth:`shape`, :meth:`dtype`, :meth:`__getitem__`,
    :meth:`__array__`, and :meth:`to_dataset`.
    """

    def __repr__(self):
        return f"{self.__class__.__name__}: {_to_human(self.nbytes)} ({self.dtype})"

    def __getitem__(self, key):
        NotImplemented

    def __array__(self, dtype=None):
        NotImplemented

    @property
    def shape(self):
        """Tuple of array dimensions (abstract — must be overridden)."""
        NotImplemented

    @property
    def dtype(self):
        """NumPy dtype of the array elements (abstract — must be overridden)."""
        NotImplemented

[docs] def to_dataset(self, file_or_group, name): """Write this virtual array as an HDF5 dataset (abstract — must be overridden).""" NotImplemented
@property def ndim(self): """Number of dimensions.""" return len(self.shape) @property def size(self): """Total number of elements.""" if self.shape: return np.prod(self.shape) else: return 0 @property def empty(self): """``True`` if the array contains no elements.""" return self.size == 0 @property def nbytes(self): """Total number of bytes occupied by the array elements.""" if self.shape: return self.size * self.dtype.itemsize else: return 0 def create_variable(self, file, name, dims=None, dtype=None): """ Write this virtual array into *file* and register it as a named variable. Parameters ---------- file : netCDF4-like file handle Open writable file. name : str Variable name to create inside *file*. dims : sequence of str, optional Dimension names for the variable. dtype : dtype-like, optional Override data type for the variable. Returns ------- variable The newly created file variable. """ self.to_dataset(file._h5group, name) variable = file._variable_cls(file, name, dims) file._variables[name] = variable variable._attach_dim_scales() variable._attach_coords() variable._ensure_dim_id() return variable
[docs] class VirtualStack(VirtualArray): """ Lazy concatenation of multiple :class:`VirtualSource` objects along one axis. Parameters ---------- sources : list of VirtualSource, optional Initial list of sources to stack. All sources must share the same dtype and shape on every axis other than *axis*. axis : int, optional Concatenation axis. Defaults to ``0``. """
[docs] def __init__(self, sources=[], axis=0): self._sources = list() self._axis = axis self._shape = () self.extend(sources)
def __getitem__(self, key): if not isinstance(key, tuple): indexers = [ key, ] else: indexers = list(key) if self._axis in range(len(indexers)): div_points = np.cumsum( [0] + [source.shape[self._axis] for source in self._sources] ) indexer = indexers[self._axis] if isinstance(indexer, int): idx = indexer if indexer >= 0 else indexer + self.shape[self._axis] if idx < 0 or idx >= self.shape[self._axis]: raise IndexError( f"index {idx} is out of bounds for axis {self._axis} " f"with size {self.shape[self._axis]}" ) isrc = np.searchsorted(div_points, idx, side="right") - 1 irel = idx - div_points[isrc] indexers[self._axis] = irel sources = [self._sources[isrc][tuple(indexers)]] if isinstance(indexer, slice): start, stop, step = indexer.indices(self.shape[self._axis]) if not step == 1: raise NotImplementedError( "cannot make stepped slicing along stacked dimension." ) isrc_start, isrc_stop = ( np.searchsorted(div_points[:-1], [start, stop], side="right") - 1 ) irel_start, irel_stop = ( np.array([start, stop]) - div_points[[isrc_start, isrc_stop]] ) sources = [] if isrc_start == isrc_stop: indexers[self._axis] = slice(irel_start, irel_stop) sources = [self._sources[isrc_start][tuple(indexers)]] else: sources = [] indexers[self._axis] = slice(irel_start, None) sources.append(self._sources[isrc_start][tuple(indexers)]) indexers[self._axis] = slice(None) sources.extend( [ source[tuple(indexers)] for source in self._sources[isrc_start + 1 : isrc_stop] ] ) indexers[self._axis] = slice(None, irel_stop) sources.append(self._sources[isrc_stop][tuple(indexers)]) else: sources = [source[tuple(indexers)] for source in self._sources] return VirtualStack(sources, self._axis) def __array__(self, dtype=None): if not self._sources: raise ValueError("no sources in stack") return self._to_layout().__array__(dtype) @property def sources(self): """List of :class:`VirtualSource` objects in the stack.""" return self._sources @property def axis(self): """Concatenation axis index.""" return self._axis @property def shape(self): """Shape of the concatenated virtual array.""" return tuple( ( sum(source.shape[self._axis] for source in self._sources) if axis == self._axis else size ) for axis, size in enumerate(self._shape) ) @property def dtype(self): """NumPy dtype shared by all sources in the stack.""" if not hasattr(self, "_dtype"): raise AttributeError("empty stack has no dtype") return self._dtype def append(self, source): """ Append *source* to the stack. Parameters ---------- source : VirtualSource Source to add. Must have the same dtype and off-axis shape as existing sources. """ if not self._sources: self._initialize(source) self._check(source) self._sources.append(source) def extend(self, sources): """ Extend the stack with an iterable of sources. Parameters ---------- sources : list of VirtualSource Sources to append, validated one-by-one via :meth:`append`. """ if not isinstance(sources, list): raise TypeError("`sources` must be a list") for source in sources: self.append(source) def to_dataset(self, file_or_group, name): """Write the stacked virtual array as an HDF5 virtual dataset.""" self._to_layout().to_dataset(file_or_group, name) def _initialize(self, source): self._shape = tuple( None if axis == self._axis else size for axis, size in enumerate(source.shape) ) self._dtype = source.dtype def _check(self, source): if not isinstance(source, VirtualSource): raise TypeError("only `VirtualSource` object can be provided") if not (source.dtype == self.dtype): raise ValueError("all sources must share the same dtype") if not all( True if size_self is None else size_self == size_other for size_self, size_other in zip(self._shape, source.shape) ): raise ValueError( "all sources must share the same shape " "except along the concatenation axis" ) def _to_layout(self): layout = VirtualLayout(self.shape, self.dtype) ndim = self.ndim index = 0 for source in self._sources: slc = tuple( ( slice(index, index := index + source.shape[self._axis]) if axis == self._axis else slice(None) ) for axis in range(ndim) ) layout[slc] = source return layout
[docs] class VirtualLayout(VirtualArray): """ A lazy array layout pointing toward multiple netCDF4/HDF5 files. Instantiate this class with the final shape of the virtual array. Then Fill it up by assigning to slices of it VirtualSource objects. Once the data assignement is completed, the layout can be virually written into a `h5py.File` or `h5py.Group` object using the `to_dataset` method. The VirtualLayout can be sliced and the selected data accessed without writting the layout to disk. To that end, use `numpy.asarray` or the `__array__` special method. Note that for now, sliced VirtualLayout cannot be written to disk. Parameters ---------- shape: tuple of int The shape of the layout. dtype: str of dtype The dtype of the layout. maxshape: tuple of int or None, optional The layout is resizable up to this shape. Use None for axes you want to be unlimited. filename: str, optional The name of the destination file, if known in advance. Mappings from data in the same file will be stored with filename '.', allowing the file to be renamed later. Attributes ---------- shape: tuple of int or The shape of the layout. dtype: dtype The dtype of the layout. ndim: int The number of dimensions of the layout. nbytes: int The number of bytes virtually linked into the layout. Methods ------- to_dataset(file_or_group, name) Writes virtually the layout into the specified HDF5 file of group with the given name. """
[docs] def __init__(self, shape, dtype, maxshape=None, filename=None): self._layout = h5py.VirtualLayout(shape, dtype, maxshape, filename) self._sel = Selection(self._layout.shape)
def __array__(self, dtype=None): with TemporaryDirectory() as tmpdirname: fname = os.path.join(tmpdirname, "vds.h5") with h5py.File(fname, "w") as file: dataset = self.to_dataset(file, "__values__") with h5py.File(fname, "r") as file: dataset = file["__values__"] out = np.asarray(dataset[self._sel.get_indexer()]) if dtype is not None: out = out.astype(dtype) return out def __getitem__(self, key): self = copy(self) self._sel = self._sel.__getitem__(key) return self def __setitem__(self, key, value): if not self._sel._whole: raise NotImplementedError( "cannot link VirtualSources to a sliced VirtualLayout" ) if isinstance(value, VirtualSource): value = value.vsource self._layout.__setitem__(key, value) @property def shape(self): """Shape of the layout after any lazy selections.""" return self._sel.shape @property def dtype(self): """NumPy dtype of the layout.""" return self._layout.dtype
[docs] def to_dataset(self, file_or_group, name): """Write the layout as an HDF5 virtual dataset in *file_or_group*.""" if np.issubdtype(self.dtype, np.integer): fillvalue = np.iinfo(self.dtype).min elif np.issubdtype(self.dtype, np.floating): fillvalue = np.nan elif np.issubdtype(self.dtype, np.complexfloating): fillvalue = np.nan + 1j * np.nan else: fillvalue = None return file_or_group.create_virtual_dataset( name, self._layout, fillvalue=fillvalue )
[docs] class VirtualSource(VirtualArray): """ A lazy array object pointing toward a netCDF4/HDF5 file. At creation the array corresponds to an entire file dataset. It can then be sliced to indicate which regions should be used. Sliced VirtualSource eventually can be assigned to a VirtualLayout to Best practive is to pass it a `h5py.Dataset` obtain destructuring a `h5py.File`. Otherwise the exact filename, dataset name, shape and dtype must be passed. The data can be accessed using `numpy.asarray` or the `__array__` special method. Parameters ---------- path_or_dataset: str or h5py.Dataset The path to a file, or an h5py dataset. If a dataset is given, no other parameters are allowed, as the relevant values are taken from the dataset instead. name: str, optional The name of the source dataset within the file. shape: tuple of int, optional A tuple giving the shape of the dataset. dtype: dtype or str, optional Numpy dtype or string. maxshape: tuple or int or None, optional The source dataset is resizable up to this shape. Use `None` for axes you want to be unlimited. Attributes ---------- vsource: h5py.VirtualSource The underlying sliced virtual source shape: tuple of int or The shape of the source. dtype: dtype The dtype of the source. ndim: int The number of dimensions of the source. nbytes: int The number of bytes virtually linked into the source. Methods ------- to_dataset(file_or_group, name) Puts the source into a layout and writes it virtually into the specified HDF5 file of group with the given name. Examples -------- >>> import os >>> from tempfile import TemporaryDirectory >>> import h5py >>> import numpy as np >>> from xdas.virtual import VirtualSource >>> with TemporaryDirectory() as tmpdir: # doctest:+ELLIPSIS ... shape = (2, 3, 5) ... data = np.arange(np.prod(shape)).reshape(*shape) ... with h5py.File(os.path.join(tmpdir, "source.h5"), "w") as file: ... file.create_dataset("data", data.shape, data.dtype, data) ... source = VirtualSource(file["data"]) # we both write and get source here ... source = source[1:-1] # the source can be sliced ... result = np.asarray(source) ... assert np.array_equal(result, data[1:-1]) <...> """
[docs] def __init__( self, path_or_dataset, name=None, shape=None, dtype=None, maxshape=None ): self._vsource = h5py.VirtualSource( path_or_dataset, name, shape, dtype, maxshape ) self._sel = Selection(self._vsource.shape)
def __getitem__(self, key): self = copy(self) self._sel = self._sel.__getitem__(key) return self def __array__(self, dtype=None): with h5py.File(self.vsource.path) as file: dataset = file[self.vsource.name] return np.asarray(dataset[self._sel.get_indexer()], dtype=dtype) # We used to create an temporary file: # return self._to_layout().__array__(dtype) @property def vsource(self): """Underlying :class:`h5py.VirtualSource` with the current selection applied.""" return self._vsource.__getitem__(self._sel.get_indexer()) @property def shape(self): """Shape of the selected region of the source dataset.""" return self.vsource.shape @property def dtype(self): """NumPy dtype of the source dataset.""" return self.vsource.dtype
[docs] def to_dataset(self, file_or_group, name): """Write this source as an HDF5 virtual dataset in *file_or_group*.""" self._to_layout().to_dataset(file_or_group, name)
def _to_layout(self): layout = VirtualLayout(self.shape, self.dtype) layout[...] = self return layout
[docs] class Selection: """ Used to perform lazy selection. It is usefull when dealing with lazy array to avoid loading unneccessary data. It must be initialized with the shape of the underlying array. It allows to track the succesive slice or single element selections made along the different dimensions of the array. Once the overall selection must be aaplied, the `get_indexer` method can be called to retreive a tuple of slice and/or int that can be applied to the array. Parameters ---------- shape: tuple of int The shape of the array to slice lazily. Attributes ---------- shape: tuple of int The shape of the array after selection is applied. ndim: int The dimensionality of the array after selection is applied. Methods ------- get_indexer() Retrun a tuple of slice or int that can be applied to the array to effectively perform the selection Examples -------- >>> import numpy as np >>> from xdas.virtual import Selection `Selection` is meant to be used on lazy arrays but here we show an example on a in-memory array for simplicity: >>> arr = np.arange(2 * 3 * 5).reshape(2, 3, 5) Successive indexing can be computed on the flight: >>> expected = arr[0][:, 1:-1][::2] Or it can be delayed up to the point where the user is done with indexing: >>> sel = Selection(arr.shape) >>> sel = sel[0][:, 1:-1][::2] # Successive selections >>> slc = sel.get_indexer() >>> assert np.array_equal(arr[slc], expected) The shape of the the resulting array can be known in advance: >>> assert sel.shape == expected.shape """
[docs] def __init__(self, shape): self._shape = shape self._selectors = Selectors([SliceSelector(size) for size in shape]) self._whole = True
def __getitem__(self, key): sel = deepcopy(self) sel._whole = False if not isinstance(key, tuple): key = (key,) dim = 0 for k in key: while isinstance(sel._selectors[dim], SingleSelector): dim += 1 sel._selectors[dim] = sel._selectors[dim][k] dim += 1 return sel @property def shape(self): """Shape of the array after all selections are applied.""" return tuple( len(selector) for selector in self._selectors if not isinstance(selector, SingleSelector) ) @property def ndim(self): """Number of dimensions remaining after selections.""" return len(self.shape)
[docs] def get_indexer(self): """Return a tuple of slices/ints that materialises the accumulated selections.""" return tuple(selector.get_indexer() for selector in self._selectors)
class Selectors(list): """Selector list that raises informative error on out of bound indexing.""" def __getitem__(self, key): if key >= len(self): raise IndexError( f"too many indices for selection: selection is {len(self)}-dimensional" ) return super().__getitem__(key) class SingleSelector: """ A lazy single element selector. Parameters ---------- index: int The index to select. """ def __init__(self, index): self._index = index def get_indexer(self): """Return the stored integer index.""" return self._index class SliceSelector: """ A lazy slice selection. The actual selection is stored as a `range` object that is transformed to a `slice` object when calling `get_indexer`. Parameters ---------- size: int The size of the axis of the data to slice lazily. """ def __init__(self, size): self._range = range(size) def __getitem__(self, key): try: item = self._range[key] except IndexError: raise IndexError( f"index {key} is out of bounds for axis with size {len(self)}" ) if isinstance(item, int): return SingleSelector(item) else: sel = copy(self) sel._range = item return sel def __len__(self): return len(self._range) def get_indexer(self): """Return a :class:`slice` that represents the stored range selection.""" if len(self) == 0: return slice(0) elif self._range.stop < 0: return slice(self._range.start, None, self._range.step) else: return slice(self._range.start, self._range.stop, self._range.step) def _to_human(size): """Convert raw byte numbers into a human readable ones with units.""" unit = {0: "B", 1: "KB", 2: "MB", 3: "GB", 4: "TB"} n = 0 while size > 1024: size /= 1024 n += 1 return f"{size:.1f}{unit[n]}"