ezmsg.baseproc.units#
Base Unit classes for ezmsg integration.
Functions
Classes
- class BaseAdaptiveTransformerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,MessageOutType,AdaptiveTransformerType]- Parameters:
settings (Settings | None)
- INPUT_SAMPLE = InputStream:unlocated[<class 'ezmsg.baseproc.util.message.SampleMessage'>]()#
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- async on_sample(msg)[source]#
- Return type:
- Parameters:
msg (SampleMessage)
- class BaseClockDrivenUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,ClockDrivenProducerType]Base class for clock-driven producer units.
These units receive clock ticks (LinearAxis) and produce AxisArray output. This simplifies the Clock → Counter → Generator pattern by combining the counter functionality into the generator.
Implement a new Unit as follows:
class SinGeneratorUnit(BaseClockDrivenUnit[ SinGeneratorSettings, # SettingsType (must extend ClockDrivenSettings) SinProducer, # ClockDrivenProducerType ]): SETTINGS = SinGeneratorSettings
Where SinGeneratorSettings extends ClockDrivenSettings and SinProducer extends BaseClockDrivenProducer.
- Parameters:
settings (Settings | None)
- INPUT_CLOCK = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.LinearAxis'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
- async on_clock(clock_tick)[source]#
- Return type:
- Parameters:
clock_tick (LinearAxis)
- class BaseConsumerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,ConsumerType]Base class for consumer units – i.e. units that receive messages but do not return results. Implement a new Unit as follows:
- class CustomUnit(BaseConsumerUnit[
CustomConsumerSettings, # SettingsType AxisArray, # MessageInType CustomConsumer, # ConsumerType
- ]):
SETTINGS = CustomConsumerSettings
… that’s all!
Where CustomConsumerSettings and CustomConsumer are custom implementations of: - ez.Settings for settings - BaseConsumer or BaseStatefulConsumer for the consumer implementation
- Parameters:
settings (Settings | None)
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- class BaseProcessorUnit(*args, settings=None, **kwargs)[source]#
Bases:
Unit,ABC,Generic[SettingsType]Base class for processor units – i.e. units that process messages. This is an abstract base class that provides common functionality for consumer and transformer units. You probably do not want to inherit from this class directly as you would need to define a custom implementation of create_processor. Refer instead to BaseConsumerUnit or BaseTransformerUnit.
- Parameters:
settings (Settings | None)
- INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
- class BaseProducerUnit(*args, settings=None, **kwargs)[source]#
Bases:
Unit,ABC,Generic[SettingsType,MessageOutType,ProducerType]Base class for producer units – i.e. units that generate messages without consuming inputs. Implement a new Unit as follows:
- class CustomUnit(BaseProducerUnit[
CustomProducerSettings, # SettingsType AxisArray, # MessageOutType CustomProducer, # ProducerType
- ]):
SETTINGS = CustomProducerSettings
… that’s all!
Where CustomProducerSettings, and CustomProducer are custom implementations of ez.Settings, and BaseProducer or BaseStatefulProducer, respectively.
- Parameters:
settings (Settings | None)
- INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
- async on_settings(msg)[source]#
Receive a settings message, override self.SETTINGS, and re-create the producer. Child classes that wish to have fine-grained control over whether the core producer resets on settings changes should override this method.
- class BaseTransformerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,MessageOutType,TransformerType]Base class for transformer units – i.e. units that transform input messages into output messages. Implement a new Unit as follows:
- class CustomUnit(BaseTransformerUnit[
CustomTransformerSettings, # SettingsType AxisArray, # MessageInType AxisArray, # MessageOutType CustomTransformer, # TransformerType
- ]):
SETTINGS = CustomTransformerSettings
… that’s all!
Where CustomTransformerSettings and CustomTransformer are custom implementations of: - ez.Settings for settings - One of these transformer types:
BaseTransformer
BaseStatefulTransformer
CompositeProcessor
- Parameters:
settings (Settings | None)
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- class GenAxisArray(*args, settings=None, **kwargs)[source]#
Bases:
Unit- Parameters:
settings (Settings | None)
- STATE#
alias of
GenState
- INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
- INPUT_SETTINGS = InputStream:unlocated[<class 'ezmsg.core.settings.Settings'>]()#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
- async on_settings(msg)[source]#
Update unit settings and reset generator. Note: Not all units will require a full reset with new settings. Override this method to implement a selective reset.
- class BaseProducerUnit(*args, settings=None, **kwargs)[source]#
Bases:
Unit,ABC,Generic[SettingsType,MessageOutType,ProducerType]Base class for producer units – i.e. units that generate messages without consuming inputs. Implement a new Unit as follows:
- class CustomUnit(BaseProducerUnit[
CustomProducerSettings, # SettingsType AxisArray, # MessageOutType CustomProducer, # ProducerType
- ]):
SETTINGS = CustomProducerSettings
… that’s all!
Where CustomProducerSettings, and CustomProducer are custom implementations of ez.Settings, and BaseProducer or BaseStatefulProducer, respectively.
- Parameters:
settings (Settings | None)
- INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
- async on_settings(msg)[source]#
Receive a settings message, override self.SETTINGS, and re-create the producer. Child classes that wish to have fine-grained control over whether the core producer resets on settings changes should override this method.
- class BaseProcessorUnit(*args, settings=None, **kwargs)[source]#
Bases:
Unit,ABC,Generic[SettingsType]Base class for processor units – i.e. units that process messages. This is an abstract base class that provides common functionality for consumer and transformer units. You probably do not want to inherit from this class directly as you would need to define a custom implementation of create_processor. Refer instead to BaseConsumerUnit or BaseTransformerUnit.
- Parameters:
settings (Settings | None)
- INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
- class BaseConsumerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,ConsumerType]Base class for consumer units – i.e. units that receive messages but do not return results. Implement a new Unit as follows:
- class CustomUnit(BaseConsumerUnit[
CustomConsumerSettings, # SettingsType AxisArray, # MessageInType CustomConsumer, # ConsumerType
- ]):
SETTINGS = CustomConsumerSettings
… that’s all!
Where CustomConsumerSettings and CustomConsumer are custom implementations of: - ez.Settings for settings - BaseConsumer or BaseStatefulConsumer for the consumer implementation
- Parameters:
settings (Settings | None)
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- class BaseTransformerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,MessageOutType,TransformerType]Base class for transformer units – i.e. units that transform input messages into output messages. Implement a new Unit as follows:
- class CustomUnit(BaseTransformerUnit[
CustomTransformerSettings, # SettingsType AxisArray, # MessageInType AxisArray, # MessageOutType CustomTransformer, # TransformerType
- ]):
SETTINGS = CustomTransformerSettings
… that’s all!
Where CustomTransformerSettings and CustomTransformer are custom implementations of: - ez.Settings for settings - One of these transformer types:
BaseTransformer
BaseStatefulTransformer
CompositeProcessor
- Parameters:
settings (Settings | None)
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- class BaseAdaptiveTransformerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,MessageOutType,AdaptiveTransformerType]- Parameters:
settings (Settings | None)
- INPUT_SAMPLE = InputStream:unlocated[<class 'ezmsg.baseproc.util.message.SampleMessage'>]()#
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- async on_sample(msg)[source]#
- Return type:
- Parameters:
msg (SampleMessage)
- class BaseClockDrivenUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,ClockDrivenProducerType]Base class for clock-driven producer units.
These units receive clock ticks (LinearAxis) and produce AxisArray output. This simplifies the Clock → Counter → Generator pattern by combining the counter functionality into the generator.
Implement a new Unit as follows:
class SinGeneratorUnit(BaseClockDrivenUnit[ SinGeneratorSettings, # SettingsType (must extend ClockDrivenSettings) SinProducer, # ClockDrivenProducerType ]): SETTINGS = SinGeneratorSettings
Where SinGeneratorSettings extends ClockDrivenSettings and SinProducer extends BaseClockDrivenProducer.
- Parameters:
settings (Settings | None)
- INPUT_CLOCK = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.LinearAxis'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
- async on_clock(clock_tick)[source]#
- Return type:
- Parameters:
clock_tick (LinearAxis)
- class GenAxisArray(*args, settings=None, **kwargs)[source]#
Bases:
Unit- Parameters:
settings (Settings | None)
- STATE#
alias of
GenState
- INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
- INPUT_SETTINGS = InputStream:unlocated[<class 'ezmsg.core.settings.Settings'>]()#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
- async on_settings(msg)[source]#
Update unit settings and reset generator. Note: Not all units will require a full reset with new settings. Override this method to implement a selective reset.