Base Processors#

Here is the API for the base processors included in the ezmsg-sigproc extension. For more detailed information on the design decisions behind these base processors, please refer to the ezmsg-sigproc explainer.

class BaseProcessor(*args, settings=None, **kwargs)[source]#

Bases: ABC, Generic[SettingsType, MessageInType, MessageOutType]

Base class for processors. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes.

Note that BaseProcessor and its children are sync by default. If you need async by defualt, then override the async methods and call them from the sync methods. Look to BaseProducer for examples of calling async methods from sync methods.

Parameters:

settings (SettingsType)

classmethod get_settings_type()[source]#
Return type:

type[SettingsType]

classmethod get_message_type(dir)[source]#
Parameters:

dir (str)

Return type:

Any

__init__(*args, settings=None, **kwargs)[source]#
Parameters:

settings (SettingsType | None)

Return type:

None

settings: SettingsType#
send(message)[source]#

Alias for __call__.

Parameters:

message (Any)

Return type:

Any

async asend(message)[source]#

Alias for __acall__.

Parameters:

message (Any)

Return type:

Any

class BaseProducer(*args, settings=None, **kwargs)[source]#

Bases: ABC, Generic[SettingsType, MessageOutType]

Base class for producers – processors that generate messages without consuming inputs.

Note that BaseProducer and its children are async by default, and the sync methods simply wrap

the async methods. This is the opposite of BaseProcessor and its children which are sync by default. These classes are designed this way because it is highly likely that a producer, which (probably) does not receive inputs, will require some sort of IO which will benefit from being async.

Parameters:

settings (SettingsType | None)

classmethod get_settings_type()[source]#
Return type:

type[SettingsType]

classmethod get_message_type(dir)[source]#
Parameters:

dir (str)

Return type:

type[MessageOutType] | None

__init__(*args, settings=None, **kwargs)[source]#
Parameters:

settings (SettingsType | None)

Return type:

None

class BaseConsumer(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, None], ABC, Generic[SettingsType, MessageInType]

Base class for consumers – processors that receive messages but don’t produce output. This base simply overrides type annotations of BaseProcessor to remove the outputs. (We don’t bother overriding send and asend because those are deprecated.)

Parameters:

settings (SettingsType)

classmethod get_message_type(dir)[source]#
Parameters:

dir (str)

Return type:

type[MessageInType] | None

__init__(*args, settings=None, **kwargs)#
Parameters:

settings (SettingsType | None)

Return type:

None

async asend(message)#

Alias for __acall__.

Parameters:

message (Any)

Return type:

Any

classmethod get_settings_type()#
Return type:

type[SettingsType]

send(message)#

Alias for __call__.

Parameters:

message (Any)

Return type:

Any

settings: SettingsType#
class BaseTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, MessageOutType], ABC, Generic[SettingsType, MessageInType, MessageOutType]

Base class for transformers – processors which receive messages and produce output. This base simply overrides type annotations of BaseProcessor to indicate that outputs are not optional. (We don’t bother overriding send and asend because those are deprecated.)

Parameters:

settings (SettingsType)

inherited-members:

class BaseStatefulProcessor(*args, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, MessageOutType], Stateful[StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

Base class implementing common stateful processor functionality. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes. Use BaseStatefulConsumer for operations that do not return a result, or BaseStatefulTransformer for operations that do return a result.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state, message)[source]#
Parameters:
Return type:

tuple[tuple[StateType, int], Any]

async asend(message)#

Alias for __acall__.

Parameters:

message (Any)

Return type:

Any

classmethod get_message_type(dir)#
Parameters:

dir (str)

Return type:

Any

classmethod get_settings_type()#
Return type:

type[SettingsType]

classmethod get_state_type()#
Return type:

type[StateType]

send(message)#

Alias for __call__.

Parameters:

message (Any)

Return type:

Any

property state: StateType#
settings: SettingsType#
class BaseStatefulProducer(*args, **kwargs)[source]#

Bases: BaseProducer[SettingsType, MessageOutType], Stateful[StateType], ABC, Generic[SettingsType, MessageOutType, StateType]

Base class implementing common stateful producer functionality.

Examples of stateful producers are things that require counters, clocks, or to cycle through a set of values.

Unlike BaseStatefulProcessor, this class does not message hashing because there

are no input messages. We still use self._hash to simply track the transition from initialization (.hash == -1) to state reset (.hash == 0).

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state)[source]#
Parameters:

state (tuple[StateType, int] | None)

Return type:

tuple[tuple[StateType, int], MessageOutType]

classmethod get_message_type(dir)#
Parameters:

dir (str)

Return type:

type[MessageOutType] | None

classmethod get_settings_type()#
Return type:

type[SettingsType]

classmethod get_state_type()#
Return type:

type[StateType]

property state: StateType#
class BaseStatefulConsumer(*args, **kwargs)[source]#

Bases: BaseStatefulProcessor[SettingsType, MessageInType, None, StateType], ABC, Generic[SettingsType, MessageInType, StateType]

Base class for stateful message consumers that don’t produce output. This class merely overrides the type annotations of BaseStatefulProcessor.

classmethod get_message_type(dir)[source]#
Parameters:

dir (str)

Return type:

type[MessageInType] | None

stateful_op(state, message)[source]#
Parameters:
  • state (tuple[StateType, int] | None)

  • message (MessageInType)

Return type:

tuple[tuple[StateType, int], None]

__init__(*args, **kwargs)#
Return type:

None

