Source code for ezmsg.sigproc.detrend

"""Remove linear or constant trends from data along an axis."""

import ezmsg.core as ez
import scipy.signal as sps
from ezmsg.baseproc import BaseTransformerUnit
from ezmsg.util.messages.axisarray import AxisArray, replace

from ezmsg.sigproc.ewma import EWMASettings, EWMATransformer


[docs] class DetrendTransformer(EWMATransformer): """ Detrend the data using an exponentially weighted moving average (EWMA) estimate of the mean. """ def _process(self, message): axis = self.settings.axis or message.dims[0] axis_idx = message.get_axis_idx(axis) if self.settings.accumulate: means, self._state.zi = sps.lfilter( [self._state.alpha], [1.0, self._state.alpha - 1.0], message.data, axis=axis_idx, zi=self._state.zi, ) else: means, _ = sps.lfilter( [self._state.alpha], [1.0, self._state.alpha - 1.0], message.data, axis=axis_idx, zi=self._state.zi, ) return replace(message, data=message.data - means)
[docs] class DetrendUnit(BaseTransformerUnit[EWMASettings, AxisArray, AxisArray, DetrendTransformer]): SETTINGS = EWMASettings
[docs] @ez.subscriber(BaseTransformerUnit.INPUT_SETTINGS) async def on_settings(self, msg: EWMASettings) -> None: """ Handle settings updates with smart reset behavior. Only resets state if `axis` changes (structural change). Changes to `time_constant` or `accumulate` are applied without resetting accumulated state. """ old_axis = self.SETTINGS.axis self.apply_settings(msg) if msg.axis != old_axis: # Axis changed - need full reset self.create_processor() else: # Only accumulate or time_constant changed - keep state self.processor.settings = msg