In-Built Signal Processing Modules#
Here is the API reference for the in-built signal processing modules included in the ezmsg-sigproc extension.
ezmsg.sigproc.activation#
- class ActivationFunction(*values)[source]#
Bases:
OptionsEnumActivation (transformation) function.
- NONE = 'none'#
None.
- SIGMOID = 'sigmoid'#
scipy.special.expit
- EXPIT = 'expit'#
scipy.special.expit
- LOGIT = 'logit'#
scipy.special.logit
- LOGEXPIT = 'log_expit'#
scipy.special.log_expit
- class ActivationSettings(function: str | ezmsg.sigproc.activation.ActivationFunction = <ActivationFunction.NONE: 'none'>)[source]#
Bases:
Settings- Parameters:
function (str | ActivationFunction)
- __init__(function=ActivationFunction.NONE)#
- Parameters:
function (str | ActivationFunction)
- Return type:
None
- function: str | ActivationFunction = 'none'#
An enum value from ActivationFunction or a string representing the activation function. Possible values are: SIGMOID, EXPIT, LOGIT, LOGEXPIT, “sigmoid”, “expit”, “logit”, “log_expit”. SIGMOID and EXPIT are equivalent. See
scipy.special.expitfor more details.
- class ActivationTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformer[ActivationSettings,AxisArray,AxisArray]- Parameters:
settings (SettingsType)
- class Activation(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[ActivationSettings,AxisArray,AxisArray,ActivationTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ActivationSettings
- activation(function)[source]#
Transform the data with a simple activation function.
- Parameters:
function (str | ActivationFunction) – An enum value from ActivationFunction or a string representing the activation function. Possible values are: SIGMOID, EXPIT, LOGIT, LOGEXPIT, “sigmoid”, “expit”, “logit”, “log_expit”. SIGMOID and EXPIT are equivalent. See
scipy.special.expitfor more details.- Return type:
Returns:
ActivationTransformer
ezmsg.sigproc.affinetransform#
- class AffineTransformSettings(weights, axis=None, right_multiply=True)[source]#
Bases:
SettingsSettings for
AffineTransform. Seeaffine_transformfor argument details.- weights: ndarray | str | Path#
An array of weights or a path to a file with weights compatible with np.loadtxt.
- class AffineTransformTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[AffineTransformSettings,AxisArray,AxisArray,AffineTransformState]
- class AffineTransform(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[AffineTransformSettings,AxisArray,AxisArray,AffineTransformTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
AffineTransformSettings
- affine_transform(weights, axis=None, right_multiply=True)[source]#
Perform affine transformations on streaming data.
- Parameters:
weights (ndarray | str | Path) – An array of weights or a path to a file with weights compatible with np.loadtxt.
axis (str | None) – The name of the axis to apply the transformation to. Defaults to the leading (0th) axis in the array.
right_multiply (bool) – Set False to transpose the weights before applying.
- Returns:
- Return type:
- class CommonRereferenceSettings(mode='mean', axis=None, include_current=True)[source]#
Bases:
SettingsSettings for
CommonRereference- __init__(mode='mean', axis=None, include_current=True)#
- class CommonRereferenceTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformer[CommonRereferenceSettings,AxisArray,AxisArray]- Parameters:
settings (SettingsType)
- class CommonRereference(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[CommonRereferenceSettings,AxisArray,AxisArray,CommonRereferenceTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
CommonRereferenceSettings
ezmsg.sigproc.aggregate#
- class AggregationFunction(*values)[source]#
Bases:
OptionsEnumEnum for aggregation functions available to be used in
ranged_aggregateoperation.- NONE = 'None (all)'#
- MAX = 'max'#
- MIN = 'min'#
- MEAN = 'mean'#
- MEDIAN = 'median'#
- STD = 'std'#
- SUM = 'sum'#
- NANMAX = 'nanmax'#
- NANMIN = 'nanmin'#
- NANMEAN = 'nanmean'#
- NANMEDIAN = 'nanmedian'#
- NANSTD = 'nanstd'#
- NANSUM = 'nansum'#
- ARGMIN = 'argmin'#
- ARGMAX = 'argmax'#
- TRAPEZOID = 'trapezoid'#
- class RangedAggregateSettings(axis=None, bands=None, operation=AggregationFunction.MEAN)[source]#
Bases:
SettingsSettings for
RangedAggregate.- Parameters:
- bands: list[tuple[float, float]] | None = None#
[(band1_min, band1_max), (band2_min, band2_max), …] If not set then this acts as a passthrough node.
- operation: AggregationFunction = 'mean'#
AggregationFunctionto apply to each band.
- class RangedAggregateTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[RangedAggregateSettings,AxisArray,AxisArray,RangedAggregateState]
- class RangedAggregate(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[RangedAggregateSettings,AxisArray,AxisArray,RangedAggregateTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
RangedAggregateSettings
- ranged_aggregate(axis=None, bands=None, operation=AggregationFunction.MEAN)[source]#
Apply an aggregation operation over one or more bands.
- Parameters:
axis (str | None) – The name of the axis along which to apply the bands.
bands (list[tuple[float, float]] | None) – [(band1_min, band1_max), (band2_min, band2_max), …] If not set then this acts as a passthrough node.
operation (AggregationFunction) –
AggregationFunctionto apply to each band.
- Returns:
- Return type:
ezmsg.sigproc.bandpower#
- class BandPowerSettings(spectrogram_settings=<factory>, bands=<factory>, aggregation=AggregationFunction.MEAN)[source]#
Bases:
SettingsSettings for
BandPower.- Parameters:
spectrogram_settings (SpectrogramSettings)
aggregation (AggregationFunction)
- spectrogram_settings: SpectrogramSettings#
Settings for spectrogram calculation.
- aggregation: AggregationFunction = 'mean'#
AggregationFunctionto apply to each band.
- __init__(spectrogram_settings=<factory>, bands=<factory>, aggregation=AggregationFunction.MEAN)#
- Parameters:
spectrogram_settings (SpectrogramSettings)
aggregation (AggregationFunction)
- Return type:
None
- class BandPowerTransformer(*args, **kwargs)[source]#
Bases:
CompositeProcessor[BandPowerSettings,AxisArray,AxisArray]
- class BandPower(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[BandPowerSettings,AxisArray,AxisArray,BandPowerTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
BandPowerSettings
- bandpower(spectrogram_settings, bands=[(17, 30), (70, 170)], aggregation=AggregationFunction.MEAN)[source]#
Calculate the average spectral power in each band.
- Returns:
- Parameters:
spectrogram_settings (SpectrogramSettings)
aggregation (AggregationFunction)
- Return type:
ezmsg.sigproc.filter#
- class FilterCoefficients(b: numpy.ndarray = <factory>, a: numpy.ndarray = <factory>)[source]#
Bases:
object
- class FilterSettings(axis: str | None = None, coef_type: str = 'ba', coefs: FilterCoefficients | None = None)[source]#
Bases:
FilterBaseSettings- Parameters:
axis (str | None)
coef_type (str)
coefs (FilterCoefficients | None)
- coefs: FilterCoefficients | None = None#
The pre-calculated filter coefficients.
- __init__(axis=None, coef_type='ba', coefs=None)#
- Parameters:
axis (str | None)
coef_type (str)
coefs (FilterCoefficients | None)
- Return type:
None
- class FilterTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[FilterSettings,AxisArray,AxisArray,FilterState]Filter data using the provided coefficients.
- class Filter(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[FilterSettings,AxisArray,AxisArray,FilterTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
FilterSettings
- class FilterByDesignTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SettingsType,AxisArray,AxisArray,FilterByDesignState],ABC,Generic[SettingsType,FilterCoefsType]Abstract base class for filter design transformers.
- class BaseFilterByDesignTransformerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[SettingsType,AxisArray,AxisArray,FilterByDesignTransformer],Generic[SettingsType,TransformerType]- Parameters:
settings (Settings | None)
- async on_settings(msg)[source]#
Receive a settings message, override self.SETTINGS, and re-create the processor. Child classes that wish to have fine-grained control over whether the core processor resets on settings changes should override this method.
- Parameters:
msg (SettingsType) – a settings message.
- Return type:
None
ezmsg.sigproc.butterworthfilter#
- class ButterworthFilterSettings(axis=None, coef_type='ba', order=0, cuton=None, cutoff=None, wn_hz=True)[source]#
Bases:
FilterBaseSettingsSettings for
ButterworthFilter.- Parameters:
- cuton: float | None = None#
Cuton frequency (Hz). If cutoff is not specified then this is the highpass corner. Otherwise, if this is lower than cutoff then this is the beginning of the bandpass or if this is greater than cutoff then this is the end of the bandstop.
- cutoff: float | None = None#
Cutoff frequency (Hz). If cuton is not specified then this is the lowpass corner. Otherwise, if this is greater than cuton then this is the end of the bandpass, or if this is less than cuton then this is the beginning of the bandstop.
- wn_hz: bool = True#
Set False if provided Wn are normalized from 0 to 1, where 1 is the Nyquist frequency
- butter_design_fun(fs, order=0, cuton=None, cutoff=None, coef_type='ba', wn_hz=True)[source]#
See
ButterworthFilterSettings.filter_specsfor an explanation of specifying different filter types (lowpass, highpass, bandpass, bandstop) from the parameters. You are likely to want to use this function withfilter_by_design, which only passes fs to the design function (this), meaning that you should wrap this function with a lambda or prepare with functools.partial.- Parameters:
fs (float) – The sampling frequency of the data in Hz.
order (int) – Filter order.
cuton (float | None) – Corner frequency of the filter in Hz.
cutoff (float | None) – Corner frequency of the filter in Hz.
coef_type (str) – “ba”, “sos”, or “zpk”
wn_hz (bool) – Set False if provided Wn are normalized from 0 to 1, where 1 is the Nyquist frequency
- Returns:
The filter coefficients as a tuple of (b, a) for coef_type “ba”, or as a single ndarray for “sos”, or (z, p, k) for “zpk”.
- Return type:
tuple[ndarray[tuple[Any, …], dtype[_ScalarT]], ndarray[tuple[Any, …], dtype[_ScalarT]]] | ndarray[tuple[Any, …], dtype[_ScalarT]] | None
- class ButterworthFilterTransformer(*args, **kwargs)[source]#
Bases:
FilterByDesignTransformer[ButterworthFilterSettings,tuple[ndarray[tuple[Any, …],dtype[_ScalarT]],ndarray[tuple[Any, …],dtype[_ScalarT]]] |ndarray[tuple[Any, …],dtype[_ScalarT]]]
- class ButterworthFilter(*args, settings=None, **kwargs)[source]#
Bases:
BaseFilterByDesignTransformerUnit[ButterworthFilterSettings,ButterworthFilterTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ButterworthFilterSettings
- butter(axis, order=0, cuton=None, cutoff=None, coef_type='ba', wn_hz=True)[source]#
Convenience generator wrapping filter_gen_by_design for Butterworth filters. Apply Butterworth filter to streaming data. Uses
scipy.signal.butterto design the filter. SeeButterworthFilterSettings.filter_specsfor an explanation of specifying different filter types (lowpass, highpass, bandpass, bandstop) from the parameters.- Returns:
- Parameters:
- Return type:
ezmsg.sigproc.decimate#
- class ChebyForDecimateTransformer(*args, **kwargs)[source]#
Bases:
ChebyshevFilterTransformer[tuple[ndarray[tuple[Any, …],dtype[_ScalarT]],ndarray[tuple[Any, …],dtype[_ScalarT]]] |ndarray[tuple[Any, …],dtype[_ScalarT]]]- A
ChebyshevFilterTransformerwith a design filter method that additionally accepts a target sampling rate, and if the target rate cannot be achieved it returns None, else it returns the filter coefficients.
- A
- class ChebyForDecimate(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[ChebyshevFilterSettings,AxisArray,AxisArray,ChebyForDecimateTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ChebyshevFilterSettings
- class Decimate(*args, settings=None, **kwargs)[source]#
Bases:
CollectionA
Collectionchaining aFilternode configured as a lowpass Chebyshev filter and aDownsamplenode.- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
DownsampleSettings
- INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
- FILTER = <ezmsg.sigproc.decimate.ChebyForDecimate object>#
- DOWNSAMPLE = <ezmsg.sigproc.downsample.Downsample object>#
ezmsg.sigproc.downsample#
- class DownsampleSettings(axis='time', target_rate=None, factor=None)[source]#
Bases:
SettingsSettings for
Downsamplenode.- target_rate: float | None = None#
Desired rate after downsampling. The actual rate will be the nearest integer factor of the input rate that is the same or higher than the target rate.
- class DownsampleTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[DownsampleSettings,AxisArray,AxisArray,DownsampleState]Downsampled data simply comprise every factor`th sample. This should only be used following appropriate lowpass filtering. If your pipeline does not already have lowpass filtering then consider using the :obj:`Decimate collection instead.
- class Downsample(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[DownsampleSettings,AxisArray,AxisArray,DownsampleTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
DownsampleSettings
ezmsg.sigproc.ewmfilter#
- class EWMState[source]#
Bases:
State- buffer_queue: Queue[AxisArray]#
- signal_queue: Queue[AxisArray]#
- class EWM(*args, settings=None, **kwargs)[source]#
Bases:
UnitExponentially Weighted Moving Average Standardization. This is deprecated. Please use
ezmsg.sigproc.scaler.AdaptiveStandardScalerinstead.References https://stackoverflow.com/a/42926270
- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
EWMSettings
- INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- INPUT_BUFFER = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- class EWMFilterSettings(history_dur: float, axis: str | None = None, zero_offset: bool = True)[source]#
Bases:
Settings
- class EWMFilter(*args, settings=None, **kwargs)[source]#
Bases:
CollectionA
Collectionthat splits the input into a branch that leads toWindowwhich then feeds intoEWM‘s INPUT_BUFFER and another branch that feeds directly intoEWM‘s INPUT_SIGNAL.This is deprecated. Please use
ezmsg.sigproc.scaler.AdaptiveStandardScalerinstead.- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
EWMFilterSettings
- INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
- WINDOW = <ezmsg.sigproc.window.Window object>#
- EWM = <ezmsg.sigproc.ewmfilter.EWM object>#
ezmsg.sigproc.math#
- class ClipTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformer[ClipSettings,AxisArray,AxisArray]- Parameters:
settings (SettingsType)
- class Clip(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[ClipSettings,AxisArray,AxisArray,ClipTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ClipSettings
- clip(a_min, a_max)[source]#
Clips the data to be within the specified range. See
np.clipfor more details.- Parameters:
- Return type:
Returns:
ClipTransformer.
- class ConstDifferenceSettings(value: float = 0.0, subtrahend: bool = True)[source]#
Bases:
Settings
- class ConstDifferenceTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformer[ConstDifferenceSettings,AxisArray,AxisArray]- Parameters:
settings (SettingsType)
- class ConstDifference(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[ConstDifferenceSettings,AxisArray,AxisArray,ConstDifferenceTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ConstDifferenceSettings
- const_difference(value=0.0, subtrahend=True)[source]#
result = (in_data - value) if subtrahend else (value - in_data) https://en.wikipedia.org/wiki/Template:Arithmetic_operations
- Parameters:
- Return type:
Returns:
ConstDifferenceTransformer.
- class InvertTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformer[None,AxisArray,AxisArray]- Parameters:
settings (SettingsType)
- class Invert(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[None,AxisArray,AxisArray,InvertTransformer]- Parameters:
settings (Settings | None)
- invert()[source]#
Take the inverse of the data.
Returns:
InvertTransformer.- Return type:
- class LogSettings(base: float = 10.0, clip_zero: bool = False)[source]#
Bases:
Settings
- class LogTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformer[LogSettings,AxisArray,AxisArray]- Parameters:
settings (SettingsType)
- class Log(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[LogSettings,AxisArray,AxisArray,LogTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
LogSettings
- log(base=10.0, clip_zero=False)[source]#
Take the logarithm of the data. See
np.logfor more details.- Parameters:
- Return type:
Returns:
LogTransformer.
- class ScaleTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformer[ScaleSettings,AxisArray,AxisArray]- Parameters:
settings (SettingsType)
- class Scale(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[ScaleSettings,AxisArray,AxisArray,ScaleTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ScaleSettings
- scale(scale=1.0)[source]#
Scale the data by a constant factor.
- Parameters:
scale (float) – Factor by which to scale the data magnitude.
- Return type:
Returns:
ScaleTransformer
ezmsg.sigproc.sampler#
- class SamplerSettings(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True, buffer_update_strategy='immediate')[source]#
Bases:
SettingsSettings for
Sampler. Seesamplerfor a description of the fields.- Parameters:
- buffer_dur: float#
The duration of the buffer in seconds. The buffer must be long enough to store the oldest sample to be included in a window. e.g., a trigger lagged by 0.5 seconds with a period of (-1.0, +1.5) will need a buffer of 0.5 + (1.5 - -1.0) = 3.0 seconds. It is best to at least double your estimate if memory allows.
- axis: str | None = None#
The axis along which to sample the data. None (default) will choose the first axis in the first input. Note: (for now) the axis must exist in the msg .axes and be of type AxisArray.LinearAxis
- period: tuple[float, float] | None = None#
Optional default period (in seconds) if unspecified in SampleTriggerMessage.
- estimate_alignment: bool = True#
- If true, use message timestamp fields and reported sampling rate to estimate
sample-accurate alignment for samples.
If false, sampling will be limited to incoming message rate – “Block timing” NOTE: For faster-than-realtime playback – Incoming timestamps must reflect “realtime” operation for estimate_alignment to operate correctly.
- buffer_update_strategy: Literal['immediate', 'threshold', 'on_demand'] = 'immediate'#
The buffer update strategy. See
ezmsg.sigproc.util.buffer.UpdateStrategy. If you expect to push data much more frequently than triggers, then “on_demand” might be more efficient. For most other scenarios, “immediate” is best.
- __init__(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True, buffer_update_strategy='immediate')#
- class SamplerState[source]#
Bases:
object- buffer: HybridAxisArrayBuffer | None = None#
- triggers: deque[SampleTriggerMessage] | None = None#
- class SamplerTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SamplerSettings,AxisArray,AxisArray,SamplerState]- push_trigger(message)[source]#
- Parameters:
message (SampleTriggerMessage)
- Return type:
- class Sampler(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[SamplerSettings,AxisArray,AxisArray,SamplerTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
SamplerSettings
- INPUT_TRIGGER = InputStream:unlocated[<class 'ezmsg.sigproc.util.message.SampleTriggerMessage'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.sigproc.util.message.SampleMessage'>](self.num_buffers=32, self.force_tcp=False)#
- async on_trigger(msg)[source]#
- Parameters:
msg (SampleTriggerMessage)
- Return type:
None
- sampler(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True)[source]#
Sample data into a buffer, accept triggers, and return slices of sampled data around the trigger time.
- Returns:
A generator that expects .send either an
AxisArraycontaining streaming data messages, or aSampleTriggerMessagecontaining a trigger, and yields the list ofSampleMessages.- Parameters:
- Return type:
- class TriggerGeneratorSettings(period: tuple[float, float], prewait: float = 0.5, publish_period: float = 5.0)[source]#
Bases:
Settings- __init__(period, prewait=0.5, publish_period=5.0)#
- class TriggerProducer(*args, **kwargs)[source]#
Bases:
BaseStatefulProducer[TriggerGeneratorSettings,SampleTriggerMessage,TriggerGeneratorState]
- class TriggerGenerator(*args, settings=None, **kwargs)[source]#
Bases:
BaseProducerUnit[TriggerGeneratorSettings,SampleTriggerMessage,TriggerProducer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
TriggerGeneratorSettings
ezmsg.sigproc.scaler#
- scaler(time_constant=1.0, axis=None)[source]#
Apply the adaptive standard scaler from https://riverml.xyz/latest/api/preprocessing/AdaptiveStandardScaler/ This is faster than
scaler_npfor single-channel data.- Parameters:
- Returns:
- A primed generator object that expects to be sent a
AxisArrayvia .send(axis_array) and yields an
AxisArraywith its data being a standardized, or “Z-scored” version of the input data.
- A primed generator object that expects to be sent a
- Return type:
Generator[AxisArray, AxisArray, None]
- class AdaptiveStandardScalerSettings(time_constant: float = 1.0, axis: str | None = None)[source]#
Bases:
EWMASettings
- class AdaptiveStandardScalerState[source]#
Bases:
object- samps_ewma: EWMATransformer | None = None#
- vars_sq_ewma: EWMATransformer | None = None#
- class AdaptiveStandardScalerTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[AdaptiveStandardScalerSettings,AxisArray,AxisArray,AdaptiveStandardScalerState]
- class AdaptiveStandardScaler(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[AdaptiveStandardScalerSettings,AxisArray,AxisArray,AdaptiveStandardScalerTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
AdaptiveStandardScalerSettings
ezmsg.sigproc.signalinjector#
- class SignalInjectorSettings(time_dim: str = 'time', frequency: float | None = None, amplitude: float = 1.0, mixing_seed: int | None = None)[source]#
Bases:
Settings
- class SignalInjectorTransformer(*args, **kwargs)[source]#
Bases:
BaseAsyncTransformer[SignalInjectorSettings,AxisArray,AxisArray,SignalInjectorState]
- class SignalInjector(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[SignalInjectorSettings,AxisArray,AxisArray,SignalInjectorTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
SignalInjectorSettings
- INPUT_FREQUENCY = InputStream:unlocated[float | None]()#
- INPUT_AMPLITUDE = InputStream:unlocated[<class 'float'>]()#
ezmsg.sigproc.slicer#
- parse_slice(s, axinfo=None)[source]#
Parses a string representation of a slice and returns a tuple of slice objects.
“” -> slice(None, None, None) (take all)
“:” -> slice(None, None, None)
‘“none”` (case-insensitive) -> slice(None, None, None)
“{start}:{stop}” or {start}:{stop}:{step} -> slice(start, stop, step)
- “5” (or any integer) -> (5,). Take only that item.
applying this to a ndarray or AxisArray will drop the dimension.
A comma-separated list of the above -> a tuple of slices | ints
A comma-separated list of values and axinfo is provided and is a CoordinateAxis -> a tuple of ints
- Parameters:
s (str) – The string representation of the slice.
axinfo (CoordinateAxis | None) – (Optional) If provided, and of type CoordinateAxis, and s is a comma-separated list of values, then the values in s will be checked against the values in axinfo.data.
- Returns:
A tuple of slice objects and/or ints.
- Return type:
- class SlicerSettings(selection: str = '', axis: str | None = None)[source]#
Bases:
Settings- selection: str = ''#
See
ezmsg.sigproc.slicer.parse_slicefor details.- Type:
selection
- class SlicerTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SlicerSettings,AxisArray,AxisArray,SlicerState]
- class Slicer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[SlicerSettings,AxisArray,AxisArray,SlicerTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
SlicerSettings
- slicer(selection='', axis=None)[source]#
Slice along a particular axis.
- Parameters:
selection (str) – See
ezmsg.sigproc.slicer.parse_slicefor details.axis (str | None) – The name of the axis to slice along. If None, the last axis is used.
- Returns:
- Return type:
ezmsg.sigproc.spectrum#
- class OptionsEnum(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Bases:
Enum
- class WindowFunction(*values)[source]#
Bases:
OptionsEnumWindowing function prior to calculating spectrum.
- NONE = 'None (Rectangular)'#
None.
- HAMMING = 'Hamming'#
- HANNING = 'Hanning'#
- BARTLETT = 'Bartlett'#
- BLACKMAN = 'Blackman'#
- class SpectralTransform(*values)[source]#
Bases:
OptionsEnumAdditional transformation functions to apply to the spectral result.
- RAW_COMPLEX = 'Complex FFT Output'#
- REAL = 'Real Component of FFT'#
- IMAG = 'Imaginary Component of FFT'#
- REL_POWER = 'Relative Power'#
- REL_DB = 'Log Power (Relative dB)'#
- class SpectralOutput(*values)[source]#
Bases:
OptionsEnumThe expected spectral contents.
- FULL = 'Full Spectrum'#
- POSITIVE = 'Positive Frequencies'#
- NEGATIVE = 'Negative Frequencies'#
- class SpectrumSettings(axis=None, out_axis='freq', window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE, norm='forward', do_fftshift=True, nfft=None)[source]#
Bases:
SettingsSettings for
Spectrum. See :obj:`spectrumfor a description of the parameters.- Parameters:
axis (str | None)
out_axis (str | None)
window (WindowFunction)
transform (SpectralTransform)
output (SpectralOutput)
norm (str | None)
do_fftshift (bool)
nfft (int | None)
- axis: str | None = None#
The name of the axis on which to calculate the spectrum. Note: The axis must have an .axes entry of type LinearAxis, not CoordinateAxis.
- out_axis: str | None = 'freq'#
The name of the new axis. Defaults to “freq”. If none; don’t change dim name
- window: WindowFunction = 'Hamming'#
The
WindowFunctionto apply to the data slice prior to calculating the spectrum.
- transform: SpectralTransform = 'Log Power (Relative dB)'#
The
SpectralTransformto apply to the spectral magnitude.
- output: SpectralOutput = 'Positive Frequencies'#
The
SpectralOutputformat.
- norm: str | None = 'forward'#
Normalization mode. Default “forward” is best used when the inverse transform is not needed, for example when the goal is to get spectral power. Use “backward” (equivalent to None) to not scale the spectrum which is useful when the spectra will be manipulated and possibly inverse-transformed. See numpy.fft.fft for details.
- do_fftshift: bool = True#
Whether to apply fftshift to the output. Default is True. This value is ignored unless output is SpectralOutput.FULL.
- nfft: int | None = None#
The number of points to use for the FFT. If None, the length of the input data is used.
- __init__(axis=None, out_axis='freq', window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE, norm='forward', do_fftshift=True, nfft=None)#
- Parameters:
axis (str | None)
out_axis (str | None)
window (WindowFunction)
transform (SpectralTransform)
output (SpectralOutput)
norm (str | None)
do_fftshift (bool)
nfft (int | None)
- Return type:
None
- class SpectrumTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SpectrumSettings,AxisArray,AxisArray,SpectrumState]
- class Spectrum(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[SpectrumSettings,AxisArray,AxisArray,SpectrumTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
SpectrumSettings
- spectrum(axis=None, out_axis='freq', window=WindowFunction.HANNING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE, norm='forward', do_fftshift=True, nfft=None)[source]#
Calculate a spectrum on a data slice.
- Returns:
A
SpectrumTransformerobject that expects anAxisArrayvia .(axis_array) (__call__) containing continuous data and returns anAxisArraywith data of spectral magnitudes or powers.- Parameters:
axis (str | None)
out_axis (str | None)
window (WindowFunction)
transform (SpectralTransform)
output (SpectralOutput)
norm (str | None)
do_fftshift (bool)
nfft (int | None)
- Return type:
ezmsg.sigproc.spectrogram#
- class SpectrogramSettings(window_dur=None, window_shift=None, window_anchor=Anchor.BEGINNING, window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE)[source]#
Bases:
SettingsSettings for
SpectrogramTransformer.- Parameters:
window_dur (float | None)
window_shift (float | None)
window (WindowFunction)
transform (SpectralTransform)
output (SpectralOutput)
- window: WindowFunction = 'Hamming'#
The
WindowFunctionto apply to the data slice prior to calculating the spectrum.
- transform: SpectralTransform = 'Log Power (Relative dB)'#
The
SpectralTransformto apply to the spectral magnitude.
- __init__(window_dur=None, window_shift=None, window_anchor=Anchor.BEGINNING, window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE)#
- Parameters:
window_dur (float | None)
window_shift (float | None)
window (WindowFunction)
transform (SpectralTransform)
output (SpectralOutput)
- Return type:
None
- output: SpectralOutput = 'Positive Frequencies'#
The
SpectralOutputformat.
- class SpectrogramTransformer(*args, **kwargs)[source]#
Bases:
CompositeProcessor[SpectrogramSettings,AxisArray,AxisArray]
- class Spectrogram(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[SpectrogramSettings,AxisArray,AxisArray,SpectrogramTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
SpectrogramSettings
- spectrogram(window_dur=None, window_shift=None, window_anchor=Anchor.BEGINNING, window=WindowFunction.HAMMING, transform=SpectralTransform.REL_DB, output=SpectralOutput.POSITIVE)[source]#
- Parameters:
window_dur (float | None)
window_shift (float | None)
window (WindowFunction)
transform (SpectralTransform)
output (SpectralOutput)
- Return type:
ezmsg.sigproc.synth#
- class AddState(queue_a: 'asyncio.Queue[AxisArray]' = <factory>, queue_b: 'asyncio.Queue[AxisArray]' = <factory>)[source]#
Bases:
object- Parameters:
queue_a (Queue[AxisArray])
queue_b (Queue[AxisArray])
- queue_a: Queue[AxisArray]#
- queue_b: Queue[AxisArray]#
- __init__(queue_a=<factory>, queue_b=<factory>)#
- Parameters:
queue_a (Queue[AxisArray])
queue_b (Queue[AxisArray])
- Return type:
None
- class Add(*args, settings=None, **kwargs)[source]#
Bases:
UnitAdd two signals together. Assumes compatible/similar axes/dimensions.
- Parameters:
settings (Settings | None)
- INPUT_SIGNAL_A = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- INPUT_SIGNAL_B = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- class ClockSettings(dispatch_rate=None)[source]#
Bases:
SettingsSettings for clock generator.
- class ClockProducer(*args, **kwargs)[source]#
Bases:
BaseStatefulProducer[ClockSettings,Flag,ClockState]Produces clock ticks at specified rate. Can be used to drive periodic operations.
- aclock(dispatch_rate)[source]#
Construct an async generator that yields events at a specified rate.
- Returns:
A
ClockProducerobject.- Parameters:
dispatch_rate (float | None)
- Return type:
- clock(dispatch_rate)#
Alias for
aclockexpected by synchronous methods. ClockProducer can be used in sync or async.- Parameters:
dispatch_rate (float | None)
- Return type:
- class Clock(*args, settings=None, **kwargs)[source]#
Bases:
BaseProducerUnit[ClockSettings,Flag,ClockProducer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ClockSettings
- class CounterSettings(n_time, fs, n_ch=1, dispatch_rate=None, mod=None)[source]#
Bases:
SettingsSettings for
Counter. Seeacounterfor a description of the parameters.
- class CounterProducer(*args, **kwargs)[source]#
Bases:
BaseStatefulProducer[CounterSettings,AxisArray,CounterState]Produces incrementing integer blocks as AxisArray.
- acounter(n_time, fs, n_ch=1, dispatch_rate=None, mod=None)[source]#
Construct an asynchronous generator to generate AxisArray objects at a specified rate and with the specified sampling rate.
NOTE: This module uses asyncio.sleep to delay appropriately in realtime mode. This method of sleeping/yielding execution priority has quirky behavior with sub-millisecond sleep periods which may result in unexpected behavior (e.g. fs = 2000, n_time = 1, realtime = True – may result in ~1400 msgs/sec)
- class Counter(*args, settings=None, **kwargs)[source]#
Bases:
BaseProducerUnit[CounterSettings,AxisArray,CounterProducer]Generates monotonically increasing counter. Unit for
CounterProducer.- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
CounterSettings
- INPUT_CLOCK = InputStream:unlocated[<class 'ezmsg.core.message.Flag'>]()#
- async produce()[source]#
Generate counter output. This is an infinite loop, but we will likely only enter the loop once if we are self-timed, and twice if we are using an external clock.
When using an internal clock, we enter the loop, and wait for the event which should have been reset upon initialization then we immediately clear, then go to the internal loop that will async call __acall__ to let the internal timer determine when to produce an output.
When using an external clock, we enter the loop, and wait for the event which should have been reset upon initialization then we immediately clear, then we hit continue to loop back around and wait for the event to be set again – potentially forever. In this case, it is expected that on_clock will be called to produce the output.
- Return type:
- class SinGeneratorSettings(axis='time', freq=1.0, amp=1.0, phase=0.0)[source]#
Bases:
SettingsSettings for
SinGenerator. Seesinfor parameter descriptions.
- class SinTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformer[SinGeneratorSettings,AxisArray,AxisArray]Transforms counter values into sinusoidal waveforms.
- Parameters:
settings (SettingsType)
- class SinGenerator(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[SinGeneratorSettings,AxisArray,AxisArray,SinTransformer]Unit for generating sinusoidal waveforms.
- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
SinGeneratorSettings
- sin(axis='time', freq=1.0, amp=1.0, phase=0.0)[source]#
Construct a generator of sinusoidal waveforms in AxisArray objects.
- Returns:
A primed generator that expects .send(axis_array) of sample counts and yields an AxisArray of sinusoids.
- Parameters:
- Return type:
- class RandomGeneratorSettings(loc: float = 0.0, scale: float = 1.0)[source]#
Bases:
Settings- loc: float = 0.0#
loc argument for
numpy.random.normal
- scale: float = 1.0#
scale argument for
numpy.random.normal
- class RandomTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformer[RandomGeneratorSettings,AxisArray,AxisArray]Replaces input data with random data and returns the result.
- Parameters:
settings (SettingsType)
- __init__(*args, settings=None, **kwargs)[source]#
- Parameters:
settings (RandomGeneratorSettings | None)
- class RandomGenerator(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[RandomGeneratorSettings,AxisArray,AxisArray,RandomTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
RandomGeneratorSettings
- class OscillatorSettings(n_time, fs, n_ch=1, dispatch_rate=None, freq=1.0, amp=1.0, phase=0.0, sync=False)[source]#
Bases:
SettingsSettings for
Oscillator- Parameters:
- class OscillatorProducer(*args, **kwargs)[source]#
Bases:
CompositeProducer[OscillatorSettings,AxisArray]
- class BaseCounterFirstProducerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProducerUnit[SettingsType,MessageOutType,ProducerType],Generic[SettingsType,MessageInType,MessageOutType,ProducerType]Base class for units whose primary processor is a composite producer with a CounterProducer as the first processor (producer) in the chain.
- Parameters:
settings (Settings | None)
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- class Oscillator(*args, settings=None, **kwargs)[source]#
Bases:
BaseCounterFirstProducerUnit[OscillatorSettings,AxisArray,AxisArray,OscillatorProducer]Generates sinusoidal waveforms using a counter and sine transformer.
- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
OscillatorSettings
- class NoiseSettings(n_time, fs, n_ch=1, dispatch_rate=None, loc=0.0, scale=1.0)[source]#
Bases:
SettingsSee
CounterSettingsandRandomGeneratorSettings.- Parameters:
- __init__(n_time, fs, n_ch=1, dispatch_rate=None, loc=0.0, scale=1.0)#
- WhiteNoiseSettings#
alias of
NoiseSettings
- class WhiteNoiseProducer(*args, **kwargs)[source]#
Bases:
CompositeProducer[NoiseSettings,AxisArray]
- class WhiteNoise(*args, settings=None, **kwargs)[source]#
Bases:
BaseCounterFirstProducerUnit[NoiseSettings,AxisArray,AxisArray,WhiteNoiseProducer]chains a
CounterandRandomGenerator.- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
NoiseSettings
- PinkNoiseSettings#
alias of
NoiseSettings
- class PinkNoiseProducer(*args, **kwargs)[source]#
Bases:
CompositeProducer[NoiseSettings,AxisArray]
- class PinkNoise(*args, settings=None, **kwargs)[source]#
Bases:
BaseCounterFirstProducerUnit[NoiseSettings,AxisArray,AxisArray,PinkNoiseProducer]chains
WhiteNoiseandButterworthFilter.- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
NoiseSettings
- class EEGSynthSettings(fs=500.0, n_time=100, alpha_freq=10.5, n_ch=8)[source]#
Bases:
SettingsSee
OscillatorSettings.- __init__(fs=500.0, n_time=100, alpha_freq=10.5, n_ch=8)#
- class EEGSynth(*args, settings=None, **kwargs)[source]#
Bases:
CollectionA
Collectionthat chains aClockto bothPinkNoiseandOscillator, thenAdds the result.Unlike the Oscillator, WhiteNoise, and PinkNoise composite processors which have linear flows, this class has a diamond flow, with clock branching to both PinkNoise and Oscillator, which then are combined in Add.
- Optional: Refactor as a ProducerUnit, similar to Clock, but we manually add all the other
transformers.
- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
EEGSynthSettings
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
- CLOCK = <ezmsg.sigproc.synth.Clock object>#
- NOISE = <ezmsg.sigproc.synth.PinkNoise object>#
- OSC = <ezmsg.sigproc.synth.Oscillator object>#
- ADD = <ezmsg.sigproc.synth.Add object>#
ezmsg.sigproc.window#
- class WindowSettings(axis: str | None = None, newaxis: str | None = None, window_dur: float | None = None, window_shift: float | None = None, zero_pad_until: str = 'full', anchor: str | ezmsg.sigproc.window.Anchor = <Anchor.BEGINNING: 'beginning'>)[source]#
Bases:
Settings- Parameters:
- class WindowTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[WindowSettings,AxisArray,AxisArray,WindowState]Apply a sliding window along the specified axis to input streaming data. The windowing method is perhaps the most useful and versatile method in ezmsg.sigproc, but its parameterization can be difficult. Please read the argument descriptions carefully.
- __init__(*args, **kwargs)[source]#
- Parameters:
axis – The axis along which to segment windows. If None, defaults to the first dimension of the first seen AxisArray. Note: The windowed axis must be an AxisArray.LinearAxis, not an AxisArray.CoordinateAxis.
newaxis – New axis on which windows are delimited, immediately preceding the target windowed axis. The data length along newaxis may be 0 if this most recent push did not provide enough data for a new window. If window_shift is None then the newaxis length will always be 1.
window_dur – The duration of the window in seconds. If None, the function acts as a passthrough and all other parameters are ignored.
window_shift – The shift of the window in seconds. If None (default), windowing operates in “1:1 mode”, where each input yields exactly one most-recent window.
zero_pad_until –
Determines how the function initializes the buffer. Can be one of “input” (default), “full”, “shift”, or “none”. If window_shift is None then this field is ignored and “input” is always used.
”input” (default) initializes the buffer with the input then prepends with zeros to the window size. The first input will always yield at least one output.
”shift” fills the buffer until window_shift. No outputs will be yielded until at least window_shift data has been seen.
”none” does not pad the buffer. No outputs will be yielded until at least window_dur data has been seen.
anchor – Determines the entry in axis that gets assigned 0, which references the value in newaxis. Can be of class
Anchoror a string representation of anAnchor.
- Return type:
None
- class Window(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[WindowSettings,AxisArray,AxisArray,WindowTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
WindowSettings
- INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#