Processing larger-than-memory data#
Chunked processing: basic concepts#
Given the sheer size of DAS data, it is often impossible to process an entire data set directly in memory. Hence, chunked-based processing is a necessity that requires an additional layer of computational logistics. A naive approach to chunked processing would be to load a chunk of data, apply a Sequential pipeline to it (see Composing a processing sequence), and write the resulting data to disk. Assuming that disk I/O is the limiting factor, this scenario would leave the CPU mostly idle as it has to wait for new data to be read and processed data to be written to disk.
To maximise the pipeline throughput, xdas applies a staggered protocol of reading, processing, and writing data in parallel, as illustrated in the figure below:
With this approach, execution time is determined by the slowest of the three steps (reading, processing, writing) rather than by the sum of the three, a concept known as latency hiding. If, for example, reading and writing a chunk of data takes 2 seconds, and processing takes 1 second, then the total execution time per chunk is 2 seconds instead of 5.
A second feature of xdas, is that it automatically handles state updates and transfer. Many types of filters (e.g. recursive filters and STA/LTA algorithms) rely on some kind of memory of previously seen data, known as the state of the filter. The state of each filter needs to be preserved and transferred from one chunk to the next. Moreover, if the computation pipeline gets interrupted and needs to be restarted, the states need to be properly initialised for a seamless continuation. xdas offers optimised filters that handle state updates internally.
Example#
The following example shows how to apply a simple processing pipeline to a large dataset. First, build and validate the pipeline on a small in-memory subset:
import numpy as np
import xdas as xd
import xdas.signal as xs
from xdas.atoms import Sequential, Partial, LFilter
from xdas.processing import process, DataArrayLoader, DataArrayWriter
from scipy.signal import iirfilter
da = xd.synthetics.wavelet_wavefronts()
b, a = iirfilter(4, 0.1, btype="high")
atom = Sequential(
[
Partial(xs.decimate, 2, ftype="fir", dim="distance"),
LFilter(b, a, dim="time"),
Partial(np.square),
]
)
monolithic = atom(da)
Then apply the same pipeline chunk-by-chunk using process().
The DataArrayLoader splits the input into fixed-size chunks
along a given dimension, while DataArrayWriter collects and
writes each processed chunk to a directory on disk:
import os
os.makedirs("output", exist_ok=True)
dl = DataArrayLoader(da, chunks={"time": 100})
dw = DataArrayWriter("output")
chunked = process(atom, dl, dw)
assert chunked.equals(monolithic)
The result is identical to the monolithic run but can scale to datasets that do not fit in
memory. The loader and writer can be swapped for other variants — for example,
ZMQPublisher to stream results over a network (see
Streaming data).