ezmsg.sigproc.base#

Functions

get_base_adaptive_transformer_type(cls)[source]#
Parameters:

cls (type)

Return type:

type

get_base_consumer_type(cls)[source]#
Parameters:

cls (type)

Return type:

type

get_base_producer_type(cls)[source]#
Parameters:

cls (type)

Return type:

type

get_base_transformer_type(cls)[source]#
Parameters:

cls (type)

Return type:

type

Classes

class AdaptiveTransformer(*args, **kwargs)[source]#

Bases: StatefulTransformer, Protocol

partial_fit(message)[source]#

Update transformer state using labeled training data.

This method should update the internal state/parameters of the transformer based on the provided labeled samples, without performing any transformation.

Parameters:

message (SampleMessage)

Return type:

None

async apartial_fit(message)[source]#
Parameters:

message (SampleMessage)

Return type:

None

class BaseAdaptiveTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SettingsType, MessageInType | SampleMessage, MessageOutType | None, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

abstractmethod partial_fit(message)[source]#
Parameters:

message (SampleMessage)

Return type:

None

async apartial_fit(message)[source]#

Override me if you need async partial fitting.

Parameters:

message (SampleMessage)

Return type:

None

class BaseAdaptiveTransformerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, MessageOutType, AdaptiveTransformerType]

Parameters:

settings (Settings | None)

INPUT_SAMPLE = InputStream:unlocated[<class 'ezmsg.sigproc.util.message.SampleMessage'>]()#
INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
create_processor()[source]#

Create the adaptive transformer instance from settings.

Return type:

None

async on_signal(message)[source]#
Parameters:

message (MessageInType)

Return type:

AsyncGenerator

async on_sample(msg)[source]#
Parameters:

msg (SampleMessage)

Return type:

None

class BaseAsyncTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SettingsType, MessageInType, MessageOutType, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

This reverses the priority of async and sync methods from BaseStatefulTransformer. Whereas in BaseStatefulTransformer, the async methods simply called the sync methods, here the sync methods call the async methods, more similar to BaseStatefulProducer.

class BaseConsumer(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, None], ABC, Generic[SettingsType, MessageInType]

Base class for consumers – processors that receive messages but don’t produce output. This base simply overrides type annotations of BaseProcessor to remove the outputs. (We don’t bother overriding send and asend because those are deprecated.)

Parameters:

settings (SettingsType)

classmethod get_message_type(dir)[source]#
Parameters:

dir (str)

Return type:

type[MessageInType] | None

class BaseConsumerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, ConsumerType]

Base class for consumer units – i.e. units that receive messages but do not return results. Implement a new Unit as follows:

class CustomUnit(BaseConsumerUnit[

CustomConsumerSettings, # SettingsType AxisArray, # MessageInType CustomConsumer, # ConsumerType

]):

SETTINGS = CustomConsumerSettings

… that’s all!

Where CustomConsumerSettings and CustomConsumer are custom implementations of: - ez.Settings for settings - BaseConsumer or BaseStatefulConsumer for the consumer implementation

Parameters:

settings (Settings | None)

INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
create_processor()[source]#

Create the consumer instance from settings.

async on_signal(message)[source]#

Consume the message. :param message: Input message to be consumed

Parameters:

message (MessageInType)

class BaseProcessor(*args, settings=None, **kwargs)[source]#

Bases: ABC, Generic[SettingsType, MessageInType, MessageOutType]

Base class for processors. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes.

Note that BaseProcessor and its children are sync by default. If you need async by defualt, then override the async methods and call them from the sync methods. Look to BaseProducer for examples of calling async methods from sync methods.

Parameters:

settings (SettingsType)

classmethod get_settings_type()[source]#
Return type:

type[SettingsType]

classmethod get_message_type(dir)[source]#
Parameters:

dir (str)

Return type:

Any

__init__(*args, settings=None, **kwargs)[source]#
Parameters:

settings (SettingsType | None)

Return type:

None

settings: SettingsType#
send(message)[source]#

Alias for __call__.

Parameters:

message (Any)

Return type:

Any

async asend(message)[source]#

Alias for __acall__.

Parameters:

message (Any)

Return type:

Any

class BaseProcessorUnit(*args, settings=None, **kwargs)[source]#

Bases: Unit, ABC, Generic[SettingsType]

