Source code for xdas.signal

"""
Signal processing functions for :class:`DataArray`.

Includes filtering, resampling, tapering, detrending, and spectral helpers,
all coordinate-aware and multi-threaded via :func:`~xdas.parallel.parallelize`.
"""

import numpy as np
import scipy.signal as sp

from .atoms.core import atomized
from .coordinates.core import Coordinate, get_sampling_interval
from .core.dataarray import DataArray
from .parallel import parallelize
from .spectral import stft  # noqa


[docs] @atomized def detrend(da, type="linear", dim="last", parallel=None): """ Detrend data along given dimension. Parameters ---------- da : DataArray The data to detrend. type : str Either "linear" or "constant". dim : str The dimension along which to detrend the data. parallel : bool or int, optional Number of threads to use. True uses all cores, False uses one, an int uses that many, None defers to the global xdas configuration. Default is None. Returns ------- DataArray The detrended data. Notes ----- Splits on data discontinuities along `dim`. """ axis = da.get_axis_num(dim) across = int(axis == 0) func = parallelize(across, across, parallel)(sp.detrend) data = func(da.values, axis, type) return da.copy(data=data)
[docs] @atomized def taper(da, window="hann", fftbins=False, dim="last", parallel=None): """ Apply a tapering window along the given dimension. Parameters ---------- da : DataArray The data to taper. window : str or tuple, optional The window to use, by default "hann" fftbins : bool, optional Whether to use a periodic windowing, by default False dim : str, optional Dimension along which to taper, by default "last" parallel : bool or int, optional Number of threads to use. True uses all cores, False uses one, an int uses that many, None defers to the global xdas configuration. Default is None. Returns ------- DataArray The tapered data. """ axis = da.get_axis_num(dim) w = sp.get_window(window, da.shape[axis], fftbins=fftbins) shape = [-1 if ax == axis else 1 for ax in range(da.ndim)] w = w.reshape(shape) across = int(axis == 0) func = parallelize(across, across, parallel)(np.multiply) data = func(da.values, w) return da.copy(data=data)
[docs] @atomized def filter(da, freq, btype, corners=4, zerophase=False, dim="last", parallel=None): """ SOS IIR filtering along given dimension. Parameters ---------- da: DataArray Traces to filter. freq: float or list Cuttoff frequency or band corners [Hz]. btype: {'bandpass', 'lowpass', 'highpass', 'bandstop'} The type of the filter. corners: int The order of the filter. zerophase: bool If True, apply filter once forwards and once backwards. This results in twice the filter order but zero phase shift in the resulting filtered trace. dim: str, optional The dimension along which to filter. parallel : bool or int, optional Number of threads to use. True uses all cores, False uses one, an int uses that many, None defers to the global xdas configuration. Default is None. Returns ------- DataArray Filtered traces. """ axis = da.get_axis_num(dim) across = int(axis == 0) fs = 1.0 / get_sampling_interval(da, dim) sos = sp.iirfilter(corners, freq, btype=btype, ftype="butter", output="sos", fs=fs) if zerophase: func = parallelize((None, across), across, parallel)(sp.sosfiltfilt) else: func = parallelize((None, across), across, parallel)(sp.sosfilt) data = func(sos, da.values, axis=axis) return da.copy(data=data)
[docs] @atomized def hilbert(da, N=None, dim="last", parallel=None): """ Compute the analytic signal, using the Hilbert transform. The transformation is done along the last axis by default. Parameters ---------- da : DataArray Signal data. Must be real. N: int, optional Number of Fourier components. Default: `da.sizes[dim]`. dim: str, optional The dimension along which to transform. Default: last. parallel : bool or int, optional Number of threads to use. True uses all cores, False uses one, an int uses that many, None defers to the global xdas configuration. Default is None. Returns ------- DataArray Analytic signal of `da` along dim. Notes ----- Splits on data discontinuities along `dim`. Examples -------- In this example we use the Hilbert transform to determine the analytic signal. >>> import xdas.signal as xs >>> from xdas.synthetics import wavelet_wavefronts >>> da = wavelet_wavefronts() >>> xs.hilbert(da, dim="time") <xdas.DataArray (time: 300, distance: 401)> [[ 0.0497+0.1632j -0.0635+0.0125j ... 0.1352-0.3107j -0.2832-0.0126j] [-0.1096-0.0335j 0.124 +0.0257j ... -0.0444+0.2409j 0.1378-0.2702j] ... [-0.1977+0.0545j -0.0533-0.1947j ... 0.3722+0.125j -0.0127+0.1723j] [ 0.1221-0.1808j 0.1888+0.0368j ... -0.4517+0.1581j 0.0411+0.1512j]] Coordinates: * time (time): 2023-01-01T00:00:00.000 to 2023-01-01T00:00:05.980 * distance (distance): 0.000 to 10000.000 """ axis = da.get_axis_num(dim) across = int(axis == 0) func = parallelize(across, across, parallel)(sp.hilbert) data = func(da.values, N, axis) return da.copy(data=data)
[docs] @atomized def resample(da, num, dim="last", window=None, domain="time", parallel=None): """ Resample da to num samples using Fourier method along the given dimension. The resampled signal starts at the same value as da but is sampled with a spacing of len(da) / num * (spacing of da). Because a Fourier method is used, the signal is assumed to be periodic. Parameters ---------- da: DataArray The data to be resampled. num: int The number of samples in the resampled signal. dim: str, optional The dimension along which to resample. Default is last. window: array_like, callable, string, float, or tuple, optional Specifies the window applied to the signal in the Fourier domain. See below for details. domain: string, optional A string indicating the domain of the input x: `time` Consider the input da as time-domain (Default), `freq` Consider the input da as frequency-domain. parallel : bool or int, optional Number of threads to use. True uses all cores, False uses one, an int uses that many, None defers to the global xdas configuration. Default is None. Returns ------- DataArray The resampled dataarray. Notes ----- Splits on data discontinuities along `dim`. Examples -------- A synthetic dataarray is resample from 300 to 100 samples along the time dimension. The 'hamming' window is used. >>> import xdas.signal as xs >>> from xdas.synthetics import wavelet_wavefronts >>> da = wavelet_wavefronts() >>> xs.resample(da, 100, dim='time', window='hamming', domain='time') <xdas.DataArray (time: 100, distance: 401)> [[ 0.039988 0.04855 -0.08251 ... 0.02539 -0.055219 -0.006693] [-0.032913 -0.016732 0.033743 ... 0.028534 -0.037685 0.032918] [ 0.01215 0.064107 -0.048831 ... 0.009131 0.053133 0.019843] ... [-0.036508 0.050059 0.015494 ... -0.012022 -0.064922 0.034198] [ 0.054003 -0.013902 -0.084095 ... 0.008979 0.080804 -0.063866] [-0.042741 -0.03524 0.122637 ... -0.013453 -0.075183 0.093055]] Coordinates: * time (time): 2023-01-01T00:00:00.000 to 2023-01-01T00:00:05.940 * distance (distance): 0.000 to 10000.000 """ axis = da.get_axis_num(dim) dim = da.dims[axis] across = int(axis == 0) func = parallelize(across, across, parallel)(sp.resample) data, t = func(da.values, num, da[dim].values, axis, window, domain) new_coord = {"tie_indices": [0, num - 1], "tie_values": [t[0], t[-1]]} coords = { name: new_coord if name == dim else coord for name, coord in da.coords.items() if not (coord.dim == dim and not name == dim) # don't handle non-dimensional } return DataArray(data, coords, da.dims, da.name, da.attrs)
[docs] @atomized def resample_poly( da, up, down, dim="last", window=("kaiser", 5.0), padtype="constant", cval=None, parallel=None, ): """ Resample da along the given dimension using polyphase filtering. The signal in `da` is upsampled by the factor `up`, a zero-phase low-pass FIR filter is applied, and then it is downsampled by the factor `down`. The resulting sample rate is ``up / down`` times the original sample rate. By default, values beyond the boundary of the signal are assumed to be zero during the filtering step. Parameters ---------- da : DataArray The data to be resampled. up : int The upsampling factor. down : int The downsampling factor. dim : str, optional The dimension of `da` that is resampled. Default is last. window : string, tuple, or array_like, optional Desired window to use to design the low-pass filter, or the FIR filter coefficients to employ. See below for details. padtype : string, optional `constant`, `line`, `mean`, `median`, `maximum`, `minimum` or any of the other signal extension modes supported by `scipy.signal.upfirdn`. Changes assumptions on values beyond the boundary. If `constant`, assumed to be `cval` (default zero). If `line` assumed to continue a linear trend defined by the first and last points. `mean`, `median`, `maximum` and `minimum` work as in `np.pad` and assume that the values beyond the boundary are the mean, median, maximum or minimum respectively of the array along the dimension. cval : float, optional Value to use if `padtype='constant'`. Default is zero. parallel : bool or int, optional Number of threads to use. True uses all cores, False uses one, an int uses that many, None defers to the global xdas configuration. Default is None. Notes ----- Splits on data discontinuities along `dim`. Returns ------- DataArray The resampled data. Examples -------- This example is made to resample the input dataarray in the time domain at 100 samples with an original shape of 300 in time. The choosed window is a 'hamming' window. The dataarray is synthetic data. >>> import xdas.signal as xs >>> from xdas.synthetics import wavelet_wavefronts >>> da = wavelet_wavefronts() >>> xs.resample_poly(da, 2, 5, dim='time') <xdas.DataArray (time: 120, distance: 401)> [[-0.006378 0.012767 -0.002068 ... -0.033461 0.002603 -0.027478] [ 0.008851 -0.037799 0.009595 ... 0.053291 -0.0396 0.026909] [-0.034468 0.085153 -0.038036 ... -0.015803 0.030245 0.047028] ... [ 0.02834 0.053455 -0.155873 ... 0.033726 -0.036478 -0.016146] [ 0.015454 -0.062852 0.049064 ... -0.018409 0.113782 -0.072631] [-0.026921 -0.01264 0.087272 ... 0.001695 -0.147191 0.177587]] Coordinates: * time (time): 2023-01-01T00:00:00.000 to 2023-01-01T00:00:05.950 * distance (distance): 0.000 to 10000.000 """ axis = da.get_axis_num(dim) dim = da.dims[axis] across = int(axis == 0) func = parallelize(across, across, parallel)(sp.resample_poly) data = func(da.values, up, down, axis, window, padtype, cval) start = da[dim][0].values d = da[dim][-1].values - da[dim][-2].values end = da[dim][-1].values + d new_coord = Coordinate( { "tie_indices": [0, data.shape[axis]], "tie_values": [start, end], }, dim, ) new_coord = new_coord[:-1] coords = { name: new_coord if name == dim else coord for name, coord in da.coords.items() if not (coord.dim == dim and not name == dim) # don't handle non-dimensional } return DataArray(data, coords, da.dims, da.name, da.attrs)
[docs] @atomized def lfilter(b, a, da, dim="last", zi=None, parallel=None): """ Filter data along one-dimension with an IIR or FIR filter. Filter a data sequence, `da`, using a digital filter. The filter is a direct form II transposed implementation of the standard difference equation. Parameters ---------- b : array_like The numerator coefficient vector in a 1-D sequence. a : array_like The denominator coefficient vector in a 1-D sequence. If ``a[0]`` is not 1, then both `a` and `b` are normalized by ``a[0]``. da : DataArray An N-dimensional input dataarray. dim : str, optional The dimension of the input data array along which to apply the linear filter. Default is last. zi : array_like or str, optional Initial conditions for the filter delays. If `zi` is None or ... then initial rest is assumed. parallel : bool or int, optional Number of threads to use. True uses all cores, False uses one, an int uses that many, None defers to the global xdas configuration. Default is None. Returns ------- da : DataArray The output of the digital filter. zf : array, optional If `zi` is None, this is not returned. If `zi` is given or ... then `zf` holds the final filter delay values. Notes ----- Splits on data discontinuities along `dim`. Examples -------- >>> import scipy.signal as sp >>> import xdas.signal as xs >>> from xdas.synthetics import wavelet_wavefronts >>> da = wavelet_wavefronts() >>> b, a = sp.iirfilter(4, 0.5, btype="low") >>> xs.lfilter(b, a, da, dim='time') <xdas.DataArray (time: 300, distance: 401)> [[ 0.004668 -0.005968 0.007386 ... -0.0138 0.01271 -0.026618] [ 0.008372 -0.01222 0.022552 ... -0.041387 0.046667 -0.093521] [-0.008928 0.002764 0.012621 ... -0.032496 0.039645 -0.076117] ... [ 0.012576 -0.1661 0.196026 ... 0.048191 -0.014532 -0.033122] [-0.06294 -0.092234 0.316862 ... 0.045337 -0.139729 0.094086] [-0.035233 0.036613 0.044002 ... -0.053585 -0.121344 0.241415]] Coordinates: * time (time): 2023-01-01T00:00:00.000 to 2023-01-01T00:00:05.980 * distance (distance): 0.000 to 10000.000 """ axis = da.get_axis_num(dim) across = int(axis == 0) if zi is ...: n_sections = max(len(a), len(b)) - 1 shape = tuple( n_sections if _axis == axis else _size for _axis, _size in enumerate(da.shape) ) zi = np.zeros(shape) if zi is None: func = parallelize((None, None, across), across, parallel)(sp.lfilter) data = func(b, a, da.values, axis, zi) return da.copy(data=data) else: func = parallelize( (None, None, across, None, across), (across, across), parallel )(sp.lfilter) data, zf = func(b, a, da.values, axis, zi) return da.copy(data=data), zf
[docs] @atomized def filtfilt( b, a, da, dim="last", padtype="odd", padlen=None, method="pad", irlen=None, parallel=None, ): """ Apply a digital filter forward and backward to a signal. This function applies a linear digital filter twice, once forward and once backwards. The combined filter has zero phase and a filter order twice that of the original. The function provides options for handling the edges of the signal. Parameters ---------- b : (N,) array_like The numerator coefficient vector of the filter. a : (N,) array_like The denominator coefficient vector of the filter. If ``a[0]`` is not 1, then both `a` and `b` are normalized by ``a[0]``. da : DataArray The array of data to be filtered. dim : str, optional The dimension of `da` to which the filter is applied. Default is last. padtype : str or None, optional Must be 'odd', 'even', 'constant', or None. This determines the type of extension to use for the padded signal to which the filter is applied. If `padtype` is None, no padding is used. The default is 'odd'. padlen : int or None, optional The number of elements by which to extend `da` at both ends of `dim` before applying the filter. This value must be less than ``da.sizes[dim] - 1``. ``padlen=0`` implies no padding. The default value is ``3 * max(len(a), len(b))``. method : str, optional Determines the method for handling the edges of the signal, either "pad" or "gust". When `method` is "pad", the signal is padded; the type of padding is determined by `padtype` and `padlen`, and `irlen` is ignored. When `method` is "gust", Gustafsson's method is used, and `padtype` and `padlen` are ignored. irlen : int or None, optional When `method` is "gust", `irlen` specifies the length of the impulse response of the filter. If `irlen` is None, no part of the impulse response is ignored. For a long signal, specifying `irlen` can significantly improve the performance of the filter. parallel : bool or int, optional Number of threads to use. True uses all cores, False uses one, an int uses that many, None defers to the global xdas configuration. Default is None. Notes ----- Splits on data discontinuities along `dim`. Returns ------- DataArray The filtered output with the same coordinates as `da`. Examples -------- >>> import scipy.signal as sp >>> import xdas.signal as xs >>> from xdas.synthetics import wavelet_wavefronts >>> da = wavelet_wavefronts() >>> b, a = sp.iirfilter(4, 0.5, btype="low") >>> xs.lfilter(b, a, da, dim='time') <xdas.DataArray (time: 300, distance: 401)> [[ 0.004668 -0.005968 0.007386 ... -0.0138 0.01271 -0.026618] [ 0.008372 -0.01222 0.022552 ... -0.041387 0.046667 -0.093521] [-0.008928 0.002764 0.012621 ... -0.032496 0.039645 -0.076117] ... [ 0.012576 -0.1661 0.196026 ... 0.048191 -0.014532 -0.033122] [-0.06294 -0.092234 0.316862 ... 0.045337 -0.139729 0.094086] [-0.035233 0.036613 0.044002 ... -0.053585 -0.121344 0.241415]] Coordinates: * time (time): 2023-01-01T00:00:00.000 to 2023-01-01T00:00:05.980 * distance (distance): 0.000 to 10000.000 """ axis = da.get_axis_num(dim) across = int(axis == 0) func = parallelize((None, None, across), across, parallel)(sp.filtfilt) data = func(b, a, da.values, axis, padtype, padlen, method, irlen) return da.copy(data=data)
[docs] @atomized def sosfilt(sos, da, dim="last", zi=None, parallel=None): """ Filter data along one dimension using cascaded second-order sections. Filter a data sequence, `da`, using a digital IIR filter defined by `sos`. Parameters ---------- sos : array_like Array of second-order filter coefficients, must have shape ``(n_sections, 6)``. Each row corresponds to a second-order section, with the first three columns providing the numerator coefficients and the last three providing the denominator coefficients. da : DataArray An N-dimensional input dataarray. dim : str, optional The dimension of the input dataarray along which to apply the linear filter. Default is -1. zi : array_like or str, optional Initial conditions for the cascaded filter delays. It is a (at least 2D) vector of shape ``(n_sections, ..., 2, ...)``, where ``..., 2, ...`` denotes the shape of `da`, but with ``da.sizes[dim]`` replaced by 2. If `zi` is None,... , or is not given then initial rest (i.e. all zeros) is assumed. parallel : bool or int, optional Number of threads to use. True uses all cores, False uses one, an int uses that many, None defers to the global xdas configuration. Default is None. Returns ------- y : DataArray The output of the digital filter. zi : ndarray, optional If `zi` is None, this is not returned. If `zi` is given or is ... then `zf` holds the final filter delay values. Notes ----- Splits on data discontinuities along `dim`. Examples -------- >>> import scipy.signal as sp >>> import xdas.signal as xs >>> from xdas.synthetics import wavelet_wavefronts >>> da = wavelet_wavefronts() >>> sos = sp.iirfilter(4, 0.5, btype="low", output="sos") >>> xs.sosfilt(sos, da, dim='time') <xdas.DataArray (time: 300, distance: 401)> [[ 0.004668 -0.005968 0.007386 ... -0.0138 0.01271 -0.026618] [ 0.008372 -0.01222 0.022552 ... -0.041387 0.046667 -0.093521] [-0.008928 0.002764 0.012621 ... -0.032496 0.039645 -0.076117] ... [ 0.012576 -0.1661 0.196026 ... 0.048191 -0.014532 -0.033122] [-0.06294 -0.092234 0.316862 ... 0.045337 -0.139729 0.094086] [-0.035233 0.036613 0.044002 ... -0.053585 -0.121344 0.241415]] Coordinates: * time (time): 2023-01-01T00:00:00.000 to 2023-01-01T00:00:05.980 * distance (distance): 0.000 to 10000.000 """ axis = da.get_axis_num(dim) across = int(axis == 0) if zi is ...: n_sections = sos.shape[0] shape = (n_sections,) + tuple( 2 if index == axis else element for index, element in enumerate(da.shape) ) zi = np.zeros(shape) if zi is None: func = parallelize((None, across), across, parallel)(sp.sosfilt) data = func(sos, da.values, axis, zi) return da.copy(data=data) else: func = parallelize( (None, across, None, across + 1), (across, across + 1), parallel )(sp.sosfilt) data, zf = func(sos, da.values, axis, zi) return da.copy(data=data), zf
[docs] @atomized def sosfiltfilt(sos, da, dim="last", padtype="odd", padlen=None, parallel=None): """ Apply a forward-backward digital filter using cascaded second-order sections. Parameters ---------- sos : array_like Array of second-order filter coefficients, must have shape ``(n_sections, 6)``. Each row corresponds to a second-order section, with the first three columns providing the numerator coefficients and the last three providing the denominator coefficients. da : DataArray The data to be filtered. dim : str, optional The dimension of `da` to which the filter is applied. Default is last. padtype : str or None, optional Must be 'odd', 'even', 'constant', or None. This determines the type of extension to use for the padded signal to which the filter is applied. If `padtype` is None, no padding is used. The default is 'odd'. padlen : int or None, optional The number of elements by which to extend `da` at both ends of `dim` before applying the filter. This value must be less than ``da.sizes[dim] - 1``. ``padlen=0`` implies no padding. The default value is:: 3 * (2 * len(sos) + 1 - min((sos[:, 2] == 0).sum(), (sos[:, 5] == 0).sum())) The extra subtraction at the end attempts to compensate for poles and zeros at the origin (e.g. for odd-order filters) to yield equivalent estimates of `padlen` to those of `filtfilt` for second-order section filters built with `scipy.signal` functions. parallel : bool or int, optional Number of threads to use. True uses all cores, False uses one, an int uses that many, None defers to the global xdas configuration. Default is None. Returns ------- DataArray The filtered output with the same coordinates as `da`. Notes ----- Splits on data discontinuities along `dim`. Examples -------- >>> import scipy.signal as sp >>> import xdas.signal as xs >>> from xdas.synthetics import wavelet_wavefronts >>> da = wavelet_wavefronts() >>> sos = sp.iirfilter(4, 0.5, btype="low", output="sos") >>> xs.sosfiltfilt(sos, da, dim='time') <xdas.DataArray (time: 300, distance: 401)> [[ 0.04968 -0.063651 0.078731 ... -0.146869 0.135149 -0.283111] [-0.01724 0.018588 -0.037267 ... 0.025092 -0.107095 0.127912] [-0.004291 -0.002956 -0.032369 ... 0.078337 -0.150316 0.155965] ... [-0.068308 -0.06057 0.165391 ... -0.115742 -0.005265 0.19878 ] [-0.014123 0.039773 -0.063194 ... -0.090854 -0.053728 0.206149] [ 0.122203 0.188674 -0.231442 ... 0.304675 -0.452161 0.041323]] Coordinates: * time (time): 2023-01-01T00:00:00.000 to 2023-01-01T00:00:05.980 * distance (distance): 0.000 to 10000.000 """ axis = da.get_axis_num(dim) across = int(axis == 0) func = parallelize((None, across), across, parallel)(sp.sosfiltfilt) data = func(sos, da.values, axis, padtype, padlen) return da.copy(data=data)
[docs] @atomized def decimate(da, q, n=None, ftype="iir", zero_phase=True, dim="last", parallel=None): """ Downsample the signal after applying an anti-aliasing filter. By default, an order 8 Chebyshev type I filter is used. A 30 point FIR filter with Hamming window is used if `ftype` is 'fir'. Parameters ---------- da : DataArray The signal to be downsampled, as an N-dimensional dataarray. q : int The downsampling factor. When using IIR downsampling, it is recommended to call `decimate` multiple times for downsampling factors higher than 13. n : int, optional The order of the filter (1 less than the length for 'fir'). Defaults to 8 for 'iir' and 20 times the downsampling factor for 'fir'. ftype : str {'iir', 'fir'} or ``dlti`` instance, optional If 'iir' or 'fir', specifies the type of lowpass filter. If an instance of an `dlti` object, uses that object to filter before downsampling. dim : str, optional The dimension along which to decimate. zero_phase : bool, optional Prevent phase shift by filtering with `filtfilt` instead of `lfilter` when using an IIR filter, and shifting the outputs back by the filter's group delay when using an FIR filter. The default value of ``True`` is recommended, since a phase shift is generally not desired. parallel : bool or int, optional Number of threads to use. True uses all cores, False uses one, an int uses that many, None defers to the global xdas configuration. Default is None. Returns ------- DataArray The down-sampled signal. Notes ----- Splits on data discontinuities along `dim`. """ axis = da.get_axis_num(dim) dim = da.dims[axis] # TODO: this fist last thing is a bad idea... across = int(axis == 0) func = parallelize(across, across, parallel)(sp.decimate) data = func(da.values, q, n, ftype, axis, zero_phase) coords = da.coords.copy() for name in coords: if coords[name].dim == dim: coords[name] = coords[name][::q] return DataArray(data, coords, da.dims, da.name, da.attrs)
[docs] @atomized def integrate(da, midpoints=False, dim="last", parallel=None): """ Integrate along a given dimension. Parameters ---------- da : DataArray The data to integrate. midpoints : bool, optional Whether to move the coordinates by half a step, by default False. dim : str, optional The dimension along which to integrate, by default "last". parallel : bool or int, optional Number of threads to use. True uses all cores, False uses one, an int uses that many, None defers to the global xdas configuration. Default is None. Returns ------- DataArray The integrated data. Notes ----- Splits on data discontinuities along `dim`. """ axis = da.get_axis_num(dim) d = get_sampling_interval(da, dim) def func(x): return np.cumsum(x, axis=axis) * d across = int(axis == 0) func = parallelize(across, across, parallel)(func) data = func(da.values) out = da.copy(data=data) if midpoints: out[dim] = out[dim] + d / 2 return out
[docs] @atomized def differentiate(da, midpoints=False, dim="last", parallel=None): """ Differentiate along a given dimension. Parameters ---------- da : DataArray The data to differentiate. midpoints : bool, optional Whether to move the coordinates by half a step, by default False. dim : str, optional The dimension along which to differentiate, by default "last". parallel : bool or int, optional Number of threads to use. True uses all cores, False uses one, an int uses that many, None defers to the global xdas configuration. Default is None. Returns ------- DataArray The differentiated data. Notes ----- Splits on data discontinuities along `dim`. """ axis = da.get_axis_num(dim) d = get_sampling_interval(da, dim) def func(x): return np.diff(x, axis=axis) / d across = int(axis == 0) func = parallelize(across, across, parallel)(func) data = func(da.values) out = da.isel({dim: slice(None, -1)}).copy(data=data) if midpoints: out[dim] = out[dim] + d / 2 return out
[docs] @atomized def segment_mean_removal(da, limits, window="hann", dim="last"): # TODO: parallelize """ Piecewise mean removal. Parameters ---------- da : DataArray The data that segment mean should be removed. limits : list of float The segments limits. window : str, optional The tapering windows to apply at each window, by default "hann". dim : str, optional The dimension along which to remove the segment means, by default "last". Returns ------- DataArray The data with segment means removed. """ out = da.copy() axis = da.get_axis_num(dim) for sstart, send in zip(limits[:-1], limits[1:]): key = {dim: slice(sstart, np.nextafter(send, -np.inf))} data = out.loc[key].values win = sp.get_window(window, data.shape[axis]) shape = tuple(-1 if a == axis else 1 for a in range(data.ndim)) win = np.reshape(win, shape) ref = np.sum(data * win, axis=axis) / np.sum(win) out.loc[key] = out.loc[key].values - ref # TODO: Add DataArray Arithmetics. return out
[docs] @atomized def sliding_mean_removal( da, wlen, window="hann", pad_mode="reflect", dim="last", parallel=None ): """ Sliding mean removal. Parameters ---------- da : DataArray The data that sliding mean should be removed. wlen : float Length of the sliding mean. window : str, optional Tapering window used, by default "hann" pad_mode : str, optional Padding mode used, by default "reflect" dim : str, optional The dimension along which to remove the sliding mean, by default "last" parallel : bool or int, optional Number of threads to use. True uses all cores, False uses one, an int uses that many, None defers to the global xdas configuration. Default is None. Returns ------- DataArray The data with sliding mean removed. Notes ----- Splits on data discontinuities along `dim`. """ axis = da.get_axis_num(dim) d = get_sampling_interval(da, dim) n = round(wlen / d) if n % 2 == 0: n += 1 win = sp.get_window(window, n) win /= np.sum(win) shape = tuple(-1 if a == axis else 1 for a in range(da.ndim)) win = np.reshape(win, shape) pad_width = tuple((n // 2, n // 2) if a == axis else (0, 0) for a in range(da.ndim)) def func(x): return x - sp.fftconvolve( np.pad(x, pad_width, mode=pad_mode), win, mode="valid" ) across = int(axis == 0) func = parallelize(across, across, parallel)(func) data = func(da.values) return da.copy(data=data)
[docs] @atomized def medfilt(da, kernel_dim): # TODO: parallelize """ Perform a median filter along given dimensions. Apply a median filter to the input using a local window-size given by kernel_size. The array will automatically be zero-padded. Parameters ---------- da : DataArray A dataarray to filter. kernel_dim : dict A dictionary which keys are the dimensions over which to apply a median filtering and which values are the related kernel size in that direction. All values must be odd. If not all dims are provided, missing dimensions are associated to 1, i.e. no median filtering along that direction. At least one dimension must be passed. Returns ------- DataArray The median filtered data. Examples -------- A median filter is applied to some synthetic dataarray with a median window size of 7 along the time dimension and 5 along the space dimension. >>> import xdas.signal as xs >>> from xdas.synthetics import wavelet_wavefronts >>> da = wavelet_wavefronts() >>> xs.medfilt(da, {"time": 7, "distance": 5}) <xdas.DataArray (time: 300, distance: 401)> [[ 0. 0. 0. ... 0. 0. 0. ] [ 0. 0. 0. ... 0. 0. 0. ] [ 0. 0. 0. ... 0. 0. 0. ] ... [ 0. 0. 0. ... -0.000402 0. 0. ] [ 0. 0. 0. ... 0. 0. 0. ] [ 0. 0. 0. ... 0. 0. 0. ]] Coordinates: * time (time): 2023-01-01T00:00:00.000 to 2023-01-01T00:00:05.980 * distance (distance): 0.000 to 10000.000 """ if not all(dim in da.dims for dim in kernel_dim.keys()): raise ValueError("dims provided not in dataarray") kernel_size = tuple(kernel_dim[dim] if dim in kernel_dim else 1 for dim in da.dims) data = sp.medfilt(da.values, kernel_size) return da.copy(data=data)