Base Processor Units#
Here is the API for the base processor ezmsg Units included in the ezmsg-sigproc extension. For more detailed information on the design decisions behind these base units, please refer to the ezmsg-sigproc explainer.
- class BaseProducerUnit(*args, settings=None, **kwargs)[source]#
Bases:
Unit,ABC,Generic[SettingsType,MessageOutType,ProducerType]Base class for producer units – i.e. units that generate messages without consuming inputs. Implement a new Unit as follows:
- class CustomUnit(BaseProducerUnit[
CustomProducerSettings, # SettingsType AxisArray, # MessageOutType CustomProducer, # ProducerType
- ]):
SETTINGS = CustomProducerSettings
… that’s all!
Where CustomProducerSettings, and CustomProducer are custom implementations of ez.Settings, and BaseProducer or BaseStatefulProducer, respectively.
- Parameters:
settings (Settings | None)
- INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- async on_settings(msg)[source]#
Receive a settings message, override self.SETTINGS, and re-create the producer. Child classes that wish to have fine-grained control over whether the core producer resets on settings changes should override this method.
- Parameters:
msg (SettingsType) – a settings message.
- Return type:
None
- __init__(*args, settings=None, **kwargs)#
- Parameters:
settings (Settings | None)
- apply_settings(settings)#
Update the
Component‘sSettingsobject.- Parameters:
settings (Settings) – An instance of the class-specific
Settings.- Return type:
None
- async setup()#
This is called from within the same process this unit will live
- async shutdown()#
Runs when the
Unitterminates. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- class BaseProcessorUnit(*args, settings=None, **kwargs)[source]#
Bases:
Unit,ABC,Generic[SettingsType]Base class for processor units – i.e. units that process messages. This is an abstract base class that provides common functionality for consumer and transformer units. You probably do not want to inherit from this class directly as you would need to define a custom implementation of create_processor. Refer instead to BaseConsumerUnit or BaseTransformerUnit.
- Parameters:
settings (Settings | None)
- INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- async on_settings(msg)[source]#
Receive a settings message, override self.SETTINGS, and re-create the processor. Child classes that wish to have fine-grained control over whether the core processor resets on settings changes should override this method.
- Parameters:
msg (SettingsType) – a settings message.
- Return type:
None
- __init__(*args, settings=None, **kwargs)#
- Parameters:
settings (Settings | None)
- apply_settings(settings)#
Update the
Component‘sSettingsobject.- Parameters:
settings (Settings) – An instance of the class-specific
Settings.- Return type:
None
- async setup()#
This is called from within the same process this unit will live
- async shutdown()#
Runs when the
Unitterminates. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- class BaseConsumerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,ConsumerType]Base class for consumer units – i.e. units that receive messages but do not return results. Implement a new Unit as follows:
- class CustomUnit(BaseConsumerUnit[
CustomConsumerSettings, # SettingsType AxisArray, # MessageInType CustomConsumer, # ConsumerType
- ]):
SETTINGS = CustomConsumerSettings
… that’s all!
Where CustomConsumerSettings and CustomConsumer are custom implementations of: - ez.Settings for settings - BaseConsumer or BaseStatefulConsumer for the consumer implementation
- Parameters:
settings (Settings | None)
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- async on_signal(message)[source]#
Consume the message. :param message: Input message to be consumed
- Parameters:
message (MessageInType)
- INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
- __init__(*args, settings=None, **kwargs)#
- Parameters:
settings (Settings | None)
- apply_settings(settings)#
Update the
Component‘sSettingsobject.- Parameters:
settings (Settings) – An instance of the class-specific
Settings.- Return type:
None
- async initialize()#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- async on_settings(msg)#
Receive a settings message, override self.SETTINGS, and re-create the processor. Child classes that wish to have fine-grained control over whether the core processor resets on settings changes should override this method.
- Parameters:
msg (SettingsType) – a settings message.
- Return type:
None
- async setup()#
This is called from within the same process this unit will live
- async shutdown()#
Runs when the
Unitterminates. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- class BaseTransformerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,MessageOutType,TransformerType]Base class for transformer units – i.e. units that transform input messages into output messages. Implement a new Unit as follows:
- class CustomUnit(BaseTransformerUnit[
CustomTransformerSettings, # SettingsType AxisArray, # MessageInType AxisArray, # MessageOutType CustomTransformer, # TransformerType
- ]):
SETTINGS = CustomTransformerSettings
… that’s all!
Where CustomTransformerSettings and CustomTransformer are custom implementations of: - ez.Settings for settings - One of these transformer types:
BaseTransformer
BaseStatefulTransformer
CompositeProcessor
- Parameters:
settings (Settings | None)
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
- __init__(*args, settings=None, **kwargs)#
- Parameters:
settings (Settings | None)
- apply_settings(settings)#
Update the
Component‘sSettingsobject.- Parameters:
settings (Settings) – An instance of the class-specific
Settings.- Return type:
None
- async initialize()#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- async on_settings(msg)#
Receive a settings message, override self.SETTINGS, and re-create the processor. Child classes that wish to have fine-grained control over whether the core processor resets on settings changes should override this method.
- Parameters:
msg (SettingsType) – a settings message.
- Return type:
None
- async setup()#
This is called from within the same process this unit will live
- async shutdown()#
Runs when the
Unitterminates. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- class BaseAdaptiveTransformerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,MessageOutType,AdaptiveTransformerType]- Parameters:
settings (Settings | None)
- INPUT_SAMPLE = InputStream:unlocated[<class 'ezmsg.sigproc.util.message.SampleMessage'>]()#
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- create_processor()[source]#
Create the adaptive transformer instance from settings.
- Return type:
None
- async on_sample(msg)[source]#
- Parameters:
msg (SampleMessage)
- Return type:
None
- INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
- __init__(*args, settings=None, **kwargs)#
- Parameters:
settings (Settings | None)
- apply_settings(settings)#
Update the
Component‘sSettingsobject.- Parameters:
settings (Settings) – An instance of the class-specific
Settings.- Return type:
None
- async initialize()#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- async on_settings(msg)#
Receive a settings message, override self.SETTINGS, and re-create the processor. Child classes that wish to have fine-grained control over whether the core processor resets on settings changes should override this method.
- Parameters:
msg (SettingsType) – a settings message.
- Return type:
None
- async setup()#
This is called from within the same process this unit will live
- async shutdown()#
Runs when the
Unitterminates. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None