In-Built Signal Processing Modules#

Here is the API reference for the in-built signal processing modules included in the ezmsg-sigproc extension.

ezmsg.sigproc.activation#

class ActivationFunction(*values)[source]#

Bases: OptionsEnum

Activation (transformation) function.

NONE = 'none'#

None.

SIGMOID = 'sigmoid'#

scipy.special.expit

EXPIT = 'expit'#

scipy.special.expit

LOGIT = 'logit'#

scipy.special.logit

LOGEXPIT = 'log_expit'#

scipy.special.log_expit

class ActivationSettings(function: str | ActivationFunction = <ActivationFunction.NONE: 'none'>)[source]#

Bases: Settings

Parameters:

function (str | ActivationFunction)

__init__(function=ActivationFunction.NONE)#
Parameters:

function (str | ActivationFunction)

Return type:

None

function: str | ActivationFunction = 'none'#

An enum value from ActivationFunction or a string representing the activation function. Possible values are: SIGMOID, EXPIT, LOGIT, LOGEXPIT, “sigmoid”, “expit”, “logit”, “log_expit”. SIGMOID and EXPIT are equivalent. See scipy.special.expit for more details.

class ActivationTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformer[ActivationSettings, AxisArray, AxisArray]

Parameters:

settings (SettingsType)