Base class for processor units – i.e. units that process messages. This is an abstract base class that provides common functionality for consumer and transformer units. You probably do not want to inherit from this class directly as you would need to define a custom implementation of create_processor. Refer instead to BaseConsumerUnit or BaseTransformerUnit.

Parameters:

settings (Settings | None)

INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

abstractmethod create_processor()[source]#
Return type:

None

async on_settings(msg)[source]#

Receive a settings message, override self.SETTINGS, and re-create the processor. Child classes that wish to have fine-grained control over whether the core processor resets on settings changes should override this method.

Parameters:

msg (SettingsType) – a settings message.

Return type:

None

class BaseProducer(*args, settings=None, **kwargs)[source]#

Bases: ABC, Generic[SettingsType, MessageOutType]

Base class for producers – processors that generate messages without consuming inputs.

Note that BaseProducer and its children are async by default, and the sync methods simply wrap

the async methods. This is the opposite of BaseProcessor and its children which are sync by default. These classes are designed this way because it is highly likely that a producer, which (probably) does not receive inputs, will require some sort of IO which will benefit from being async.

Parameters:

settings (SettingsType | None)

classmethod get_settings_type()[source]#
Return type:

type[SettingsType]

classmethod get_message_type(dir)[source]#
Parameters:

dir (str)

Return type:

type[MessageOutType] | None

__init__(*args, settings=None, **kwargs)[source]#
Parameters:

settings (SettingsType | None)

Return type:

None

class BaseProducerUnit(*args, settings=None, **kwargs)[source]#

Bases: Unit, ABC, Generic[SettingsType, MessageOutType, ProducerType]

Base class for producer units – i.e. units that generate messages without consuming inputs. Implement a new Unit as follows:

class CustomUnit(BaseProducerUnit[

CustomProducerSettings, # SettingsType AxisArray, # MessageOutType CustomProducer, # ProducerType

]):

SETTINGS = CustomProducerSettings

… that’s all!

Where CustomProducerSettings, and CustomProducer are custom implementations of ez.Settings, and BaseProducer or BaseStatefulProducer, respectively.

Parameters:

settings (Settings | None)

INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

create_producer()[source]#

Create the producer instance from settings.

Return type:

None

async on_settings(msg)[source]#

Receive a settings message, override self.SETTINGS, and re-create the producer. Child classes that wish to have fine-grained control over whether the core producer resets on settings changes should override this method.

Parameters:

msg (SettingsType) – a settings message.

Return type:

None

async produce()[source]#
Return type:

AsyncGenerator

class BaseStatefulConsumer(*args, **kwargs)[source]#

Bases: BaseStatefulProcessor[SettingsType, MessageInType, None, StateType], ABC, Generic[SettingsType, MessageInType, StateType]

Base class for stateful message consumers that don’t produce output. This class merely overrides the type annotations of BaseStatefulProcessor.

classmethod get_message_type(dir)[source]#
Parameters:

dir (str)

Return type:

type[MessageInType] | None

stateful_op(state, message)[source]#
Parameters:
  • state (tuple[StateType, int] | None)

  • message (MessageInType)

Return type:

tuple[tuple[StateType, int], None]

class BaseStatefulProcessor(*args, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, MessageOutType], Stateful[StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

Base class implementing common stateful processor functionality. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes. Use BaseStatefulConsumer for operations that do not return a result, or BaseStatefulTransformer for operations that do return a result.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state, message)[source]#
Parameters:
Return type:

tuple[tuple[StateType, int], Any]

class BaseStatefulProducer(*args, **kwargs)[source]#

Bases: BaseProducer[SettingsType, MessageOutType], Stateful[StateType], ABC, Generic[SettingsType, MessageOutType, StateType]

Base class implementing common stateful producer functionality.

Examples of stateful producers are things that require counters, clocks, or to cycle through a set of values.

Unlike BaseStatefulProcessor, this class does not message hashing because there

are no input messages. We still use self._hash to simply track the transition from initialization (.hash == -1) to state reset (.hash == 0).

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state)[source]#
Parameters:

state (tuple[StateType, int] | None)

Return type:

tuple[tuple[StateType, int], MessageOutType]

class BaseStatefulTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulProcessor[SettingsType, MessageInType, MessageOutType, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

Base class for stateful message transformers that produce output. This class merely overrides the type annotations of BaseStatefulProcessor.

stateful_op(state, message)[source]#
Parameters:
  • state (tuple[StateType, int] | None)

  • message (MessageInType)

Return type:

tuple[tuple[StateType, int], MessageOutType]

class BaseTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, MessageOutType], ABC, Generic[SettingsType, MessageInType, MessageOutType]

