ezmsg.baseproc.units#
Base Unit classes for ezmsg integration.
Functions
Classes
- class BaseAdaptiveTransformerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,MessageOutType,AdaptiveTransformerType]- Parameters:
settings (Settings | None)
- INPUT_SAMPLE = InputStream:unlocated[AxisArray]()#
- INPUT_SIGNAL = InputStream:unlocated[MessageInType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[MessageOutType](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
- OUTPUT_SAMPLE = OutputStream:unlocated[MessageOutType](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
- class BaseClockDrivenUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,ClockDrivenProducerType]Base class for clock-driven producer units.
These units receive clock ticks (LinearAxis) and produce AxisArray output. This simplifies the Clock → Counter → Generator pattern by combining the counter functionality into the generator.
Implement a new Unit as follows:
class SinGeneratorUnit(BaseClockDrivenUnit[ SinGeneratorSettings, # SettingsType (must extend ClockDrivenSettings) SinProducer, # ClockDrivenProducerType ]): SETTINGS = SinGeneratorSettings
Where SinGeneratorSettings extends ClockDrivenSettings and SinProducer extends BaseClockDrivenProducer.
- Parameters:
settings (Settings | None)
- INPUT_CLOCK = InputStream:unlocated[LinearAxis]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[AxisArray](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
- async on_clock(clock_tick)[source]#
- Return type:
- Parameters:
clock_tick (LinearAxis)
- class BaseConsumerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,ConsumerType]Base class for consumer units – i.e. units that receive messages but do not return results. Implement a new Unit as follows:
- class CustomUnit(BaseConsumerUnit[
CustomConsumerSettings, # SettingsType AxisArray, # MessageInType CustomConsumer, # ConsumerType
- ]):
SETTINGS = CustomConsumerSettings
… that’s all!
Where CustomConsumerSettings and CustomConsumer are custom implementations of: - ez.Settings for settings - BaseConsumer or BaseStatefulConsumer for the consumer implementation
- Parameters:
settings (Settings | None)
- INPUT_SIGNAL = InputStream:unlocated[MessageInType]()#
- class BaseProcessorUnit(*args, settings=None, **kwargs)[source]#
Bases:
Unit,ABC,Generic[SettingsType]Base class for processor units – i.e. units that process messages. This is an abstract base class that provides common functionality for consumer and transformer units. You probably do not want to inherit from this class directly as you would need to define a custom implementation of create_processor. Refer instead to BaseConsumerUnit or BaseTransformerUnit.
- Parameters:
settings (Settings | None)
- INPUT_SETTINGS = InputStream:unlocated[SettingsType]()#
- async initialize()[source]#
Runs when the Unit is instantiated.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should initialize your unit’s state and prepare for message processing.
- Return type:
- async on_settings(msg)[source]#
Receive a settings message, override self.SETTINGS, and delegate the update to the underlying processor. The processor decides — via
BaseProcessor.update_settings()and itsNONRESET_SETTINGS_FIELDSclass variable — whether the change requires a state reset. Override this method (orupdate_settingson the processor) for finer control.
- class BaseProducerUnit(*args, settings=None, **kwargs)[source]#
Bases:
Unit,ABC,Generic[SettingsType,MessageOutType,ProducerType]Base class for producer units – i.e. units that generate messages without consuming inputs. Implement a new Unit as follows:
- class CustomUnit(BaseProducerUnit[
CustomProducerSettings, # SettingsType AxisArray, # MessageOutType CustomProducer, # ProducerType
- ]):
SETTINGS = CustomProducerSettings
… that’s all!
Where CustomProducerSettings, and CustomProducer are custom implementations of ez.Settings, and BaseProducer or BaseStatefulProducer, respectively.
- Parameters:
settings (Settings | None)
- INPUT_SETTINGS = InputStream:unlocated[SettingsType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[MessageOutType](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
- async initialize()[source]#
Runs when the Unit is instantiated.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should initialize your unit’s state and prepare for message processing.
- Return type:
- create_producer()[source]#
Create the producer instance from settings.
Closes the previous producer first (if any), so resources held by it — sockets, file handles, hardware sessions — are released deterministically rather than being left to garbage collection.
- Return type:
- async on_settings(msg)[source]#
Receive a settings message, override self.SETTINGS, and delegate the update to the underlying producer. The producer decides — via
BaseProducer.update_settings()and itsNONRESET_SETTINGS_FIELDSclass variable — whether the change requires a state reset. Override this method (orupdate_settingson the producer) for finer control.
- class BaseTransformerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,MessageOutType,TransformerType]Base class for transformer units – i.e. units that transform input messages into output messages.
Implement a new Unit as follows:
class CustomUnit(BaseTransformerUnit[ CustomTransformerSettings, # SettingsType AxisArray, # MessageInType AxisArray, # MessageOutType CustomTransformer, # TransformerType ]): SETTINGS = CustomTransformerSettings
… that’s all!
Where
CustomTransformerSettingsandCustomTransformerare custom implementations of:ez.Settingsfor settingsOne of these transformer types:
BaseTransformerBaseStatefulTransformerCompositeProcessor
- Parameters:
settings (Settings | None)
- INPUT_SIGNAL = InputStream:unlocated[MessageInType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[MessageOutType](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
- class GenAxisArray(*args, settings=None, **kwargs)[source]#
Bases:
UnitDeprecated since version ``GenAxisArray``: is deprecated. Use
BaseTransformerUnitorBaseAdaptiveTransformerUnitfromezmsg.baseprocinstead.- Parameters:
settings (Settings | None)
- INPUT_SIGNAL = InputStream:unlocated[AxisArray]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[AxisArray](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
- INPUT_SETTINGS = InputStream:unlocated[Settings]()#
- async initialize()[source]#
Runs when the Unit is instantiated.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should initialize your unit’s state and prepare for message processing.
- Return type:
- async on_settings(msg)[source]#
Update unit settings and reset generator. Note: Not all units will require a full reset with new settings. Override this method to implement a selective reset.