ezmsg.baseproc.units#

Base Unit classes for ezmsg integration.

Functions

get_base_adaptive_transformer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

get_base_clockdriven_producer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

get_base_consumer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

get_base_producer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

get_base_transformer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

Classes

class BaseAdaptiveTransformerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, MessageOutType, AdaptiveTransformerType]

Parameters:

settings (Settings | None)

INPUT_SAMPLE = InputStream:unlocated[<class 'ezmsg.baseproc.util.message.SampleMessage'>]()#
INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
create_processor()[source]#

Create the adaptive transformer instance from settings.

Return type:

None

async on_signal(message)[source]#
Return type:

AsyncGenerator

Parameters:

message (MessageInType)

async on_sample(msg)[source]#
Return type:

None

Parameters:

msg (SampleMessage)

class BaseClockDrivenUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, ClockDrivenProducerType]

Base class for clock-driven producer units.

These units receive clock ticks (LinearAxis) and produce AxisArray output. This simplifies the Clock → Counter → Generator pattern by combining the counter functionality into the generator.

Implement a new Unit as follows:

class SinGeneratorUnit(BaseClockDrivenUnit[
    SinGeneratorSettings,     # SettingsType (must extend ClockDrivenSettings)
    SinProducer,              # ClockDrivenProducerType
]):
    SETTINGS = SinGeneratorSettings

Where SinGeneratorSettings extends ClockDrivenSettings and SinProducer extends BaseClockDrivenProducer.

Parameters:

settings (Settings | None)

INPUT_CLOCK = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.LinearAxis'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
create_processor()[source]#

Create the clock-driven producer instance from settings.

Return type:

None

async on_clock(clock_tick)[source]#
Return type:

AsyncGenerator

Parameters:

clock_tick (LinearAxis)

class BaseConsumerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, ConsumerType]

Base class for consumer units – i.e. units that receive messages but do not return results. Implement a new Unit as follows:

class CustomUnit(BaseConsumerUnit[

CustomConsumerSettings, # SettingsType AxisArray, # MessageInType CustomConsumer, # ConsumerType

]):

SETTINGS = CustomConsumerSettings

… that’s all!

Where CustomConsumerSettings and CustomConsumer are custom implementations of: - ez.Settings for settings - BaseConsumer or BaseStatefulConsumer for the consumer implementation

Parameters:

settings (Settings | None)

INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
create_processor()[source]#

Create the consumer instance from settings.

async on_signal(message)[source]#

Consume the message. :type message: TypeVar(MessageInType) :param message: Input message to be consumed

Parameters:

message (MessageInType)

class BaseProcessorUnit(*args, settings=None, **kwargs)[source]#

Bases: Unit, ABC, Generic[SettingsType]

Base class for processor units – i.e. units that process messages. This is an abstract base class that provides common functionality for consumer and transformer units. You probably do not want to inherit from this class directly as you would need to define a custom implementation of create_processor. Refer instead to BaseConsumerUnit or BaseTransformerUnit.

Parameters:

settings (Settings | None)

INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

abstractmethod create_processor()[source]#
Return type:

None

async on_settings(msg)[source]#

Receive a settings message, override self.SETTINGS, and re-create the processor. Child classes that wish to have fine-grained control over whether the core processor resets on settings changes should override this method.

Parameters:

msg (TypeVar(SettingsType, bound= Settings)) – a settings message.

Return type:

None

class BaseProducerUnit(*args, settings=None, **kwargs)[source]#

Bases: Unit, ABC, Generic[SettingsType, MessageOutType, ProducerType]

Base class for producer units – i.e. units that generate messages without consuming inputs. Implement a new Unit as follows:

class CustomUnit(BaseProducerUnit[

CustomProducerSettings, # SettingsType AxisArray, # MessageOutType CustomProducer, # ProducerType

]):

SETTINGS = CustomProducerSettings

… that’s all!

Where CustomProducerSettings, and CustomProducer are custom implementations of ez.Settings, and BaseProducer or BaseStatefulProducer, respectively.

