ezmsg.baseproc.units#

Base Unit classes for ezmsg integration.

Functions

get_base_adaptive_transformer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

get_base_clockdriven_producer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

get_base_consumer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

get_base_producer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

get_base_transformer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

Classes

class BaseAdaptiveTransformerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, MessageOutType, AdaptiveTransformerType]

Parameters:

settings (Settings | None)

INPUT_SAMPLE = InputStream:unlocated[AxisArray]()#
INPUT_SIGNAL = InputStream:unlocated[MessageInType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[MessageOutType](self.num_buffers=32, self.force_tcp=False)#
OUTPUT_SAMPLE = OutputStream:unlocated[MessageOutType](self.num_buffers=32, self.force_tcp=False)#
create_processor()[source]#

Create the adaptive transformer instance from settings.

Return type:

None

async on_signal(message)[source]#
Return type:

AsyncGenerator

Parameters:

message (MessageInType)

async on_sample(msg)[source]#
Return type:

AsyncGenerator

Parameters:

msg (AxisArray)

class BaseClockDrivenUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, ClockDrivenProducerType]

Base class for clock-driven producer units.

These units receive clock ticks (LinearAxis) and produce AxisArray output. This simplifies the Clock → Counter → Generator pattern by combining the counter functionality into the generator.

Implement a new Unit as follows:

class SinGeneratorUnit(BaseClockDrivenUnit[
    SinGeneratorSettings,     # SettingsType (must extend ClockDrivenSettings)
    SinProducer,              # ClockDrivenProducerType
]):
    SETTINGS = SinGeneratorSettings

Where SinGeneratorSettings extends ClockDrivenSettings and SinProducer extends BaseClockDrivenProducer.

Parameters:

settings (Settings | None)

INPUT_CLOCK = InputStream:unlocated[LinearAxis]()#
OUTPUT_SIGNAL = OutputStream:unlocated[AxisArray](self.num_buffers=32, self.force_tcp=False)#
create_processor()[source]#

Create the clock-driven producer instance from settings.

Return type:

None

async on_clock(clock_tick)[source]#
Return type:

AsyncGenerator

Parameters:

clock_tick (LinearAxis)

class BaseConsumerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, ConsumerType]

Base class for consumer units – i.e. units that receive messages but do not return results. Implement a new Unit as follows:

class CustomUnit(BaseConsumerUnit[

CustomConsumerSettings, # SettingsType AxisArray, # MessageInType CustomConsumer, # ConsumerType

]):

SETTINGS = CustomConsumerSettings

… that’s all!

Where CustomConsumerSettings and CustomConsumer are custom implementations of: - ez.Settings for settings - BaseConsumer or BaseStatefulConsumer for the consumer implementation

Parameters:

settings (Settings | None)

INPUT_SIGNAL = InputStream:unlocated[MessageInType]()#
create_processor()[source]#

Create the consumer instance from settings.

async on_signal(message)[source]#

Consume the message. :type message: TypeVar(MessageInType) :param message: Input message to be consumed

Parameters:

message (MessageInType)

class BaseProcessorUnit(*args, settings=None, **kwargs)[source]#

Bases: Unit, ABC, Generic[SettingsType]

Base class for processor units – i.e. units that process messages. This is an abstract base class that provides common functionality for consumer and transformer units. You probably do not want to inherit from this class directly as you would need to define a custom implementation of create_processor. Refer instead to BaseConsumerUnit or BaseTransformerUnit.

Parameters:

settings (Settings | None)

INPUT_SETTINGS = InputStream:unlocated[SettingsType]()#
async initialize()[source]#

Runs when the Unit is instantiated.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should initialize your unit’s state and prepare for message processing.

Return type:

None

abstractmethod create_processor()[source]#
Return type:

None

async on_settings(msg)[source]#

Receive a settings message, override self.SETTINGS, and re-create the processor. Child classes that wish to have fine-grained control over whether the core processor resets on settings changes should override this method.