Base class for transformers – processors which receive messages and produce output. This base simply overrides type annotations of BaseProcessor to indicate that outputs are not optional. (We don’t bother overriding send and asend because those are deprecated.)

Parameters:

settings (SettingsType)

class BaseTransformerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, MessageOutType, TransformerType]

Base class for transformer units – i.e. units that transform input messages into output messages. Implement a new Unit as follows:

class CustomUnit(BaseTransformerUnit[

CustomTransformerSettings, # SettingsType AxisArray, # MessageInType AxisArray, # MessageOutType CustomTransformer, # TransformerType

]):

SETTINGS = CustomTransformerSettings

… that’s all!

Where CustomTransformerSettings and CustomTransformer are custom implementations of: - ez.Settings for settings - One of these transformer types:

  • BaseTransformer

  • BaseStatefulTransformer

  • CompositeProcessor

Parameters:

settings (Settings | None)

INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
create_processor()[source]#

Create the transformer instance from settings.

async on_signal(message)[source]#
Parameters:

message (MessageInType)

Return type:

AsyncGenerator

class CompositeProcessor(*args, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, MessageOutType], CompositeStateful[SettingsType, MessageOutType], ABC, Generic[SettingsType, MessageInType, MessageOutType]

A processor that chains multiple processor together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The last processor may be a consumer, otherwise processors must be transformers. Use CompositeProducer if you want the first processor to be a producer. Concrete subclasses must implement _initialize_processors. Optionally override _reset_state if you want adaptive state behaviour. Example implementation:

class CustomCompositeProcessor(CompositeProcessor[CustomSettings, AxisArray, AxisArray]):

@staticmethod def _initialize_processors(settings: CustomSettings) -> dict[str, BaseProcessor]:

return {

“stateful_transformer”: CustomStatefulProducer(**settings), “transformer”: CustomTransformer(**settings),

}

Where **settings should be replaced with initialisation arguments for each processor.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state, message)[source]#
Parameters:
Return type:

tuple[dict[str, tuple[Any, int]], MessageOutType | None]

class CompositeProducer(*args, **kwargs)[source]#

Bases: BaseProducer[SettingsType, MessageOutType], CompositeStateful[SettingsType, MessageOutType], ABC, Generic[SettingsType, MessageOutType]

A producer that chains multiple processors (starting with a producer) together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The first processor must be a producer, the last processor may be a consumer, otherwise processors must be transformers.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state)[source]#
Parameters:

state (dict[str, tuple[Any, int]] | None)

Return type:

tuple[dict[str, tuple[Any, int]], MessageOutType | None]

class CompositeStateful[source]#

Bases: Stateful[dict[str, Any]], ABC, Generic[SettingsType, MessageOutType]

Mixin class for composite processor/producer chains. DO NOT use this class directly. Used to enforce statefulness of the composite processor/producer chain and provide initialization and validation methods.

property state: dict[str, Any]#
abstractmethod stateful_op(state, *args, **kwargs)[source]#
Parameters:
Return type:

tuple[dict[str, tuple[Any, int]], MessageOutType | None]

class Consumer(*args, **kwargs)[source]#

Bases: Processor[SettingsType, MessageInType, None], Protocol

Protocol for consumers that receive messages but do not return a result.

class GenAxisArray(*args, settings=None, **kwargs)[source]#

Bases: Unit

Parameters:

settings (Settings | None)

STATE#

alias of GenState

INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
INPUT_SETTINGS = InputStream:unlocated[<class 'ezmsg.core.settings.Settings'>]()#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

construct_generator()[source]#
async on_settings(msg)[source]#

Update unit settings and reset generator. Note: Not all units will require a full reset with new settings. Override this method to implement a selective reset.

Parameters:

msg (Settings) – Instance of SETTINGS object.

Return type:

None

async on_signal(message)[source]#
Parameters:

message (AxisArray)

Return type:

AsyncGenerator

class Processor(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageInType, MessageOutType]

Protocol for processors. You probably will not implement this protocol directly. Refer instead to the less ambiguous Consumer and Transformer protocols, and the base classes in this module which implement them.

Note: In Python 3.12+, we can invoke __acall__ directly using await obj(message),

