Source code for ezmsg.sigproc.messages

import warnings
import time

import numpy.typing as npt
from ezmsg.util.messages.axisarray import AxisArray


# UPCOMING: TSMessage Deprecation
# TSMessage is deprecated because it doesn't handle multiple time axes well.
# AxisArray has an incompatible API but supports a superset of functionality.
warnings.warn(
    "TimeSeriesMessage/TSMessage is deprecated.  Please use ezmsg.utils.AxisArray",
    DeprecationWarning,
    stacklevel=2,
)


[docs] def TSMessage( data: npt.NDArray, fs: float = 1.0, time_dim: int = 0, timestamp: float | None = None, ) -> AxisArray: dims = [f"dim_{i}" for i in range(data.ndim)] dims[time_dim] = "time" offset = time.time() if timestamp is None else timestamp offset_adj = data.shape[time_dim] / fs # offset corresponds to idx[0] on time_dim axis = AxisArray.TimeAxis(fs, offset=offset - offset_adj) return AxisArray(data, dims=dims, axes=dict(time=axis))