In-Built Signal Processing Modules#

Here is the API reference for the in-built signal processing modules included in the ezmsg-sigproc extension.

ezmsg.sigproc.activation#

class ActivationFunction(*values)[source]#

Bases: OptionsEnum

Activation (transformation) function.

NONE = 'none'#

None.

SIGMOID = 'sigmoid'#

scipy.special.expit

EXPIT = 'expit'#

scipy.special.expit

LOGIT = 'logit'#

scipy.special.logit

LOGEXPIT = 'log_expit'#

scipy.special.log_expit

class ActivationSettings(function: str | ezmsg.sigproc.activation.ActivationFunction = <ActivationFunction.NONE: 'none'>)[source]#

Bases: Settings

Parameters:

function (str | ActivationFunction)

__init__(function=ActivationFunction.NONE)#
Parameters:

function (str | ActivationFunction)

Return type:

None

function: str | ActivationFunction = 'none'#

An enum value from ActivationFunction or a string representing the activation function. Possible values are: SIGMOID, EXPIT, LOGIT, LOGEXPIT, “sigmoid”, “expit”, “logit”, “log_expit”. SIGMOID and EXPIT are equivalent. See scipy.special.expit for more details.

class ActivationTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformer[ActivationSettings, AxisArray, AxisArray]

Parameters:

settings (SettingsType)

