Source code for xdas.coordinates.sampled

"""
:class:`SampledCoordinate`: regularly-sampled coordinate.

Described by tie points and a fixed ``sampling_interval`` between them.
"""

import re

import numpy as np

from .core import (
    Coordinate,
    format_datetime,
    is_monotonic_increasing,
    parse,
    parse_tolerance,
)

CODE_TO_UNITS = {
    "h": "hours",
    "m": "minutes",
    "s": "seconds",
    "ms": "milliseconds",
    "us": "microseconds",
    "ns": "nanoseconds",
}
UNITS_TO_CODE = {v: k for k, v in CODE_TO_UNITS.items()}


[docs] class SampledCoordinate(Coordinate, name="sampled"): """ A coordinate that is sampled at regular intervals. Parameters ---------- data : dict-like The data of the coordinate. dim : str, optional The dimension name of the coordinate, by default None. dtype : str or numpy.dtype, optional The data type of the coordinate, by default None. """
[docs] def __init__(self, data=None, dim=None, dtype=None): # empty if data is None: data = {"tie_values": [], "tie_lengths": [], "sampling_interval": None} empty = True else: empty = False # parse data data, dim = parse(data, dim) if not self.__class__.isvalid(data): raise ValueError( "`data` must be dict-like and contain `tie_values`, `tie_lengths`, and " "`sampling_interval`" ) tie_values = np.asarray(data["tie_values"], dtype=dtype) tie_lengths = np.asarray(data["tie_lengths"]) sampling_interval = data["sampling_interval"] # check shapes if not tie_values.ndim == 1: raise ValueError("`tie_values` must be 1D") if not tie_lengths.ndim == 1: raise ValueError("`tie_lengths` must be 1D") if not len(tie_values) == len(tie_lengths): raise ValueError("`tie_values` and `tie_lengths` must have the same length") # check dtypes and values if not empty: # tie_values if not ( np.issubdtype(tie_values.dtype, np.number) or np.issubdtype(tie_values.dtype, np.datetime64) ): raise ValueError( "`tie_values` must have either numeric or datetime dtype" ) # tie_lengths if not np.issubdtype(tie_lengths.dtype, np.integer): raise ValueError("`tie_lengths` must be integer-like") if not np.all(tie_lengths > 0): raise ValueError("`tie_lengths` must be strictly positive integers") # sampling_interval if not np.ndim(sampling_interval) == 0: raise ValueError("`sampling_interval` must be a scalar value") sampling_interval = np.asarray(sampling_interval)[()] # ensure numpy scalar if np.issubdtype(tie_values.dtype, np.datetime64): if not np.issubdtype( np.asarray(sampling_interval).dtype, np.timedelta64 ): raise ValueError( "`sampling_interval` must be timedelta64 for datetime64 `tie_values`" ) # store data self.data = { "tie_values": tie_values, "tie_lengths": tie_lengths, "sampling_interval": sampling_interval, } self.dim = dim
@property def tie_values(self): """Start values of each regularly-sampled segment.""" return self.data["tie_values"] @property def tie_lengths(self): """Number of samples in each regularly-sampled segment.""" return self.data["tie_lengths"] @property def sampling_interval(self): """Fixed step between consecutive samples (shared across all segments).""" return self.data["sampling_interval"] @property def dtype(self): """Dtype of the tie values (and of all materialised coordinate values).""" return self.tie_values.dtype @property def tie_indices(self): """Start integer index of each segment within the full coordinate array.""" return np.concatenate(([0], np.cumsum(self.tie_lengths[:-1]))) @property def empty(self): """``True`` if no segments have been set.""" return self.tie_values.shape == (0,) @property def ndim(self): """Always 1.""" return self.tie_values.ndim @property def shape(self): """Shape tuple ``(len(self),)``.""" return (len(self),) @property def indices(self): """Full integer index array from 0 to ``len(self) - 1``.""" if self.empty: return np.array([], dtype="int") else: return np.arange(len(self)) @property def values(self): """Materialised numpy array of all coordinate values.""" if self.empty: return np.array([], dtype=self.dtype) else: return self.get_value(self.indices) @property def start(self): """Value at index 0 (first tie value).""" return self.tie_values[0] @property def end(self): """Value one step past the last sample (exclusive upper bound).""" return self.tie_values[-1] + self.sampling_interval * self.tie_lengths[-1]
[docs] @staticmethod def isvalid(data): """Return ``True`` if *data* has ``tie_values``, ``tie_lengths``, and ``sampling_interval`` keys.""" match data: case { "tie_values": _, "tie_lengths": _, "sampling_interval": _, }: return True case _: return False
def __len__(self): if self.empty: return 0 else: return sum(self.tie_lengths) def __repr__(self): if self.empty: return "empty coordinate" elif len(self) == 1: return f"{self.tie_values[0]}" else: if np.issubdtype(self.dtype, np.floating): return f"{self.start:.3f} to {self.end:.3f}" elif np.issubdtype(self.dtype, np.datetime64): start_str = format_datetime(self.start) end_str = format_datetime(self.end) return f"{start_str} to {end_str}" else: return f"{self.start} to {self.end}" def __getitem__(self, item): if isinstance(item, slice): return self.slice_index(item) else: return Coordinate( self.get_value(item), None if np.isscalar(item) else self.dim ) def __add__(self, other): return self.__class__( { "tie_values": self.tie_values + other, "tie_lengths": self.tie_lengths, "sampling_interval": self.sampling_interval, }, self.dim, ) def __sub__(self, other): return self.__class__( { "tie_values": self.tie_values - other, "tie_lengths": self.tie_lengths, "sampling_interval": self.sampling_interval, }, self.dim, ) def __array__(self, dtype=None): out = self.values if dtype is not None: out = out.__array__(dtype) return out def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): raise NotImplementedError def __array_function__(self, func, types, args, kwargs): raise NotImplementedError def issampled(self): """Return ``True`` (this is a :class:`SampledCoordinate`).""" return True
[docs] def get_sampling_interval(self, cast=True): """ Return the sampling interval. Parameters ---------- cast : bool, optional If ``True`` (default), cast timedelta64 to seconds (float). """ delta = self.sampling_interval if cast and np.issubdtype(delta.dtype, np.timedelta64): delta = delta / np.timedelta64(1, "s") return delta
def is_monotonic_increasing(self): """Return ``True`` if no segment starts before the end of the previous one.""" return not self.get_split_indices("overlaps", tolerance=False).size
[docs] def equals(self, other): """Return ``True`` if *other* has identical tie values, lengths, sampling interval, dim, and dtype.""" return ( np.array_equal(self.tie_values, other.tie_values) and np.array_equal(self.tie_lengths, other.tie_lengths) and self.sampling_interval == other.sampling_interval and self.dim == other.dim and self.dtype == other.dtype )
[docs] def get_value(self, index): """Compute coordinate value(s) at integer position(s) *index* using the stored segments.""" index = self.format_index(index, bounds="raise") reference = np.searchsorted(self.tie_indices, index, side="right") - 1 return self.tie_values[reference] + ( (index - self.tie_indices[reference]) * self.sampling_interval )
[docs] def slice_index(self, index_slice): """Return a new :class:`SampledCoordinate` for the integer slice *index_slice*.""" # normalize slice start, stop, step = index_slice.indices(len(self)) if step < 0: raise NotImplementedError("negative slice step is not implemented") # align stop stop += (start - stop) % step # TODO: check for negative step # get relative start and stop within each tie q, r = np.divmod(start - self.tie_indices, step) lo = np.maximum(q, 0) * step + r q, r = np.divmod(self.tie_indices + self.tie_lengths - stop, step) hi = self.tie_lengths - np.maximum(q, 0) * step + r # filter empty segments mask = hi > lo lo = lo[mask] hi = hi[mask] # compute new tie values, tie lengths and sampling interval tie_values = self.tie_values[mask] + lo * self.sampling_interval tie_lengths = (hi - lo) // step sampling_interval = self.sampling_interval * step # build new coordinate data = { "tie_values": tie_values, "tie_lengths": tie_lengths, "sampling_interval": sampling_interval, } return self.__class__(data, self.dim)
[docs] def get_indexer(self, value, method=None): """ Return the integer index for label *value* using the segment structure. Parameters ---------- value : scalar, str (ISO datetime), or array-like Label(s) to locate. method : {None, "nearest", "ffill", "bfill"}, optional How to handle values that fall in gaps or between samples. Returns ------- int or numpy.ndarray Raises ------ KeyError If *value* falls in an overlap region or is not found (exact mode). """ if isinstance(value, str): value = np.datetime64(value) else: value = np.asarray(value) if not is_monotonic_increasing( self.tie_values ): # TODO: make it work even in this case raise ValueError("tie_values must be strictly increasing") # find preceeding tie point reference = np.searchsorted(self.tie_values, value, side="right") - 1 reference = np.maximum(reference, 0) # overlaps before = np.maximum(reference - 1, 0) end = ( self.tie_values[before] + (self.tie_lengths[before] - 1) * self.sampling_interval ) if np.any((reference > 0) & (value <= end)): raise KeyError("value is in an overlap region") # gap after = np.minimum(reference + 1, len(self.tie_values) - 1) end = ( self.tie_values[reference] + (self.tie_lengths[reference] - 1) * self.sampling_interval ) match method: case "nearest": mask = (reference < len(self.tie_values) - 1) & ( value - end >= self.tie_values[after] - value ) reference = np.where(mask, after, reference) case "bfill": mask = (reference < len(self.tie_values) - 1) & (value >= end) reference = np.where(mask, after, reference) case "ffill" | None: pass case _: raise ValueError( "method must be one of `None`, 'nearest', 'ffill', or 'bfill'" ) offset = (value - self.tie_values[reference]) / self.sampling_interval match method: # pragma: no branch case None: if np.any( (offset % 1 != 0) | (offset < 0) | (offset >= self.tie_lengths[reference]) ): raise KeyError("index not found") offset = offset.astype(int) case "nearest": offset = np.round(offset).astype(int) offset = np.clip(offset, 0, self.tie_lengths[reference] - 1) case "ffill": offset = np.floor(offset).astype(int) if np.any(offset < 0): raise KeyError("index not found") offset = np.minimum(offset, self.tie_lengths[reference] - 1) case "bfill": # pragma: no branch offset = np.ceil(offset).astype(int) if np.any(offset > self.tie_lengths[reference] - 1): raise KeyError("index not found") offset = np.maximum(offset, 0) return self.tie_indices[reference] + offset
[docs] def concat(self, other): """Append *other* :class:`SampledCoordinate` segments after this one.""" if not isinstance(other, self.__class__): raise TypeError(f"cannot concatenate {type(other)} to {self.__class__}") if not self.dim == other.dim: raise ValueError("cannot concatenate coordinate with different dimension") if self.empty: return other if other.empty: return self if not self.dtype == other.dtype: raise ValueError("cannot concatenate coordinate with different dtype") if not self.sampling_interval == other.sampling_interval: raise ValueError( "cannot concatenate coordinate with different sampling intervals" ) tie_values = np.concatenate([self.tie_values, other.tie_values]) tie_lengths = np.concatenate([self.tie_lengths, other.tie_lengths]) return self.__class__( { "tie_values": tie_values, "tie_lengths": tie_lengths, "sampling_interval": self.sampling_interval, }, self.dim, )
[docs] def decimate(self, q): """Return a new coordinate keeping every *q*-th sample (integer decimation).""" return self[::q]
[docs] def simplify(self, tolerance=None): """ Merge adjacent segments whose gap is within *tolerance* of the sampling interval. Parameters ---------- tolerance : float, timedelta, or None Maximum allowed discrepancy between the expected and actual start of the next segment. ``None`` uses zero tolerance. ``False`` returns ``self`` unchanged. """ if tolerance is False: return self # TODO: copy tolerance = parse_tolerance(tolerance, self.dtype) tie_values = [self.tie_values[0]] tie_lengths = [self.tie_lengths[0]] for value, length in zip(self.tie_values[1:], self.tie_lengths[1:]): delta = value - (tie_values[-1] + self.sampling_interval * tie_lengths[-1]) if np.abs(delta) <= tolerance: tie_lengths[-1] += length else: tie_values.append(value) tie_lengths.append(length) return self.__class__( { "tie_values": np.array(tie_values), "tie_lengths": np.array(tie_lengths), "sampling_interval": self.sampling_interval, }, self.dim, )
[docs] def get_split_indices(self, kind="discontinuities", tolerance=False): """ Return integer indices of segment boundaries (start of each segment except the first). Parameters ---------- kind : {"discontinuities", "gaps", "overlaps"}, optional Which boundary type to return. Default ``"discontinuities"``. tolerance : float, timedelta, or ``False`` Minimum magnitude of the discrepancy to report. Returns ------- numpy.ndarray """ valid_kinds = {"discontinuities", "gaps", "overlaps"} if kind not in valid_kinds: raise ValueError(f"`kind` must be one of {valid_kinds}; got {kind!r}") indices = self.tie_indices[1:] # Fast path: no filtering requested if kind == "discontinuities" and tolerance is False: return indices deltas = self.tie_values[1:] - ( self.tie_values[:-1] + self.sampling_interval * self.tie_lengths[:-1] ) if tolerance is False: zero = np.timedelta64(0) if np.issubdtype(self.dtype, np.datetime64) else 0 match kind: # pragma: no branch case "gaps": mask = deltas >= zero case "overlaps": # pragma: no branch mask = deltas < zero else: tolerance = parse_tolerance(tolerance, self.dtype) match kind: # pragma: no branch case "discontinuities": mask = np.abs(deltas) > tolerance case "gaps": mask = deltas > tolerance case "overlaps": # pragma: no branch mask = deltas < -tolerance return indices[mask]
[docs] @classmethod def from_array(cls, arr, dim=None, sampling_interval=None): """Not supported — raises :exc:`NotImplementedError`.""" raise NotImplementedError("from_array is not implemented for SampledCoordinate")
[docs] def to_dict(self): """Serialise to ``{"dim": ..., "data": {"tie_values": ..., "tie_lengths": ..., "sampling_interval": ...}, "dtype": ...}``.""" tie_values = self.data["tie_values"] tie_lengths = self.data["tie_lengths"] if np.issubdtype(tie_values.dtype, np.datetime64): tie_values = tie_values.astype(str) data = { "tie_values": tie_values.tolist(), "tie_lengths": tie_lengths.tolist(), "sampling_interval": self.sampling_interval, } return {"dim": self.dim, "data": data, "dtype": str(self.dtype)}
def to_dataset(self, dataset, attrs): """Write sampling metadata into an xarray *dataset* using CF tie-point conventions.""" mapping = f"{self.name}: {self.name}_sampling" if "coordinate_sampling" in attrs: attrs["coordinate_sampling"] += " " + mapping else: attrs["coordinate_sampling"] = mapping tie_values = ( self.tie_values.astype("M8[ns]") if np.issubdtype(self.tie_values.dtype, np.datetime64) else self.tie_values ) tie_lengths = self.tie_lengths interp_attrs = { "tie_point_mapping": f"{self.dim}: {self.name}_values {self.name}_lengths", } # timedelta if np.issubdtype(self.sampling_interval.dtype, np.timedelta64): code, count = np.datetime_data(self.sampling_interval.dtype) interp_attrs["dtype"] = "timedelta64[ns]" interp_attrs["units"] = CODE_TO_UNITS[code] sampling_interval = count * self.sampling_interval.astype(int) else: sampling_interval = self.sampling_interval dataset.update( { f"{self.name}_sampling": ((), sampling_interval, interp_attrs), f"{self.name}_values": (f"{self.name}_points", tie_values), f"{self.name}_lengths": (f"{self.name}_points", tie_lengths), } ) return dataset, attrs @classmethod def from_dataset(cls, dataset, name): """Read sampled coordinates from *dataset* using the ``coordinate_sampling`` attribute.""" coords = {} mapping = dataset[name].attrs.pop("coordinate_sampling", None) if mapping is not None: matches = re.findall(r"(\w+): (\w+)", mapping) for match in matches: name, sampling = match dim, values, lengths = re.match( r"(\w+): (\w+) (\w+)", dataset[sampling].attrs["tie_point_mapping"] ).groups() data = { "tie_values": dataset[values].values, "tie_lengths": dataset[lengths].values, "sampling_interval": dataset[sampling].values[()], } # timedelta if ( "dtype" in dataset[sampling].attrs and "units" in dataset[sampling].attrs ): data["sampling_interval"] = np.timedelta64( data["sampling_interval"], UNITS_TO_CODE[dataset[sampling].attrs.pop("units")], ).astype(dataset[sampling].attrs.pop("dtype")) coords[name] = Coordinate(data, dim) return coords
[docs] @classmethod def from_block(cls, start, size, step, dim=None, dtype=None): """Build a single-segment :class:`SampledCoordinate` starting at *start* with *size* samples and step *step*.""" data = { "tie_values": [start], "tie_lengths": [size], "sampling_interval": step, } return cls(data, dim=dim, dtype=dtype)