class Activation(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[ActivationSettings, AxisArray, AxisArray, ActivationTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of ActivationSettings

activation(function)[source]#

Transform the data with a simple activation function.

Parameters:

function (str | ActivationFunction) – An enum value from ActivationFunction or a string representing the activation function. Possible values are: SIGMOID, EXPIT, LOGIT, LOGEXPIT, “sigmoid”, “expit”, “logit”, “log_expit”. SIGMOID and EXPIT are equivalent. See scipy.special.expit for more details.

Return type:

ActivationTransformer

Returns: ActivationTransformer

ezmsg.sigproc.affinetransform#

Affine transformations via matrix multiplication: y = Ax or y = Ax + B.

For full matrix transformations where channels are mixed (off-diagonal weights), use AffineTransformTransformer or the AffineTransform unit.

For simple per-channel scaling and offset (diagonal weights only), use LinearTransformTransformer from ezmsg.sigproc.linear instead, which is more efficient as it avoids matrix multiplication.

class AffineTransformSettings(weights, axis=None, right_multiply=True)[source]#

Bases: Settings

Settings for AffineTransform.

Parameters:
weights: ndarray | str | Path#

An array of weights or a path to a file with weights compatible with np.loadtxt.

axis: str | None = None#

The name of the axis to apply the transformation to. Defaults to the leading (0th) axis in the array.

right_multiply: bool = True#

Set False to transpose the weights before applying.

__init__(weights, axis=None, right_multiply=True)#
Parameters:
Return type:

None

class AffineTransformState[source]#

Bases: object

weights: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#
new_axis: AxisBase | None = None#
class AffineTransformTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[AffineTransformSettings, AxisArray, AxisArray, AffineTransformState]

Apply affine transformation via matrix multiplication: y = Ax or y = Ax + B.

Use this transformer when you need full matrix transformations that mix channels (off-diagonal weights), such as spatial filters or projections.

For simple per-channel scaling and offset where each output channel depends only on its corresponding input channel (diagonal weight matrix), use LinearTransformTransformer instead, which is more efficient.

The weights matrix can include an offset row (stacked as [A|B]) where the input is automatically augmented with a column of ones to compute y = Ax + B.

class AffineTransform(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[AffineTransformSettings, AxisArray, AxisArray, AffineTransformTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of AffineTransformSettings

affine_transform(weights, axis=None, right_multiply=True)[source]#

Perform affine transformations on streaming data.

Parameters:
  • weights (ndarray | str | Path) – An array of weights or a path to a file with weights compatible with np.loadtxt.

  • axis (str | None) – The name of the axis to apply the transformation to. Defaults to the leading (0th) axis in the array.

  • right_multiply (bool) – Set False to transpose the weights before applying.

Returns:

AffineTransformTransformer.

Return type:

AffineTransformTransformer

zeros_for_noop(data, **ignore_kwargs)[source]#
Parameters:

data (ndarray[tuple[Any, ...], dtype[_ScalarT]])

Return type:

ndarray[tuple[Any, …], dtype[_ScalarT]]

class CommonRereferenceSettings(mode='mean', axis=None, include_current=True)[source]#

Bases: Settings

Settings for CommonRereference

Parameters:
  • mode (str)

  • axis (str | None)

  • include_current (bool)

mode: str = 'mean'#

The statistical mode to apply – either “mean” or “median”.

axis: str | None = None#

The name of the axis to apply the transformation to.

__init__(mode='mean', axis=None, include_current=True)#
Parameters:
  • mode (str)

  • axis (str | None)

  • include_current (bool)

Return type:

None

include_current: bool = True#

Set False to exclude each channel from participating in the calculation of its reference.

class CommonRereferenceTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformer[CommonRereferenceSettings, AxisArray, AxisArray]

Parameters:

settings (SettingsType)

class CommonRereference(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[CommonRereferenceSettings, AxisArray, AxisArray, CommonRereferenceTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of CommonRereferenceSettings

common_rereference(mode='mean', axis=None, include_current=True)[source]#

Perform common average referencing (CAR) on streaming data.

Parameters:
  • mode (str) – The statistical mode to apply – either “mean” or “median”

  • axis (str | None) – The name of hte axis to apply the transformation to.

  • include_current (bool) – Set False to exclude each channel from participating in the calculation of its reference.

Returns:

CommonRereferenceTransformer

Return type:

CommonRereferenceTransformer

ezmsg.sigproc.aggregate#

Aggregation operations over arrays.

Note

AggregateTransformer supports the Array API standard, enabling use with NumPy, CuPy, PyTorch, and other compatible array libraries. RangedAggregateTransformer currently requires NumPy arrays.

class AggregationFunction(*values)[source]#

Bases: OptionsEnum

Enum for aggregation functions available to be used in ranged_aggregate operation.

NONE = 'None (all)'#
MAX = 'max'#
MIN = 'min'#
MEAN = 'mean'#
MEDIAN = 'median'#
STD = 'std'#
SUM = 'sum'#
NANMAX = 'nanmax'#
NANMIN = 'nanmin'#
NANMEAN = 'nanmean'#
NANMEDIAN = 'nanmedian'#
NANSTD = 'nanstd'#
NANSUM = 'nansum'#
ARGMIN = 'argmin'#
ARGMAX = 'argmax'#
TRAPEZOID = 'trapezoid'#
class RangedAggregateSettings(axis=None, bands=None, operation=AggregationFunction.MEAN)[source]#

Bases: Settings

Settings for RangedAggregate.

Parameters:
axis: str | None = None#

The name of the axis along which to apply the bands.

bands: list[tuple[float, float]] | None = None#

[(band1_min, band1_max), (band2_min, band2_max), …] If not set then this acts as a passthrough node.

operation: AggregationFunction = 'mean'#

AggregationFunction to apply to each band.

__init__(axis=None, bands=None, operation=AggregationFunction.MEAN)#
Parameters:
Return type:

None

class RangedAggregateState[source]#

Bases: object

slices: list[tuple[Any, ...]] | None = None#
out_axis: AxisBase | None = None#
ax_vec: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#
class RangedAggregateTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[RangedAggregateSettings, AxisArray, AxisArray, RangedAggregateState]

class RangedAggregate(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[RangedAggregateSettings, AxisArray, AxisArray, RangedAggregateTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of RangedAggregateSettings

ranged_aggregate(axis=None, bands=None, operation=AggregationFunction.MEAN)[source]#

Apply an aggregation operation over one or more bands.

Parameters:
  • axis (str | None) – The name of the axis along which to apply the bands.

  • bands (list[tuple[float, float]] | None) – [(band1_min, band1_max), (band2_min, band2_max), …] If not set then this acts as a passthrough node.

  • operation (AggregationFunction) – AggregationFunction to apply to each band.

Returns:

RangedAggregateTransformer

Return type:

RangedAggregateTransformer

class AggregateSettings(axis, operation=AggregationFunction.MEAN)[source]#

Bases: Settings

Settings for Aggregate.

Parameters:
axis: str#

The name of the axis to aggregate over. This axis will be removed from the output.

__init__(axis, operation=AggregationFunction.MEAN)#
Parameters:
Return type:

None

operation: AggregationFunction = 'mean'#

AggregationFunction to apply.

class AggregateTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformer[AggregateSettings, AxisArray, AxisArray]

Transformer that aggregates an entire axis using a specified operation.

Unlike RangedAggregateTransformer which aggregates over specific ranges/bands and preserves the axis (with one value per band), this transformer aggregates the entire axis and removes it from the output, reducing dimensionality by one.

Parameters:

settings (SettingsType)

class AggregateUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[AggregateSettings, AxisArray, AxisArray, AggregateTransformer]

Unit that aggregates an entire axis using a specified operation.

Parameters:

settings (Settings | None)

SETTINGS#

alias of AggregateSettings

ezmsg.sigproc.bandpower#

class BandPowerSettings(spectrogram_settings=<factory>, bands=<factory>, aggregation=AggregationFunction.MEAN)[source]#

Bases: Settings

Settings for BandPower.

Parameters:
spectrogram_settings: SpectrogramSettings#

Settings for spectrogram calculation.

bands: list[tuple[float, float]] | None#

(min, max) tuples of band limits in Hz.

aggregation: AggregationFunction = 'mean'#

AggregationFunction to apply to each band.

__init__(spectrogram_settings=<factory>, bands=<factory>, aggregation=AggregationFunction.MEAN)#
Parameters:
Return type:

None

class BandPowerTransformer(*args, **kwargs)[source]#

Bases: CompositeProcessor[BandPowerSettings, AxisArray, AxisArray]

class BandPower(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[BandPowerSettings, AxisArray, AxisArray, BandPowerTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of BandPowerSettings

bandpower(spectrogram_settings, bands=[(17, 30), (70, 170)], aggregation=AggregationFunction.MEAN)[source]#

Calculate the average spectral power in each band.

Returns:

BandPowerTransformer

Parameters:
Return type:

BandPowerTransformer

ezmsg.sigproc.filter#

class FilterCoefficients(b: ndarray = <factory>, a: ndarray = <factory>)[source]#

Bases: object

Parameters:
b: ndarray#
a: ndarray#
__init__(b=<factory>, a=<factory>)#
Parameters:
Return type:

None

class FilterBaseSettings(axis: str | None = None, coef_type: str = 'ba')[source]#

Bases: Settings

Parameters:
  • axis (str | None)

  • coef_type (str)

axis: str | None = None#

The name of the axis to operate on.

coef_type: str = 'ba'#

The type of filter coefficients. One of “ba” or “sos”.

__init__(axis=None, coef_type='ba')#
Parameters:
  • axis (str | None)

  • coef_type (str)

Return type:

None

class FilterSettings(axis: str | None = None, coef_type: str = 'ba', coefs: FilterCoefficients | None = None)[source]#

Bases: FilterBaseSettings

Parameters:
coefs: FilterCoefficients | None = None#

The pre-calculated filter coefficients.

__init__(axis=None, coef_type='ba', coefs=None)#
Parameters:
Return type:

None

class FilterState[source]#

Bases: object

zi: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#
class FilterTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[FilterSettings, AxisArray, AxisArray, FilterState]

Filter data using the provided coefficients.

update_coefficients(coefs, coef_type=None)[source]#

Update filter coefficients.

If the new coefficients have the same length as the current ones, only the coefficients are updated. If the lengths differ, the filter state is also reset to handle the new filter order.

Parameters:
Return type:

None

class Filter(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[FilterSettings, AxisArray, AxisArray, FilterTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of FilterSettings

filtergen(axis, coefs, coef_type)[source]#

Filter data using the provided coefficients.

Returns:

FilterTransformer.

Parameters:
Return type:

FilterTransformer

class FilterByDesignState[source]#

Bases: object

filter: FilterTransformer | None = None#
needs_redesign: bool = False#
class FilterByDesignTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SettingsType, AxisArray, AxisArray, FilterByDesignState], ABC, Generic[SettingsType, FilterCoefsType]

Abstract base class for filter design transformers.

classmethod get_message_type(dir)[source]#
Parameters:

dir (str)

Return type:

type[AxisArray]

abstractmethod get_design_function()[source]#

Return a function that takes sampling frequency and returns filter coefficients.

Return type:

Callable[[float], FilterCoefsType | None]

update_settings(new_settings=None, **kwargs)[source]#

Update settings and mark that filter coefficients need to be recalculated.

Parameters:
  • new_settings (SettingsType | None) – Complete new settings object to replace current settings

  • **kwargs – Individual settings to update

Return type:

None

class BaseFilterByDesignTransformerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SettingsType, AxisArray, AxisArray, FilterByDesignTransformer], Generic[SettingsType, TransformerType]

Parameters:

settings (Settings | None)

async on_settings(msg)[source]#

Receive a settings message, override self.SETTINGS, and re-create the processor. Child classes that wish to have fine-grained control over whether the core processor resets on settings changes should override this method.

Parameters:

msg (SettingsType) – a settings message.

Return type:

None

ezmsg.sigproc.butterworthfilter#

class ButterworthFilterSettings(axis=None, coef_type='ba', order=0, cuton=None, cutoff=None, wn_hz=True)[source]#

Bases: FilterBaseSettings

Settings for ButterworthFilter.

Parameters:
order: int = 0#

Filter order

cuton: float | None = None#

Cuton frequency (Hz). If cutoff is not specified then this is the highpass corner. Otherwise, if this is lower than cutoff then this is the beginning of the bandpass or if this is greater than cutoff then this is the end of the bandstop.

cutoff: float | None = None#

Cutoff frequency (Hz). If cuton is not specified then this is the lowpass corner. Otherwise, if this is greater than cuton then this is the end of the bandpass, or if this is less than cuton then this is the beginning of the bandstop.

wn_hz: bool = True#

Set False if provided Wn are normalized from 0 to 1, where 1 is the Nyquist frequency

filter_specs()[source]#

Determine the filter type given the corner frequencies.

Returns:

A tuple with the first element being a string indicating the filter type (one of “lowpass”, “highpass”, “bandpass”, “bandstop”) and the second element being the corner frequency or frequencies.

Return type:

tuple[str, float | tuple[float, float]] | None

__init__(axis=None, coef_type='ba', order=0, cuton=None, cutoff=None, wn_hz=True)#
Parameters:
Return type:

None

butter_design_fun(fs, order=0, cuton=None, cutoff=None, coef_type='ba', wn_hz=True)[source]#

See ButterworthFilterSettings.filter_specs for an explanation of specifying different filter types (lowpass, highpass, bandpass, bandstop) from the parameters. You are likely to want to use this function with filter_by_design, which only passes fs to the design function (this), meaning that you should wrap this function with a lambda or prepare with functools.partial.

Parameters:
  • fs (float) – The sampling frequency of the data in Hz.

  • order (int) – Filter order.

  • cuton (float | None) – Corner frequency of the filter in Hz.

  • cutoff (float | None) – Corner frequency of the filter in Hz.

  • coef_type (str) – “ba”, “sos”, or “zpk”

  • wn_hz (bool) – Set False if provided Wn are normalized from 0 to 1, where 1 is the Nyquist frequency

Returns:

The filter coefficients as a tuple of (b, a) for coef_type “ba”, or as a single ndarray for “sos”, or (z, p, k) for “zpk”.

Return type:

tuple[ndarray[tuple[Any, …], dtype[_ScalarT]], ndarray[tuple[Any, …], dtype[_ScalarT]]] | ndarray[tuple[Any, …], dtype[_ScalarT]] | None

class ButterworthFilterTransformer(*args, **kwargs)[source]#

Bases: FilterByDesignTransformer[ButterworthFilterSettings, tuple[ndarray[tuple[Any, …], dtype[_ScalarT]], ndarray[tuple[Any, …], dtype[_ScalarT]]] | ndarray[tuple[Any, …], dtype[_ScalarT]]]

get_design_function()[source]#

Return a function that takes sampling frequency and returns filter coefficients.

Return type:

Callable[[float], tuple[ndarray[tuple[Any, …], dtype[_ScalarT]], ndarray[tuple[Any, …], dtype[_ScalarT]]] | ndarray[tuple[Any, …], dtype[_ScalarT]] | None]

class ButterworthFilter(*args, settings=None, **kwargs)[source]#

Bases: BaseFilterByDesignTransformerUnit[ButterworthFilterSettings, ButterworthFilterTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of ButterworthFilterSettings

butter(axis, order=0, cuton=None, cutoff=None, coef_type='ba', wn_hz=True)[source]#

Convenience generator wrapping filter_gen_by_design for Butterworth filters. Apply Butterworth filter to streaming data. Uses scipy.signal.butter to design the filter. See ButterworthFilterSettings.filter_specs for an explanation of specifying different filter types (lowpass, highpass, bandpass, bandstop) from the parameters.

Returns:

ButterworthFilterTransformer

Parameters:
Return type:

ButterworthFilterTransformer

ezmsg.sigproc.decimate#

class ChebyForDecimateTransformer(*args, **kwargs)[source]#

Bases: ChebyshevFilterTransformer[tuple[ndarray[tuple[Any, …], dtype[_ScalarT]], ndarray[tuple[Any, …], dtype[_ScalarT]]] | ndarray[tuple[Any, …], dtype[_ScalarT]]]

A ChebyshevFilterTransformer with a design filter method that additionally accepts a target sampling rate,

and if the target rate cannot be achieved it returns None, else it returns the filter coefficients.

get_design_function()[source]#

Return a function that takes sampling frequency and returns filter coefficients.

Return type:

Callable[[float], tuple[ndarray[tuple[Any, …], dtype[_ScalarT]], ndarray[tuple[Any, …], dtype[_ScalarT]]] | ndarray[tuple[Any, …], dtype[_ScalarT]] | None]

class ChebyForDecimate(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[ChebyshevFilterSettings, AxisArray, AxisArray, ChebyForDecimateTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of ChebyshevFilterSettings

class Decimate(*args, settings=None, **kwargs)[source]#

Bases: Collection

A Collection chaining a Filter node configured as a lowpass Chebyshev filter and a Downsample node.

Parameters:

settings (Settings | None)

SETTINGS#

alias of DownsampleSettings

INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
FILTER = <ezmsg.sigproc.decimate.ChebyForDecimate object>#
DOWNSAMPLE = <ezmsg.sigproc.downsample.Downsample object>#
configure()[source]#

A lifecycle hook that runs when the Collection is instantiated. This is the best place to call Unit.apply_settings() on each member Unit of the Collection.

Return type:

None

network()[source]#

Override this method and have the definition return a NetworkDefinition which defines how InputStream and OutputStream from member Unit s will be connected.

Return type:

Iterable[Tuple[Stream | str, Stream | str]]

ezmsg.sigproc.denormalize#

class DenormalizeSettings(low_rate: float = 2.0, high_rate: float = 40.0, distribution: str = 'uniform')[source]#

Bases: Settings

Parameters:
low_rate: float = 2.0#

Low end of probable rate after denormalization (Hz).

high_rate: float = 40.0#

High end of probable rate after denormalization (Hz).

distribution: str = 'uniform'#

Distribution to sample rates from. Options are ‘uniform’, ‘normal’, or ‘constant’.

__init__(low_rate=2.0, high_rate=40.0, distribution='uniform')#
Parameters:
Return type:

None

class DenormalizeState[source]#

Bases: object

gains: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#
offsets: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#
class DenormalizeTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[DenormalizeSettings, AxisArray, AxisArray, DenormalizeState]

Scales data from a normalized distribution (mean=0, std=1) to a denormalized distribution using random per-channel offsets and gains designed to keep the 99.9% CIs between 0 and 2x the offset.

This is useful for simulating realistic firing rates from normalized data.

class DenormalizeUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[DenormalizeSettings, AxisArray, AxisArray, DenormalizeTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of DenormalizeSettings

ezmsg.sigproc.downsample#

class DownsampleSettings(axis='time', target_rate=None, factor=None)[source]#

Bases: Settings

Settings for Downsample node.

Parameters:
  • axis (str)

  • target_rate (float | None)

  • factor (int | None)

axis: str = 'time'#

The name of the axis along which to downsample.

target_rate: float | None = None#

Desired rate after downsampling. The actual rate will be the nearest integer factor of the input rate that is the same or higher than the target rate.

factor: int | None = None#

Explicitly specify downsample factor. If specified, target_rate is ignored.

__init__(axis='time', target_rate=None, factor=None)#
Parameters:
  • axis (str)

  • target_rate (float | None)

  • factor (int | None)

Return type:

None

class DownsampleState[source]#

Bases: object

q: int = 0#

The integer downsampling factor. It will be determined based on the target rate.

s_idx: int = 0#

Index of the next msg’s first sample into the virtual rotating ds_factor counter.

class DownsampleTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[DownsampleSettings, AxisArray, AxisArray, DownsampleState]

Downsampled data simply comprise every factor`th sample. This should only be used following appropriate lowpass filtering. If your pipeline does not already have lowpass filtering then consider using the :obj:`Decimate collection instead.

class Downsample(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[DownsampleSettings, AxisArray, AxisArray, DownsampleTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of DownsampleSettings

downsample(axis='time', target_rate=None, factor=None)[source]#
Parameters:
  • axis (str)

  • target_rate (float | None)

  • factor (int | None)

Return type:

DownsampleTransformer

ezmsg.sigproc.ewmfilter#

class EWMSettings(axis: str | None = None, zero_offset: bool = True)[source]#

Bases: Settings

Parameters:
  • axis (str | None)

  • zero_offset (bool)

axis: str | None = None#

Name of the axis to accumulate.

zero_offset: bool = True#

If true, we assume zero DC offset for input data.

__init__(axis=None, zero_offset=True)#
Parameters:
  • axis (str | None)

  • zero_offset (bool)

Return type:

None

class EWMState[source]#

Bases: State

buffer_queue: Queue[AxisArray]#
signal_queue: Queue[AxisArray]#
class EWM(*args, settings=None, **kwargs)[source]#

Bases: Unit

Exponentially Weighted Moving Average Standardization. This is deprecated. Please use ezmsg.sigproc.scaler.AdaptiveStandardScaler instead.

References https://stackoverflow.com/a/42926270

Parameters:

settings (Settings | None)

SETTINGS#

alias of EWMSettings

STATE#

alias of EWMState

INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
INPUT_BUFFER = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

async on_signal(message)[source]#
Parameters:

message (AxisArray)

Return type:

None

async on_buffer(message)[source]#
Parameters:

message (AxisArray)

Return type:

None

async sync_output()[source]#
Return type:

AsyncGenerator

class EWMFilterSettings(history_dur: float, axis: str | None = None, zero_offset: bool = True)[source]#

Bases: Settings

Parameters:
history_dur: float#

Previous data to accumulate for standardization.

axis: str | None = None#

Name of the axis to accumulate.

zero_offset: bool = True#

If true, we assume zero DC offset for input data.

__init__(history_dur, axis=None, zero_offset=True)#
Parameters:
Return type:

None

class EWMFilter(*args, settings=None, **kwargs)[source]#

Bases: Collection

A Collection that splits the input into a branch that leads to Window which then feeds into EWM ‘s INPUT_BUFFER and another branch that feeds directly into EWM ‘s INPUT_SIGNAL.

This is deprecated. Please use ezmsg.sigproc.scaler.AdaptiveStandardScaler instead.

Parameters:

settings (Settings | None)

SETTINGS#

alias of EWMFilterSettings

INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
WINDOW = <ezmsg.sigproc.window.Window object>#
EWM = <ezmsg.sigproc.ewmfilter.EWM object>#
configure()[source]#

A lifecycle hook that runs when the Collection is instantiated. This is the best place to call Unit.apply_settings() on each member Unit of the Collection.

Return type:

None

network()[source]#

Override this method and have the definition return a NetworkDefinition which defines how InputStream and OutputStream from member Unit s will be connected.

Return type:

Iterable[Tuple[Stream | str, Stream | str]]

ezmsg.sigproc.math#

Clips the data to be within the specified range.

Note

This module supports the Array API standard, enabling use with NumPy, CuPy, PyTorch, and other compatible array libraries.

class ClipSettings(min: float | None = None, max: float | None = None)[source]#

Bases: Settings

Parameters:
min: float | None = None#

Lower clip bound. If None, no lower clipping is applied.

max: float | None = None#

Upper clip bound. If None, no upper clipping is applied.

__init__(min=None, max=None)#
Parameters:
Return type:

None

class ClipTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformer[ClipSettings, AxisArray, AxisArray]

Parameters:

settings (SettingsType)

class Clip(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[ClipSettings, AxisArray, AxisArray, ClipTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of ClipSettings

clip(min=None, max=None)[source]#

Clips the data to be within the specified range.

Parameters:
  • min (float | None) – Lower clip bound. If None, no lower clipping is applied.

  • max (float | None) – Upper clip bound. If None, no upper clipping is applied.

Returns:

ClipTransformer.

Return type:

ClipTransformer

Take the difference between 2 signals or between a signal and a constant value.

Note

ConstDifferenceTransformer supports the Array API standard, enabling use with NumPy, CuPy, PyTorch, and other compatible array libraries. DifferenceProcessor (two-input difference) currently requires NumPy arrays.

class ConstDifferenceSettings(value: float = 0.0, subtrahend: bool = True)[source]#

Bases: Settings

Parameters:
value: float = 0.0#

number to subtract or be subtracted from the input data

subtrahend: bool = True#

If True (default) then value is subtracted from the input data. If False, the input data is subtracted from value.

__init__(value=0.0, subtrahend=True)#
Parameters:
Return type:

None

class ConstDifferenceTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformer[ConstDifferenceSettings, AxisArray, AxisArray]

Parameters:

settings (SettingsType)

class ConstDifference(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[ConstDifferenceSettings, AxisArray, AxisArray, ConstDifferenceTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of ConstDifferenceSettings

const_difference(value=0.0, subtrahend=True)[source]#

result = (in_data - value) if subtrahend else (value - in_data) https://en.wikipedia.org/wiki/Template:Arithmetic_operations

Parameters:
  • value (float) – number to subtract or be subtracted from the input data

  • subtrahend (bool) – If True (default) then value is subtracted from the input data. If False, the input data is subtracted from value.

Return type:

ConstDifferenceTransformer

Returns: ConstDifferenceTransformer.

class DifferenceState(queue_a=<factory>, queue_b=<factory>)[source]#

Bases: object

State for Difference processor with two input queues.

Parameters:
queue_a: Queue[AxisArray]#
queue_b: Queue[AxisArray]#
__init__(queue_a=<factory>, queue_b=<factory>)#
Parameters:
Return type:

None

class DifferenceProcessor[source]#

Bases: object

Processor that subtracts two AxisArray signals (A - B).

This processor maintains separate queues for two input streams and subtracts corresponding messages element-wise. It assumes both inputs have compatible shapes and aligned time spans.

__init__()[source]#
property state: DifferenceState#
push_a(msg)[source]#

Push a message to queue A (minuend).

Parameters:

msg (AxisArray)

Return type:

None

push_b(msg)[source]#

Push a message to queue B (subtrahend).

Parameters:

msg (AxisArray)

Return type:

None

class Difference(*args, settings=None, **kwargs)[source]#

Bases: Unit

Subtract two signals (A - B).

Assumes compatible/similar axes/dimensions and aligned time spans. Messages are paired by arrival order (oldest from each queue).

OUTPUT = INPUT_SIGNAL_A - INPUT_SIGNAL_B

Parameters:

settings (Settings | None)

INPUT_SIGNAL_A = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
INPUT_SIGNAL_B = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

async on_a(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

None

async on_b(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

None

async output()[source]#
Return type:

AsyncGenerator

Compute the multiplicative inverse (1/x) of the data.

Note

This module supports the Array API standard, enabling use with NumPy, CuPy, PyTorch, and other compatible array libraries.

class InvertTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformer[None, AxisArray, AxisArray]

Parameters:

settings (SettingsType)

class Invert(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[None, AxisArray, AxisArray, InvertTransformer]

Parameters:

settings (Settings | None)

invert()[source]#

Take the inverse of the data.

Returns: InvertTransformer.

Return type:

InvertTransformer

Take the logarithm of the data.

Note

This module supports the Array API standard, enabling use with NumPy, CuPy, PyTorch, and other compatible array libraries.

class LogSettings(base: float = 10.0, clip_zero: bool = False)[source]#

Bases: Settings

Parameters:
base: float = 10.0#

The base of the logarithm. Default is 10.

clip_zero: bool = False#

If True, clip the data to the minimum positive value of the data type before taking the log.

__init__(base=10.0, clip_zero=False)#
Parameters:
Return type:

None

class LogTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformer[LogSettings, AxisArray, AxisArray]

Parameters:

settings (SettingsType)

class Log(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[LogSettings, AxisArray, AxisArray, LogTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of LogSettings

log(base=10.0, clip_zero=False)[source]#

Take the logarithm of the data. See np.log for more details.

Parameters:
  • base (float) – The base of the logarithm. Default is 10.

  • clip_zero (bool) – If True, clip the data to the minimum positive value of the data type before taking the log.

Return type:

LogTransformer

Returns: LogTransformer.

Scale the data by a constant factor.

Note

This module supports the Array API standard, enabling use with NumPy, CuPy, PyTorch, and other compatible array libraries.

class ScaleSettings(scale: float = 1.0)[source]#

Bases: Settings

Parameters:

scale (float)

scale: float = 1.0#

Factor by which to scale the data magnitude.

__init__(scale=1.0)#
Parameters:

scale (float)

Return type:

None

class ScaleTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformer[ScaleSettings, AxisArray, AxisArray]

Parameters:

settings (SettingsType)

class Scale(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[ScaleSettings, AxisArray, AxisArray, ScaleTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of ScaleSettings

scale(scale=1.0)[source]#

Scale the data by a constant factor.

Parameters:

scale (float) – Factor by which to scale the data magnitude.

Return type:

ScaleTransformer

Returns: ScaleTransformer

ezmsg.sigproc.sampler#

class SamplerSettings(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True, buffer_update_strategy='immediate')[source]#

Bases: Settings

Settings for Sampler. See sampler for a description of the fields.

Parameters:
buffer_dur: float#

The duration of the buffer in seconds. The buffer must be long enough to store the oldest sample to be included in a window. e.g., a trigger lagged by 0.5 seconds with a period of (-1.0, +1.5) will need a buffer of 0.5 + (1.5 - -1.0) = 3.0 seconds. It is best to at least double your estimate if memory allows.

axis: str | None = None#

The axis along which to sample the data. None (default) will choose the first axis in the first input. Note: (for now) the axis must exist in the msg .axes and be of type AxisArray.LinearAxis

period: tuple[float, float] | None = None#

Optional default period (in seconds) if unspecified in SampleTriggerMessage.

value: Any = None#

Optional default value if unspecified in SampleTriggerMessage

estimate_alignment: bool = True#
If true, use message timestamp fields and reported sampling rate to estimate

sample-accurate alignment for samples.

If false, sampling will be limited to incoming message rate – “Block timing” NOTE: For faster-than-realtime playback – Incoming timestamps must reflect “realtime” operation for estimate_alignment to operate correctly.

buffer_update_strategy: Literal['immediate', 'threshold', 'on_demand'] = 'immediate'#

The buffer update strategy. See ezmsg.sigproc.util.buffer.UpdateStrategy. If you expect to push data much more frequently than triggers, then “on_demand” might be more efficient. For most other scenarios, “immediate” is best.

__init__(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True, buffer_update_strategy='immediate')#
Parameters:
Return type:

None

class SamplerState[source]#

Bases: object

buffer: HybridAxisArrayBuffer | None = None#
triggers: deque[SampleTriggerMessage] | None = None#
class SamplerTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SamplerSettings, AxisArray, AxisArray, SamplerState]

push_trigger(message)[source]#
Parameters:

message (SampleTriggerMessage)

Return type:

list[SampleMessage]

class Sampler(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SamplerSettings, AxisArray, AxisArray, SamplerTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of SamplerSettings

INPUT_TRIGGER = InputStream:unlocated[<class 'ezmsg.baseproc.util.message.SampleTriggerMessage'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.baseproc.util.message.SampleMessage'>](self.num_buffers=32, self.force_tcp=False)#
async on_trigger(msg)[source]#
Parameters:

msg (SampleTriggerMessage)

Return type:

None

async on_signal(message)[source]#
Parameters:

message (AxisArray)

Return type:

AsyncGenerator

sampler(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True)[source]#

Sample data into a buffer, accept triggers, and return slices of sampled data around the trigger time.

Returns:

A generator that expects .send either an AxisArray containing streaming data messages, or a SampleTriggerMessage containing a trigger, and yields the list of SampleMessage s.

Parameters:
Return type:

SamplerTransformer

class TriggerGeneratorSettings(period: tuple[float, float], prewait: float = 0.5, publish_period: float = 5.0)[source]#

Bases: Settings

Parameters:
period: tuple[float, float]#

The period around the trigger event.

prewait: float = 0.5#

The time before the first trigger (sec)

__init__(period, prewait=0.5, publish_period=5.0)#
Parameters:
Return type:

None

publish_period: float = 5.0#

The period between triggers (sec)

class TriggerGeneratorState[source]#

Bases: object

output: int = 0#
class TriggerProducer(*args, **kwargs)[source]#

Bases: BaseStatefulProducer[TriggerGeneratorSettings, SampleTriggerMessage, TriggerGeneratorState]

class TriggerGenerator(*args, settings=None, **kwargs)[source]#

Bases: BaseProducerUnit[TriggerGeneratorSettings, SampleTriggerMessage, TriggerProducer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of TriggerGeneratorSettings

ezmsg.sigproc.scaler#

scaler(time_constant=1.0, axis=None)[source]#

Apply the adaptive standard scaler from https://riverml.xyz/latest/api/preprocessing/AdaptiveStandardScaler/ This is faster than scaler_np for single-channel data.

Parameters:
  • time_constant (float) – Decay constant tau in seconds.

  • axis (str | None) – The name of the axis to accumulate statistics over.

Returns:

A primed generator object that expects to be sent a AxisArray via .send(axis_array)

and yields an AxisArray with its data being a standardized, or “Z-scored” version of the input data.

Return type:

Generator[AxisArray, AxisArray, None]

class AdaptiveStandardScalerSettings(time_constant: float = 1.0, axis: str | None = None, accumulate: bool = True)[source]#

Bases: EWMASettings

Parameters:
__init__(time_constant=1.0, axis=None, accumulate=True)#
Parameters:
Return type:

None

class AdaptiveStandardScalerState[source]#

Bases: object

samps_ewma: EWMATransformer | None = None#
vars_sq_ewma: EWMATransformer | None = None#
alpha: float | None = None#
class AdaptiveStandardScalerTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[AdaptiveStandardScalerSettings, AxisArray, AxisArray, AdaptiveStandardScalerState]

property accumulate: bool#

Whether to accumulate statistics from incoming samples.

class AdaptiveStandardScaler(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[AdaptiveStandardScalerSettings, AxisArray, AxisArray, AdaptiveStandardScalerTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of AdaptiveStandardScalerSettings

async on_settings(msg)[source]#

Handle settings updates with smart reset behavior.

Only resets state if axis changes (structural change). Changes to time_constant or accumulate are applied without resetting accumulated statistics.

Parameters:

msg (AdaptiveStandardScalerSettings)

Return type:

None

scaler_np(time_constant=1.0, axis=None)[source]#
Parameters:
  • time_constant (float)

  • axis (str | None)

Return type:

AdaptiveStandardScalerTransformer

ezmsg.sigproc.signalinjector#

class SignalInjectorSettings(time_dim: str = 'time', frequency: float | None = None, amplitude: float = 1.0, mixing_seed: int | None = None)[source]#

Bases: Settings

Parameters:
  • time_dim (str)

  • frequency (float | None)

  • amplitude (float)

  • mixing_seed (int | None)

time_dim: str = 'time'#
frequency: float | None = None#
amplitude: float = 1.0#
mixing_seed: int | None = None#
__init__(time_dim='time', frequency=None, amplitude=1.0, mixing_seed=None)#
Parameters:
  • time_dim (str)

  • frequency (float | None)

  • amplitude (float)

  • mixing_seed (int | None)

Return type:

None

class SignalInjectorState[source]#

Bases: object

cur_shape: tuple[int, ...] | None = None#
cur_frequency: float | None = None#
cur_amplitude: float | None = None#
mixing: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#
class SignalInjectorTransformer(*args, **kwargs)[source]#

Bases: BaseAsyncTransformer[SignalInjectorSettings, AxisArray, AxisArray, SignalInjectorState]

class SignalInjector(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SignalInjectorSettings, AxisArray, AxisArray, SignalInjectorTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of SignalInjectorSettings

INPUT_FREQUENCY = InputStream:unlocated[float | None]()#
INPUT_AMPLITUDE = InputStream:unlocated[<class 'float'>]()#
async on_frequency(msg)[source]#
Parameters:

msg (float | None)

Return type:

None

async on_amplitude(msg)[source]#
Parameters:

msg (float)

Return type:

None

ezmsg.sigproc.slicer#

parse_slice(s, axinfo=None)[source]#

Parses a string representation of a slice and returns a tuple of slice objects.

  • “” -> slice(None, None, None) (take all)

  • “:” -> slice(None, None, None)

  • ‘“none”` (case-insensitive) -> slice(None, None, None)

  • “{start}:{stop}” or {start}:{stop}:{step} -> slice(start, stop, step)

  • “5” (or any integer) -> (5,). Take only that item.

    applying this to a ndarray or AxisArray will drop the dimension.

  • A comma-separated list of the above -> a tuple of slices | ints

  • A comma-separated list of values and axinfo is provided and is a CoordinateAxis -> a tuple of ints

Parameters:
  • s (str) – The string representation of the slice.

  • axinfo (CoordinateAxis | None) – (Optional) If provided, and of type CoordinateAxis, and s is a comma-separated list of values, then the values in s will be checked against the values in axinfo.data.

Returns:

A tuple of slice objects and/or ints.

Return type:

tuple[slice | int, …]

class SlicerSettings(selection: str = '', axis: str | None = None)[source]#

Bases: Settings

Parameters:
  • selection (str)

  • axis (str | None)

selection: str = ''#

See ezmsg.sigproc.slicer.parse_slice for details.

Type:

selection

axis: str | None = None#

The name of the axis to slice along. If None, the last axis is used.

__init__(selection='', axis=None)#
Parameters:
  • selection (str)

  • axis (str | None)

Return type:

None

class SlicerState[source]#

Bases: object

slice_: slice | int | ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#
new_axis: AxisBase | None = None#
b_change_dims: bool = False#
class SlicerTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SlicerSettings, AxisArray, AxisArray, SlicerState]

class Slicer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SlicerSettings, AxisArray, AxisArray, SlicerTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of SlicerSettings

slicer(selection='', axis=None)[source]#

Slice along a particular axis.

Parameters:
Returns:

SlicerTransformer

Return type:

SlicerTransformer

ezmsg.sigproc.spectrum#

class OptionsEnum(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: Enum

classmethod options()[source]#
class WindowFunction(*values)[source]#

Bases: OptionsEnum

Windowing function prior to calculating spectrum.

NONE = 'None (Rectangular)'#

None.

HAMMING = 'Hamming'#

numpy.hamming

HANNING = 'Hanning'#

numpy.hanning

BARTLETT = 'Bartlett'#

numpy.bartlett

BLACKMAN = 'Blackman'#

numpy.blackman

class SpectralTransform(*values)[source]#

Bases: OptionsEnum

Additional transformation functions to apply to the spectral result.

RAW_COMPLEX = 'Complex FFT Output'#
REAL = 'Real Component of FFT'#
IMAG = 'Imaginary Component of FFT'#
REL_POWER = 'Relative Power'#
REL_DB = 'Log Power (Relative dB)'#
class SpectralOutput(*values)[source]#

Bases: OptionsEnum

The expected spectral contents.

FULL = 'Full Spectrum'#
POSITIVE = 'Positive Frequencies'#
NEGATIVE = 'Negative Frequencies'#
class SpectrumSettings(axis=None, out_axis='freq', window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE, norm='forward', do_fftshift=True, nfft=None)[source]#

Bases: Settings

Settings for Spectrum. See :obj:`spectrum for a description of the parameters.

Parameters:
axis: str | None = None#

The name of the axis on which to calculate the spectrum. Note: The axis must have an .axes entry of type LinearAxis, not CoordinateAxis.

out_axis: str | None = 'freq'#

The name of the new axis. Defaults to “freq”. If none; don’t change dim name

window: WindowFunction = 'Hamming'#

The WindowFunction to apply to the data slice prior to calculating the spectrum.

transform: SpectralTransform = 'Log Power (Relative dB)'#

The SpectralTransform to apply to the spectral magnitude.

output: SpectralOutput = 'Positive Frequencies'#

The SpectralOutput format.

norm: str | None = 'forward'#

Normalization mode. Default “forward” is best used when the inverse transform is not needed, for example when the goal is to get spectral power. Use “backward” (equivalent to None) to not scale the spectrum which is useful when the spectra will be manipulated and possibly inverse-transformed. See numpy.fft.fft for details.

do_fftshift: bool = True#

Whether to apply fftshift to the output. Default is True. This value is ignored unless output is SpectralOutput.FULL.

nfft: int | None = None#

The number of points to use for the FFT. If None, the length of the input data is used.

__init__(axis=None, out_axis='freq', window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE, norm='forward', do_fftshift=True, nfft=None)#
Parameters:
Return type:

None

class SpectrumState[source]#

Bases: object

f_sl: slice | None = None#
freq_axis: LinearAxis | None = None#
fftfun: Callable | None = None#
f_transform: Callable | None = None#
new_dims: list[str] | None = None#
window: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#
class SpectrumTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SpectrumSettings, AxisArray, AxisArray, SpectrumState]

class Spectrum(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SpectrumSettings, AxisArray, AxisArray, SpectrumTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of SpectrumSettings

spectrum(axis=None, out_axis='freq', window=WindowFunction.HANNING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE, norm='forward', do_fftshift=True, nfft=None)[source]#

Calculate a spectrum on a data slice.

Returns:

A SpectrumTransformer object that expects an AxisArray via .(axis_array) (__call__) containing continuous data and returns an AxisArray with data of spectral magnitudes or powers.

Parameters:
Return type:

SpectrumTransformer

ezmsg.sigproc.spectrogram#

class SpectrogramSettings(window_dur=None, window_shift=None, window_anchor=Anchor.BEGINNING, window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE)[source]#

Bases: Settings

Settings for SpectrogramTransformer.

Parameters:
window_dur: float | None = None#

window duration in seconds.

window_shift: float | None = None#

“window step in seconds. If None, window_shift == window_dur

window_anchor: str | Anchor = 'beginning'#

obj”WindowTransformer

Type:

See

window: WindowFunction = 'Hamming'#

The WindowFunction to apply to the data slice prior to calculating the spectrum.

transform: SpectralTransform = 'Log Power (Relative dB)'#

The SpectralTransform to apply to the spectral magnitude.

__init__(window_dur=None, window_shift=None, window_anchor=Anchor.BEGINNING, window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE)#
Parameters:
Return type:

None

output: SpectralOutput = 'Positive Frequencies'#

The SpectralOutput format.

class SpectrogramTransformer(*args, **kwargs)[source]#

Bases: CompositeProcessor[SpectrogramSettings, AxisArray, AxisArray]

class Spectrogram(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SpectrogramSettings, AxisArray, AxisArray, SpectrogramTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of SpectrogramSettings

spectrogram(window_dur=None, window_shift=None, window_anchor=Anchor.BEGINNING, window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE)[source]#
Parameters:
Return type:

SpectrogramTransformer

ezmsg.sigproc.synth#

ezmsg.sigproc.window#

class Anchor(*values)[source]#

Bases: Enum

BEGINNING = 'beginning'#
END = 'end'#
MIDDLE = 'middle'#
class WindowSettings(axis: str | None = None, newaxis: str | None = None, window_dur: float | None = None, window_shift: float | None = None, zero_pad_until: str = 'full', anchor: str | Anchor = <Anchor.BEGINNING: 'beginning'>)[source]#

Bases: Settings

Parameters:
axis: str | None = None#
newaxis: str | None = None#
window_dur: float | None = None#
window_shift: float | None = None#
zero_pad_until: str = 'full'#
anchor: str | Anchor = 'beginning'#
__init__(axis=None, newaxis=None, window_dur=None, window_shift=None, zero_pad_until='full', anchor=Anchor.BEGINNING)#
Parameters:
Return type:

None

class WindowState[source]#

Bases: object

buffer: ndarray[tuple[Any, ...], dtype[_ScalarT]] | SparseArray | None = None#
window_samples: int | None = None#
window_shift_samples: int | None = None#
shift_deficit: int = 0#

Number of incoming samples to ignore. Only relevant when shift > window.

newaxis_warned: bool = False#
out_newaxis: LinearAxis | None = None#
out_dims: list[str] | None = None#
class WindowTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[WindowSettings, AxisArray, AxisArray, WindowState]

Apply a sliding window along the specified axis to input streaming data. The windowing method is perhaps the most useful and versatile method in ezmsg.sigproc, but its parameterization can be difficult. Please read the argument descriptions carefully.

__init__(*args, **kwargs)[source]#
Parameters:
  • axis – The axis along which to segment windows. If None, defaults to the first dimension of the first seen AxisArray. Note: The windowed axis must be an AxisArray.LinearAxis, not an AxisArray.CoordinateAxis.

  • newaxis – New axis on which windows are delimited, immediately preceding the target windowed axis. The data length along newaxis may be 0 if this most recent push did not provide enough data for a new window. If window_shift is None then the newaxis length will always be 1.

  • window_dur – The duration of the window in seconds. If None, the function acts as a passthrough and all other parameters are ignored.

  • window_shift – The shift of the window in seconds. If None (default), windowing operates in “1:1 mode”, where each input yields exactly one most-recent window.

  • zero_pad_until

    Determines how the function initializes the buffer. Can be one of “input” (default), “full”, “shift”, or “none”. If window_shift is None then this field is ignored and “input” is always used.

    • ”input” (default) initializes the buffer with the input then prepends with zeros to the window size. The first input will always yield at least one output.

    • ”shift” fills the buffer until window_shift. No outputs will be yielded until at least window_shift data has been seen.

    • ”none” does not pad the buffer. No outputs will be yielded until at least window_dur data has been seen.

  • anchor – Determines the entry in axis that gets assigned 0, which references the value in newaxis. Can be of class Anchor or a string representation of an Anchor.

Return type:

None

class Window(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[WindowSettings, AxisArray, AxisArray, WindowTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of WindowSettings

INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
async on_signal(message)[source]#

override superclass on_signal so we can opt to yield once or multiple times after dropping the win axis.

Parameters:

message (AxisArray)

Return type:

AsyncGenerator

windowing(axis=None, newaxis=None, window_dur=None, window_shift=None, zero_pad_until='full', anchor=Anchor.BEGINNING)[source]#
Parameters:
Return type:

WindowTransformer