Parameters:

settings (Settings | None)

INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

create_producer()[source]#

Create the producer instance from settings.

Return type:

None

async on_settings(msg)[source]#

Receive a settings message, override self.SETTINGS, and re-create the producer. Child classes that wish to have fine-grained control over whether the core producer resets on settings changes should override this method.

Parameters:

msg (TypeVar(SettingsType, bound= Settings)) – a settings message.

Return type:

None

async produce()[source]#
Return type:

AsyncGenerator

class BaseTransformerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, MessageOutType, TransformerType]

Base class for transformer units – i.e. units that transform input messages into output messages. Implement a new Unit as follows:

class CustomUnit(BaseTransformerUnit[

CustomTransformerSettings, # SettingsType AxisArray, # MessageInType AxisArray, # MessageOutType CustomTransformer, # TransformerType

]):

SETTINGS = CustomTransformerSettings

… that’s all!

Where CustomTransformerSettings and CustomTransformer are custom implementations of: - ez.Settings for settings - One of these transformer types:

  • BaseTransformer

  • BaseStatefulTransformer

  • CompositeProcessor

Parameters:

settings (Settings | None)

INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
create_processor()[source]#

Create the transformer instance from settings.

async on_signal(message)[source]#
Return type:

AsyncGenerator

Parameters:

message (MessageInType)

class GenAxisArray(*args, settings=None, **kwargs)[source]#

Bases: Unit

Parameters:

settings (Settings | None)

STATE#

alias of GenState

INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
INPUT_SETTINGS = InputStream:unlocated[<class 'ezmsg.core.settings.Settings'>]()#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

construct_generator()[source]#
async on_settings(msg)[source]#

Update unit settings and reset generator. Note: Not all units will require a full reset with new settings. Override this method to implement a selective reset.

Parameters:

msg (Settings) – Instance of SETTINGS object.

Return type:

None

async on_signal(message)[source]#
Return type:

AsyncGenerator

Parameters:

message (AxisArray)

get_base_producer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

get_base_consumer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

get_base_transformer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

get_base_adaptive_transformer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

get_base_clockdriven_producer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

class BaseProducerUnit(*args, settings=None, **kwargs)[source]#

Bases: Unit, ABC, Generic[SettingsType, MessageOutType, ProducerType]

Base class for producer units – i.e. units that generate messages without consuming inputs. Implement a new Unit as follows:

class CustomUnit(BaseProducerUnit[

CustomProducerSettings, # SettingsType AxisArray, # MessageOutType CustomProducer, # ProducerType

]):

SETTINGS = CustomProducerSettings

… that’s all!

Where CustomProducerSettings, and CustomProducer are custom implementations of ez.Settings, and BaseProducer or BaseStatefulProducer, respectively.

Parameters:

settings (Settings | None)

INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

create_producer()[source]#

Create the producer instance from settings.

Return type:

None

async on_settings(msg)[source]#

Receive a settings message, override self.SETTINGS, and re-create the producer. Child classes that wish to have fine-grained control over whether the core producer resets on settings changes should override this method.

Parameters:

msg (TypeVar(SettingsType, bound= Settings)) – a settings message.

Return type:

None

async produce()[source]#
Return type:

AsyncGenerator

class BaseProcessorUnit(*args, settings=None, **kwargs)[source]#

Bases: Unit, ABC, Generic[SettingsType]

Base class for processor units – i.e. units that process messages. This is an abstract base class that provides common functionality for consumer and transformer units. You probably do not want to inherit from this class directly as you would need to define a custom implementation of create_processor. Refer instead to BaseConsumerUnit or BaseTransformerUnit.

Parameters:

settings (Settings | None)

INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

abstractmethod create_processor()[source]#
Return type:

None

async on_settings(msg)[source]#

Receive a settings message, override self.SETTINGS, and re-create the processor. Child classes that wish to have fine-grained control over whether the core processor resets on settings changes should override this method.

