Source code for xdas.synthetics

"""
Synthetic DAS data generators used in doctests and test fixtures.

Includes :func:`wavelet_wavefronts` and :func:`randn_wavefronts`.
"""

import numpy as np
import scipy.signal as sp

from .core.dataarray import DataArray
from .core.routines import split


[docs] def wavelet_wavefronts( *, starttime="2023-01-01T00:00:00", resolution=(np.timedelta64(20, "ms"), 25.0), nchunk=None, ): """ Generate a synthetic DAS :class:`DataArray` with wavelet wavefronts. Parameters ---------- starttime : str, optional The starttime of the data, parsed by ``np.datetime64(starttime)``. Default is ``"2023-01-01T00:00:00"``. resolution : (timedelta64, float), optional The temporal and spatial sampling intervals. Default is ``(np.timedelta64(20, "ms"), 25.0)``. nchunk : int, optional If provided, splits the result into ``nchunk`` chunks and returns a list of DataArrays instead of a single DataArray. Examples -------- >>> import os >>> import xdas as xd >>> from xdas.synthetics import wavelet_wavefronts >>> from tempfile import TemporaryDirectory >>> with TemporaryDirectory() as dirpath: ... wavelet_wavefronts().to_netcdf(os.path.join(dirpath, "sample.nc")) ... for idx, da in enumerate(wavelet_wavefronts(nchunk=3), start=1): ... da.to_netcdf(os.path.join(dirpath, f"{idx:03}.nc")) ... da_monolithic = xd.open(os.path.join(dirpath, "sample.nc")) ... da_chunked = xd.open(os.path.join(dirpath, "00*.nc")) ... da_monolithic.equals(da_chunked) True """ # ensure reporducibility np.random.seed(42) # sampling starttime = np.datetime64(starttime).astype("datetime64[ns]") span = (np.timedelta64(6, "s"), 10000.0) # (6 s, 10 km) shape = (span[0] // resolution[0], int(span[1] // resolution[1]) + 1) t = np.arange(shape[0]) * resolution[0] / np.timedelta64(1, "s") # time values [s] s = np.arange(shape[1]) * resolution[1] # distance values [m] # physical parameters snr = 10 # signal to noise ration vp = 4000 # P-wave speed [m/s] vs = vp / 1.75 # S-wave speed [m/s] xc = 5_000 # source distance along [m] fc = 10.0 # source central frequency [Hz] t0 = 1.0 # source origin time relative to start of file [s] d = np.hypot(xc, (s - np.mean(s))) # channel distance to source [m] ttp = d / vp # P-wave travel time [s] tts = d / vs # S-wave travel time [s] t_col = t[:, np.newaxis] data = sp.gausspulse(t_col - ttp - t0, fc) / 2 # P is twice weaker data += sp.gausspulse(t_col - tts - t0, fc) data /= np.max(np.abs(data), axis=0, keepdims=True) # normalize data += np.random.randn(*shape) / snr # add noise # strain rate like response data = np.diff(data, prepend=0, axis=-1) data = np.diff(data, prepend=0, axis=0) # pack data and coordinates as DataArray or DataCollection if chunking. da = DataArray( data=data, coords={ "time": { "tie_indices": [0, shape[0] - 1], "tie_values": [starttime, starttime + resolution[0] * (shape[0] - 1)], }, "distance": { "tie_indices": [0, shape[1] - 1], "tie_values": [0.0, resolution[1] * (shape[1] - 1)], }, }, ) if nchunk is not None: return split(da, nchunk) else: return da
[docs] def randn_wavefronts(): """ Generate a large random-noise synthetic DAS :class:`DataArray`. Returns a 200 s × 100 km array (10 Hz temporal, 100 m spatial sampling) with step-onset noise bursts simulating P- and S-wave arrivals from a single source located 20 km off-axis. Returns ------- DataArray Synthetic DAS data with ``time`` and ``distance`` coordinates. """ # ensure reporducibility np.random.seed(42) # sampling resolution = (np.timedelta64(10, "ms"), 100.0) starttime = np.datetime64("2024-01-01T00:00:00").astype("datetime64[ns]") span = (np.timedelta64(200, "s"), 100000.0) # (200 s, 100 km) shape = (span[0] // resolution[0], int(span[1] // resolution[1]) + 1) t = np.arange(shape[0]) * resolution[0] / np.timedelta64(1, "s") # time values [s] s = np.arange(shape[1]) * resolution[1] # distance values [m] # physical parameters snr = 20 # signal to noise ratio vp = 4000 # P-wave speed [m/s] vs = vp / 1.75 # S-wave speed [m/s] xc = 20_000 # source distance along [m] t0 = 30.0 # source origin time relative to start of file [s] d = np.hypot(xc, (s - np.mean(s))) # channel distance to source [m] ttp = d / vp # P-wave travel time [s] tts = d / vs # S-wave travel time [s] data = np.zeros(shape) for k in range(shape[1]): data[:, k] += (t > (t0 + ttp[k])) * np.random.randn(shape[0]) / 2 data[:, k] += (t > (t0 + tts[k])) * np.random.randn(shape[0]) data /= np.max(np.abs(data), axis=0, keepdims=True) # normalize data += np.random.randn(*shape) / snr # add noise # pack data and coordinates as Database or DataCollection if chunking. da = DataArray( data=data, coords={ "time": { "tie_indices": [0, shape[0] - 1], "tie_values": [starttime, starttime + resolution[0] * (shape[0] - 1)], }, "distance": { "tie_indices": [0, shape[1] - 1], "tie_values": [0.0, resolution[1] * (shape[1] - 1)], }, }, ) return da
[docs] def dummy(shape=(1000, 100)): """ Return a minimal random :class:`DataArray` for quick testing. Parameters ---------- shape : tuple of int, optional ``(n_time, n_distance)`` shape. Defaults to ``(1000, 100)``. Returns ------- DataArray DataArray filled with Gaussian noise, sampled at 10 Hz over ``[0, 1000]`` m with ``time`` starting at 2024-01-01. """ starttime = np.datetime64("2024-01-01T00:00:00.000000000") endtime = starttime + (shape[0] - 1) * np.timedelta64(100, "ms") time = {"tie_indices": [0, shape[0] - 1], "tie_values": [starttime, endtime]} distance = {"tie_indices": [0, shape[1] - 1], "tie_values": [0.0, 1000.0]} return DataArray( data=np.random.randn(*shape), coords={ "time": time, "distance": distance, }, )