"""
FFT functions that preserve :class:`DataArray` coordinates.
Includes :func:`fft`, :func:`ifft`, :func:`rfft`, :func:`irfft`,
:func:`fftfreq`, :func:`rfftfreq`.
"""
import numpy as np
from .atoms.core import atomized
from .coordinates.core import get_sampling_interval
from .core.dataarray import DataArray
from .parallel import parallelize
[docs]
@atomized
def fft(da, n=None, dim={"last": "spectrum"}, norm=None, parallel=None):
"""
Compute the discrete Fourier Transform along a given dimension.
This function computes the one-dimensional n-point discrete Fourier Transform (DFT)
with the efficient Fast Fourier Transform (FFT) algorithm.
Parameters
----------
da: DataArray
The data array to process, can be complex.
n: int, optional
Length of transformed dimension of the output. If `n` is smaller than the
length of the input, the input is cropped. If it is larger, the input is
padded with zeros. If `n` is not given, the length of the input along the
dimension specified by `dim` is used.
dim: {str: str}, optional
A mapping indicating as a key the dimension along which to compute the FFT, and
as value the new name of the dimension. Default to {"last": "spectrum"}.
norm: {“backward”, “ortho”, “forward”}, optional
Normalization mode (see `numpy.fft`). Default is "backward". Indicates which
direction of the forward/backward pair of transforms is scaled and with what
normalization factor.
Returns
-------
DataArray:
The transformed input with an updated dimension name and values.
Notes
-----
To perform a multidimensional Fourier operations, repeat this function on the
desired dimensions.
Examples
--------
>>> import xdas as xd
>>> import xdas.fft as xfft
>>> signal = xd.DataArray([0., 1., 0., -1.], coords={"time": [0, 1, 2, 3]})
>>> xfft.fft(signal, dim={"time": "frequency"})
<xdas.DataArray (frequency: 4)>
[0.+0.j 0.+2.j 0.+0.j 0.-2.j]
Coordinates:
* frequency (frequency): [-0.5 ... 0.25]
"""
((olddim, newdim),) = dim.items()
olddim = da.dims[da.get_axis_num(olddim)]
if n is None:
n = da.sizes[olddim]
axis = da.get_axis_num(olddim)
d = get_sampling_interval(da, olddim)
f = np.fft.fftshift(np.fft.fftfreq(n, d))
def func(x):
return np.fft.fftshift(np.fft.fft(x, n, axis, norm), axis)
across = int(axis == 0)
func = parallelize(across, across, parallel)(func)
data = func(da.values)
coords = {
newdim if name == olddim else name: f if name == olddim else da.coords[name]
for name in da.coords
if (da[name].dim != olddim or name == olddim)
}
dims = tuple(newdim if dim == olddim else dim for dim in da.dims)
return DataArray(data, coords, dims, da.name, da.attrs)
[docs]
@atomized
def rfft(da, n=None, dim={"last": "spectrum"}, norm=None, parallel=None):
"""
Compute the discrete Fourier Transform for real inputs along a given dimension.
This function computes the one-dimensional n-point discrete Fourier Transform (DFT)
or real-valued inputs with the efficient Fast Fourier Transform (FFT) algorithm.
Parameters
----------
da: DataArray
The data array to process, can be complex.
n: int, optional
Length of transformed dimension of the output. If `n` is smaller than the
length of the input, the input is cropped. If it is larger, the input is
padded with zeros. If `n` is not given, the length of the input along the
dimension specified by `dim` is used.
dim: {str: str}, optional
A mapping indicating as a key the dimension along which to compute the FFT, and
as value the new name of the dimension. Default to {"last": "spectrum"}.
norm: {“backward”, “ortho”, “forward”}, optional
Normalization mode (see `numpy.fft`). Default is "backward". Indicates which
direction of the forward/backward pair of transforms is scaled and with what
normalization factor.
Returns
-------
DataArray:
The transformed input with an updated dimension name and values. The length of
the transformed dimension is (n/2)+1 if n is even or (n+1)/2 if n is odd.
Notes
-----
To perform a multidimensional Fourier operations, repeat this function on the
desired dimensions.
Examples
--------
>>> import xdas as xd
>>> import xdas.fft as xfft
>>> signal = xd.DataArray([0., 1., 0., -1.], coords={"time": [0, 1, 2, 3]})
>>> xfft.rfft(signal, dim={"time": "frequency"})
<xdas.DataArray (frequency: 3)>
[0.+0.j 0.-2.j 0.+0.j]
Coordinates:
* frequency (frequency): [0. ... 0.5]
"""
((olddim, newdim),) = dim.items()
olddim = da.dims[da.get_axis_num(olddim)]
if n is None:
n = da.sizes[olddim]
axis = da.get_axis_num(olddim)
d = get_sampling_interval(da, olddim)
across = int(axis == 0)
func = parallelize(across, across, parallel)(np.fft.rfft)
f = np.fft.rfftfreq(n, d)
data = func(da.values, n, axis, norm)
coords = {
newdim if name == olddim else name: f if name == olddim else da.coords[name]
for name in da.coords
if (da[name].dim != olddim or name == olddim)
}
dims = tuple(newdim if dim == olddim else dim for dim in da.dims)
return DataArray(data, coords, dims, da.name, da.attrs)
[docs]
@atomized
def ifft(da, n=None, dim={"last": "signal"}, norm=None, parallel=None):
"""
Compute the inverse of `fft`.
Parameters
----------
da: DataArray
The data array to process, should be complex.
n: int, optional
Length of transformed dimension of the output. If `n` is smaller than the
length of the input, the input is cropped. If it is larger, the input is
padded with zeros. If `n` is not given, the length of the input along the
dimension specified by `dim` is used.
dim: {str: str}, optional
A mapping indicating as a key the dimension along which to compute the IFFT, and
as value the new name of the dimension. Default to {"last": "signal"}.
norm: {“backward”, “ortho”, “forward”}, optional
Normalization mode (see `numpy.fft`). Default is "backward". Indicates which
direction of the forward/backward pair of transforms is scaled and with what
normalization factor.
Returns
-------
DataArray:
The transformed input with an updated dimension name and values.
Notes
-----
To perform a multidimensional inverse Fourier operations, repeat this function on
the desired dimensions.
Examples
--------
>>> import xdas as xd
>>> import xdas.fft as xfft
>>> signal = xd.DataArray([0., 1., 0., -1.], coords={"time": [0, 1, 2, 3]})
>>> spectrum = xfft.fft(signal, dim={"time": "frequency"})
>>> result = xfft.ifft(spectrum, dim={"frequency": "time"})
>>> result["time"] = signal["time"] # to match time coordinates
>>> assert np.real(result).equals(signal)
"""
((olddim, newdim),) = dim.items()
olddim = da.dims[da.get_axis_num(olddim)]
if n is None:
n = da.sizes[olddim]
axis = da.get_axis_num(olddim)
d = get_sampling_interval(da, olddim)
f = np.fft.ifftshift(np.fft.fftfreq(n, d))
def func(x):
return np.fft.ifft(np.fft.ifftshift(x, axis), n, axis, norm)
across = int(axis == 0)
func = parallelize(across, across, parallel)(func)
data = func(da.values)
coords = {
newdim if name == olddim else name: f if name == olddim else da.coords[name]
for name in da.coords
if (da[name].dim != olddim or name == olddim)
}
dims = tuple(newdim if dim == olddim else dim for dim in da.dims)
return DataArray(data, coords, dims, da.name, da.attrs)
[docs]
@atomized
def irfft(da, n=None, dim={"last": "signal"}, norm=None, parallel=None):
"""
Compute the inverse of `rfft`.
Parameters
----------
da: DataArray
The data array to process, can be complex.
n : int, optional
Length of the transformed dimension of the output. For `n` output points,
``n//2+1`` input points are necessary. If the input is longer than this, it is
cropped. If it is shorter than this, it is padded with zeros. If `n` is not
given, it is taken to be ``2*(m-1)`` where ``m`` is the length of the input
along the dimension specified by `dim`.
dim: {str: str}, optional
A mapping indicating as a key the dimension along which to compute the FFT, and
as value the new name of the dimension. Default to {"last": "time"}.
norm: {“backward”, “ortho”, “forward”}, optional
Normalization mode (see `numpy.fft`). Default is "backward". Indicates which
direction of the forward/backward pair of transforms is scaled and with what
normalization factor.
Returns
-------
DataArray:
The truncated or zero-padded input, transformed along the dimension indicated
by `dim`, or the last one if `dim` is not specified. The length of the
transformed dimension is `n`, or, if `n` is not given, ``2*(m-1)`` where ``m``
is the length of the transformed dimension of the input. To get an odd number
of output points, `n` must be specified.
Notes
-----
To perform a multidimensional Fourier operations, repeat this function on the
desired dimensions.
Examples
--------
>>> import xdas as xd
>>> import xdas.fft as xfft
>>> signal = xd.DataArray([0., 1., 0., -1.], coords={"time": [0, 1, 2, 3]})
>>> spectrum = xfft.rfft(signal, dim={"time": "frequency"})
>>> result = xfft.irfft(
... spectrum,
... n=signal.sizes["time"], # ensure correct output if n is odd
... dim={"frequency": "time"},
... )
>>> result["time"] = signal["time"] # to match time coordinates
>>> assert np.real(result).equals(signal)
"""
((olddim, newdim),) = dim.items()
olddim = da.dims[da.get_axis_num(olddim)]
if n is None:
n = (da.sizes[olddim] - 1) * 2
axis = da.get_axis_num(olddim)
d = get_sampling_interval(da, olddim)
across = int(axis == 0)
func = parallelize(across, across, parallel)(np.fft.irfft)
f = np.fft.fftshift(np.fft.fftfreq(n, d))
data = func(da.values, n, axis, norm)
coords = {
newdim if name == olddim else name: f if name == olddim else da.coords[name]
for name in da.coords
if (da[name].dim != olddim or name == olddim)
}
dims = tuple(newdim if dim == olddim else dim for dim in da.dims)
return DataArray(data, coords, dims, da.name, da.attrs)