Source code for xdas.processing.core

"""
Core processing infrastructure for chunked pipeline execution.

Includes :class:`DataArrayLoader`, :class:`DataArrayWriter`,
:class:`DataFrameWriter`, :class:`StreamWriter`, :class:`ZMQPublisher`,
:class:`ZMQSubscriber`, :class:`RealTimeLoader`, and :func:`process`.
"""

import os
from concurrent.futures import ThreadPoolExecutor
from glob import glob
from pathlib import Path
from queue import Queue
from tempfile import TemporaryDirectory

import numpy as np
import obspy
import pandas as pd
import zmq
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

from ..core.dataarray import DataArray
from ..core.routines import concat, open_dataarray
from .monitor import Monitor


[docs] def process(atom, data_loader, data_writer): """ Execute a chunked processing pipeline. Parameters ---------- atom : callable The atomic operation to execute on each chunk of data. data_loader : DataArrayLoader The data loader object that provides the chunks of data. data_writer : DataArrayWriter The data writer object that writes the processed data. Returns ------- result : object The result of the processing pipeline. Notes ----- This function executes a chunked processing pipeline by ingesting the data from the `data_loader` and flushing the processed data through the `data_writer`. It iterates over the chunks of data provided by the `data_loader`, applies the `atom` function to each chunk, and writes the processed data using the `data_writer`. The progress of the processing is monitored using a `Monitor` object. """ if hasattr(atom, "reset"): atom.reset() if hasattr(data_loader, "nbytes"): total = data_loader.nbytes else: total = None monitor = Monitor(total=total) monitor.tic("read") for chunk in data_loader: monitor.tic("proc") result = atom(chunk, chunk_dim=data_loader.chunk_dim) monitor.tic("write") data_writer.write(result) monitor.toc(chunk.nbytes) monitor.tic("read") monitor.close() return data_writer.result()
[docs] class DataArrayLoader: """ A class to handle data chunked data ingestion. To optimize I/O latencies, chunks are loaded before they are used asynchronously in a buffer as soon as the iterator is created. Parameters ---------- da : ``DataArray`` The (virtual) DataArray that contains the data to be chunked chunks : dict The sizes of the chunks along each dimension. Needs to be of the form: ``{"dim": int}``. The key correspond with the dimension (usually "time"), and the value is an integer indicating the size of the chunk (in samples) along that dimension. max_buffers : int, default=1 The maximum number of chunks to load into memory at the same time. max_workers : int, default=1 The maximum number of thread used to load the chunks. Examples -------- >>> import xdas as xd >>> from xdas.processing import DataArrayLoader >>> da = xd.open_dataarray(...) # doctest: +SKIP Create chunks along the time dimension >>> chunks = {"time": 1000} >>> dl = DataArrayLoader(da, chunks) # doctest: +SKIP Iterate over the chunks >>> for chunk in dl: ... process(chunk) # doctest: +SKIP """
[docs] def __init__(self, da, chunks, max_buffers=1, max_workers=1): if not isinstance(da, DataArray): raise TypeError(f"`da` must by a DataArray object, not a {type(da)}") if not (isinstance(chunks, dict) and len(chunks) == 1): raise TypeError( "`chunks` must be a dict that maps a unique " "dimension to a unique size: {'dim': int}" ) ((chunk_dim, chunk_size),) = chunks.items() chunk_dim = str(chunk_dim) chunk_size = int(chunk_size) if chunk_dim not in da.dims: raise ValueError( f"chunking dimension {chunk_dim} not found in `da` dimensions {da.dims}" ) if chunk_size > da.sizes[chunk_dim]: raise ValueError( f"chunking size {chunk_size} is greater than `da` " f"size {da.sizes[chunk_dim]} along dim {chunk_dim}" ) self.da = da self.chunk_dim = chunk_dim self.chunk_size = chunk_size self.max_buffers = max_buffers self.max_workers = max_workers
def __len__(self): div, mod = divmod(self.da.sizes[self.chunk_dim], self.chunk_size) return div if mod == 0 else div + 1 def _get_chunk(self, idx): start = idx * self.chunk_size end = (idx + 1) * self.chunk_size query = { dim: slice(start, end) if dim == self.chunk_dim else slice(None) for dim in self.da.dims } return self.da[query].load() def __iter__(self): with ThreadPoolExecutor(self.max_workers) as executor: it = iter(range(len(self))) futures = [] try: for _ in range(self.max_buffers): futures.append(executor.submit(self._get_chunk, next(it))) except StopIteration: pass while futures: future = futures.pop(0) result = future.result() try: futures.append(executor.submit(self._get_chunk, next(it))) except StopIteration: pass yield result @property def nbytes(self): """Total bytes of the underlying :class:`DataArray`.""" return self.da.nbytes
[docs] class RealTimeLoader(Observer): """ Real-time :class:`DataArray` loader that watches a directory for new files. Parameters ---------- path : str or Path Directory to watch. engine : str, optional Engine used to open arriving files. Defaults to ``"netcdf"``. """
[docs] def __init__(self, path, engine="netcdf"): super().__init__() self.path = str(path) if isinstance(path, Path) else path self.queue = Queue() self.handler = Handler(self.queue, engine) self.schedule(self.handler, self.path, recursive=True) self.start()
def __iter__(self): return self def __next__(self): chunk = self.queue.get() if chunk is None: raise StopIteration else: return chunk
class Handler(FileSystemEventHandler): """Watchdog event handler that loads closed files into a queue.""" def __init__(self, queue, engine): self.engine = engine self.queue = queue def on_closed(self, event): """Load the newly-closed file and place it in the queue.""" da = open_dataarray(event.src_path, engine=self.engine) self.queue.put(da.load())
[docs] class DataArrayWriter: """ A class to handle chunked data egress. Parameters ---------- dirpath : str or Path The directory to store the output of a processing pipeline. The directory needs to exist and be empty. encoding : dict The encoding to use when dumping the DataArrays to bytes. max_buffers : int, default=1 The maximum number of chunks to load into memory at the same time. max_workers : int, default=1 The maximum number of thread used to load the chunks. create_dirs : bool, optional Whether to create parent directories if they do not exist. Default is False. Examples -------- >>> import xdas as xd >>> import xdas.processing as xp >>> expected = xd.DataArray(np.random.rand(1000, 100), dims=("time", "distance")) >>> dw = DataArrayWriter("some_path") # doctest: +SKIP >>> for chunk in chunks: ... dw.submit(chunk) # doctest: +SKIP >>> result = dw.result # doctest: +SKIP >>> assert result.equals(expected) # doctest: +SKIP """
[docs] def __init__( self, dirpath, encoding=None, max_buffers=1, max_workers=1, create_dirs=False ): dirpath = str(dirpath) if isinstance(dirpath, Path) else dirpath if create_dirs: os.makedirs(dirpath, exist_ok=True) if not os.path.exists(dirpath): raise OSError(f"no directory {dirpath}") self.dirpath = dirpath self.encoding = encoding self.max_buffers = max_buffers self.max_workers = max_workers self._executor = ThreadPoolExecutor(self.max_workers) self._futures = [] self._results = [] self._count = 0
[docs] def submit(self, chunk): """ Asynchronously write *chunk* to disk and register the path for later concat. Parameters ---------- chunk : DataArray Processed data chunk to persist. """ if not isinstance(chunk, DataArray): raise TypeError(f"`chunk` must by a DataArray object, not a {type(chunk)}") if not len(self._futures) < self.max_buffers: future = self._futures.pop(0) result = future.result() self._results.append(result) self._futures.append(self._executor.submit(self._write, chunk, self._count)) self._count += 1
[docs] def write(self, chunk): """Alias for :meth:`submit`.""" return self.submit(chunk)
def _write(self, chunk, count): path = os.path.join(self.dirpath, f"{count:09d}") chunk.to_netcdf(path, encoding=self.encoding) return open_dataarray(path)
[docs] def shutdown(self): """Shut down the internal thread pool.""" self._executor.shutdown()
[docs] def result(self): """Flush all pending writes and return the concatenated :class:`DataArray`.""" while self._futures: future = self._futures.pop(0) result = future.result() self._results.append(result) self.shutdown() return concat(self._results)
[docs] class DataFrameWriter: """ A class for writing pandas DataFrames to a CSV file asynchronously. Parameters ---------- path : str The path to the csv file. parse_dates : bool, int, optional Whether to parse dates when reopening the csv file at the end of the process create_dirs : bool, optional Whether to create parent directories if they do not exist. Default is False. Examples -------- >>> import pandas as pd >>> import xdas.processing as xp >>> dw = xp.DataFrameWriter("output.csv") >>> for df in dfs: ... dw.submit(dfs). # doctest: +SKIP >>> result = dw.result() # doctest: +SKIP >>> expected = pd.concat(dfs, ignore_index=True) # doctest: +SKIP >>> assert result.equals(expected) # doctest: +SKIP """
[docs] def __init__(self, path, parse_dates=None, create_dirs=False): dirpath = os.path.dirname(path) if create_dirs: if dirpath: os.makedirs(dirpath, exist_ok=True) if dirpath and not os.path.exists(dirpath): raise OSError(f"no directory {dirpath}") self.path = str(path) if isinstance(path, Path) else path self.parse_dates = parse_dates self._executor = ThreadPoolExecutor(1) self._future = None
def submit(self, df): """ Asynchronously append *df* to the CSV file. Parameters ---------- df : pandas.DataFrame DataFrame chunk to write. """ if not isinstance(df, pd.DataFrame): raise TypeError(f"`df` must by a DataFrame object, not a {type(df)}") if self._future is not None: self._future.result() self._future = self._executor.submit(self._write, df) def write(self, df): """Alias for :meth:`submit`.""" return self.submit(df) def _write(self, df): if df is not None: # pragma: no branch if not os.path.exists(self.path): df.to_csv(self.path, mode="w", header=True, index=False) else: df.to_csv(self.path, mode="a", header=False, index=False) def shutdown(self): """Shut down the internal thread pool.""" self._executor.shutdown() def result(self): """Flush pending writes and return the full CSV as a :class:`pandas.DataFrame`.""" self._future.result() self.shutdown() try: return pd.read_csv(self.path, parse_dates=self.parse_dates) except pd.errors.EmptyDataError: return pd.DataFrame()
[docs] class StreamWriter: """ A class for writing obspy Streams to miniseed files asynchronously. Parameters ---------- path : str The path of the miniseed file or the folder name where the miniseed files will be written. dataquality : str Data quality of the waveforms. kw_merge : dict Keyword arguments for merging the Streams, following the arguments of the obspy.core.stream.Stream.merge function. kw_write : dict Keyword arguments for writing the Streams, following the arguments of the obspy.core.stream.Stream.write function. output_format : str The output format of the miniseed files. Can be "flat" or "SDS". If "flat", the miniseed files will be written in a single file. If "SDS", the miniseed files will be written in the SDS file structure. For more information about SDS see: https://www.seiscomp.de/seiscomp3/doc/applications/slarchive/SDS.html Examples -------- >>> import obspy >>> import numpy as np >>> import xdas as xd >>> import xdas.processing as xp Generate some DataArray: >>> data = np.random.randint( ... low=-1000, high=1000, size=(1000, 10), dtype=np.int32 ... ) >>> starttime = np.datetime64("2023-01-01T00:00:00") >>> endtime = starttime + np.timedelta64(10, "ms") * (data.shape[0] - 1) >>> distance = 5.0 * np.arange(data.shape[1]) >>> da = xd.DataArray( ... data=data, ... coords={ ... "time": { ... "tie_indices": [0, data.shape[0] - 1], ... "tie_values": [starttime, endtime], ... }, ... "distance": distance, ... }, ... ) StreamWriter works great with the `DataArray.to_stream` method that can be used as an atom like this: >>> atom = lambda da, **kwargs: da.to_stream( ... network="NT", ... station="ST{:03}", ... channel="HN1", ... location="00", ... dim={"distance": "time"}, ... ) >>> data_loader = xp.DataArrayLoader(da, chunks={"time": 100}) This is how a StreamWriter can be used to write the data to a miniseed file: >>> kw_merge = {"method": 1} >>> kw_write = {"reclen": 4096} >>> data_writer = xp.StreamWriter( ... "some_directory", "M", kw_merge, kw_write, output_format="SDS" ... ) >>> result = xp.process(atom, data_loader, data_writer) The data will be written to the SDS file structure in the specified directory. >>> st = obspy.read("some_directory/2023/NT/*/HN1.D/NT.*.00.HN1.D.2023.001") Clean up: >>> import shutil >>> shutil.rmtree("some_directory") """
[docs] def __init__( self, path, dataquality, kw_merge=None, kw_write=None, output_format="SDS" ): path = str(path) if isinstance(path, Path) else path if output_format == "SDS": os.makedirs(path, exist_ok=True) self.dirpath = path self.fname = None elif output_format == "flat": head, tail = os.path.split(path) if not os.path.exists(head): raise OSError(f"The directory {head} does not exist.") self.dirpath = head self.fname = tail else: raise ValueError( "output_format must be either 'SDS' or 'flat'. " f"Got {output_format} instead." ) self.dataquality = dataquality self.kw_merge = kw_merge if kw_merge is not None else {} self.kw_write = kw_write if kw_write is not None else {} self.output_format = output_format self._executor = ThreadPoolExecutor(1) self._future = None
def _to_SDS(self, st): for tr in st: new_st = obspy.Stream() new_st += tr new_st = new_st[0].split() for new_tr in new_st: if isinstance(new_tr.data, np.ma.masked_array): # pragma: no cover new_tr.data = new_tr.data.filled() new_tr.stats.mseed["dataquality"] = self.dataquality year = new_st[0].stats.starttime.year network = new_st[0].stats.network station = new_st[0].stats.station channel = new_st[0].stats.channel location = new_st[0].stats.location julday = new_st[0].stats.starttime.julday dirpath = os.path.join( self.dirpath, str(year), network, station, channel + ".D" ) os.makedirs(dirpath, exist_ok=True) fname = f"{network}.{station}.{location}.{channel}.D.{year}.{julday:03d}" sds_path = os.path.join(dirpath, fname) new_st.write(sds_path, format="MSEED", **self.kw_write) def _to_flat(self, st): new_st = obspy.Stream() for tr in st: tmp_st = obspy.Stream() tmp_st += tr tmp_st = tmp_st[0].split() for new_tr in tmp_st: if isinstance(new_tr.data, np.ma.masked_array): # pragma: no cover new_tr.data = new_tr.data.filled() new_st += new_tr new_st.write(os.path.join(self.dirpath, self.fname), **self.kw_write) def submit(self, st): """ Asynchronously write *st* to a temporary MiniSEED file. Parameters ---------- st : obspy.Stream Stream chunk to persist. """ if not isinstance(st, obspy.Stream): raise TypeError(f"`st` must by a DataFrame object, not a {type(st)}") if self._future is not None: self._future.result() self._future = self._executor.submit(self._write, st) def write(self, st): """Alias for :meth:`submit`.""" return self.submit(st) def _write(self, st): st.write(f"{self.dirpath}/{st[0].stats.starttime}_tmp.mseed", **self.kw_write) def shutdown(self): """Shut down the internal thread pool.""" self._executor.shutdown() def result(self): """Merge all temporary MiniSEED files and write the final output.""" self._future.result() self.shutdown() pattern = f"{self.dirpath}/*_tmp.mseed" out = obspy.read(pattern) out = out.merge(**self.kw_merge) if self.output_format == "flat": self._to_flat(out) elif self.output_format == "SDS": # pragma: no branch self._to_SDS(out) files_to_remove = glob(pattern) for file in files_to_remove: os.remove(file) return out
[docs] class ZMQPublisher: """ A class for publishing DataArray chunks over ZeroMQ. Parameters ---------- address : str The address to bind the publisher to. encoding : dict The encoding to use when dumping the DataArrays to bytes. Examples -------- >>> import xdas as xd >>> from xdas.processing import ZMQPublisher, ZMQSubscriber First we generate some data and split it into packets >>> packets = xd.split(xd.synthetics.dummy(), 10) We initialize the publisher at a given address >>> address = f"tcp://localhost:{xd.io.get_free_port()}" >>> publisher = ZMQPublisher(address) We can then publish the packets >>> for da in packets: ... publisher.submit(da) To reduce the size of the packets, we can also specify an encoding >>> import hdf5plugin >>> address = f"tcp://localhost:{xd.io.get_free_port()}" >>> encoding = {"chunks": (10, 10), **hdf5plugin.Zfp(accuracy=1e-6)} >>> publisher = ZMQPublisher(address, encoding) >>> for da in packets: ... publisher.submit(da) """
[docs] def __init__(self, address, encoding=None): self.address = address self.encoding = encoding self._context = zmq.Context() self._socket = self._context.socket(zmq.PUB) self._socket.bind(self.address)
[docs] def submit(self, da): """ Send a DataArray over ZeroMQ. Parameters ---------- da : DataArray The DataArray to be sent. """ self._socket.send(tobytes(da, self.encoding))
[docs] def write(self, da): """Alias for :meth:`submit`.""" self.submit(da)
[docs] def result(self): """Return ``None`` — ZMQPublisher has no aggregated result.""" return None
[docs] class ZMQSubscriber: """ A class for subscribing to DataArray chunks over ZeroMQ. Parameters ---------- address : str The address to connect the subscriber to. Methods ------- submit(da) Send a DataArray over ZeroMQ. Examples -------- >>> import threading >>> import xdas as xd >>> from xdas.processing import ZMQSubscriber First we generate some data and split it into packets >>> da = xd.synthetics.dummy() >>> packets = xd.split(da, 10) We then publish the packets asynchronously >>> address = f"tcp://localhost:{xd.io.get_free_port()}" >>> publisher = ZMQPublisher(address) >>> def publish(): ... for packet in packets: ... publisher.submit(packet) >>> threading.Thread(target=publish).start() Now let's receive the packets >>> subscriber = ZMQSubscriber(address) >>> packets = [] >>> for n, da in enumerate(subscriber, start=1): ... packets.append(da) ... if n == 10: ... break >>> da = xd.concat(packets) >>> assert da.equals(da) """
[docs] def __init__(self, address): self.address = address self._context = zmq.Context() self._socket = self._context.socket(zmq.SUB) self._socket.connect(address) self._socket.setsockopt_string(zmq.SUBSCRIBE, "")
def __iter__(self): return self def __next__(self): message = self._socket.recv() return frombuffer(message)
def tobytes(da, encoding=None): """ Serialise *da* to raw NetCDF4 bytes via a temporary file. Parameters ---------- da : DataArray DataArray to serialise. encoding : dict, optional HDF5/NetCDF4 encoding options forwarded to :meth:`DataArray.to_netcdf`. Returns ------- bytes """ with TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, "tmp.nc") da.to_netcdf(path, virtual=False, encoding=encoding) with open(path, "rb") as file: return file.read() def frombuffer(da): """ Deserialise raw NetCDF4 *da* bytes into a loaded :class:`DataArray`. Parameters ---------- da : bytes Raw bytes as produced by :func:`tobytes`. Returns ------- DataArray """ with TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, "tmp.nc") with open(path, "wb") as file: file.write(da) return open_dataarray(path).load()