Source code for xdas.trigger

"""
Threshold-based triggering atom :class:`Trigger`.

Detects phase arrivals in :class:`DataArray` objects using an on/off
mechanism.
"""

import numpy as np
import pandas as pd
from numba import njit

from .atoms.core import Atom, State, atomized
from .coordinates.core import Coordinate


[docs] class Trigger(Atom): """ Find picks in a data array along a given axis based on a given threshold. The pick findings use a triggering mechanism where triggers are turned on and off based on the threshold crossings. The trigger off threshold is half of the trigger on threshold. Picks are determined by finding the maximum value on each triggered region. Parameters ---------- thresh : float The threshold value for picking. dim : str, optional The dimension along which to find picks. Defaults to "last". Notes ----- For more details see the documentation of the `initialize` and `call` methods. Examples -------- >>> import numpy as np >>> import xdas as xd >>> from xdas.atoms import Trigger Use case: >>> cft = xd.DataArray( ... data=[[0.0, 0.1, 0.9, 0.8, 0.2, 0.1, 0.6, 0.7, 0.3, 0.2]], ... coords={ ... "space": [0.0], ... "time": {"tie_indices": [0, 9], "tie_values": [0.0, 9.0]}, ... }, ... ) Chunked processing using atomic processing: >>> atom = Trigger(thresh=0.5, dim="time") >>> chunks = xd.split(cft, 3, dim="time") >>> result = [] >>> for chunk in chunks: ... picks = atom(chunk, chunk_dim="time") ... result.append(picks) >>> result = pd.concat(result, ignore_index=True) >>> result space time value 0 0.0 2.0 0.9 1 0.0 7.0 0.7 """
[docs] def __init__(self, thresh, dim="last"): super().__init__() # parameters self.thresh_on = float(thresh) self.thresh_off = float(thresh) / 2.0 self.dim = str(dim) # states self.axis = State(...) self.shape = State(...) self.status = State(...) self.index = State(...) self.value = State(...) self.offset = State(...) self.coord = State(...)
def initialize(self, cft, **flags): """ Initialize the trigger with the following states. - "axis": An integer indicating the axis number of the dimension along which to find picks. - "shape": A tuple indicating the unravel shape of the lanes along wigh the the picks will be found. - "status": A boolean array indicating the trigger status for each lane. - "index": An integer array indicating the index of the last triggered value for each lane. - "value": A float array indicating the value of the last triggered value for each lane. - "offset": An integer indicating the offset of the chunk. - "coord": An InterpCoordinate containing coordinate information along 'dim' up to the last processed chunk. Parameters ---------- cft : DataArray The characteristic function where picks must be found. **flags Optional flags. """ self.axis = State(cft.get_axis_num(self.dim)) self.shape = State(cft.shape[: self.axis] + cft.shape[self.axis + 1 :]) self.status = State(np.zeros(self.shape, dtype=bool)) self.index = State(np.zeros(self.shape, dtype=int)) self.value = State(np.zeros(self.shape, dtype=float)) self.offset = State(0) self.coord = State(Coordinate({"tie_indices": [], "tie_values": []}, self.dim)) def call(self, cft, **flags): """ Call the trigger. Parameters ---------- cft : DataArray The characteristic function where picks must be found. **flags Optional flags. Returns ------- picks: DataFrame A DataFrame containing the pick coordinates and their corresponding values. Notes ----- In the trigger does not turn off at the end of the array, the last pick will not be found. This can be fixed by appending a zero to the end of the array. """ data = np.asarray(cft.values, dtype=float) values, coords = self._call_numeric(data) self.coord = _concat([self.coord, cft.coords[self.dim]]) picks = {} for axis, dim in enumerate(cft.dims): if dim == self.dim: picks[dim] = self.coord[coords[axis]].values else: picks[dim] = cft.coords[dim][coords[axis]].values picks["value"] = values return pd.DataFrame(picks) def _call_numeric(self, data): """ Find picks in a N-dimensional array along a given axis based on a given threshold. The pick findings use a triggering mechanism where triggers are turned on and off based on the threshold crossings. The trigger off threshold is half of the trigger on threshold. Picks are determined by finding the maximum value on each triggered region. Parameters ---------- data : DataArray The characteristic function where picks must be found. Returns ------- coords : tuple of 1d ndarray A tuple containing the coordinates of the picks. values : 1d ndarray The values of the picks. Notes ----- If the trigger does not turn off at the end of the array, the last pick will \ not be found. This can be fixed by appending a zero to the end of the array. """ data = np.moveaxis(data, self.axis, -1) length = data.shape[-1] # ravel additional axes into a unique lanes axis data = np.reshape(data, (-1, data.shape[-1])) status_view = np.reshape(self.status, (-1,)) index_view = np.reshape(self.index, (-1,)) value_view = np.reshape(self.value, (-1,)) lanes, indices, values = _trigger( data, self.thresh_on, self.thresh_off, status_view, index_view, value_view, self.offset, ) self.offset += length # unravel lanes indices if self.shape: coords = np.unravel_index(lanes, self.shape) else: coords = () # insert found indices into the original axis position coords = coords[: self.axis] + (indices,) + coords[self.axis :] return values, coords
@atomized def find_picks(cft, thresh, dim="last", state_dict=None): # TODO: state_dict => state """ Find picks in a data array along a given axis based on a given threshold. The pick findings use a triggering mechanism where triggers are turned on and off based on the threshold crossings. The trigger off threshold is half of the trigger on threshold. Picks are determined by finding the maximum value on each triggered region. Parameters ---------- cft : DataArray The characteristic function where picks must be found. thresh : float The threshold value for picking. dim : str, optional The dimension along which to find picks. Defaults to "last". state_dict : dict, optional The state dictionary. If not provided or if '...' is passed, a new buffer will be initialized. The state dictionary should have the following keys: - "buffer": A dictionary containing the previous buffer of the trigger. - "offset": An integer indicating the offset of the chunk. - "coord": An InterpCoordinate containing coordinate information along 'dim' up to the last processed chunk. Defaults to None. Returns ------- picks: DataFrame A DataFrame containing the pick coordinates and their corresponding values. state_dict: dict, optional The updated state dictionary containing the current buffer of the trigger. Can be used as the `state_dict` argument in the subsequent call to ensure continuity. If not provided (or ...), the state dictionary will not be returned. Notes ----- In the trigger does not turn off at the end of the array, the last pick will not be found. This can be fixed by appending a zero to the end of the array. Examples -------- >>> import numpy as np >>> import xdas as xd >>> from xdas.trigger import find_picks Use case: >>> cft = xd.DataArray( ... data=[[0.0, 0.1, 0.9, 0.8, 0.2, 0.1, 0.6, 0.7, 0.3, 0.2]], ... coords={ ... "space": [0.0], ... "time": {"tie_indices": [0, 9], "tie_values": [0.0, 9.0]}, ... }, ... ) Monolithic processing: >>> find_picks(cft, thresh=0.5, dim="time") space time value 0 0.0 2.0 0.9 1 0.0 7.0 0.7 Chunked processing using passing state: >>> chunks = xd.split(cft, 3, dim="time") >>> state_dict = ... >>> result = [] >>> for chunk in chunks: ... picks, state_dict = find_picks( ... chunk, thresh=0.5, dim="time", state_dict=state_dict ... ) ... result.append(picks) >>> result = pd.concat(result, ignore_index=True) >>> result space time value 0 0.0 2.0 0.9 1 0.0 7.0 0.7 Chunked processing using atomic processing: >>> atom = find_picks(..., thresh=0.5, dim="time", state_dict=...) >>> result = [] >>> for chunk in chunks: ... picks = atom(chunk, chunk_dim="time") ... result.append(picks) >>> result = pd.concat(result, ignore_index=True) >>> result space time value 0 0.0 2.0 0.9 1 0.0 7.0 0.7 """ axis = cft.get_axis_num(dim) data = cft.values return_state = state_dict is not None # initialize state if not provided if state_dict is None: state_dict = {"buffer": None, "offset": None, "coord": None} elif state_dict is ...: state_dict = { "buffer": ..., "offset": ..., "coord": Coordinate({"tie_indices": [], "tie_values": []}, dim), } # find pick indices and update state if return_state: ( indices, values, state_dict["buffer"], state_dict["offset"], ) = _find_picks_numeric( data, thresh, axis, buffer=state_dict["buffer"], offset=state_dict["offset"], ) state_dict["coord"] = _concat([state_dict["coord"], cft.coords[dim]]) else: indices, values = _find_picks_numeric(data, thresh, axis) state_dict["coord"] = cft.coords[dim] # get picks coordinates from indices and pack it into a dataframe picks = {} for a, d in enumerate(cft.dims): if d == dim: picks[d] = state_dict["coord"][indices[a]].values else: picks[d] = cft.coords[d][indices[a]].values picks["value"] = values picks = pd.DataFrame(picks) # return state if requested if return_state: return picks, state_dict else: return picks def _concat(list_of_coord): # TODO: make it a public function/method """ Concatenates a list of interpolated coordinates. Parameters ---------- list_of_coord : list A list of InterpCoordinate objects to be concatenated. Returns ------- InterpCoordinate The concatenated interpolated coordinate. Examples -------- >>> import xdas as xd >>> coord1 = xd.Coordinate( ... {"tie_indices": [0, 2], "tie_values": [10, 30]}, ... dim="dim", ... ) >>> coord2 = xd.Coordinate( ... {"tie_indices": [0, 3], "tie_values": [40, 70]}, ... dim="dim", ... ) >>> concatenated = _concat([coord1, coord2]) >>> concatenated.tie_indices array([0, 6]) >>> concatenated.tie_values array([10, 70]) >>> concatenated.dim 'dim' """ tie_indices = [] tie_values = [] idx = 0 dim = list_of_coord[0].dim for coord in list_of_coord: if not coord.isinterp(): raise ValueError("Only interpolated coordinates can be concatenated.") if not coord.dim == dim: raise ValueError("All coordinates must have the same dimension.") tie_indices.extend(idx + coord.tie_indices) tie_values.extend(coord.tie_values) idx += len(coord) coord = Coordinate({"tie_indices": tie_indices, "tie_values": tie_values}, dim) return coord.simplify() def _find_picks_numeric(cft, thresh, axis=-1, buffer=None, offset=None): """ Find picks in a N-dimensional array along a given axis based on a given threshold. The pick findings use a triggering mechanism where triggers are turned on and off based on the threshold crossings. The trigger off threshold is half of the trigger on threshold. Picks are determined by finding the maximum value on each triggered region. This function support chunked processing through the buffer and offset arguments. Parameters ---------- cft : ndarray The N-dimensional characteristic function where picks must be found. thresh : float The threshold value above which picks are looked for. axis : int, optional The axis along which to find picks. Defaults to -1. buffer : Ellipsis or dict, optional The buffer dictionary containing the previous buffer of the trigger. If not provided or if '...' is passed, a new buffer will be initialized. The buffer dictionary should have the following keys: - "status": A boolean array indicating the trigger status for each lane. - "index": An integer array indicating the index of the last triggered value for each lane. - "value": A float array indicating the value of the last triggered value for each lane. Each array must have the same shape as the input array without the axis along which the picks are being found. Defaults to None. offset : Ellipsis or int, optional The relative position of the processed chunk within the whole array. Used to get absolute indices when performing chunked computations. If not provided the indices will be given relative the the actual given array, meaning that picks that occurred before the provided chunk will have negative indices. Defaults to None. Returns ------- coords : tuple of 1d ndarray A tuple containing the coordinates of the picks. values : 1d ndarray The values of the picks. buffer : dict, optional The updated buffer dictionary containing the current buffer of the trigger. Can be used as the `buffer` argument in the subsequent call to ensure continuity. offset : int, optional The updated offset value. Can be used as the `offset` argument in the subsequent call to ensure correct absolute indices. Notes ----- If the trigger does not turn off at the end of the array, the last pick will not be found. This can be fixed by appending a zero to the end of the array. Examples -------- >>> import numpy as np >>> import xdas as xd >>> from xdas.trigger import _find_picks_numeric >>> cft = np.array([[0., 0.1, 0.9, 0.8, 0.2, 0.1, 0.6, 0.7, 0.3, 0.2]]) >>> _find_picks_numeric(cft, thresh=0.5, axis=-1) ((array([0, 0]), array([2, 7])), array([0.9, 0.7])) """ cft = np.asarray(cft, dtype=float) thresh = float(thresh) axis = cft.ndim + int(axis) if axis < 0 else int(axis) return_buffer = buffer is not None return_offset = offset is not None # move axis to last cft = np.moveaxis(cft, axis, -1) shape = cft.shape[:-1] length = cft.shape[-1] # if not provided initialize buffer variables, copy if provided if buffer is None or buffer is ...: buffer = { "status": np.zeros(shape, dtype=bool), "index": np.zeros(shape, dtype=int), "value": np.zeros(shape, dtype=float), } else: buffer = { "status": np.copy(buffer["status"]).astype(bool), "index": np.copy(buffer["index"]).astype(int), "value": np.copy(buffer["value"]).astype(float), } # if not provided initialize offset if offset is None or offset is ...: offset = 0 else: offset = int(offset) # group additional axes into a unique lanes axis cft = np.reshape(cft, (-1, cft.shape[-1])) buffer = {key: np.reshape(val, (-1,)) for key, val in buffer.items()} # find picks lanes, indices, values = _trigger( cft, thresh, thresh / 2.0, buffer["status"], buffer["index"], buffer["value"], offset, ) # handle offsetting if return_offset: offset += length else: buffer["index"] -= length # unravel lanes indices if shape: coords = np.unravel_index(lanes, shape) else: coords = () # insert found indices into the original axis position coords = coords[:axis] + (indices,) + coords[axis:] # reshape buffer back to original shape buffer = {key: np.reshape(val, shape) for key, val in buffer.items()} # return outputs out = (coords, values) if return_buffer: out = out + (buffer,) if return_offset: out = out + (offset,) return out @njit( "Tuple((i8[:], i8[:], f8[:]))(f8[:, :], f8, f8, b1[:], i8[:], f8[:], i8)", cache=True, ) def _trigger( # pragma: no cover cft, thresh_on, thresh_off, buffer_status, buffer_index, buffer_value, offset ): """ Perform trigger detection on the input data. Parameters ---------- cft : ndarray 2D array of shape (n, m) representing the input data. Each row is a lane. Each column is the signal onto perform trigger detection. thresh_on : float Threshold value for turning on the trigger. thresh_off : float Threshold value for turning off the trigger. buffer_status : ndarray Boolean buffer of shape (n,) holding the trigger status for each lane. buffer_index : ndarray Integer buffer of shape (n,) holding the index of the last found pick for each lane. buffer_value : ndarray Float buffer of shape (n,) holding the value of the last found pick for each lane. offset : int The offset to add to the found indices. Returns ------- tuple of ndarray A tuple containing three arrays of shape (k,) where k is the number of picks found. The arrays are: - lanes : lanes indices (along first axis) of the picks. - indices : signal indices (along last axis) of the picks. - values : values of the picks. """ lanes = [] indices = [] values = [] for (lane, index), value in np.ndenumerate(cft): index += offset if buffer_status[lane]: if value > buffer_value[lane]: buffer_index[lane] = index buffer_value[lane] = value if value < thresh_off: buffer_status[lane] = False lanes.append(lane) indices.append(buffer_index[lane]) values.append(buffer_value[lane]) else: if value > thresh_on: buffer_status[lane] = True buffer_index[lane] = index buffer_value[lane] = value return np.array(lanes), np.array(indices), np.array(values)