async asend(message)#

Alias for __acall__.

Parameters:

message (Any)

Return type:

Any

classmethod get_settings_type()#
Return type:

type[SettingsType]

classmethod get_state_type()#
Return type:

type[StateType]

send(message)#

Alias for __call__.

Parameters:

message (Any)

Return type:

Any

property state: StateType#
settings: SettingsType#
class BaseStatefulTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulProcessor[SettingsType, MessageInType, MessageOutType, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

Base class for stateful message transformers that produce output. This class merely overrides the type annotations of BaseStatefulProcessor.

stateful_op(state, message)[source]#
Parameters:
  • state (tuple[StateType, int] | None)

  • message (MessageInType)

Return type:

tuple[tuple[StateType, int], MessageOutType]

__init__(*args, **kwargs)#
Return type:

None

async asend(message)#

Alias for __acall__.

Parameters:

message (Any)

Return type:

Any

classmethod get_message_type(dir)#
Parameters:

dir (str)

Return type:

Any

classmethod get_settings_type()#
Return type:

type[SettingsType]

classmethod get_state_type()#
Return type:

type[StateType]

send(message)#

Alias for __call__.

Parameters:

message (Any)

Return type:

Any

property state: StateType#
settings: SettingsType#
class BaseAdaptiveTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SettingsType, MessageInType | SampleMessage, MessageOutType | None, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

abstractmethod partial_fit(message)[source]#
Parameters:

message (SampleMessage)

Return type:

None

async apartial_fit(message)[source]#

Override me if you need async partial fitting.

Parameters:

message (SampleMessage)

Return type:

None

__init__(*args, **kwargs)#
Return type:

None

async asend(message)#

Alias for __acall__.

Parameters:

message (Any)

Return type:

Any

classmethod get_message_type(dir)#
Parameters:

dir (str)

Return type:

Any

classmethod get_settings_type()#
Return type:

type[SettingsType]

classmethod get_state_type()#
Return type:

type[StateType]

send(message)#

Alias for __call__.

Parameters:

message (Any)

Return type:

Any

property state: StateType#
stateful_op(state, message)#
Parameters:
  • state (tuple[StateType, int] | None)

  • message (MessageInType)

Return type:

tuple[tuple[StateType, int], MessageOutType]

settings: SettingsType#
class BaseAsyncTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SettingsType, MessageInType, MessageOutType, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

This reverses the priority of async and sync methods from BaseStatefulTransformer. Whereas in BaseStatefulTransformer, the async methods simply called the sync methods, here the sync methods call the async methods, more similar to BaseStatefulProducer.

__init__(*args, **kwargs)#
Return type:

None

async asend(message)#

Alias for __acall__.

Parameters:

message (Any)

Return type:

Any

classmethod get_message_type(dir)#
Parameters:

dir (str)

Return type:

Any

classmethod get_settings_type()#
Return type:

type[SettingsType]

classmethod get_state_type()#
Return type:

type[StateType]

send(message)#

Alias for __call__.

Parameters:

message (Any)

Return type:

Any

property state: StateType#
stateful_op(state, message)#
Parameters:
  • state (tuple[StateType, int] | None)

  • message (MessageInType)

Return type:

tuple[tuple[StateType, int], MessageOutType]

settings: SettingsType#
class CompositeProcessor(*args, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, MessageOutType], CompositeStateful[SettingsType, MessageOutType], ABC, Generic[SettingsType, MessageInType, MessageOutType]

A processor that chains multiple processor together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The last processor may be a consumer, otherwise processors must be transformers. Use CompositeProducer if you want the first processor to be a producer. Concrete subclasses must implement _initialize_processors. Optionally override _reset_state if you want adaptive state behaviour. Example implementation:

class CustomCompositeProcessor(CompositeProcessor[CustomSettings, AxisArray, AxisArray]):

@staticmethod def _initialize_processors(settings: CustomSettings) -> dict[str, BaseProcessor]:

return {

“stateful_transformer”: CustomStatefulProducer(**settings), “transformer”: CustomTransformer(**settings),

}

Where **settings should be replaced with initialisation arguments for each processor.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state, message)[source]#
Parameters:
Return type:

tuple[dict[str, tuple[Any, int]], MessageOutType | None]

async asend(message)#

Alias for __acall__.

Parameters:

message (Any)

Return type:

Any

classmethod get_message_type(dir)#
Parameters:

dir (str)

Return type:

Any

classmethod get_settings_type()#
Return type:

type[SettingsType]

classmethod get_state_type()#
Return type:

type[StateType]

send(message)#

Alias for __call__.

Parameters:

message (Any)

Return type:

Any

property state: dict[str, Any]#
settings: SettingsType#
class CompositeProducer(*args, **kwargs)[source]#

Bases: BaseProducer[SettingsType, MessageOutType], CompositeStateful[SettingsType, MessageOutType], ABC, Generic[SettingsType, MessageOutType]

A producer that chains multiple processors (starting with a producer) together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The first processor must be a producer, the last processor may be a consumer, otherwise processors must be transformers.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state)[source]#
Parameters:

state (dict[str, tuple[Any, int]] | None)

Return type:

tuple[dict[str, tuple[Any, int]], MessageOutType | None]

classmethod get_message_type(dir)#
Parameters:

dir (str)

Return type:

type[MessageOutType] | None

classmethod get_settings_type()#
Return type:

type[SettingsType]

classmethod get_state_type()#
Return type:

type[StateType]

property state: dict[str, Any]#