Parameters:

msg (TypeVar(SettingsType, bound= Settings)) – a settings message.

Return type:

None

class BaseProducerUnit(*args, settings=None, **kwargs)[source]#

Bases: Unit, ABC, Generic[SettingsType, MessageOutType, ProducerType]

Base class for producer units – i.e. units that generate messages without consuming inputs. Implement a new Unit as follows:

class CustomUnit(BaseProducerUnit[

CustomProducerSettings, # SettingsType AxisArray, # MessageOutType CustomProducer, # ProducerType

]):

SETTINGS = CustomProducerSettings

… that’s all!

Where CustomProducerSettings, and CustomProducer are custom implementations of ez.Settings, and BaseProducer or BaseStatefulProducer, respectively.

Parameters:

settings (Settings | None)

INPUT_SETTINGS = InputStream:unlocated[SettingsType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[MessageOutType](self.num_buffers=32, self.force_tcp=False)#
async initialize()[source]#

Runs when the Unit is instantiated.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should initialize your unit’s state and prepare for message processing.

Return type:

None

create_producer()[source]#

Create the producer instance from settings.

Return type:

None

async on_settings(msg)[source]#

Receive a settings message, override self.SETTINGS, and re-create the producer. Child classes that wish to have fine-grained control over whether the core producer resets on settings changes should override this method.

Parameters:

msg (TypeVar(SettingsType, bound= Settings)) – a settings message.

Return type:

None

async produce()[source]#
Return type:

AsyncGenerator

class BaseTransformerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, MessageOutType, TransformerType]

Base class for transformer units – i.e. units that transform input messages into output messages.

Implement a new Unit as follows:

class CustomUnit(BaseTransformerUnit[
    CustomTransformerSettings,    # SettingsType
    AxisArray,                    # MessageInType
    AxisArray,                    # MessageOutType
    CustomTransformer,            # TransformerType
]):
    SETTINGS = CustomTransformerSettings

… that’s all!

Where CustomTransformerSettings and CustomTransformer are custom implementations of:

  • ez.Settings for settings

  • One of these transformer types:

    • BaseTransformer

    • BaseStatefulTransformer

    • CompositeProcessor

Parameters:

settings (Settings | None)

INPUT_SIGNAL = InputStream:unlocated[MessageInType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[MessageOutType](self.num_buffers=32, self.force_tcp=False)#
create_processor()[source]#

Create the transformer instance from settings.

async on_signal(message)[source]#
Return type:

AsyncGenerator

Parameters:

message (MessageInType)

class GenAxisArray(*args, settings=None, **kwargs)[source]#

Bases: Unit

Deprecated since version ``GenAxisArray``: is deprecated. Use BaseTransformerUnit or BaseAdaptiveTransformerUnit from ezmsg.baseproc instead.

Parameters:

settings (Settings | None)

STATE#

alias of GenState

INPUT_SIGNAL = InputStream:unlocated[AxisArray]()#
OUTPUT_SIGNAL = OutputStream:unlocated[AxisArray](self.num_buffers=32, self.force_tcp=False)#
INPUT_SETTINGS = InputStream:unlocated[Settings]()#
async initialize()[source]#

Runs when the Unit is instantiated.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should initialize your unit’s state and prepare for message processing.

Return type:

None

construct_generator()[source]#
async on_settings(msg)[source]#

Update unit settings and reset generator. Note: Not all units will require a full reset with new settings. Override this method to implement a selective reset.

Parameters:

msg (Settings) – Instance of SETTINGS object.

Return type:

None

async on_signal(message)[source]#
Return type:

AsyncGenerator

Parameters:

message (AxisArray)

class GenState[source]#

Bases: State

Deprecated since version ``GenState``: is deprecated. Define a local state class or use ezmsg.baseproc processor classes instead.

gen: Generator[Any, Any, None]#