but to support earlier versions we need to use await obj.__acall__(message).

__init__(*args, **kwargs)#
class Producer(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageOutType]

Protocol for producers that generate messages.

__init__(*args, **kwargs)#
class Stateful[source]#

Bases: ABC, Generic[StateType]

Mixin class for stateful processors. DO NOT use this class directly. Used to enforce that the processor/producer has a state attribute and stateful_op method.

classmethod get_state_type()[source]#
Return type:

type[StateType]

property state: StateType#
abstractmethod stateful_op(*args, **kwargs)[source]#
Parameters:
Return type:

tuple

class StatefulConsumer(*args, **kwargs)[source]#

Bases: StatefulProcessor[SettingsType, MessageInType, None, StateType], Protocol

Protocol specifically for processors that consume messages without producing output.

stateful_op(state, message)[source]#
Parameters:
  • state (tuple[StateType, int])

  • message (MessageInType)

Return type:

tuple[tuple[StateType, int], None]

class StatefulProcessor(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageInType, MessageOutType, StateType]

Base protocol for _stateful_ message processors. You probably will not implement this protocol directly. Refer instead to the less ambiguous StatefulConsumer and StatefulTransformer protocols.

property state: StateType#
stateful_op(state, message)[source]#
Parameters:
Return type:

tuple[Any, Any]

__init__(*args, **kwargs)#
class StatefulProducer(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageOutType, StateType]

Protocol for producers that generate messages without consuming inputs.

property state: StateType#
stateful_op(state)[source]#
Parameters:

state (Any)

Return type:

tuple[Any, Any]

__init__(*args, **kwargs)#
class StatefulTransformer(*args, **kwargs)[source]#

Bases: StatefulProcessor[SettingsType, MessageInType, MessageOutType, StateType], Protocol

Protocol specifically for processors that transform messages.

stateful_op(state, message)[source]#
Parameters:
  • state (tuple[StateType, int])

  • message (MessageInType)

Return type:

tuple[tuple[StateType, int], MessageOutType]

class Transformer(*args, **kwargs)[source]#

Bases: Processor[SettingsType, MessageInType, MessageOutType], Protocol

Protocol for transformers that receive messages and return a result of the same class.

class Processor(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageInType, MessageOutType]

Protocol for processors. You probably will not implement this protocol directly. Refer instead to the less ambiguous Consumer and Transformer protocols, and the base classes in this module which implement them.

Note: In Python 3.12+, we can invoke __acall__ directly using await obj(message),

but to support earlier versions we need to use await obj.__acall__(message).

__init__(*args, **kwargs)#
class Producer(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageOutType]

Protocol for producers that generate messages.

__init__(*args, **kwargs)#
class Consumer(*args, **kwargs)[source]#

Bases: Processor[SettingsType, MessageInType, None], Protocol

Protocol for consumers that receive messages but do not return a result.

class Transformer(*args, **kwargs)[source]#

Bases: Processor[SettingsType, MessageInType, MessageOutType], Protocol

Protocol for transformers that receive messages and return a result of the same class.

class StatefulProcessor(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageInType, MessageOutType, StateType]

Base protocol for _stateful_ message processors. You probably will not implement this protocol directly. Refer instead to the less ambiguous StatefulConsumer and StatefulTransformer protocols.

property state: StateType#
stateful_op(state, message)[source]#
Parameters:
Return type:

tuple[Any, Any]

__init__(*args, **kwargs)#
class StatefulProducer(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageOutType, StateType]

Protocol for producers that generate messages without consuming inputs.

property state: StateType#
stateful_op(state)[source]#
Parameters:

state (Any)

Return type:

tuple[Any, Any]

__init__(*args, **kwargs)#
class StatefulConsumer(*args, **kwargs)[source]#

Bases: StatefulProcessor[SettingsType, MessageInType, None, StateType], Protocol

Protocol specifically for processors that consume messages without producing output.

stateful_op(state, message)[source]#
Parameters:
  • state (tuple[StateType, int])

  • message (MessageInType)

Return type:

tuple[tuple[StateType, int], None]

class StatefulTransformer(*args, **kwargs)[source]#

Bases: StatefulProcessor[SettingsType, MessageInType, MessageOutType, StateType], Protocol

Protocol specifically for processors that transform messages.

stateful_op(state, message)[source]#
Parameters:
  • state (tuple[StateType, int])

  • message (MessageInType)