class Activation(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[ActivationSettings, AxisArray, AxisArray, ActivationTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of ActivationSettings

activation(function)[source]#

Transform the data with a simple activation function.

Parameters:

function (str | ActivationFunction) – An enum value from ActivationFunction or a string representing the activation function. Possible values are: SIGMOID, EXPIT, LOGIT, LOGEXPIT, “sigmoid”, “expit”, “logit”, “log_expit”. SIGMOID and EXPIT are equivalent. See scipy.special.expit for more details.

Return type:

ActivationTransformer

Returns: ActivationTransformer

ezmsg.sigproc.affinetransform#

class AffineTransformSettings(weights, axis=None, right_multiply=True)[source]#

Bases: Settings

Settings for AffineTransform. See affine_transform for argument details.

Parameters:
weights: ndarray | str | Path#

An array of weights or a path to a file with weights compatible with np.loadtxt.

axis: str | None = None#

The name of the axis to apply the transformation to. Defaults to the leading (0th) axis in the array.

right_multiply: bool = True#

Set False to transpose the weights before applying.

__init__(weights, axis=None, right_multiply=True)#
Parameters:
Return type:

None

class AffineTransformState[source]#

Bases: object

weights: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#
new_axis: AxisBase | None = None#
class AffineTransformTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[AffineTransformSettings, AxisArray, AxisArray, AffineTransformState]

class AffineTransform(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[AffineTransformSettings, AxisArray, AxisArray, AffineTransformTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of AffineTransformSettings

affine_transform(weights, axis=None, right_multiply=True)[source]#

Perform affine transformations on streaming data.

Parameters:
  • weights (ndarray | str | Path) – An array of weights or a path to a file with weights compatible with np.loadtxt.

  • axis (str | None) – The name of the axis to apply the transformation to. Defaults to the leading (0th) axis in the array.

  • right_multiply (bool) – Set False to transpose the weights before applying.

Returns:

AffineTransformTransformer.

Return type:

AffineTransformTransformer

zeros_for_noop(data, **ignore_kwargs)[source]#
Parameters:

data (ndarray[tuple[Any, ...], dtype[_ScalarT]])

Return type:

ndarray[tuple[Any, …], dtype[_ScalarT]]

class CommonRereferenceSettings(mode='mean', axis=None, include_current=True)[source]#

Bases: Settings

Settings for CommonRereference

Parameters:
  • mode (str)

  • axis (str | None)

  • include_current (bool)

mode: str = 'mean'#

The statistical mode to apply – either “mean” or “median”.

axis: str | None = None#

The name of the axis to apply the transformation to.

__init__(mode='mean', axis=None, include_current=True)#
Parameters:
  • mode (str)

  • axis (str | None)

  • include_current (bool)

Return type:

None

include_current: bool = True#

Set False to exclude each channel from participating in the calculation of its reference.

class CommonRereferenceTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformer[CommonRereferenceSettings, AxisArray, AxisArray]

Parameters:

settings (SettingsType)

class CommonRereference(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[CommonRereferenceSettings, AxisArray, AxisArray, CommonRereferenceTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of CommonRereferenceSettings

common_rereference(mode='mean', axis=None, include_current=True)[source]#

Perform common average referencing (CAR) on streaming data.

Parameters:
  • mode (str) – The statistical mode to apply – either “mean” or “median”

  • axis (str | None) – The name of hte axis to apply the transformation to.

  • include_current (bool) – Set False to exclude each channel from participating in the calculation of its reference.

Returns:

CommonRereferenceTransformer

Return type:

CommonRereferenceTransformer

ezmsg.sigproc.aggregate#

class AggregationFunction(*values)[source]#

Bases: OptionsEnum

Enum for aggregation functions available to be used in ranged_aggregate operation.

NONE = 'None (all)'#
MAX = 'max'#
MIN = 'min'#
MEAN = 'mean'#
MEDIAN = 'median'#
STD = 'std'#
SUM = 'sum'#
NANMAX = 'nanmax'#
NANMIN = 'nanmin'#
NANMEAN = 'nanmean'#
NANMEDIAN = 'nanmedian'#
NANSTD = 'nanstd'#
NANSUM = 'nansum'#
ARGMIN = 'argmin'#
ARGMAX = 'argmax'#
TRAPEZOID = 'trapezoid'#
class RangedAggregateSettings(axis=None, bands=None, operation=AggregationFunction.MEAN)[source]#

Bases: Settings

Settings for RangedAggregate.

Parameters:
axis: str | None = None#

The name of the axis along which to apply the bands.

bands: list[tuple[float, float]] | None = None#

[(band1_min, band1_max), (band2_min, band2_max), …] If not set then this acts as a passthrough node.

operation: AggregationFunction = 'mean'#

AggregationFunction to apply to each band.

__init__(axis=None, bands=None, operation=AggregationFunction.MEAN)#
Parameters:
Return type:

None

class RangedAggregateState[source]#

Bases: object

slices: list[tuple[Any, ...]] | None = None#
out_axis: AxisBase | None = None#
ax_vec: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#
class RangedAggregateTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[RangedAggregateSettings, AxisArray, AxisArray, RangedAggregateState]

class RangedAggregate(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[RangedAggregateSettings, AxisArray, AxisArray, RangedAggregateTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of RangedAggregateSettings

ranged_aggregate(axis=None, bands=None, operation=AggregationFunction.MEAN)[source]#

Apply an aggregation operation over one or more bands.

Parameters:
  • axis (str | None) – The name of the axis along which to apply the bands.

  • bands (list[tuple[float, float]] | None) – [(band1_min, band1_max), (band2_min, band2_max), …] If not set then this acts as a passthrough node.

  • operation (AggregationFunction) – AggregationFunction to apply to each band.

Returns:

RangedAggregateTransformer

Return type:

RangedAggregateTransformer

ezmsg.sigproc.bandpower#

class BandPowerSettings(spectrogram_settings=<factory>, bands=<factory>, aggregation=AggregationFunction.MEAN)[source]#

Bases: Settings

Settings for BandPower.

Parameters:
spectrogram_settings: SpectrogramSettings#

Settings for spectrogram calculation.

bands: list[tuple[float, float]] | None#

(min, max) tuples of band limits in Hz.

aggregation: AggregationFunction = 'mean'#

AggregationFunction to apply to each band.

__init__(spectrogram_settings=<factory>, bands=<factory>, aggregation=AggregationFunction.MEAN)#
Parameters:
Return type:

None

class BandPowerTransformer(*args, **kwargs)[source]#

Bases: CompositeProcessor[BandPowerSettings, AxisArray, AxisArray]

class BandPower(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[BandPowerSettings, AxisArray, AxisArray, BandPowerTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of BandPowerSettings

bandpower(spectrogram_settings, bands=[(17, 30), (70, 170)], aggregation=AggregationFunction.MEAN)[source]#

Calculate the average spectral power in each band.

Returns:

BandPowerTransformer

Parameters:
Return type:

BandPowerTransformer

ezmsg.sigproc.filter#

class FilterCoefficients(b: numpy.ndarray = <factory>, a: numpy.ndarray = <factory>)[source]#

Bases: object

Parameters:
b: ndarray#
a: ndarray#
__init__(b=<factory>, a=<factory>)#
Parameters:
Return type:

None

class FilterBaseSettings(axis: str | None = None, coef_type: str = 'ba')[source]#

Bases: Settings

Parameters:
  • axis (str | None)

  • coef_type (str)

axis: str | None = None#

The name of the axis to operate on.

coef_type: str = 'ba'#

The type of filter coefficients. One of “ba” or “sos”.

__init__(axis=None, coef_type='ba')#
Parameters:
  • axis (str | None)

  • coef_type (str)

Return type:

None

class FilterSettings(axis: str | None = None, coef_type: str = 'ba', coefs: FilterCoefficients | None = None)[source]#

Bases: FilterBaseSettings

Parameters:
coefs: FilterCoefficients | None = None#

The pre-calculated filter coefficients.

__init__(axis=None, coef_type='ba', coefs=None)#
Parameters:
Return type:

None

class FilterState[source]#

Bases: object

zi: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#
class FilterTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[FilterSettings, AxisArray, AxisArray, FilterState]

Filter data using the provided coefficients.

update_coefficients(coefs, coef_type=None)[source]#

Update filter coefficients.

If the new coefficients have the same length as the current ones, only the coefficients are updated. If the lengths differ, the filter state is also reset to handle the new filter order.

Parameters:
Return type:

None

class Filter(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[FilterSettings, AxisArray, AxisArray, FilterTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of FilterSettings

filtergen(axis, coefs, coef_type)[source]#

Filter data using the provided coefficients.

Returns:

FilterTransformer.

Parameters:
Return type:

FilterTransformer

class FilterByDesignState[source]#

Bases: object

filter: FilterTransformer | None = None#
needs_redesign: bool = False#
class FilterByDesignTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SettingsType, AxisArray, AxisArray, FilterByDesignState], ABC, Generic[SettingsType, FilterCoefsType]

Abstract base class for filter design transformers.

classmethod get_message_type(dir)[source]#
Parameters:

dir (str)

Return type:

type[AxisArray]

abstractmethod get_design_function()[source]#

Return a function that takes sampling frequency and returns filter coefficients.

Return type:

Callable[[float], FilterCoefsType | None]

update_settings(new_settings=None, **kwargs)[source]#

Update settings and mark that filter coefficients need to be recalculated.

Parameters:
  • new_settings (SettingsType | None) – Complete new settings object to replace current settings

  • **kwargs – Individual settings to update

Return type:

None

class BaseFilterByDesignTransformerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SettingsType, AxisArray, AxisArray, FilterByDesignTransformer], Generic[SettingsType, TransformerType]

Parameters:

settings (Settings | None)

async on_settings(msg)[source]#

Receive a settings message, override self.SETTINGS, and re-create the processor. Child classes that wish to have fine-grained control over whether the core processor resets on settings changes should override this method.

Parameters:

msg (SettingsType) – a settings message.

Return type:

None

ezmsg.sigproc.butterworthfilter#

class ButterworthFilterSettings(axis=None, coef_type='ba', order=0, cuton=None, cutoff=None, wn_hz=True)[source]#

Bases: FilterBaseSettings

Settings for ButterworthFilter.

Parameters:
order: int = 0#

Filter order

cuton: float | None = None#

Cuton frequency (Hz). If cutoff is not specified then this is the highpass corner. Otherwise, if this is lower than cutoff then this is the beginning of the bandpass or if this is greater than cutoff then this is the end of the bandstop.

cutoff: float | None = None#

Cutoff frequency (Hz). If cuton is not specified then this is the lowpass corner. Otherwise, if this is greater than cuton then this is the end of the bandpass, or if this is less than cuton then this is the beginning of the bandstop.

wn_hz: bool = True#

Set False if provided Wn are normalized from 0 to 1, where 1 is the Nyquist frequency

filter_specs()[source]#

Determine the filter type given the corner frequencies.

Returns:

A tuple with the first element being a string indicating the filter type (one of “lowpass”, “highpass”, “bandpass”, “bandstop”) and the second element being the corner frequency or frequencies.

Return type:

tuple[str, float | tuple[float, float]] | None

__init__(axis=None, coef_type='ba', order=0, cuton=None, cutoff=None, wn_hz=True)#
Parameters:
Return type:

None

butter_design_fun(fs, order=0, cuton=None, cutoff=None, coef_type='ba', wn_hz=True)[source]#

See ButterworthFilterSettings.filter_specs for an explanation of specifying different filter types (lowpass, highpass, bandpass, bandstop) from the parameters. You are likely to want to use this function with filter_by_design, which only passes fs to the design function (this), meaning that you should wrap this function with a lambda or prepare with functools.partial.

Parameters:
  • fs (float) – The sampling frequency of the data in Hz.

  • order (int) – Filter order.

  • cuton (float | None) – Corner frequency of the filter in Hz.

  • cutoff (float | None) – Corner frequency of the filter in Hz.

  • coef_type (str) – “ba”, “sos”, or “zpk”

  • wn_hz (bool) – Set False if provided Wn are normalized from 0 to 1, where 1 is the Nyquist frequency

Returns:

The filter coefficients as a tuple of (b, a) for coef_type “ba”, or as a single ndarray for “sos”, or (z, p, k) for “zpk”.

Return type:

tuple[ndarray[tuple[Any, …], dtype[_ScalarT]], ndarray[tuple[Any, …], dtype[_ScalarT]]] | ndarray[tuple[Any, …], dtype[_ScalarT]] | None

class ButterworthFilterTransformer(*args, **kwargs)[source]#

Bases: FilterByDesignTransformer[ButterworthFilterSettings, tuple[ndarray[tuple[Any, …], dtype[_ScalarT]], ndarray[tuple[Any, …], dtype[_ScalarT]]] | ndarray[tuple[Any, …], dtype[_ScalarT]]]

get_design_function()[source]#

Return a function that takes sampling frequency and returns filter coefficients.

Return type:

Callable[[float], tuple[ndarray[tuple[Any, …], dtype[_ScalarT]], ndarray[tuple[Any, …], dtype[_ScalarT]]] | ndarray[tuple[Any, …], dtype[_ScalarT]] | None]

class ButterworthFilter(*args, settings=None, **kwargs)[source]#

Bases: BaseFilterByDesignTransformerUnit[ButterworthFilterSettings, ButterworthFilterTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of ButterworthFilterSettings

butter(axis, order=0, cuton=None, cutoff=None, coef_type='ba', wn_hz=True)[source]#

Convenience generator wrapping filter_gen_by_design for Butterworth filters. Apply Butterworth filter to streaming data. Uses scipy.signal.butter to design the filter. See ButterworthFilterSettings.filter_specs for an explanation of specifying different filter types (lowpass, highpass, bandpass, bandstop) from the parameters.

Returns:

ButterworthFilterTransformer

Parameters:
Return type:

ButterworthFilterTransformer

ezmsg.sigproc.decimate#

class ChebyForDecimateTransformer(*args, **kwargs)[source]#

Bases: ChebyshevFilterTransformer[tuple[ndarray[tuple[Any, …], dtype[_ScalarT]], ndarray[tuple[Any, …], dtype[_ScalarT]]] | ndarray[tuple[Any, …], dtype[_ScalarT]]]

A ChebyshevFilterTransformer with a design filter method that additionally accepts a target sampling rate,

and if the target rate cannot be achieved it returns None, else it returns the filter coefficients.

get_design_function()[source]#

Return a function that takes sampling frequency and returns filter coefficients.

Return type:

Callable[[float], tuple[ndarray[tuple[Any, …], dtype[_ScalarT]], ndarray[tuple[Any, …], dtype[_ScalarT]]] | ndarray[tuple[Any, …], dtype[_ScalarT]] | None]

class ChebyForDecimate(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[ChebyshevFilterSettings, AxisArray, AxisArray, ChebyForDecimateTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of ChebyshevFilterSettings

class Decimate(*args, settings=None, **kwargs)[source]#

Bases: Collection

A Collection chaining a Filter node configured as a lowpass Chebyshev filter and a Downsample node.

Parameters:

settings (Settings | None)

SETTINGS#

alias of DownsampleSettings

INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
FILTER = <ezmsg.sigproc.decimate.ChebyForDecimate object>#
DOWNSAMPLE = <ezmsg.sigproc.downsample.Downsample object>#
configure()[source]#

A lifecycle hook that runs when the Collection is instantiated. This is the best place to call Unit.apply_settings() on each member Unit of the Collection.

Return type:

None

network()[source]#

Override this method and have the definition return a NetworkDefinition which defines how InputStream and OutputStream from member Unit s will be connected.

Return type:

Iterable[Tuple[Stream | str, Stream | str]]

ezmsg.sigproc.downsample#

class DownsampleSettings(axis='time', target_rate=None, factor=None)[source]#

Bases: Settings

Settings for Downsample node.

Parameters:
  • axis (str)

  • target_rate (float | None)

  • factor (int | None)

axis: str = 'time'#

The name of the axis along which to downsample.

target_rate: float | None = None#

Desired rate after downsampling. The actual rate will be the nearest integer factor of the input rate that is the same or higher than the target rate.

factor: int | None = None#

Explicitly specify downsample factor. If specified, target_rate is ignored.

__init__(axis='time', target_rate=None, factor=None)#
Parameters:
  • axis (str)

  • target_rate (float | None)

  • factor (int | None)

Return type:

None

class DownsampleState[source]#

Bases: object

q: int = 0#

The integer downsampling factor. It will be determined based on the target rate.

s_idx: int = 0#

Index of the next msg’s first sample into the virtual rotating ds_factor counter.

class DownsampleTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[DownsampleSettings, AxisArray, AxisArray, DownsampleState]

Downsampled data simply comprise every factor`th sample. This should only be used following appropriate lowpass filtering. If your pipeline does not already have lowpass filtering then consider using the :obj:`Decimate collection instead.

class Downsample(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[DownsampleSettings, AxisArray, AxisArray, DownsampleTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of DownsampleSettings

downsample(axis='time', target_rate=None, factor=None)[source]#
Parameters:
  • axis (str)

  • target_rate (float | None)

  • factor (int | None)

Return type:

DownsampleTransformer

ezmsg.sigproc.ewmfilter#

class EWMSettings(axis: str | None = None, zero_offset: bool = True)[source]#

Bases: Settings

Parameters:
  • axis (str | None)

  • zero_offset (bool)

axis: str | None = None#

Name of the axis to accumulate.

zero_offset: bool = True#

If true, we assume zero DC offset for input data.

__init__(axis=None, zero_offset=True)#
Parameters:
  • axis (str | None)

  • zero_offset (bool)

Return type:

None

class EWMState[source]#

Bases: State

buffer_queue: Queue[AxisArray]#
signal_queue: Queue[AxisArray]#
class EWM(*args, settings=None, **kwargs)[source]#

Bases: Unit

Exponentially Weighted Moving Average Standardization. This is deprecated. Please use ezmsg.sigproc.scaler.AdaptiveStandardScaler instead.

References https://stackoverflow.com/a/42926270

Parameters:

settings (Settings | None)

SETTINGS#

alias of EWMSettings

STATE#

alias of EWMState

INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
INPUT_BUFFER = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

async on_signal(message)[source]#
Parameters:

message (AxisArray)

Return type:

None

async on_buffer(message)[source]#
Parameters:

message (AxisArray)

Return type:

None

async sync_output()[source]#
Return type:

AsyncGenerator

class EWMFilterSettings(history_dur: float, axis: str | None = None, zero_offset: bool = True)[source]#

Bases: Settings

Parameters:
history_dur: float#

Previous data to accumulate for standardization.

axis: str | None = None#

Name of the axis to accumulate.

zero_offset: bool = True#

If true, we assume zero DC offset for input data.

__init__(history_dur, axis=None, zero_offset=True)#
Parameters:
Return type:

None

class EWMFilter(*args, settings=None, **kwargs)[source]#

Bases: Collection

A Collection that splits the input into a branch that leads to Window which then feeds into EWM ‘s INPUT_BUFFER and another branch that feeds directly into EWM ‘s INPUT_SIGNAL.

This is deprecated. Please use ezmsg.sigproc.scaler.AdaptiveStandardScaler instead.

Parameters:

settings (Settings | None)

SETTINGS#

alias of EWMFilterSettings

INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
WINDOW = <ezmsg.sigproc.window.Window object>#
EWM = <ezmsg.sigproc.ewmfilter.EWM object>#
configure()[source]#

A lifecycle hook that runs when the Collection is instantiated. This is the best place to call Unit.apply_settings() on each member Unit of the Collection.

Return type:

None

network()[source]#

Override this method and have the definition return a NetworkDefinition which defines how InputStream and OutputStream from member Unit s will be connected.

Return type:

Iterable[Tuple[Stream | str, Stream | str]]

ezmsg.sigproc.math#

class ClipSettings(a_min: float, a_max: float)[source]#

Bases: Settings

Parameters:
a_min: float#

Lower clip bound.

a_max: float#

Upper clip bound.

__init__(a_min, a_max)#
Parameters:
Return type:

None

class ClipTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformer[ClipSettings, AxisArray, AxisArray]

Parameters:

settings (SettingsType)

class Clip(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[ClipSettings, AxisArray, AxisArray, ClipTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of ClipSettings

clip(a_min, a_max)[source]#

Clips the data to be within the specified range. See np.clip for more details.

Parameters:
  • a_min (float) – Lower clip bound

  • a_max (float) – Upper clip bound

Return type:

ClipTransformer

Returns: ClipTransformer.

class ConstDifferenceSettings(value: float = 0.0, subtrahend: bool = True)[source]#

Bases: Settings

Parameters:
value: float = 0.0#

number to subtract or be subtracted from the input data

subtrahend: bool = True#

If True (default) then value is subtracted from the input data. If False, the input data is subtracted from value.

__init__(value=0.0, subtrahend=True)#
Parameters:
Return type:

None

class ConstDifferenceTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformer[ConstDifferenceSettings, AxisArray, AxisArray]

Parameters:

settings (SettingsType)

class ConstDifference(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[ConstDifferenceSettings, AxisArray, AxisArray, ConstDifferenceTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of ConstDifferenceSettings

const_difference(value=0.0, subtrahend=True)[source]#

result = (in_data - value) if subtrahend else (value - in_data) https://en.wikipedia.org/wiki/Template:Arithmetic_operations

Parameters:
  • value (float) – number to subtract or be subtracted from the input data

  • subtrahend (bool) – If True (default) then value is subtracted from the input data. If False, the input data is subtracted from value.

Return type:

ConstDifferenceTransformer

Returns: ConstDifferenceTransformer.

class InvertTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformer[None, AxisArray, AxisArray]

Parameters:

settings (SettingsType)

class Invert(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[None, AxisArray, AxisArray, InvertTransformer]

Parameters:

settings (Settings | None)

invert()[source]#

Take the inverse of the data.

Returns: InvertTransformer.

Return type:

InvertTransformer

class LogSettings(base: float = 10.0, clip_zero: bool = False)[source]#

Bases: Settings

Parameters:
base: float = 10.0#

The base of the logarithm. Default is 10.

clip_zero: bool = False#

If True, clip the data to the minimum positive value of the data type before taking the log.

__init__(base=10.0, clip_zero=False)#
Parameters:
Return type:

None

class LogTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformer[LogSettings, AxisArray, AxisArray]

Parameters:

settings (SettingsType)

class Log(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[LogSettings, AxisArray, AxisArray, LogTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of LogSettings

log(base=10.0, clip_zero=False)[source]#

Take the logarithm of the data. See np.log for more details.

Parameters:
  • base (float) – The base of the logarithm. Default is 10.

  • clip_zero (bool) – If True, clip the data to the minimum positive value of the data type before taking the log.

Return type:

LogTransformer

Returns: LogTransformer.

class ScaleSettings(scale: float = 1.0)[source]#

Bases: Settings

Parameters:

scale (float)

scale: float = 1.0#

Factor by which to scale the data magnitude.

__init__(scale=1.0)#
Parameters:

scale (float)

Return type:

None

class ScaleTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformer[ScaleSettings, AxisArray, AxisArray]

Parameters:

settings (SettingsType)

class Scale(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[ScaleSettings, AxisArray, AxisArray, ScaleTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of ScaleSettings

scale(scale=1.0)[source]#

Scale the data by a constant factor.

Parameters:

scale (float) – Factor by which to scale the data magnitude.

Return type:

ScaleTransformer

Returns: ScaleTransformer

ezmsg.sigproc.sampler#

class SamplerSettings(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True, buffer_update_strategy='immediate')[source]#

Bases: Settings

Settings for Sampler. See sampler for a description of the fields.

Parameters:
buffer_dur: float#

The duration of the buffer in seconds. The buffer must be long enough to store the oldest sample to be included in a window. e.g., a trigger lagged by 0.5 seconds with a period of (-1.0, +1.5) will need a buffer of 0.5 + (1.5 - -1.0) = 3.0 seconds. It is best to at least double your estimate if memory allows.

axis: str | None = None#

The axis along which to sample the data. None (default) will choose the first axis in the first input. Note: (for now) the axis must exist in the msg .axes and be of type AxisArray.LinearAxis

period: tuple[float, float] | None = None#

Optional default period (in seconds) if unspecified in SampleTriggerMessage.

value: Any = None#

Optional default value if unspecified in SampleTriggerMessage

estimate_alignment: bool = True#
If true, use message timestamp fields and reported sampling rate to estimate

sample-accurate alignment for samples.

If false, sampling will be limited to incoming message rate – “Block timing” NOTE: For faster-than-realtime playback – Incoming timestamps must reflect “realtime” operation for estimate_alignment to operate correctly.

buffer_update_strategy: Literal['immediate', 'threshold', 'on_demand'] = 'immediate'#

The buffer update strategy. See ezmsg.sigproc.util.buffer.UpdateStrategy. If you expect to push data much more frequently than triggers, then “on_demand” might be more efficient. For most other scenarios, “immediate” is best.

__init__(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True, buffer_update_strategy='immediate')#
Parameters:
Return type:

None

class SamplerState[source]#

Bases: object

buffer: HybridAxisArrayBuffer | None = None#
triggers: deque[SampleTriggerMessage] | None = None#
class SamplerTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SamplerSettings, AxisArray, AxisArray, SamplerState]

push_trigger(message)[source]#
Parameters:

message (SampleTriggerMessage)

Return type:

list[SampleMessage]

class Sampler(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SamplerSettings, AxisArray, AxisArray, SamplerTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of SamplerSettings

INPUT_TRIGGER = InputStream:unlocated[<class 'ezmsg.sigproc.util.message.SampleTriggerMessage'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.sigproc.util.message.SampleMessage'>](self.num_buffers=32, self.force_tcp=False)#
async on_trigger(msg)[source]#
Parameters:

msg (SampleTriggerMessage)

Return type:

None

async on_signal(message)[source]#
Parameters:

message (AxisArray)

Return type:

AsyncGenerator

sampler(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True)[source]#

Sample data into a buffer, accept triggers, and return slices of sampled data around the trigger time.

Returns:

A generator that expects .send either an AxisArray containing streaming data messages, or a SampleTriggerMessage containing a trigger, and yields the list of SampleMessage s.

Parameters:
Return type:

SamplerTransformer

class TriggerGeneratorSettings(period: tuple[float, float], prewait: float = 0.5, publish_period: float = 5.0)[source]#

Bases: Settings

Parameters:
period: tuple[float, float]#

The period around the trigger event.

prewait: float = 0.5#

The time before the first trigger (sec)

__init__(period, prewait=0.5, publish_period=5.0)#
Parameters:
Return type:

None

publish_period: float = 5.0#

The period between triggers (sec)

class TriggerGeneratorState[source]#

Bases: object

output: int = 0#
class TriggerProducer(*args, **kwargs)[source]#

Bases: BaseStatefulProducer[TriggerGeneratorSettings, SampleTriggerMessage, TriggerGeneratorState]

class TriggerGenerator(*args, settings=None, **kwargs)[source]#

Bases: BaseProducerUnit[TriggerGeneratorSettings, SampleTriggerMessage, TriggerProducer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of TriggerGeneratorSettings

ezmsg.sigproc.scaler#

scaler(time_constant=1.0, axis=None)[source]#

Apply the adaptive standard scaler from https://riverml.xyz/latest/api/preprocessing/AdaptiveStandardScaler/ This is faster than scaler_np for single-channel data.

Parameters:
  • time_constant (float) – Decay constant tau in seconds.

  • axis (str | None) – The name of the axis to accumulate statistics over.

Returns:

A primed generator object that expects to be sent a AxisArray via .send(axis_array)

and yields an AxisArray with its data being a standardized, or “Z-scored” version of the input data.

Return type:

Generator[AxisArray, AxisArray, None]

class AdaptiveStandardScalerSettings(time_constant: float = 1.0, axis: str | None = None)[source]#

Bases: EWMASettings

Parameters:
  • time_constant (float)

  • axis (str | None)

__init__(time_constant=1.0, axis=None)#
Parameters:
  • time_constant (float)

  • axis (str | None)

Return type:

None

class AdaptiveStandardScalerState[source]#

Bases: object

samps_ewma: EWMATransformer | None = None#
vars_sq_ewma: EWMATransformer | None = None#
alpha: float | None = None#
class AdaptiveStandardScalerTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[AdaptiveStandardScalerSettings, AxisArray, AxisArray, AdaptiveStandardScalerState]

class AdaptiveStandardScaler(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[AdaptiveStandardScalerSettings, AxisArray, AxisArray, AdaptiveStandardScalerTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of AdaptiveStandardScalerSettings

scaler_np(time_constant=1.0, axis=None)[source]#
Parameters:
  • time_constant (float)

  • axis (str | None)

Return type:

AdaptiveStandardScalerTransformer

ezmsg.sigproc.signalinjector#

class SignalInjectorSettings(time_dim: str = 'time', frequency: float | None = None, amplitude: float = 1.0, mixing_seed: int | None = None)[source]#

Bases: Settings

Parameters:
  • time_dim (str)

  • frequency (float | None)

  • amplitude (float)

  • mixing_seed (int | None)

time_dim: str = 'time'#
frequency: float | None = None#
amplitude: float = 1.0#
mixing_seed: int | None = None#
__init__(time_dim='time', frequency=None, amplitude=1.0, mixing_seed=None)#
Parameters:
  • time_dim (str)

  • frequency (float | None)

  • amplitude (float)

  • mixing_seed (int | None)

Return type:

None

class SignalInjectorState[source]#

Bases: object

cur_shape: tuple[int, ...] | None = None#
cur_frequency: float | None = None#
cur_amplitude: float | None = None#
mixing: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#
class SignalInjectorTransformer(*args, **kwargs)[source]#

Bases: BaseAsyncTransformer[SignalInjectorSettings, AxisArray, AxisArray, SignalInjectorState]

class SignalInjector(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SignalInjectorSettings, AxisArray, AxisArray, SignalInjectorTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of SignalInjectorSettings

INPUT_FREQUENCY = InputStream:unlocated[float | None]()#
INPUT_AMPLITUDE = InputStream:unlocated[<class 'float'>]()#
async on_frequency(msg)[source]#
Parameters:

msg (float | None)

Return type:

None

async on_amplitude(msg)[source]#
Parameters:

msg (float)

Return type:

None

ezmsg.sigproc.slicer#

parse_slice(s, axinfo=None)[source]#

Parses a string representation of a slice and returns a tuple of slice objects.

  • “” -> slice(None, None, None) (take all)

  • “:” -> slice(None, None, None)

  • ‘“none”` (case-insensitive) -> slice(None, None, None)

  • “{start}:{stop}” or {start}:{stop}:{step} -> slice(start, stop, step)

  • “5” (or any integer) -> (5,). Take only that item.

    applying this to a ndarray or AxisArray will drop the dimension.

  • A comma-separated list of the above -> a tuple of slices | ints

  • A comma-separated list of values and axinfo is provided and is a CoordinateAxis -> a tuple of ints

Parameters:
  • s (str) – The string representation of the slice.

  • axinfo (CoordinateAxis | None) – (Optional) If provided, and of type CoordinateAxis, and s is a comma-separated list of values, then the values in s will be checked against the values in axinfo.data.

Returns:

A tuple of slice objects and/or ints.

Return type:

tuple[slice | int, …]

class SlicerSettings(selection: str = '', axis: str | None = None)[source]#

Bases: Settings

Parameters:
  • selection (str)

  • axis (str | None)

selection: str = ''#

See ezmsg.sigproc.slicer.parse_slice for details.

Type:

selection

axis: str | None = None#

The name of the axis to slice along. If None, the last axis is used.

__init__(selection='', axis=None)#
Parameters:
  • selection (str)

  • axis (str | None)

Return type:

None

class SlicerState[source]#

Bases: object

slice_: slice | int | ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#
new_axis: AxisBase | None = None#
b_change_dims: bool = False#
class SlicerTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SlicerSettings, AxisArray, AxisArray, SlicerState]

class Slicer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SlicerSettings, AxisArray, AxisArray, SlicerTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of SlicerSettings

slicer(selection='', axis=None)[source]#

Slice along a particular axis.

Parameters:
Returns:

SlicerTransformer

Return type:

SlicerTransformer

ezmsg.sigproc.spectrum#

class OptionsEnum(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: Enum

classmethod options()[source]#
class WindowFunction(*values)[source]#

Bases: OptionsEnum

Windowing function prior to calculating spectrum.

NONE = 'None (Rectangular)'#

None.

HAMMING = 'Hamming'#

numpy.hamming

HANNING = 'Hanning'#

numpy.hanning

BARTLETT = 'Bartlett'#

numpy.bartlett

BLACKMAN = 'Blackman'#

numpy.blackman

class SpectralTransform(*values)[source]#

Bases: OptionsEnum

Additional transformation functions to apply to the spectral result.

RAW_COMPLEX = 'Complex FFT Output'#
REAL = 'Real Component of FFT'#
IMAG = 'Imaginary Component of FFT'#
REL_POWER = 'Relative Power'#
REL_DB = 'Log Power (Relative dB)'#
class SpectralOutput(*values)[source]#

Bases: OptionsEnum

The expected spectral contents.

FULL = 'Full Spectrum'#
POSITIVE = 'Positive Frequencies'#
NEGATIVE = 'Negative Frequencies'#
class SpectrumSettings(axis=None, out_axis='freq', window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE, norm='forward', do_fftshift=True, nfft=None)[source]#

Bases: Settings

Settings for Spectrum. See :obj:`spectrum for a description of the parameters.

Parameters:
axis: str | None = None#

The name of the axis on which to calculate the spectrum. Note: The axis must have an .axes entry of type LinearAxis, not CoordinateAxis.

out_axis: str | None = 'freq'#

The name of the new axis. Defaults to “freq”. If none; don’t change dim name

window: WindowFunction = 'Hamming'#

The WindowFunction to apply to the data slice prior to calculating the spectrum.

transform: SpectralTransform = 'Log Power (Relative dB)'#

The SpectralTransform to apply to the spectral magnitude.

output: SpectralOutput = 'Positive Frequencies'#

The SpectralOutput format.

norm: str | None = 'forward'#

Normalization mode. Default “forward” is best used when the inverse transform is not needed, for example when the goal is to get spectral power. Use “backward” (equivalent to None) to not scale the spectrum which is useful when the spectra will be manipulated and possibly inverse-transformed. See numpy.fft.fft for details.

do_fftshift: bool = True#

Whether to apply fftshift to the output. Default is True. This value is ignored unless output is SpectralOutput.FULL.

nfft: int | None = None#

The number of points to use for the FFT. If None, the length of the input data is used.

__init__(axis=None, out_axis='freq', window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE, norm='forward', do_fftshift=True, nfft=None)#
Parameters:
Return type:

None

class SpectrumState[source]#

Bases: object

f_sl: slice | None = None#
freq_axis: LinearAxis | None = None#
fftfun: Callable | None = None#
f_transform: Callable | None = None#
new_dims: list[str] | None = None#
window: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#
class SpectrumTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SpectrumSettings, AxisArray, AxisArray, SpectrumState]

class Spectrum(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SpectrumSettings, AxisArray, AxisArray, SpectrumTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of SpectrumSettings

spectrum(axis=None, out_axis='freq', window=WindowFunction.HANNING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE, norm='forward', do_fftshift=True, nfft=None)[source]#

Calculate a spectrum on a data slice.

Returns:

A SpectrumTransformer object that expects an AxisArray via .(axis_array) (__call__) containing continuous data and returns an AxisArray with data of spectral magnitudes or powers.

Parameters:
Return type:

SpectrumTransformer

ezmsg.sigproc.spectrogram#

class SpectrogramSettings(window_dur=None, window_shift=None, window_anchor=Anchor.BEGINNING, window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE)[source]#

Bases: Settings

Settings for SpectrogramTransformer.

Parameters:
window_dur: float | None = None#

window duration in seconds.

window_shift: float | None = None#

“window step in seconds. If None, window_shift == window_dur

window_anchor: str | Anchor = 'beginning'#

obj”WindowTransformer

Type:

See

window: WindowFunction = 'Hamming'#

The WindowFunction to apply to the data slice prior to calculating the spectrum.

transform: SpectralTransform = 'Log Power (Relative dB)'#

The SpectralTransform to apply to the spectral magnitude.

__init__(window_dur=None, window_shift=None, window_anchor=Anchor.BEGINNING, window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE)#
Parameters:
Return type:

None

output: SpectralOutput = 'Positive Frequencies'#

The SpectralOutput format.

class SpectrogramTransformer(*args, **kwargs)[source]#

Bases: CompositeProcessor[SpectrogramSettings, AxisArray, AxisArray]

class Spectrogram(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SpectrogramSettings, AxisArray, AxisArray, SpectrogramTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of SpectrogramSettings

spectrogram(window_dur=None, window_shift=None, window_anchor=Anchor.BEGINNING, window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE)[source]#
Parameters:
Return type:

SpectrogramTransformer

ezmsg.sigproc.synth#

class AddState(queue_a: 'asyncio.Queue[AxisArray]' = <factory>, queue_b: 'asyncio.Queue[AxisArray]' = <factory>)[source]#

Bases: object

Parameters:
  • queue_a (Queue[AxisArray])

  • queue_b (Queue[AxisArray])

queue_a: Queue[AxisArray]#
queue_b: Queue[AxisArray]#
__init__(queue_a=<factory>, queue_b=<factory>)#
Parameters:
  • queue_a (Queue[AxisArray])

  • queue_b (Queue[AxisArray])

Return type:

None

class AddProcessor[source]#

Bases: object

__init__()[source]#
property state: AddState#
push_a(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

None

push_b(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

None

class Add(*args, settings=None, **kwargs)[source]#

Bases: Unit

Add two signals together. Assumes compatible/similar axes/dimensions.

Parameters:

settings (Settings | None)

INPUT_SIGNAL_A = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
INPUT_SIGNAL_B = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

async on_a(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

None

async on_b(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

None

async output()[source]#
Return type:

AsyncGenerator

class ClockSettings(dispatch_rate=None)[source]#

Bases: Settings

Settings for clock generator.

Parameters:

dispatch_rate (float | str | None)

dispatch_rate: float | str | None = None#

Dispatch rate in Hz, ‘realtime’, or None for external clock

__init__(dispatch_rate=None)#
Parameters:

dispatch_rate (float | str | None)

Return type:

None

class ClockState[source]#

Bases: object

State for clock generator.

t_0: float#
n_dispatch: int = 0#
class ClockProducer(*args, **kwargs)[source]#

Bases: BaseStatefulProducer[ClockSettings, Flag, ClockState]

Produces clock ticks at specified rate. Can be used to drive periodic operations.

aclock(dispatch_rate)[source]#

Construct an async generator that yields events at a specified rate.

Returns:

A ClockProducer object.

Parameters:

dispatch_rate (float | None)

Return type:

ClockProducer

clock(dispatch_rate)#

Alias for aclock expected by synchronous methods. ClockProducer can be used in sync or async.

Parameters:

dispatch_rate (float | None)

Return type:

ClockProducer

class Clock(*args, settings=None, **kwargs)[source]#

Bases: BaseProducerUnit[ClockSettings, Flag, ClockProducer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of ClockSettings

async produce()[source]#
Return type:

AsyncGenerator

class CounterSettings(n_time, fs, n_ch=1, dispatch_rate=None, mod=None)[source]#

Bases: Settings

Settings for Counter. See acounter for a description of the parameters.

Parameters:
n_time: int#

Number of samples to output per block.

fs: float#

Sampling rate of signal output in Hz

n_ch: int = 1#

Number of channels to synthesize

dispatch_rate: float | str | None = None#

Message dispatch rate (Hz), ‘realtime’, ‘ext_clock’, or None (fast as possible) Note: if dispatch_rate is a float then time offsets will be synthetic and the system will run faster or slower than wall clock time.

mod: int | None = None#

If set to an integer, counter will rollover

__init__(n_time, fs, n_ch=1, dispatch_rate=None, mod=None)#
Parameters:
Return type:

None

class CounterState[source]#

Bases: object

State for counter generator.

counter_start: int = 0#

next sample’s first value

n_sent: int = 0#

number of samples sent

clock_zero: float | None = None#

time of first sample

timer_type: str = 'unspecified'#

“realtime” | “ext_clock” | “manual” | “unspecified”

new_generator: Event | None = None#

Event to signal the counter has been reset.

class CounterProducer(*args, **kwargs)[source]#

Bases: BaseStatefulProducer[CounterSettings, AxisArray, CounterState]

Produces incrementing integer blocks as AxisArray.

classmethod get_message_type(dir)[source]#
Parameters:

dir (str)

Return type:

type[AxisArray] | None

__init__(*args, **kwargs)[source]#
acounter(n_time, fs, n_ch=1, dispatch_rate=None, mod=None)[source]#

Construct an asynchronous generator to generate AxisArray objects at a specified rate and with the specified sampling rate.

NOTE: This module uses asyncio.sleep to delay appropriately in realtime mode. This method of sleeping/yielding execution priority has quirky behavior with sub-millisecond sleep periods which may result in unexpected behavior (e.g. fs = 2000, n_time = 1, realtime = True – may result in ~1400 msgs/sec)

Returns:

An asynchronous generator.

Parameters:
Return type:

CounterProducer

class Counter(*args, settings=None, **kwargs)[source]#

Bases: BaseProducerUnit[CounterSettings, AxisArray, CounterProducer]

Generates monotonically increasing counter. Unit for CounterProducer.

Parameters:

settings (Settings | None)

SETTINGS#

alias of CounterSettings

INPUT_CLOCK = InputStream:unlocated[<class 'ezmsg.core.message.Flag'>]()#
async on_clock(_)[source]#
Parameters:

_ (Flag)

async produce()[source]#

Generate counter output. This is an infinite loop, but we will likely only enter the loop once if we are self-timed, and twice if we are using an external clock.

When using an internal clock, we enter the loop, and wait for the event which should have been reset upon initialization then we immediately clear, then go to the internal loop that will async call __acall__ to let the internal timer determine when to produce an output.

When using an external clock, we enter the loop, and wait for the event which should have been reset upon initialization then we immediately clear, then we hit continue to loop back around and wait for the event to be set again – potentially forever. In this case, it is expected that on_clock will be called to produce the output.

Return type:

AsyncGenerator

class SinGeneratorSettings(axis='time', freq=1.0, amp=1.0, phase=0.0)[source]#

Bases: Settings

Settings for SinGenerator. See sin for parameter descriptions.

Parameters:
axis: str | None = 'time'#

The name of the axis over which the sinusoid passes. Note: The axis must exist in the msg.axes and be of type AxisArray.LinearAxis.

freq: float = 1.0#

The frequency of the sinusoid, in Hz.

amp: float = 1.0#

The amplitude of the sinusoid.

phase: float = 0.0#

The initial phase of the sinusoid, in radians.

__init__(axis='time', freq=1.0, amp=1.0, phase=0.0)#
Parameters:
Return type:

None

class SinTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformer[SinGeneratorSettings, AxisArray, AxisArray]

Transforms counter values into sinusoidal waveforms.

Parameters:

settings (SettingsType)

class SinGenerator(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SinGeneratorSettings, AxisArray, AxisArray, SinTransformer]

Unit for generating sinusoidal waveforms.

Parameters:

settings (Settings | None)

SETTINGS#

alias of SinGeneratorSettings

sin(axis='time', freq=1.0, amp=1.0, phase=0.0)[source]#

Construct a generator of sinusoidal waveforms in AxisArray objects.

Returns:

A primed generator that expects .send(axis_array) of sample counts and yields an AxisArray of sinusoids.

Parameters:
Return type:

SinTransformer

class RandomGeneratorSettings(loc: float = 0.0, scale: float = 1.0)[source]#

Bases: Settings

Parameters:
loc: float = 0.0#

loc argument for numpy.random.normal

scale: float = 1.0#

scale argument for numpy.random.normal

__init__(loc=0.0, scale=1.0)#
Parameters:
Return type:

None

class RandomTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformer[RandomGeneratorSettings, AxisArray, AxisArray]

Replaces input data with random data and returns the result.

Parameters:

settings (SettingsType)

__init__(*args, settings=None, **kwargs)[source]#
Parameters:

settings (RandomGeneratorSettings | None)

class RandomGenerator(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[RandomGeneratorSettings, AxisArray, AxisArray, RandomTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of RandomGeneratorSettings

class OscillatorSettings(n_time, fs, n_ch=1, dispatch_rate=None, freq=1.0, amp=1.0, phase=0.0, sync=False)[source]#

Bases: Settings

Settings for Oscillator

Parameters:
n_time: int#

Number of samples to output per block.

fs: float#

Sampling rate of signal output in Hz

n_ch: int = 1#

Number of channels to output per block

dispatch_rate: float | str | None = None#

(Hz) | ‘realtime’ | ‘ext_clock’

freq: float = 1.0#

Oscillation frequency in Hz

amp: float = 1.0#

Amplitude

phase: float = 0.0#

Phase offset (in radians)

sync: bool = False#

Adjust freq to sync with sampling rate

__init__(n_time, fs, n_ch=1, dispatch_rate=None, freq=1.0, amp=1.0, phase=0.0, sync=False)#
Parameters:
Return type:

None

class OscillatorProducer(*args, **kwargs)[source]#

Bases: CompositeProducer[OscillatorSettings, AxisArray]

class BaseCounterFirstProducerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProducerUnit[SettingsType, MessageOutType, ProducerType], Generic[SettingsType, MessageInType, MessageOutType, ProducerType]

Base class for units whose primary processor is a composite producer with a CounterProducer as the first processor (producer) in the chain.

Parameters:

settings (Settings | None)

INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
create_producer()[source]#

Create the producer instance from settings.

async on_signal(_)[source]#
Parameters:

_ (Flag)

async produce()[source]#
Return type:

AsyncGenerator

class Oscillator(*args, settings=None, **kwargs)[source]#

Bases: BaseCounterFirstProducerUnit[OscillatorSettings, AxisArray, AxisArray, OscillatorProducer]

Generates sinusoidal waveforms using a counter and sine transformer.

Parameters:

settings (Settings | None)

SETTINGS#

alias of OscillatorSettings

class NoiseSettings(n_time, fs, n_ch=1, dispatch_rate=None, loc=0.0, scale=1.0)[source]#

Bases: Settings

See CounterSettings and RandomGeneratorSettings.

Parameters:
n_time: int#
fs: float#
n_ch: int = 1#
__init__(n_time, fs, n_ch=1, dispatch_rate=None, loc=0.0, scale=1.0)#
Parameters:
Return type:

None

dispatch_rate: float | str | None = None#

(Hz), ‘realtime’, or ‘ext_clock’

loc: float = 0.0#
scale: float = 1.0#
WhiteNoiseSettings#

alias of NoiseSettings

class WhiteNoiseProducer(*args, **kwargs)[source]#

Bases: CompositeProducer[NoiseSettings, AxisArray]

class WhiteNoise(*args, settings=None, **kwargs)[source]#

Bases: BaseCounterFirstProducerUnit[NoiseSettings, AxisArray, AxisArray, WhiteNoiseProducer]

chains a Counter and RandomGenerator.

Parameters:

settings (Settings | None)

SETTINGS#

alias of NoiseSettings

PinkNoiseSettings#

alias of NoiseSettings

class PinkNoiseProducer(*args, **kwargs)[source]#

Bases: CompositeProducer[NoiseSettings, AxisArray]

class PinkNoise(*args, settings=None, **kwargs)[source]#

Bases: BaseCounterFirstProducerUnit[NoiseSettings, AxisArray, AxisArray, PinkNoiseProducer]

chains WhiteNoise and ButterworthFilter.

Parameters:

settings (Settings | None)

SETTINGS#

alias of NoiseSettings

class EEGSynthSettings(fs=500.0, n_time=100, alpha_freq=10.5, n_ch=8)[source]#

Bases: Settings

See OscillatorSettings.

Parameters:
__init__(fs=500.0, n_time=100, alpha_freq=10.5, n_ch=8)#
Parameters:
Return type:

None

fs: float = 500.0#
n_time: int = 100#
alpha_freq: float = 10.5#
n_ch: int = 8#
class EEGSynth(*args, settings=None, **kwargs)[source]#

Bases: Collection

A Collection that chains a Clock to both PinkNoise and Oscillator, then Add s the result.

Unlike the Oscillator, WhiteNoise, and PinkNoise composite processors which have linear flows, this class has a diamond flow, with clock branching to both PinkNoise and Oscillator, which then are combined in Add.

Optional: Refactor as a ProducerUnit, similar to Clock, but we manually add all the other

transformers.

Parameters:

settings (Settings | None)

SETTINGS#

alias of EEGSynthSettings

OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
CLOCK = <ezmsg.sigproc.synth.Clock object>#
NOISE = <ezmsg.sigproc.synth.PinkNoise object>#
OSC = <ezmsg.sigproc.synth.Oscillator object>#
ADD = <ezmsg.sigproc.synth.Add object>#
configure()[source]#

A lifecycle hook that runs when the Collection is instantiated. This is the best place to call Unit.apply_settings() on each member Unit of the Collection.

Return type:

None

network()[source]#

Override this method and have the definition return a NetworkDefinition which defines how InputStream and OutputStream from member Unit s will be connected.

Return type:

Iterable[Tuple[Stream | str, Stream | str]]

ezmsg.sigproc.window#

class Anchor(*values)[source]#

Bases: Enum

BEGINNING = 'beginning'#
END = 'end'#
MIDDLE = 'middle'#
class WindowSettings(axis: str | None = None, newaxis: str | None = None, window_dur: float | None = None, window_shift: float | None = None, zero_pad_until: str = 'full', anchor: str | ezmsg.sigproc.window.Anchor = <Anchor.BEGINNING: 'beginning'>)[source]#

Bases: Settings

Parameters:
axis: str | None = None#
newaxis: str | None = None#
window_dur: float | None = None#
window_shift: float | None = None#
zero_pad_until: str = 'full'#
anchor: str | Anchor = 'beginning'#
__init__(axis=None, newaxis=None, window_dur=None, window_shift=None, zero_pad_until='full', anchor=Anchor.BEGINNING)#
Parameters:
Return type:

None

class WindowState[source]#

Bases: object

buffer: ndarray[tuple[Any, ...], dtype[_ScalarT]] | SparseArray | None = None#
window_samples: int | None = None#
window_shift_samples: int | None = None#
shift_deficit: int = 0#

Number of incoming samples to ignore. Only relevant when shift > window.

newaxis_warned: bool = False#
out_newaxis: LinearAxis | None = None#
out_dims: list[str] | None = None#
class WindowTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[WindowSettings, AxisArray, AxisArray, WindowState]

Apply a sliding window along the specified axis to input streaming data. The windowing method is perhaps the most useful and versatile method in ezmsg.sigproc, but its parameterization can be difficult. Please read the argument descriptions carefully.

__init__(*args, **kwargs)[source]#
Parameters:
  • axis – The axis along which to segment windows. If None, defaults to the first dimension of the first seen AxisArray. Note: The windowed axis must be an AxisArray.LinearAxis, not an AxisArray.CoordinateAxis.

  • newaxis – New axis on which windows are delimited, immediately preceding the target windowed axis. The data length along newaxis may be 0 if this most recent push did not provide enough data for a new window. If window_shift is None then the newaxis length will always be 1.

  • window_dur – The duration of the window in seconds. If None, the function acts as a passthrough and all other parameters are ignored.

  • window_shift – The shift of the window in seconds. If None (default), windowing operates in “1:1 mode”, where each input yields exactly one most-recent window.

  • zero_pad_until

    Determines how the function initializes the buffer. Can be one of “input” (default), “full”, “shift”, or “none”. If window_shift is None then this field is ignored and “input” is always used.

    • ”input” (default) initializes the buffer with the input then prepends with zeros to the window size. The first input will always yield at least one output.

    • ”shift” fills the buffer until window_shift. No outputs will be yielded until at least window_shift data has been seen.

    • ”none” does not pad the buffer. No outputs will be yielded until at least window_dur data has been seen.

  • anchor – Determines the entry in axis that gets assigned 0, which references the value in newaxis. Can be of class Anchor or a string representation of an Anchor.

Return type:

None

class Window(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[WindowSettings, AxisArray, AxisArray, WindowTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of WindowSettings

INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
async on_signal(message)[source]#

override superclass on_signal so we can opt to yield once or multiple times after dropping the win axis.

Parameters:

message (AxisArray)

Return type:

AsyncGenerator

windowing(axis=None, newaxis=None, window_dur=None, window_shift=None, zero_pad_until='full', anchor=Anchor.BEGINNING)[source]#
Parameters:
Return type:

WindowTransformer