ezmsg.sigproc.ewmfilter#

Exponentially weighted moving average filter for streaming normalization.

Classes

class EWM(*args, settings=None, **kwargs)[source]#

Bases: Unit

Exponentially Weighted Moving Average Standardization. This is deprecated. Please use ezmsg.sigproc.scaler.AdaptiveStandardScaler instead.

References https://stackoverflow.com/a/42926270

Parameters:

settings (Settings | None)

SETTINGS#

alias of EWMSettings

STATE#

alias of EWMState

INPUT_SIGNAL = InputStream:unlocated[AxisArray]()#
INPUT_BUFFER = InputStream:unlocated[AxisArray]()#
OUTPUT_SIGNAL = OutputStream:unlocated[AxisArray](self.num_buffers=32, self.force_tcp=False)#
async initialize()[source]#

Runs when the Unit is instantiated.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should initialize your unit’s state and prepare for message processing.

Return type:

None

async on_signal(message)[source]#
Parameters:

message (AxisArray)

Return type:

None

async on_buffer(message)[source]#
Parameters:

message (AxisArray)

Return type:

None

async sync_output()[source]#
Return type:

AsyncGenerator

class EWMFilter(*args, settings=None, **kwargs)[source]#

Bases: Collection

A Collection that splits the input into a branch that leads to Window which then feeds into EWM ‘s INPUT_BUFFER and another branch that feeds directly into EWM ‘s INPUT_SIGNAL.

This is deprecated. Please use ezmsg.sigproc.scaler.AdaptiveStandardScaler instead.

Parameters:

settings (Settings | None)

SETTINGS#

alias of EWMFilterSettings

INPUT_SIGNAL = InputStream:unlocated[AxisArray]()#
OUTPUT_SIGNAL = OutputStream:unlocated[AxisArray](self.num_buffers=32, self.force_tcp=False)#
WINDOW = <ezmsg.sigproc.window.Window object>#
EWM = <ezmsg.sigproc.ewmfilter.EWM object>#
configure()[source]#

A lifecycle hook that runs when the Collection is instantiated.

This is the best place to call Unit.apply_settings() on each member Unit of the Collection. Override this method to perform collection-specific configuration of child components.

Return type:

None

network()[source]#

Override this method and have the definition return a NetworkDefinition which defines how InputStreams and OutputStreams from member Units will be connected.

The NetworkDefinition specifies the message routing between components by connecting output streams to input streams.

Returns:

Network definition specifying stream connections

Return type:

NetworkDefinition

class EWMFilterSettings(history_dur: float, axis: str | None = None, zero_offset: bool = True)[source]#

Bases: Settings

Parameters:
history_dur: float#

Previous data to accumulate for standardization.

axis: str | None = None#

Name of the axis to accumulate.

zero_offset: bool = True#

If true, we assume zero DC offset for input data.

__init__(history_dur, axis=None, zero_offset=True)#
Parameters:
Return type:

None

class EWMSettings(axis: str | None = None, zero_offset: bool = True)[source]#

Bases: Settings

Parameters:
  • axis (str | None)

  • zero_offset (bool)

axis: str | None = None#

Name of the axis to accumulate.

zero_offset: bool = True#

If true, we assume zero DC offset for input data.

__init__(axis=None, zero_offset=True)#
Parameters:
  • axis (str | None)

  • zero_offset (bool)

Return type:

None

class EWMState[source]#

Bases: State

buffer_queue: Queue[AxisArray]#
signal_queue: Queue[AxisArray]#