Return type:

tuple[tuple[StateType, int], MessageOutType]

class AdaptiveTransformer(*args, **kwargs)[source]#

Bases: StatefulTransformer, Protocol

partial_fit(message)[source]#

Update transformer state using labeled training data.

This method should update the internal state/parameters of the transformer based on the provided labeled samples, without performing any transformation.

Parameters:

message (SampleMessage)

Return type:

None

async apartial_fit(message)[source]#
Parameters:

message (SampleMessage)

Return type:

None

class BaseProcessor(*args, settings=None, **kwargs)[source]#

Bases: ABC, Generic[SettingsType, MessageInType, MessageOutType]

Base class for processors. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes.

Note that BaseProcessor and its children are sync by default. If you need async by defualt, then override the async methods and call them from the sync methods. Look to BaseProducer for examples of calling async methods from sync methods.

Parameters:

settings (SettingsType)

classmethod get_settings_type()[source]#
Return type:

type[SettingsType]

classmethod get_message_type(dir)[source]#
Parameters:

dir (str)

Return type:

Any

__init__(*args, settings=None, **kwargs)[source]#
Parameters:

settings (SettingsType | None)

Return type:

None

settings: SettingsType#
send(message)[source]#

Alias for __call__.

Parameters:

message (Any)

Return type:

Any

async asend(message)[source]#

Alias for __acall__.

Parameters:

message (Any)

Return type:

Any

class BaseProducer(*args, settings=None, **kwargs)[source]#

Bases: ABC, Generic[SettingsType, MessageOutType]

Base class for producers – processors that generate messages without consuming inputs.

Note that BaseProducer and its children are async by default, and the sync methods simply wrap

the async methods. This is the opposite of BaseProcessor and its children which are sync by default. These classes are designed this way because it is highly likely that a producer, which (probably) does not receive inputs, will require some sort of IO which will benefit from being async.

Parameters:

settings (SettingsType | None)

classmethod get_settings_type()[source]#
Return type:

type[SettingsType]

classmethod get_message_type(dir)[source]#
Parameters:

dir (str)

Return type:

type[MessageOutType] | None

__init__(*args, settings=None, **kwargs)[source]#
Parameters:

settings (SettingsType | None)

Return type:

None

class BaseConsumer(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, None], ABC, Generic[SettingsType, MessageInType]

Base class for consumers – processors that receive messages but don’t produce output. This base simply overrides type annotations of BaseProcessor to remove the outputs. (We don’t bother overriding send and asend because those are deprecated.)

Parameters:

settings (SettingsType)

classmethod get_message_type(dir)[source]#
Parameters:

dir (str)

Return type:

type[MessageInType] | None

class BaseTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, MessageOutType], ABC, Generic[SettingsType, MessageInType, MessageOutType]

Base class for transformers – processors which receive messages and produce output. This base simply overrides type annotations of BaseProcessor to indicate that outputs are not optional. (We don’t bother overriding send and asend because those are deprecated.)

Parameters:

settings (SettingsType)

class Stateful[source]#

Bases: ABC, Generic[StateType]

Mixin class for stateful processors. DO NOT use this class directly. Used to enforce that the processor/producer has a state attribute and stateful_op method.

classmethod get_state_type()[source]#
Return type:

type[StateType]

property state: StateType#
abstractmethod stateful_op(*args, **kwargs)[source]#
Parameters:
Return type:

tuple

class BaseStatefulProcessor(*args, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, MessageOutType], Stateful[StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

Base class implementing common stateful processor functionality. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes. Use BaseStatefulConsumer for operations that do not return a result, or BaseStatefulTransformer for operations that do return a result.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state, message)[source]#
Parameters:
Return type:

tuple[tuple[StateType, int], Any]

class BaseStatefulProducer(*args, **kwargs)[source]#

Bases: BaseProducer[SettingsType, MessageOutType], Stateful[StateType], ABC, Generic[SettingsType, MessageOutType, StateType]

Base class implementing common stateful producer functionality.

Examples of stateful producers are things that require counters, clocks, or to cycle through a set of values.

Unlike BaseStatefulProcessor, this class does not message hashing because there

are no input messages. We still use self._hash to simply track the transition from initialization (.hash == -1) to state reset (.hash == 0).

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state)[source]#
Parameters:

state (tuple[StateType, int] | None)

Return type:

