ezmsg.sigproc.base#
Functions
Classes
- class AdaptiveTransformer(*args, **kwargs)[source]#
Bases:
StatefulTransformer,Protocol- partial_fit(message)[source]#
Update transformer state using labeled training data.
This method should update the internal state/parameters of the transformer based on the provided labeled samples, without performing any transformation.
- Parameters:
message (SampleMessage)
- Return type:
None
- async apartial_fit(message)[source]#
- Parameters:
message (SampleMessage)
- Return type:
None
- class BaseAdaptiveTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SettingsType,MessageInType|SampleMessage,MessageOutType|None,StateType],ABC,Generic[SettingsType,MessageInType,MessageOutType,StateType]- abstractmethod partial_fit(message)[source]#
- Parameters:
message (SampleMessage)
- Return type:
None
- async apartial_fit(message)[source]#
Override me if you need async partial fitting.
- Parameters:
message (SampleMessage)
- Return type:
None
- class BaseAdaptiveTransformerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,MessageOutType,AdaptiveTransformerType]- Parameters:
settings (Settings | None)
- INPUT_SAMPLE = InputStream:unlocated[<class 'ezmsg.sigproc.util.message.SampleMessage'>]()#
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- create_processor()[source]#
Create the adaptive transformer instance from settings.
- Return type:
None
- async on_sample(msg)[source]#
- Parameters:
msg (SampleMessage)
- Return type:
None
- class BaseAsyncTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SettingsType,MessageInType,MessageOutType,StateType],ABC,Generic[SettingsType,MessageInType,MessageOutType,StateType]This reverses the priority of async and sync methods from
BaseStatefulTransformer. Whereas inBaseStatefulTransformer, the async methods simply called the sync methods, here the sync methods call the async methods, more similar toBaseStatefulProducer.
- class BaseConsumer(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessor[SettingsType,MessageInType,None],ABC,Generic[SettingsType,MessageInType]Base class for consumers – processors that receive messages but don’t produce output. This base simply overrides type annotations of BaseProcessor to remove the outputs. (We don’t bother overriding send and asend because those are deprecated.)
- Parameters:
settings (SettingsType)
- class BaseConsumerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,ConsumerType]Base class for consumer units – i.e. units that receive messages but do not return results. Implement a new Unit as follows:
- class CustomUnit(BaseConsumerUnit[
CustomConsumerSettings, # SettingsType AxisArray, # MessageInType CustomConsumer, # ConsumerType
- ]):
SETTINGS = CustomConsumerSettings
… that’s all!
Where CustomConsumerSettings and CustomConsumer are custom implementations of: - ez.Settings for settings - BaseConsumer or BaseStatefulConsumer for the consumer implementation
- Parameters:
settings (Settings | None)
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- class BaseProcessor(*args, settings=None, **kwargs)[source]#
Bases:
ABC,Generic[SettingsType,MessageInType,MessageOutType]Base class for processors. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes.
Use
BaseConsumerorBaseTransformerfor ops that return a result or not, respectively.Use
BaseStatefulProcessorand its children for operations that require state.
Note that BaseProcessor and its children are sync by default. If you need async by defualt, then override the async methods and call them from the sync methods. Look to BaseProducer for examples of calling async methods from sync methods.
- Parameters:
settings (SettingsType)
- __init__(*args, settings=None, **kwargs)[source]#
- Parameters:
settings (SettingsType | None)
- Return type:
None
- settings: SettingsType#
- class BaseProcessorUnit(*args, settings=None, **kwargs)[source]#
Bases:
Unit,ABC,Generic[SettingsType]Base class for processor units – i.e. units that process messages. This is an abstract base class that provides common functionality for consumer and transformer units. You probably do not want to inherit from this class directly as you would need to define a custom implementation of create_processor. Refer instead to BaseConsumerUnit or BaseTransformerUnit.
- Parameters:
settings (Settings | None)
- INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- async on_settings(msg)[source]#
Receive a settings message, override self.SETTINGS, and re-create the processor. Child classes that wish to have fine-grained control over whether the core processor resets on settings changes should override this method.
- Parameters:
msg (SettingsType) – a settings message.
- Return type:
None
- class BaseProducer(*args, settings=None, **kwargs)[source]#
Bases:
ABC,Generic[SettingsType,MessageOutType]Base class for producers – processors that generate messages without consuming inputs.
- Note that BaseProducer and its children are async by default, and the sync methods simply wrap
the async methods. This is the opposite of
BaseProcessorand its children which are sync by default. These classes are designed this way because it is highly likely that a producer, which (probably) does not receive inputs, will require some sort of IO which will benefit from being async.
- Parameters:
settings (SettingsType | None)
- class BaseProducerUnit(*args, settings=None, **kwargs)[source]#
Bases:
Unit,ABC,Generic[SettingsType,MessageOutType,ProducerType]Base class for producer units – i.e. units that generate messages without consuming inputs. Implement a new Unit as follows:
- class CustomUnit(BaseProducerUnit[
CustomProducerSettings, # SettingsType AxisArray, # MessageOutType CustomProducer, # ProducerType
- ]):
SETTINGS = CustomProducerSettings
… that’s all!
Where CustomProducerSettings, and CustomProducer are custom implementations of ez.Settings, and BaseProducer or BaseStatefulProducer, respectively.
- Parameters:
settings (Settings | None)
- INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- async on_settings(msg)[source]#
Receive a settings message, override self.SETTINGS, and re-create the producer. Child classes that wish to have fine-grained control over whether the core producer resets on settings changes should override this method.
- Parameters:
msg (SettingsType) – a settings message.
- Return type:
None
- class BaseStatefulConsumer(*args, **kwargs)[source]#
Bases:
BaseStatefulProcessor[SettingsType,MessageInType,None,StateType],ABC,Generic[SettingsType,MessageInType,StateType]Base class for stateful message consumers that don’t produce output. This class merely overrides the type annotations of BaseStatefulProcessor.
- class BaseStatefulProcessor(*args, **kwargs)[source]#
Bases:
BaseProcessor[SettingsType,MessageInType,MessageOutType],Stateful[StateType],ABC,Generic[SettingsType,MessageInType,MessageOutType,StateType]Base class implementing common stateful processor functionality. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes. Use BaseStatefulConsumer for operations that do not return a result, or BaseStatefulTransformer for operations that do return a result.
- class BaseStatefulProducer(*args, **kwargs)[source]#
Bases:
BaseProducer[SettingsType,MessageOutType],Stateful[StateType],ABC,Generic[SettingsType,MessageOutType,StateType]- Base class implementing common stateful producer functionality.
Examples of stateful producers are things that require counters, clocks, or to cycle through a set of values.
- Unlike BaseStatefulProcessor, this class does not message hashing because there
are no input messages. We still use self._hash to simply track the transition from initialization (.hash == -1) to state reset (.hash == 0).
- class BaseStatefulTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulProcessor[SettingsType,MessageInType,MessageOutType,StateType],ABC,Generic[SettingsType,MessageInType,MessageOutType,StateType]Base class for stateful message transformers that produce output. This class merely overrides the type annotations of BaseStatefulProcessor.
- class BaseTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessor[SettingsType,MessageInType,MessageOutType],ABC,Generic[SettingsType,MessageInType,MessageOutType]Base class for transformers – processors which receive messages and produce output. This base simply overrides type annotations of
BaseProcessorto indicate that outputs are not optional. (We don’t bother overriding send and asend because those are deprecated.)- Parameters:
settings (SettingsType)
- class BaseTransformerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,MessageOutType,TransformerType]Base class for transformer units – i.e. units that transform input messages into output messages. Implement a new Unit as follows:
- class CustomUnit(BaseTransformerUnit[
CustomTransformerSettings, # SettingsType AxisArray, # MessageInType AxisArray, # MessageOutType CustomTransformer, # TransformerType
- ]):
SETTINGS = CustomTransformerSettings
… that’s all!
Where CustomTransformerSettings and CustomTransformer are custom implementations of: - ez.Settings for settings - One of these transformer types:
BaseTransformer
BaseStatefulTransformer
CompositeProcessor
- Parameters:
settings (Settings | None)
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- class CompositeProcessor(*args, **kwargs)[source]#
Bases:
BaseProcessor[SettingsType,MessageInType,MessageOutType],CompositeStateful[SettingsType,MessageOutType],ABC,Generic[SettingsType,MessageInType,MessageOutType]A processor that chains multiple processor together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The last processor may be a consumer, otherwise processors must be transformers. Use CompositeProducer if you want the first processor to be a producer. Concrete subclasses must implement _initialize_processors. Optionally override _reset_state if you want adaptive state behaviour. Example implementation:
- class CustomCompositeProcessor(CompositeProcessor[CustomSettings, AxisArray, AxisArray]):
@staticmethod def _initialize_processors(settings: CustomSettings) -> dict[str, BaseProcessor]:
Where **settings should be replaced with initialisation arguments for each processor.
- class CompositeProducer(*args, **kwargs)[source]#
Bases:
BaseProducer[SettingsType,MessageOutType],CompositeStateful[SettingsType,MessageOutType],ABC,Generic[SettingsType,MessageOutType]A producer that chains multiple processors (starting with a producer) together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The first processor must be a producer, the last processor may be a consumer, otherwise processors must be transformers.
- class CompositeStateful[source]#
Bases:
Stateful[dict[str,Any]],ABC,Generic[SettingsType,MessageOutType]Mixin class for composite processor/producer chains. DO NOT use this class directly. Used to enforce statefulness of the composite processor/producer chain and provide initialization and validation methods.
- class Consumer(*args, **kwargs)[source]#
Bases:
Processor[SettingsType,MessageInType,None],ProtocolProtocol for consumers that receive messages but do not return a result.
- class GenAxisArray(*args, settings=None, **kwargs)[source]#
Bases:
Unit- Parameters:
settings (Settings | None)
- STATE#
alias of
GenState
- INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
- INPUT_SETTINGS = InputStream:unlocated[<class 'ezmsg.core.settings.Settings'>]()#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- async on_settings(msg)[source]#
Update unit settings and reset generator. Note: Not all units will require a full reset with new settings. Override this method to implement a selective reset.
- Parameters:
msg (Settings) – Instance of SETTINGS object.
- Return type:
None
- class Processor(*args, **kwargs)[source]#
Bases:
Protocol[SettingsType,MessageInType,MessageOutType]Protocol for processors. You probably will not implement this protocol directly. Refer instead to the less ambiguous Consumer and Transformer protocols, and the base classes in this module which implement them.
- Note: In Python 3.12+, we can invoke __acall__ directly using await obj(message),
but to support earlier versions we need to use await obj.__acall__(message).
- __init__(*args, **kwargs)#
- class Producer(*args, **kwargs)[source]#
Bases:
Protocol[SettingsType,MessageOutType]Protocol for producers that generate messages.
- __init__(*args, **kwargs)#
- class Stateful[source]#
Bases:
ABC,Generic[StateType]Mixin class for stateful processors. DO NOT use this class directly. Used to enforce that the processor/producer has a state attribute and stateful_op method.
- property state: StateType#
- class StatefulConsumer(*args, **kwargs)[source]#
Bases:
StatefulProcessor[SettingsType,MessageInType,None,StateType],ProtocolProtocol specifically for processors that consume messages without producing output.
- class StatefulProcessor(*args, **kwargs)[source]#
Bases:
Protocol[SettingsType,MessageInType,MessageOutType,StateType]Base protocol for _stateful_ message processors. You probably will not implement this protocol directly. Refer instead to the less ambiguous StatefulConsumer and StatefulTransformer protocols.
- property state: StateType#
- __init__(*args, **kwargs)#
- class StatefulProducer(*args, **kwargs)[source]#
Bases:
Protocol[SettingsType,MessageOutType,StateType]Protocol for producers that generate messages without consuming inputs.
- property state: StateType#
- __init__(*args, **kwargs)#
- class StatefulTransformer(*args, **kwargs)[source]#
Bases:
StatefulProcessor[SettingsType,MessageInType,MessageOutType,StateType],ProtocolProtocol specifically for processors that transform messages.
- class Transformer(*args, **kwargs)[source]#
Bases:
Processor[SettingsType,MessageInType,MessageOutType],ProtocolProtocol for transformers that receive messages and return a result of the same class.
- class Processor(*args, **kwargs)[source]#
Bases:
Protocol[SettingsType,MessageInType,MessageOutType]Protocol for processors. You probably will not implement this protocol directly. Refer instead to the less ambiguous Consumer and Transformer protocols, and the base classes in this module which implement them.
- Note: In Python 3.12+, we can invoke __acall__ directly using await obj(message),
but to support earlier versions we need to use await obj.__acall__(message).
- __init__(*args, **kwargs)#
- class Producer(*args, **kwargs)[source]#
Bases:
Protocol[SettingsType,MessageOutType]Protocol for producers that generate messages.
- __init__(*args, **kwargs)#
- class Consumer(*args, **kwargs)[source]#
Bases:
Processor[SettingsType,MessageInType,None],ProtocolProtocol for consumers that receive messages but do not return a result.
- class Transformer(*args, **kwargs)[source]#
Bases:
Processor[SettingsType,MessageInType,MessageOutType],ProtocolProtocol for transformers that receive messages and return a result of the same class.
- class StatefulProcessor(*args, **kwargs)[source]#
Bases:
Protocol[SettingsType,MessageInType,MessageOutType,StateType]Base protocol for _stateful_ message processors. You probably will not implement this protocol directly. Refer instead to the less ambiguous StatefulConsumer and StatefulTransformer protocols.
- property state: StateType#
- __init__(*args, **kwargs)#
- class StatefulProducer(*args, **kwargs)[source]#
Bases:
Protocol[SettingsType,MessageOutType,StateType]Protocol for producers that generate messages without consuming inputs.
- property state: StateType#
- __init__(*args, **kwargs)#
- class StatefulConsumer(*args, **kwargs)[source]#
Bases:
StatefulProcessor[SettingsType,MessageInType,None,StateType],ProtocolProtocol specifically for processors that consume messages without producing output.
- class StatefulTransformer(*args, **kwargs)[source]#
Bases:
StatefulProcessor[SettingsType,MessageInType,MessageOutType,StateType],ProtocolProtocol specifically for processors that transform messages.
- class AdaptiveTransformer(*args, **kwargs)[source]#
Bases:
StatefulTransformer,Protocol- partial_fit(message)[source]#
Update transformer state using labeled training data.
This method should update the internal state/parameters of the transformer based on the provided labeled samples, without performing any transformation.
- Parameters:
message (SampleMessage)
- Return type:
None
- async apartial_fit(message)[source]#
- Parameters:
message (SampleMessage)
- Return type:
None
- class BaseProcessor(*args, settings=None, **kwargs)[source]#
Bases:
ABC,Generic[SettingsType,MessageInType,MessageOutType]Base class for processors. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes.
Use
BaseConsumerorBaseTransformerfor ops that return a result or not, respectively.Use
BaseStatefulProcessorand its children for operations that require state.
Note that BaseProcessor and its children are sync by default. If you need async by defualt, then override the async methods and call them from the sync methods. Look to BaseProducer for examples of calling async methods from sync methods.
- Parameters:
settings (SettingsType)
- __init__(*args, settings=None, **kwargs)[source]#
- Parameters:
settings (SettingsType | None)
- Return type:
None
- settings: SettingsType#
- class BaseProducer(*args, settings=None, **kwargs)[source]#
Bases:
ABC,Generic[SettingsType,MessageOutType]Base class for producers – processors that generate messages without consuming inputs.
- Note that BaseProducer and its children are async by default, and the sync methods simply wrap
the async methods. This is the opposite of
BaseProcessorand its children which are sync by default. These classes are designed this way because it is highly likely that a producer, which (probably) does not receive inputs, will require some sort of IO which will benefit from being async.
- Parameters:
settings (SettingsType | None)
- class BaseConsumer(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessor[SettingsType,MessageInType,None],ABC,Generic[SettingsType,MessageInType]Base class for consumers – processors that receive messages but don’t produce output. This base simply overrides type annotations of BaseProcessor to remove the outputs. (We don’t bother overriding send and asend because those are deprecated.)
- Parameters:
settings (SettingsType)
- class BaseTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessor[SettingsType,MessageInType,MessageOutType],ABC,Generic[SettingsType,MessageInType,MessageOutType]Base class for transformers – processors which receive messages and produce output. This base simply overrides type annotations of
BaseProcessorto indicate that outputs are not optional. (We don’t bother overriding send and asend because those are deprecated.)- Parameters:
settings (SettingsType)
- class Stateful[source]#
Bases:
ABC,Generic[StateType]Mixin class for stateful processors. DO NOT use this class directly. Used to enforce that the processor/producer has a state attribute and stateful_op method.
- property state: StateType#
- class BaseStatefulProcessor(*args, **kwargs)[source]#
Bases:
BaseProcessor[SettingsType,MessageInType,MessageOutType],Stateful[StateType],ABC,Generic[SettingsType,MessageInType,MessageOutType,StateType]Base class implementing common stateful processor functionality. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes. Use BaseStatefulConsumer for operations that do not return a result, or BaseStatefulTransformer for operations that do return a result.
- class BaseStatefulProducer(*args, **kwargs)[source]#
Bases:
BaseProducer[SettingsType,MessageOutType],Stateful[StateType],ABC,Generic[SettingsType,MessageOutType,StateType]- Base class implementing common stateful producer functionality.
Examples of stateful producers are things that require counters, clocks, or to cycle through a set of values.
- Unlike BaseStatefulProcessor, this class does not message hashing because there
are no input messages. We still use self._hash to simply track the transition from initialization (.hash == -1) to state reset (.hash == 0).
- class BaseStatefulConsumer(*args, **kwargs)[source]#
Bases:
BaseStatefulProcessor[SettingsType,MessageInType,None,StateType],ABC,Generic[SettingsType,MessageInType,StateType]Base class for stateful message consumers that don’t produce output. This class merely overrides the type annotations of BaseStatefulProcessor.
- class BaseStatefulTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulProcessor[SettingsType,MessageInType,MessageOutType,StateType],ABC,Generic[SettingsType,MessageInType,MessageOutType,StateType]Base class for stateful message transformers that produce output. This class merely overrides the type annotations of BaseStatefulProcessor.
- class BaseAdaptiveTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SettingsType,MessageInType|SampleMessage,MessageOutType|None,StateType],ABC,Generic[SettingsType,MessageInType,MessageOutType,StateType]- abstractmethod partial_fit(message)[source]#
- Parameters:
message (SampleMessage)
- Return type:
None
- async apartial_fit(message)[source]#
Override me if you need async partial fitting.
- Parameters:
message (SampleMessage)
- Return type:
None
- class BaseAsyncTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SettingsType,MessageInType,MessageOutType,StateType],ABC,Generic[SettingsType,MessageInType,MessageOutType,StateType]This reverses the priority of async and sync methods from
BaseStatefulTransformer. Whereas inBaseStatefulTransformer, the async methods simply called the sync methods, here the sync methods call the async methods, more similar toBaseStatefulProducer.
- class CompositeStateful[source]#
Bases:
Stateful[dict[str,Any]],ABC,Generic[SettingsType,MessageOutType]Mixin class for composite processor/producer chains. DO NOT use this class directly. Used to enforce statefulness of the composite processor/producer chain and provide initialization and validation methods.
- class CompositeProcessor(*args, **kwargs)[source]#
Bases:
BaseProcessor[SettingsType,MessageInType,MessageOutType],CompositeStateful[SettingsType,MessageOutType],ABC,Generic[SettingsType,MessageInType,MessageOutType]A processor that chains multiple processor together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The last processor may be a consumer, otherwise processors must be transformers. Use CompositeProducer if you want the first processor to be a producer. Concrete subclasses must implement _initialize_processors. Optionally override _reset_state if you want adaptive state behaviour. Example implementation:
- class CustomCompositeProcessor(CompositeProcessor[CustomSettings, AxisArray, AxisArray]):
@staticmethod def _initialize_processors(settings: CustomSettings) -> dict[str, BaseProcessor]:
Where **settings should be replaced with initialisation arguments for each processor.
- class CompositeProducer(*args, **kwargs)[source]#
Bases:
BaseProducer[SettingsType,MessageOutType],CompositeStateful[SettingsType,MessageOutType],ABC,Generic[SettingsType,MessageOutType]A producer that chains multiple processors (starting with a producer) together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The first processor must be a producer, the last processor may be a consumer, otherwise processors must be transformers.
- class BaseProducerUnit(*args, settings=None, **kwargs)[source]#
Bases:
Unit,ABC,Generic[SettingsType,MessageOutType,ProducerType]Base class for producer units – i.e. units that generate messages without consuming inputs. Implement a new Unit as follows:
- class CustomUnit(BaseProducerUnit[
CustomProducerSettings, # SettingsType AxisArray, # MessageOutType CustomProducer, # ProducerType
- ]):
SETTINGS = CustomProducerSettings
… that’s all!
Where CustomProducerSettings, and CustomProducer are custom implementations of ez.Settings, and BaseProducer or BaseStatefulProducer, respectively.
- Parameters:
settings (Settings | None)
- INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- async on_settings(msg)[source]#
Receive a settings message, override self.SETTINGS, and re-create the producer. Child classes that wish to have fine-grained control over whether the core producer resets on settings changes should override this method.
- Parameters:
msg (SettingsType) – a settings message.
- Return type:
None
- class BaseProcessorUnit(*args, settings=None, **kwargs)[source]#
Bases:
Unit,ABC,Generic[SettingsType]Base class for processor units – i.e. units that process messages. This is an abstract base class that provides common functionality for consumer and transformer units. You probably do not want to inherit from this class directly as you would need to define a custom implementation of create_processor. Refer instead to BaseConsumerUnit or BaseTransformerUnit.
- Parameters:
settings (Settings | None)
- INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- async on_settings(msg)[source]#
Receive a settings message, override self.SETTINGS, and re-create the processor. Child classes that wish to have fine-grained control over whether the core processor resets on settings changes should override this method.
- Parameters:
msg (SettingsType) – a settings message.
- Return type:
None
- class BaseConsumerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,ConsumerType]Base class for consumer units – i.e. units that receive messages but do not return results. Implement a new Unit as follows:
- class CustomUnit(BaseConsumerUnit[
CustomConsumerSettings, # SettingsType AxisArray, # MessageInType CustomConsumer, # ConsumerType
- ]):
SETTINGS = CustomConsumerSettings
… that’s all!
Where CustomConsumerSettings and CustomConsumer are custom implementations of: - ez.Settings for settings - BaseConsumer or BaseStatefulConsumer for the consumer implementation
- Parameters:
settings (Settings | None)
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- class BaseTransformerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,MessageOutType,TransformerType]Base class for transformer units – i.e. units that transform input messages into output messages. Implement a new Unit as follows:
- class CustomUnit(BaseTransformerUnit[
CustomTransformerSettings, # SettingsType AxisArray, # MessageInType AxisArray, # MessageOutType CustomTransformer, # TransformerType
- ]):
SETTINGS = CustomTransformerSettings
… that’s all!
Where CustomTransformerSettings and CustomTransformer are custom implementations of: - ez.Settings for settings - One of these transformer types:
BaseTransformer
BaseStatefulTransformer
CompositeProcessor
- Parameters:
settings (Settings | None)
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- class BaseAdaptiveTransformerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,MessageOutType,AdaptiveTransformerType]- Parameters:
settings (Settings | None)
- INPUT_SAMPLE = InputStream:unlocated[<class 'ezmsg.sigproc.util.message.SampleMessage'>]()#
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- create_processor()[source]#
Create the adaptive transformer instance from settings.
- Return type:
None
- async on_sample(msg)[source]#
- Parameters:
msg (SampleMessage)
- Return type:
None
- class GenAxisArray(*args, settings=None, **kwargs)[source]#
Bases:
Unit- Parameters:
settings (Settings | None)
- STATE#
alias of
GenState
- INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
- INPUT_SETTINGS = InputStream:unlocated[<class 'ezmsg.core.settings.Settings'>]()#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- async on_settings(msg)[source]#
Update unit settings and reset generator. Note: Not all units will require a full reset with new settings. Override this method to implement a selective reset.
- Parameters:
msg (Settings) – Instance of SETTINGS object.
- Return type:
None