Parameters:

msg (TypeVar(SettingsType, bound= Settings)) – a settings message.

Return type:

None

class BaseConsumerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, ConsumerType]

Base class for consumer units – i.e. units that receive messages but do not return results. Implement a new Unit as follows:

class CustomUnit(BaseConsumerUnit[

CustomConsumerSettings, # SettingsType AxisArray, # MessageInType CustomConsumer, # ConsumerType

]):

SETTINGS = CustomConsumerSettings

… that’s all!

Where CustomConsumerSettings and CustomConsumer are custom implementations of: - ez.Settings for settings - BaseConsumer or BaseStatefulConsumer for the consumer implementation

Parameters:

settings (Settings | None)

INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
create_processor()[source]#

Create the consumer instance from settings.

async on_signal(message)[source]#

Consume the message. :type message: TypeVar(MessageInType) :param message: Input message to be consumed

Parameters:

message (MessageInType)

class BaseTransformerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, MessageOutType, TransformerType]

Base class for transformer units – i.e. units that transform input messages into output messages. Implement a new Unit as follows:

class CustomUnit(BaseTransformerUnit[

CustomTransformerSettings, # SettingsType AxisArray, # MessageInType AxisArray, # MessageOutType CustomTransformer, # TransformerType

]):

SETTINGS = CustomTransformerSettings

… that’s all!

Where CustomTransformerSettings and CustomTransformer are custom implementations of: - ez.Settings for settings - One of these transformer types:

  • BaseTransformer

  • BaseStatefulTransformer

  • CompositeProcessor

Parameters:

settings (Settings | None)

INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
create_processor()[source]#

Create the transformer instance from settings.

async on_signal(message)[source]#
Return type:

AsyncGenerator

Parameters:

message (MessageInType)

class BaseAdaptiveTransformerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, MessageOutType, AdaptiveTransformerType]

Parameters:

settings (Settings | None)

INPUT_SAMPLE = InputStream:unlocated[<class 'ezmsg.baseproc.util.message.SampleMessage'>]()#
INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
create_processor()[source]#

Create the adaptive transformer instance from settings.

Return type:

None

async on_signal(message)[source]#
Return type:

AsyncGenerator

Parameters:

message (MessageInType)

async on_sample(msg)[source]#
Return type:

None

Parameters:

msg (SampleMessage)

class BaseClockDrivenUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, ClockDrivenProducerType]

Base class for clock-driven producer units.

These units receive clock ticks (LinearAxis) and produce AxisArray output. This simplifies the Clock → Counter → Generator pattern by combining the counter functionality into the generator.

Implement a new Unit as follows:

class SinGeneratorUnit(BaseClockDrivenUnit[
    SinGeneratorSettings,     # SettingsType (must extend ClockDrivenSettings)
    SinProducer,              # ClockDrivenProducerType
]):
    SETTINGS = SinGeneratorSettings

Where SinGeneratorSettings extends ClockDrivenSettings and SinProducer extends BaseClockDrivenProducer.

Parameters:

settings (Settings | None)

INPUT_CLOCK = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.LinearAxis'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
create_processor()[source]#

Create the clock-driven producer instance from settings.

Return type:

None

async on_clock(clock_tick)[source]#
Return type:

AsyncGenerator

Parameters:

clock_tick (LinearAxis)

class GenAxisArray(*args, settings=None, **kwargs)[source]#

Bases: Unit

Parameters:

settings (Settings | None)

STATE#

alias of GenState

INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
INPUT_SETTINGS = InputStream:unlocated[<class 'ezmsg.core.settings.Settings'>]()#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

construct_generator()[source]#
async on_settings(msg)[source]#

Update unit settings and reset generator. Note: Not all units will require a full reset with new settings. Override this method to implement a selective reset.

Parameters:

msg (Settings) – Instance of SETTINGS object.

Return type:

None

async on_signal(message)[source]#
Return type:

AsyncGenerator

Parameters:

message (AxisArray)