tuple[tuple[StateType, int], MessageOutType]

class BaseStatefulConsumer(*args, **kwargs)[source]#

Bases: BaseStatefulProcessor[SettingsType, MessageInType, None, StateType], ABC, Generic[SettingsType, MessageInType, StateType]

Base class for stateful message consumers that don’t produce output. This class merely overrides the type annotations of BaseStatefulProcessor.

classmethod get_message_type(dir)[source]#
Parameters:

dir (str)

Return type:

type[MessageInType] | None

stateful_op(state, message)[source]#
Parameters:
  • state (tuple[StateType, int] | None)

  • message (MessageInType)

Return type:

tuple[tuple[StateType, int], None]

class BaseStatefulTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulProcessor[SettingsType, MessageInType, MessageOutType, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

Base class for stateful message transformers that produce output. This class merely overrides the type annotations of BaseStatefulProcessor.

stateful_op(state, message)[source]#
Parameters:
  • state (tuple[StateType, int] | None)

  • message (MessageInType)

Return type:

tuple[tuple[StateType, int], MessageOutType]

class BaseAdaptiveTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SettingsType, MessageInType | SampleMessage, MessageOutType | None, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

abstractmethod partial_fit(message)[source]#
Parameters:

message (SampleMessage)

Return type:

None

async apartial_fit(message)[source]#

Override me if you need async partial fitting.

Parameters:

message (SampleMessage)

Return type:

None

class BaseAsyncTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SettingsType, MessageInType, MessageOutType, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

This reverses the priority of async and sync methods from BaseStatefulTransformer. Whereas in BaseStatefulTransformer, the async methods simply called the sync methods, here the sync methods call the async methods, more similar to BaseStatefulProducer.

class CompositeStateful[source]#

Bases: Stateful[dict[str, Any]], ABC, Generic[SettingsType, MessageOutType]

Mixin class for composite processor/producer chains. DO NOT use this class directly. Used to enforce statefulness of the composite processor/producer chain and provide initialization and validation methods.

property state: dict[str, Any]#
abstractmethod stateful_op(state, *args, **kwargs)[source]#
Parameters:
Return type:

tuple[dict[str, tuple[Any, int]], MessageOutType | None]

class CompositeProcessor(*args, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, MessageOutType], CompositeStateful[SettingsType, MessageOutType], ABC, Generic[SettingsType, MessageInType, MessageOutType]

A processor that chains multiple processor together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The last processor may be a consumer, otherwise processors must be transformers. Use CompositeProducer if you want the first processor to be a producer. Concrete subclasses must implement _initialize_processors. Optionally override _reset_state if you want adaptive state behaviour. Example implementation:

class CustomCompositeProcessor(CompositeProcessor[CustomSettings, AxisArray, AxisArray]):

@staticmethod def _initialize_processors(settings: CustomSettings) -> dict[str, BaseProcessor]:

return {

“stateful_transformer”: CustomStatefulProducer(**settings), “transformer”: CustomTransformer(**settings),

}

Where **settings should be replaced with initialisation arguments for each processor.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state, message)[source]#
Parameters:
Return type:

tuple[dict[str, tuple[Any, int]], MessageOutType | None]

class CompositeProducer(*args, **kwargs)[source]#

Bases: BaseProducer[SettingsType, MessageOutType], CompositeStateful[SettingsType, MessageOutType], ABC, Generic[SettingsType, MessageOutType]

A producer that chains multiple processors (starting with a producer) together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The first processor must be a producer, the last processor may be a consumer, otherwise processors must be transformers.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state)[source]#
Parameters:

state (dict[str, tuple[Any, int]] | None)

Return type:

tuple[dict[str, tuple[Any, int]], MessageOutType | None]

get_base_producer_type(cls)[source]#
Parameters:

cls (type)

Return type:

type

get_base_consumer_type(cls)[source]#
Parameters:

cls (type)

Return type:

type

get_base_transformer_type(cls)[source]#
Parameters:

cls (type)

Return type:

type

get_base_adaptive_transformer_type(cls)[source]#
Parameters:

cls (type)

Return type:

type

class BaseProducerUnit(*args, settings=None, **kwargs)[source]#

Bases: Unit, ABC, Generic[SettingsType, MessageOutType, ProducerType]

Base class for producer units – i.e. units that generate messages without consuming inputs. Implement a new Unit as follows:

