"""
Phase-pick utilities for :class:`DataArray` objects.
Includes tapered selection and cross-correlation based picking of onset times.
"""
import numpy as np
from numba import njit, prange
from scipy.fft import next_fast_len
import xdas as xd
[docs]
def tapered_selection(da, start, end, window=None, size=None, dim="last"):
"""
Select and taper a DataArray based on `start` and `end` values.
Coordinates with NaN or NaT `start` or `end` values are ignored. If no `size` is
provided, the length of the resulting data is determined by the next fast length
(for FFT) of the maximum distance between the start and end values. The tapering
window is split in half and applied to the start and end of the selected data. The
window size must be smaller than the smallest selected data window.
Parameters
----------
da : DataArray
Input data array to select and taper. Must be 2D and have `dim` as one of its
dimensions.
start : array-like
Start values along the other dimension than `dim` (must be 1D and have the
same size) NaN or NaT values indicate coordinates to be ignored.
end : array-like
End values along the other dimension than `dim` (must be 1D and have the
same size) NaN or NaT values indicate coordinates to be ignored.
size : int, optional
Size of the output data along `dim`. If None, it is determined by the next
fast length of the maximum selected window.
dim : str, optional
Dimension along which to perform the selection and tapering. Default is 'last'.
window : array-like, optional
Tapering window to apply to the selected data.
Returns
-------
DataArray
A DataArray containing the selected and tapered data with sizes {other_dim: N,
`dim`: `size`}, where N is the number of valid start/end pairs. The `dim`
dimension becomes the last dimension and its coordinates run from 0 to
d * (size - 1), where d is the sampling interval along `dim`.
"""
# transpose so `dim` is last
da = da.transpose(..., dim)
# convert to numpy
data = np.asarray(da)
start = np.asarray(start)
end = np.asarray(end)
window = np.asarray(window if window is not None else [])
# check shapes
if not data.shape[:-1] == start.shape == end.shape:
raise ValueError("shape mismatch between `da`, `start`, and `end`")
# select valid start/end
mask = np.isfinite(start) & np.isfinite(end)
selection = np.nonzero(mask)[0]
# check selection
if selection.size == 0:
raise ValueError("No valid start/end pairs found")
# get selection indices
startindex = da[dim].get_indexer(start[selection], method="bfill")
endindex = da[dim].get_indexer(end[selection], method="ffill")
stopindex = endindex + 1
# determine output size
if size is None:
size = next_fast_len(max(stopindex - startindex))
# check window size
if min(stopindex - startindex) < window.size:
raise ValueError("some selected windows are smaller than the window size")
# make window even-sized (central value should be 1 so can be skipped)
if window.size % 2 != 0:
half_size = window.size // 2
window = np.concatenate((window[:half_size], window[-half_size:]))
# perform tapered selection
data = _tapered_selection(
data,
selection,
startindex,
stopindex,
size,
window,
)
# update output coords
coords = {}
for name in da.coords:
if da[name].dim == dim:
if name == dim:
coords[name] = {
"tie_indices": [0, size - 1],
"tie_values": [0.0, (size - 1) * xd.get_sampling_interval(da, dim)],
}
else:
pass # skip non-dimensional coords for `dim`
elif da[name].dim is not None:
coords[name] = da[name][selection]
else:
coords[name] = da[name]
# return output DataArray
return xd.DataArray(data, coords=coords, dims=da.dims)
@njit(parallel=True, cache=True)
def _tapered_selection(data, sel, start, stop, size, window): # pragma: no cover
out = np.zeros((sel.size, size), dtype=data.dtype)
w = window.size // 2
for i in prange(sel.size):
j = 0
n = stop[i] - start[i]
p = sel[i]
q = start[i]
k = 0
while j < w:
out[i, j] = data[p, q] * window[k]
j += 1
q += 1
k += 1
while j < n - w:
out[i, j] = data[p, q]
j += 1
q += 1
while j < n:
out[i, j] = data[p, q] * window[k]
j += 1
q += 1
k += 1
return out