In-Built Signal Processing Modules#
Here is the API reference for the in-built signal processing modules included in the ezmsg-sigproc extension.
ezmsg.sigproc.activation#
- class ActivationFunction(*values)[source]#
Bases:
OptionsEnumActivation (transformation) function.
- NONE = 'none'#
None.
- SIGMOID = 'sigmoid'#
scipy.special.expit
- EXPIT = 'expit'#
scipy.special.expit
- LOGIT = 'logit'#
scipy.special.logit
- LOGEXPIT = 'log_expit'#
scipy.special.log_expit
- class ActivationSettings(function: str | ActivationFunction = <ActivationFunction.NONE: 'none'>)[source]#
Bases:
Settings- Parameters:
function (str | ActivationFunction)
- __init__(function=ActivationFunction.NONE)#
- Parameters:
function (str | ActivationFunction)
- Return type:
None
- function: str | ActivationFunction = 'none'#
An enum value from ActivationFunction or a string representing the activation function. Possible values are: SIGMOID, EXPIT, LOGIT, LOGEXPIT, “sigmoid”, “expit”, “logit”, “log_expit”. SIGMOID and EXPIT are equivalent. See
scipy.special.expitfor more details.
- class ActivationTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformer[ActivationSettings,AxisArray,AxisArray]- Parameters:
settings (SettingsType)
- class Activation(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[ActivationSettings,AxisArray,AxisArray,ActivationTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ActivationSettings
- activation(function)[source]#
Transform the data with a simple activation function.
- Parameters:
function (str | ActivationFunction) – An enum value from ActivationFunction or a string representing the activation function. Possible values are: SIGMOID, EXPIT, LOGIT, LOGEXPIT, “sigmoid”, “expit”, “logit”, “log_expit”. SIGMOID and EXPIT are equivalent. See
scipy.special.expitfor more details.- Return type:
Returns:
ActivationTransformer
ezmsg.sigproc.affinetransform#
Affine transformations via matrix multiplication: y = Ax or y = Ax + B.
For full matrix transformations where channels are mixed (off-diagonal weights),
use AffineTransformTransformer or the AffineTransform unit.
For simple per-channel scaling and offset (diagonal weights only), use
LinearTransformTransformer from ezmsg.sigproc.linear instead,
which is more efficient as it avoids matrix multiplication.
- class AffineTransformSettings(weights, axis=None, right_multiply=True)[source]#
Bases:
SettingsSettings for
AffineTransform.- weights: ndarray | str | Path#
An array of weights or a path to a file with weights compatible with np.loadtxt.
- class AffineTransformTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[AffineTransformSettings,AxisArray,AxisArray,AffineTransformState]Apply affine transformation via matrix multiplication: y = Ax or y = Ax + B.
Use this transformer when you need full matrix transformations that mix channels (off-diagonal weights), such as spatial filters or projections.
For simple per-channel scaling and offset where each output channel depends only on its corresponding input channel (diagonal weight matrix), use
LinearTransformTransformerinstead, which is more efficient.The weights matrix can include an offset row (stacked as [A|B]) where the input is automatically augmented with a column of ones to compute y = Ax + B.
- class AffineTransform(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[AffineTransformSettings,AxisArray,AxisArray,AffineTransformTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
AffineTransformSettings
- affine_transform(weights, axis=None, right_multiply=True)[source]#
Perform affine transformations on streaming data.
- Parameters:
weights (ndarray | str | Path) – An array of weights or a path to a file with weights compatible with np.loadtxt.
axis (str | None) – The name of the axis to apply the transformation to. Defaults to the leading (0th) axis in the array.
right_multiply (bool) – Set False to transpose the weights before applying.
- Returns:
- Return type:
- class CommonRereferenceSettings(mode='mean', axis=None, include_current=True)[source]#
Bases:
SettingsSettings for
CommonRereference- __init__(mode='mean', axis=None, include_current=True)#
- class CommonRereferenceTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformer[CommonRereferenceSettings,AxisArray,AxisArray]- Parameters:
settings (SettingsType)
- class CommonRereference(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[CommonRereferenceSettings,AxisArray,AxisArray,CommonRereferenceTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
CommonRereferenceSettings
ezmsg.sigproc.aggregate#
Aggregation operations over arrays.
Note
AggregateTransformer supports the Array API standard,
enabling use with NumPy, CuPy, PyTorch, and other compatible array libraries.
RangedAggregateTransformer currently requires NumPy arrays.
- class AggregationFunction(*values)[source]#
Bases:
OptionsEnumEnum for aggregation functions available to be used in
ranged_aggregateoperation.- NONE = 'None (all)'#
- MAX = 'max'#
- MIN = 'min'#
- MEAN = 'mean'#
- MEDIAN = 'median'#
- STD = 'std'#
- SUM = 'sum'#
- NANMAX = 'nanmax'#
- NANMIN = 'nanmin'#
- NANMEAN = 'nanmean'#
- NANMEDIAN = 'nanmedian'#
- NANSTD = 'nanstd'#
- NANSUM = 'nansum'#
- ARGMIN = 'argmin'#
- ARGMAX = 'argmax'#
- TRAPEZOID = 'trapezoid'#
- class RangedAggregateSettings(axis=None, bands=None, operation=AggregationFunction.MEAN)[source]#
Bases:
SettingsSettings for
RangedAggregate.- Parameters:
- bands: list[tuple[float, float]] | None = None#
[(band1_min, band1_max), (band2_min, band2_max), …] If not set then this acts as a passthrough node.
- operation: AggregationFunction = 'mean'#
AggregationFunctionto apply to each band.
- class RangedAggregateTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[RangedAggregateSettings,AxisArray,AxisArray,RangedAggregateState]
- class RangedAggregate(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[RangedAggregateSettings,AxisArray,AxisArray,RangedAggregateTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
RangedAggregateSettings
- ranged_aggregate(axis=None, bands=None, operation=AggregationFunction.MEAN)[source]#
Apply an aggregation operation over one or more bands.
- Parameters:
axis (str | None) – The name of the axis along which to apply the bands.
bands (list[tuple[float, float]] | None) – [(band1_min, band1_max), (band2_min, band2_max), …] If not set then this acts as a passthrough node.
operation (AggregationFunction) –
AggregationFunctionto apply to each band.
- Returns:
- Return type:
- class AggregateSettings(axis, operation=AggregationFunction.MEAN)[source]#
Bases:
SettingsSettings for
Aggregate.- Parameters:
axis (str)
operation (AggregationFunction)
- __init__(axis, operation=AggregationFunction.MEAN)#
- Parameters:
axis (str)
operation (AggregationFunction)
- Return type:
None
- operation: AggregationFunction = 'mean'#
AggregationFunctionto apply.
- class AggregateTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformer[AggregateSettings,AxisArray,AxisArray]Transformer that aggregates an entire axis using a specified operation.
Unlike
RangedAggregateTransformerwhich aggregates over specific ranges/bands and preserves the axis (with one value per band), this transformer aggregates the entire axis and removes it from the output, reducing dimensionality by one.- Parameters:
settings (SettingsType)
- class AggregateUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[AggregateSettings,AxisArray,AxisArray,AggregateTransformer]Unit that aggregates an entire axis using a specified operation.
- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
AggregateSettings
ezmsg.sigproc.bandpower#
- class BandPowerSettings(spectrogram_settings=<factory>, bands=<factory>, aggregation=AggregationFunction.MEAN)[source]#
Bases:
SettingsSettings for
BandPower.- Parameters:
spectrogram_settings (SpectrogramSettings)
aggregation (AggregationFunction)
- spectrogram_settings: SpectrogramSettings#
Settings for spectrogram calculation.
- aggregation: AggregationFunction = 'mean'#
AggregationFunctionto apply to each band.
- __init__(spectrogram_settings=<factory>, bands=<factory>, aggregation=AggregationFunction.MEAN)#
- Parameters:
spectrogram_settings (SpectrogramSettings)
aggregation (AggregationFunction)
- Return type:
None
- class BandPowerTransformer(*args, **kwargs)[source]#
Bases:
CompositeProcessor[BandPowerSettings,AxisArray,AxisArray]
- class BandPower(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[BandPowerSettings,AxisArray,AxisArray,BandPowerTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
BandPowerSettings
- bandpower(spectrogram_settings, bands=[(17, 30), (70, 170)], aggregation=AggregationFunction.MEAN)[source]#
Calculate the average spectral power in each band.
- Returns:
- Parameters:
spectrogram_settings (SpectrogramSettings)
aggregation (AggregationFunction)
- Return type:
ezmsg.sigproc.filter#
- class FilterSettings(axis: str | None = None, coef_type: str = 'ba', coefs: FilterCoefficients | None = None)[source]#
Bases:
FilterBaseSettings- Parameters:
axis (str | None)
coef_type (str)
coefs (FilterCoefficients | None)
- coefs: FilterCoefficients | None = None#
The pre-calculated filter coefficients.
- __init__(axis=None, coef_type='ba', coefs=None)#
- Parameters:
axis (str | None)
coef_type (str)
coefs (FilterCoefficients | None)
- Return type:
None
- class FilterTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[FilterSettings,AxisArray,AxisArray,FilterState]Filter data using the provided coefficients.
- class Filter(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[FilterSettings,AxisArray,AxisArray,FilterTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
FilterSettings
- class FilterByDesignTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SettingsType,AxisArray,AxisArray,FilterByDesignState],ABC,Generic[SettingsType,FilterCoefsType]Abstract base class for filter design transformers.
- class BaseFilterByDesignTransformerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[SettingsType,AxisArray,AxisArray,FilterByDesignTransformer],Generic[SettingsType,TransformerType]- Parameters:
settings (Settings | None)
- async on_settings(msg)[source]#
Receive a settings message, override self.SETTINGS, and re-create the processor. Child classes that wish to have fine-grained control over whether the core processor resets on settings changes should override this method.
- Parameters:
msg (SettingsType) – a settings message.
- Return type:
None
ezmsg.sigproc.butterworthfilter#
- class ButterworthFilterSettings(axis=None, coef_type='ba', order=0, cuton=None, cutoff=None, wn_hz=True)[source]#
Bases:
FilterBaseSettingsSettings for
ButterworthFilter.- Parameters:
- cuton: float | None = None#
Cuton frequency (Hz). If cutoff is not specified then this is the highpass corner. Otherwise, if this is lower than cutoff then this is the beginning of the bandpass or if this is greater than cutoff then this is the end of the bandstop.
- cutoff: float | None = None#
Cutoff frequency (Hz). If cuton is not specified then this is the lowpass corner. Otherwise, if this is greater than cuton then this is the end of the bandpass, or if this is less than cuton then this is the beginning of the bandstop.
- wn_hz: bool = True#
Set False if provided Wn are normalized from 0 to 1, where 1 is the Nyquist frequency
- butter_design_fun(fs, order=0, cuton=None, cutoff=None, coef_type='ba', wn_hz=True)[source]#
See
ButterworthFilterSettings.filter_specsfor an explanation of specifying different filter types (lowpass, highpass, bandpass, bandstop) from the parameters. You are likely to want to use this function withfilter_by_design, which only passes fs to the design function (this), meaning that you should wrap this function with a lambda or prepare with functools.partial.- Parameters:
fs (float) – The sampling frequency of the data in Hz.
order (int) – Filter order.
cuton (float | None) – Corner frequency of the filter in Hz.
cutoff (float | None) – Corner frequency of the filter in Hz.
coef_type (str) – “ba”, “sos”, or “zpk”
wn_hz (bool) – Set False if provided Wn are normalized from 0 to 1, where 1 is the Nyquist frequency
- Returns:
The filter coefficients as a tuple of (b, a) for coef_type “ba”, or as a single ndarray for “sos”, or (z, p, k) for “zpk”.
- Return type:
tuple[ndarray[tuple[Any, …], dtype[_ScalarT]], ndarray[tuple[Any, …], dtype[_ScalarT]]] | ndarray[tuple[Any, …], dtype[_ScalarT]] | None
- class ButterworthFilterTransformer(*args, **kwargs)[source]#
Bases:
FilterByDesignTransformer[ButterworthFilterSettings,tuple[ndarray[tuple[Any, …],dtype[_ScalarT]],ndarray[tuple[Any, …],dtype[_ScalarT]]] |ndarray[tuple[Any, …],dtype[_ScalarT]]]
- class ButterworthFilter(*args, settings=None, **kwargs)[source]#
Bases:
BaseFilterByDesignTransformerUnit[ButterworthFilterSettings,ButterworthFilterTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ButterworthFilterSettings
- butter(axis, order=0, cuton=None, cutoff=None, coef_type='ba', wn_hz=True)[source]#
Convenience generator wrapping filter_gen_by_design for Butterworth filters. Apply Butterworth filter to streaming data. Uses
scipy.signal.butterto design the filter. SeeButterworthFilterSettings.filter_specsfor an explanation of specifying different filter types (lowpass, highpass, bandpass, bandstop) from the parameters.- Returns:
- Parameters:
- Return type:
ezmsg.sigproc.decimate#
- class ChebyForDecimateTransformer(*args, **kwargs)[source]#
Bases:
ChebyshevFilterTransformer[tuple[ndarray[tuple[Any, …],dtype[_ScalarT]],ndarray[tuple[Any, …],dtype[_ScalarT]]] |ndarray[tuple[Any, …],dtype[_ScalarT]]]- A
ChebyshevFilterTransformerwith a design filter method that additionally accepts a target sampling rate, and if the target rate cannot be achieved it returns None, else it returns the filter coefficients.
- A
- class ChebyForDecimate(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[ChebyshevFilterSettings,AxisArray,AxisArray,ChebyForDecimateTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ChebyshevFilterSettings
- class Decimate(*args, settings=None, **kwargs)[source]#
Bases:
CollectionA
Collectionchaining aFilternode configured as a lowpass Chebyshev filter and aDownsamplenode.- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
DownsampleSettings
- INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
- FILTER = <ezmsg.sigproc.decimate.ChebyForDecimate object>#
- DOWNSAMPLE = <ezmsg.sigproc.downsample.Downsample object>#
ezmsg.sigproc.denormalize#
- class DenormalizeSettings(low_rate: float = 2.0, high_rate: float = 40.0, distribution: str = 'uniform')[source]#
Bases:
Settings
- class DenormalizeTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[DenormalizeSettings,AxisArray,AxisArray,DenormalizeState]Scales data from a normalized distribution (mean=0, std=1) to a denormalized distribution using random per-channel offsets and gains designed to keep the 99.9% CIs between 0 and 2x the offset.
This is useful for simulating realistic firing rates from normalized data.
- class DenormalizeUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[DenormalizeSettings,AxisArray,AxisArray,DenormalizeTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
DenormalizeSettings
ezmsg.sigproc.downsample#
- class DownsampleSettings(axis='time', target_rate=None, factor=None)[source]#
Bases:
SettingsSettings for
Downsamplenode.- target_rate: float | None = None#
Desired rate after downsampling. The actual rate will be the nearest integer factor of the input rate that is the same or higher than the target rate.
- class DownsampleTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[DownsampleSettings,AxisArray,AxisArray,DownsampleState]Downsampled data simply comprise every factor`th sample. This should only be used following appropriate lowpass filtering. If your pipeline does not already have lowpass filtering then consider using the :obj:`Decimate collection instead.
- class Downsample(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[DownsampleSettings,AxisArray,AxisArray,DownsampleTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
DownsampleSettings
ezmsg.sigproc.ewmfilter#
- class EWM(*args, settings=None, **kwargs)[source]#
Bases:
UnitExponentially Weighted Moving Average Standardization. This is deprecated. Please use
ezmsg.sigproc.scaler.AdaptiveStandardScalerinstead.References https://stackoverflow.com/a/42926270
- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
EWMSettings
- INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- INPUT_BUFFER = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- class EWMFilterSettings(history_dur: float, axis: str | None = None, zero_offset: bool = True)[source]#
Bases:
Settings
- class EWMFilter(*args, settings=None, **kwargs)[source]#
Bases:
CollectionA
Collectionthat splits the input into a branch that leads toWindowwhich then feeds intoEWM‘s INPUT_BUFFER and another branch that feeds directly intoEWM‘s INPUT_SIGNAL.This is deprecated. Please use
ezmsg.sigproc.scaler.AdaptiveStandardScalerinstead.- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
EWMFilterSettings
- INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
- WINDOW = <ezmsg.sigproc.window.Window object>#
- EWM = <ezmsg.sigproc.ewmfilter.EWM object>#
ezmsg.sigproc.math#
Clips the data to be within the specified range.
Note
This module supports the Array API standard, enabling use with NumPy, CuPy, PyTorch, and other compatible array libraries.
- class ClipTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformer[ClipSettings,AxisArray,AxisArray]- Parameters:
settings (SettingsType)
- class Clip(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[ClipSettings,AxisArray,AxisArray,ClipTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ClipSettings
- clip(min=None, max=None)[source]#
Clips the data to be within the specified range.
- Parameters:
- Returns:
- Return type:
Take the difference between 2 signals or between a signal and a constant value.
Note
ConstDifferenceTransformer supports the Array API standard,
enabling use with NumPy, CuPy, PyTorch, and other compatible array libraries.
DifferenceProcessor (two-input difference) currently requires NumPy arrays.
- class ConstDifferenceSettings(value: float = 0.0, subtrahend: bool = True)[source]#
Bases:
Settings
- class ConstDifferenceTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformer[ConstDifferenceSettings,AxisArray,AxisArray]- Parameters:
settings (SettingsType)
- class ConstDifference(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[ConstDifferenceSettings,AxisArray,AxisArray,ConstDifferenceTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ConstDifferenceSettings
- const_difference(value=0.0, subtrahend=True)[source]#
result = (in_data - value) if subtrahend else (value - in_data) https://en.wikipedia.org/wiki/Template:Arithmetic_operations
- Parameters:
- Return type:
Returns:
ConstDifferenceTransformer.
- class DifferenceState(queue_a=<factory>, queue_b=<factory>)[source]#
Bases:
objectState for Difference processor with two input queues.
- class DifferenceProcessor[source]#
Bases:
objectProcessor that subtracts two AxisArray signals (A - B).
This processor maintains separate queues for two input streams and subtracts corresponding messages element-wise. It assumes both inputs have compatible shapes and aligned time spans.
- property state: DifferenceState#
- class Difference(*args, settings=None, **kwargs)[source]#
Bases:
UnitSubtract two signals (A - B).
Assumes compatible/similar axes/dimensions and aligned time spans. Messages are paired by arrival order (oldest from each queue).
OUTPUT = INPUT_SIGNAL_A - INPUT_SIGNAL_B
- Parameters:
settings (Settings | None)
- INPUT_SIGNAL_A = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- INPUT_SIGNAL_B = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
Compute the multiplicative inverse (1/x) of the data.
Note
This module supports the Array API standard, enabling use with NumPy, CuPy, PyTorch, and other compatible array libraries.
- class InvertTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformer[None,AxisArray,AxisArray]- Parameters:
settings (SettingsType)
- class Invert(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[None,AxisArray,AxisArray,InvertTransformer]- Parameters:
settings (Settings | None)
- invert()[source]#
Take the inverse of the data.
Returns:
InvertTransformer.- Return type:
Take the logarithm of the data.
Note
This module supports the Array API standard, enabling use with NumPy, CuPy, PyTorch, and other compatible array libraries.
- class LogSettings(base: float = 10.0, clip_zero: bool = False)[source]#
Bases:
Settings
- class LogTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformer[LogSettings,AxisArray,AxisArray]- Parameters:
settings (SettingsType)
- class Log(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[LogSettings,AxisArray,AxisArray,LogTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
LogSettings
- log(base=10.0, clip_zero=False)[source]#
Take the logarithm of the data. See
np.logfor more details.- Parameters:
- Return type:
Returns:
LogTransformer.
Scale the data by a constant factor.
Note
This module supports the Array API standard, enabling use with NumPy, CuPy, PyTorch, and other compatible array libraries.
- class ScaleTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformer[ScaleSettings,AxisArray,AxisArray]- Parameters:
settings (SettingsType)
- class Scale(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[ScaleSettings,AxisArray,AxisArray,ScaleTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ScaleSettings
- scale(scale=1.0)[source]#
Scale the data by a constant factor.
- Parameters:
scale (float) – Factor by which to scale the data magnitude.
- Return type:
Returns:
ScaleTransformer
ezmsg.sigproc.sampler#
- class SamplerSettings(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True, buffer_update_strategy='immediate')[source]#
Bases:
SettingsSettings for
Sampler. Seesamplerfor a description of the fields.- Parameters:
- buffer_dur: float#
The duration of the buffer in seconds. The buffer must be long enough to store the oldest sample to be included in a window. e.g., a trigger lagged by 0.5 seconds with a period of (-1.0, +1.5) will need a buffer of 0.5 + (1.5 - -1.0) = 3.0 seconds. It is best to at least double your estimate if memory allows.
- axis: str | None = None#
The axis along which to sample the data. None (default) will choose the first axis in the first input. Note: (for now) the axis must exist in the msg .axes and be of type AxisArray.LinearAxis
- period: tuple[float, float] | None = None#
Optional default period (in seconds) if unspecified in SampleTriggerMessage.
- estimate_alignment: bool = True#
- If true, use message timestamp fields and reported sampling rate to estimate
sample-accurate alignment for samples.
If false, sampling will be limited to incoming message rate – “Block timing” NOTE: For faster-than-realtime playback – Incoming timestamps must reflect “realtime” operation for estimate_alignment to operate correctly.
- buffer_update_strategy: Literal['immediate', 'threshold', 'on_demand'] = 'immediate'#
The buffer update strategy. See
ezmsg.sigproc.util.buffer.UpdateStrategy. If you expect to push data much more frequently than triggers, then “on_demand” might be more efficient. For most other scenarios, “immediate” is best.
- __init__(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True, buffer_update_strategy='immediate')#
- class SamplerState[source]#
Bases:
object- buffer: HybridAxisArrayBuffer | None = None#
- triggers: deque[SampleTriggerMessage] | None = None#
- class SamplerTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SamplerSettings,AxisArray,AxisArray,SamplerState]- push_trigger(message)[source]#
- Parameters:
message (SampleTriggerMessage)
- Return type:
- class Sampler(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[SamplerSettings,AxisArray,AxisArray,SamplerTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
SamplerSettings
- INPUT_TRIGGER = InputStream:unlocated[<class 'ezmsg.baseproc.util.message.SampleTriggerMessage'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.baseproc.util.message.SampleMessage'>](self.num_buffers=32, self.force_tcp=False)#
- async on_trigger(msg)[source]#
- Parameters:
msg (SampleTriggerMessage)
- Return type:
None
- sampler(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True)[source]#
Sample data into a buffer, accept triggers, and return slices of sampled data around the trigger time.
- Returns:
A generator that expects .send either an
AxisArraycontaining streaming data messages, or aSampleTriggerMessagecontaining a trigger, and yields the list ofSampleMessages.- Parameters:
- Return type:
- class TriggerGeneratorSettings(period: tuple[float, float], prewait: float = 0.5, publish_period: float = 5.0)[source]#
Bases:
Settings- __init__(period, prewait=0.5, publish_period=5.0)#
- class TriggerProducer(*args, **kwargs)[source]#
Bases:
BaseStatefulProducer[TriggerGeneratorSettings,SampleTriggerMessage,TriggerGeneratorState]
- class TriggerGenerator(*args, settings=None, **kwargs)[source]#
Bases:
BaseProducerUnit[TriggerGeneratorSettings,SampleTriggerMessage,TriggerProducer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
TriggerGeneratorSettings
ezmsg.sigproc.scaler#
- scaler(time_constant=1.0, axis=None)[source]#
Apply the adaptive standard scaler from https://riverml.xyz/latest/api/preprocessing/AdaptiveStandardScaler/ This is faster than
scaler_npfor single-channel data.- Parameters:
- Returns:
- A primed generator object that expects to be sent a
AxisArrayvia .send(axis_array) and yields an
AxisArraywith its data being a standardized, or “Z-scored” version of the input data.
- A primed generator object that expects to be sent a
- Return type:
- class AdaptiveStandardScalerSettings(time_constant: float = 1.0, axis: str | None = None, accumulate: bool = True)[source]#
Bases:
EWMASettings
- class AdaptiveStandardScalerState[source]#
Bases:
object- samps_ewma: EWMATransformer | None = None#
- vars_sq_ewma: EWMATransformer | None = None#
- class AdaptiveStandardScalerTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[AdaptiveStandardScalerSettings,AxisArray,AxisArray,AdaptiveStandardScalerState]
- class AdaptiveStandardScaler(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[AdaptiveStandardScalerSettings,AxisArray,AxisArray,AdaptiveStandardScalerTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
AdaptiveStandardScalerSettings
ezmsg.sigproc.signalinjector#
- class SignalInjectorSettings(time_dim: str = 'time', frequency: float | None = None, amplitude: float = 1.0, mixing_seed: int | None = None)[source]#
Bases:
Settings
- class SignalInjectorTransformer(*args, **kwargs)[source]#
Bases:
BaseAsyncTransformer[SignalInjectorSettings,AxisArray,AxisArray,SignalInjectorState]
- class SignalInjector(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[SignalInjectorSettings,AxisArray,AxisArray,SignalInjectorTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
SignalInjectorSettings
- INPUT_FREQUENCY = InputStream:unlocated[float | None]()#
- INPUT_AMPLITUDE = InputStream:unlocated[<class 'float'>]()#
ezmsg.sigproc.slicer#
- parse_slice(s, axinfo=None)[source]#
Parses a string representation of a slice and returns a tuple of slice objects.
“” -> slice(None, None, None) (take all)
“:” -> slice(None, None, None)
‘“none”` (case-insensitive) -> slice(None, None, None)
“{start}:{stop}” or {start}:{stop}:{step} -> slice(start, stop, step)
- “5” (or any integer) -> (5,). Take only that item.
applying this to a ndarray or AxisArray will drop the dimension.
A comma-separated list of the above -> a tuple of slices | ints
A comma-separated list of values and axinfo is provided and is a CoordinateAxis -> a tuple of ints
- Parameters:
s (str) – The string representation of the slice.
axinfo (CoordinateAxis | None) – (Optional) If provided, and of type CoordinateAxis, and s is a comma-separated list of values, then the values in s will be checked against the values in axinfo.data.
- Returns:
A tuple of slice objects and/or ints.
- Return type:
- class SlicerSettings(selection: str = '', axis: str | None = None)[source]#
Bases:
Settings- selection: str = ''#
See
ezmsg.sigproc.slicer.parse_slicefor details.- Type:
- class SlicerTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SlicerSettings,AxisArray,AxisArray,SlicerState]
- class Slicer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[SlicerSettings,AxisArray,AxisArray,SlicerTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
SlicerSettings
- slicer(selection='', axis=None)[source]#
Slice along a particular axis.
- Parameters:
selection (str) – See
ezmsg.sigproc.slicer.parse_slicefor details.axis (str | None) – The name of the axis to slice along. If None, the last axis is used.
- Returns:
- Return type:
ezmsg.sigproc.spectrum#
- class OptionsEnum(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Bases:
Enum
- class WindowFunction(*values)[source]#
Bases:
OptionsEnumWindowing function prior to calculating spectrum.
- NONE = 'None (Rectangular)'#
None.
- HAMMING = 'Hamming'#
- HANNING = 'Hanning'#
- BARTLETT = 'Bartlett'#
- BLACKMAN = 'Blackman'#
- class SpectralTransform(*values)[source]#
Bases:
OptionsEnumAdditional transformation functions to apply to the spectral result.
- RAW_COMPLEX = 'Complex FFT Output'#
- REAL = 'Real Component of FFT'#
- IMAG = 'Imaginary Component of FFT'#
- REL_POWER = 'Relative Power'#
- REL_DB = 'Log Power (Relative dB)'#
- class SpectralOutput(*values)[source]#
Bases:
OptionsEnumThe expected spectral contents.
- FULL = 'Full Spectrum'#
- POSITIVE = 'Positive Frequencies'#
- NEGATIVE = 'Negative Frequencies'#
- class SpectrumSettings(axis=None, out_axis='freq', window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE, norm='forward', do_fftshift=True, nfft=None)[source]#
Bases:
SettingsSettings for
Spectrum. See :obj:`spectrumfor a description of the parameters.- Parameters:
axis (str | None)
out_axis (str | None)
window (WindowFunction)
transform (SpectralTransform)
output (SpectralOutput)
norm (str | None)
do_fftshift (bool)
nfft (int | None)
- axis: str | None = None#
The name of the axis on which to calculate the spectrum. Note: The axis must have an .axes entry of type LinearAxis, not CoordinateAxis.
- out_axis: str | None = 'freq'#
The name of the new axis. Defaults to “freq”. If none; don’t change dim name
- window: WindowFunction = 'Hamming'#
The
WindowFunctionto apply to the data slice prior to calculating the spectrum.
- transform: SpectralTransform = 'Log Power (Relative dB)'#
The
SpectralTransformto apply to the spectral magnitude.
- output: SpectralOutput = 'Positive Frequencies'#
The
SpectralOutputformat.
- norm: str | None = 'forward'#
Normalization mode. Default “forward” is best used when the inverse transform is not needed, for example when the goal is to get spectral power. Use “backward” (equivalent to None) to not scale the spectrum which is useful when the spectra will be manipulated and possibly inverse-transformed. See numpy.fft.fft for details.
- do_fftshift: bool = True#
Whether to apply fftshift to the output. Default is True. This value is ignored unless output is SpectralOutput.FULL.
- nfft: int | None = None#
The number of points to use for the FFT. If None, the length of the input data is used.
- __init__(axis=None, out_axis='freq', window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE, norm='forward', do_fftshift=True, nfft=None)#
- Parameters:
axis (str | None)
out_axis (str | None)
window (WindowFunction)
transform (SpectralTransform)
output (SpectralOutput)
norm (str | None)
do_fftshift (bool)
nfft (int | None)
- Return type:
None
- class SpectrumTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SpectrumSettings,AxisArray,AxisArray,SpectrumState]
- class Spectrum(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[SpectrumSettings,AxisArray,AxisArray,SpectrumTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
SpectrumSettings
- spectrum(axis=None, out_axis='freq', window=WindowFunction.HANNING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE, norm='forward', do_fftshift=True, nfft=None)[source]#
Calculate a spectrum on a data slice.
- Returns:
A
SpectrumTransformerobject that expects anAxisArrayvia .(axis_array) (__call__) containing continuous data and returns anAxisArraywith data of spectral magnitudes or powers.- Parameters:
axis (str | None)
out_axis (str | None)
window (WindowFunction)
transform (SpectralTransform)
output (SpectralOutput)
norm (str | None)
do_fftshift (bool)
nfft (int | None)
- Return type:
ezmsg.sigproc.spectrogram#
- class SpectrogramSettings(window_dur=None, window_shift=None, window_anchor=Anchor.BEGINNING, window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE)[source]#
Bases:
SettingsSettings for
SpectrogramTransformer.- Parameters:
window_dur (float | None)
window_shift (float | None)
window (WindowFunction)
transform (SpectralTransform)
output (SpectralOutput)
- window: WindowFunction = 'Hamming'#
The
WindowFunctionto apply to the data slice prior to calculating the spectrum.
- transform: SpectralTransform = 'Log Power (Relative dB)'#
The
SpectralTransformto apply to the spectral magnitude.
- __init__(window_dur=None, window_shift=None, window_anchor=Anchor.BEGINNING, window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE)#
- Parameters:
window_dur (float | None)
window_shift (float | None)
window (WindowFunction)
transform (SpectralTransform)
output (SpectralOutput)
- Return type:
None
- output: SpectralOutput = 'Positive Frequencies'#
The
SpectralOutputformat.
- class SpectrogramTransformer(*args, **kwargs)[source]#
Bases:
CompositeProcessor[SpectrogramSettings,AxisArray,AxisArray]
- class Spectrogram(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[SpectrogramSettings,AxisArray,AxisArray,SpectrogramTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
SpectrogramSettings
- spectrogram(window_dur=None, window_shift=None, window_anchor=Anchor.BEGINNING, window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE)[source]#
- Parameters:
window_dur (float | None)
window_shift (float | None)
window (WindowFunction)
transform (SpectralTransform)
output (SpectralOutput)
- Return type:
ezmsg.sigproc.synth#
ezmsg.sigproc.window#
- class WindowSettings(axis: str | None = None, newaxis: str | None = None, window_dur: float | None = None, window_shift: float | None = None, zero_pad_until: str = 'full', anchor: str | Anchor = <Anchor.BEGINNING: 'beginning'>)[source]#
Bases:
Settings- Parameters:
- class WindowTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[WindowSettings,AxisArray,AxisArray,WindowState]Apply a sliding window along the specified axis to input streaming data. The windowing method is perhaps the most useful and versatile method in ezmsg.sigproc, but its parameterization can be difficult. Please read the argument descriptions carefully.
- __init__(*args, **kwargs)[source]#
- Parameters:
axis – The axis along which to segment windows. If None, defaults to the first dimension of the first seen AxisArray. Note: The windowed axis must be an AxisArray.LinearAxis, not an AxisArray.CoordinateAxis.
newaxis – New axis on which windows are delimited, immediately preceding the target windowed axis. The data length along newaxis may be 0 if this most recent push did not provide enough data for a new window. If window_shift is None then the newaxis length will always be 1.
window_dur – The duration of the window in seconds. If None, the function acts as a passthrough and all other parameters are ignored.
window_shift – The shift of the window in seconds. If None (default), windowing operates in “1:1 mode”, where each input yields exactly one most-recent window.
zero_pad_until –
Determines how the function initializes the buffer. Can be one of “input” (default), “full”, “shift”, or “none”. If window_shift is None then this field is ignored and “input” is always used.
”input” (default) initializes the buffer with the input then prepends with zeros to the window size. The first input will always yield at least one output.
”shift” fills the buffer until window_shift. No outputs will be yielded until at least window_shift data has been seen.
”none” does not pad the buffer. No outputs will be yielded until at least window_dur data has been seen.
anchor – Determines the entry in axis that gets assigned 0, which references the value in newaxis. Can be of class
Anchoror a string representation of anAnchor.
- Return type:
None
- class Window(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[WindowSettings,AxisArray,AxisArray,WindowTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
WindowSettings
- INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#