class CustomUnit(BaseProducerUnit[

CustomProducerSettings, # SettingsType AxisArray, # MessageOutType CustomProducer, # ProducerType

]):

SETTINGS = CustomProducerSettings

… that’s all!

Where CustomProducerSettings, and CustomProducer are custom implementations of ez.Settings, and BaseProducer or BaseStatefulProducer, respectively.

Parameters:

settings (Settings | None)

INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

create_producer()[source]#

Create the producer instance from settings.

Return type:

None

async on_settings(msg)[source]#

Receive a settings message, override self.SETTINGS, and re-create the producer. Child classes that wish to have fine-grained control over whether the core producer resets on settings changes should override this method.

Parameters:

msg (SettingsType) – a settings message.

Return type:

None

async produce()[source]#
Return type:

AsyncGenerator

class BaseProcessorUnit(*args, settings=None, **kwargs)[source]#

Bases: Unit, ABC, Generic[SettingsType]

Base class for processor units – i.e. units that process messages. This is an abstract base class that provides common functionality for consumer and transformer units. You probably do not want to inherit from this class directly as you would need to define a custom implementation of create_processor. Refer instead to BaseConsumerUnit or BaseTransformerUnit.

Parameters:

settings (Settings | None)

INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

abstractmethod create_processor()[source]#
Return type:

None

async on_settings(msg)[source]#

Receive a settings message, override self.SETTINGS, and re-create the processor. Child classes that wish to have fine-grained control over whether the core processor resets on settings changes should override this method.

Parameters:

msg (SettingsType) – a settings message.

Return type:

None

class BaseConsumerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, ConsumerType]

Base class for consumer units – i.e. units that receive messages but do not return results. Implement a new Unit as follows:

class CustomUnit(BaseConsumerUnit[

CustomConsumerSettings, # SettingsType AxisArray, # MessageInType CustomConsumer, # ConsumerType

]):

SETTINGS = CustomConsumerSettings

… that’s all!

Where CustomConsumerSettings and CustomConsumer are custom implementations of: - ez.Settings for settings - BaseConsumer or BaseStatefulConsumer for the consumer implementation

Parameters:

settings (Settings | None)

INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
create_processor()[source]#

Create the consumer instance from settings.

async on_signal(message)[source]#

Consume the message. :param message: Input message to be consumed

Parameters:

message (MessageInType)

class BaseTransformerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, MessageOutType, TransformerType]

Base class for transformer units – i.e. units that transform input messages into output messages. Implement a new Unit as follows:

class CustomUnit(BaseTransformerUnit[

CustomTransformerSettings, # SettingsType AxisArray, # MessageInType AxisArray, # MessageOutType CustomTransformer, # TransformerType

]):

SETTINGS = CustomTransformerSettings

… that’s all!

Where CustomTransformerSettings and CustomTransformer are custom implementations of: - ez.Settings for settings - One of these transformer types:

  • BaseTransformer

  • BaseStatefulTransformer

  • CompositeProcessor

Parameters:

settings (Settings | None)

INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
create_processor()[source]#

Create the transformer instance from settings.

async on_signal(message)[source]#
Parameters:

message (MessageInType)

Return type:

AsyncGenerator

class BaseAdaptiveTransformerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, MessageOutType, AdaptiveTransformerType]

Parameters:

settings (Settings | None)

INPUT_SAMPLE = InputStream:unlocated[<class 'ezmsg.sigproc.util.message.SampleMessage'>]()#
INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
create_processor()[source]#

Create the adaptive transformer instance from settings.

Return type:

None

async on_signal(message)[source]#
Parameters:

message (MessageInType)

Return type:

AsyncGenerator

async on_sample(msg)[source]#
Parameters:

msg (SampleMessage)

Return type:

None

class GenAxisArray(*args, settings=None, **kwargs)[source]#

Bases: Unit

Parameters:

settings (Settings | None)

STATE#

alias of GenState

INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
INPUT_SETTINGS = InputStream:unlocated[<class 'ezmsg.core.settings.Settings'>]()#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

construct_generator()[source]#
async on_settings(msg)[source]#

Update unit settings and reset generator. Note: Not all units will require a full reset with new settings. Override this method to implement a selective reset.

Parameters:

msg (Settings) – Instance of SETTINGS object.

Return type:

None

async on_signal(message)[source]#
Parameters:

message (AxisArray)

Return type:

AsyncGenerator