Base Processors#
Here is the API for the base processors included in the ezmsg-sigproc extension. For more detailed information on the design decisions behind these base processors, please refer to the ezmsg-sigproc explainer.
- class BaseProcessor(*args, settings=None, **kwargs)[source]#
Bases:
ABC,Generic[SettingsType,MessageInType,MessageOutType]Base class for processors. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes.
Use
BaseConsumerorBaseTransformerfor ops that return a result or not, respectively.Use
BaseStatefulProcessorand its children for operations that require state.
Note that BaseProcessor and its children are sync by default. If you need async by defualt, then override the async methods and call them from the sync methods. Look to BaseProducer for examples of calling async methods from sync methods.
- Parameters:
settings (SettingsType)
- __init__(*args, settings=None, **kwargs)[source]#
- Parameters:
settings (SettingsType | None)
- Return type:
None
- settings: SettingsType#
- class BaseProducer(*args, settings=None, **kwargs)[source]#
Bases:
ABC,Generic[SettingsType,MessageOutType]Base class for producers – processors that generate messages without consuming inputs.
- Note that BaseProducer and its children are async by default, and the sync methods simply wrap
the async methods. This is the opposite of
BaseProcessorand its children which are sync by default. These classes are designed this way because it is highly likely that a producer, which (probably) does not receive inputs, will require some sort of IO which will benefit from being async.
- Parameters:
settings (SettingsType | None)
- class BaseConsumer(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessor[SettingsType,MessageInType,None],ABC,Generic[SettingsType,MessageInType]Base class for consumers – processors that receive messages but don’t produce output. This base simply overrides type annotations of BaseProcessor to remove the outputs. (We don’t bother overriding send and asend because those are deprecated.)
- Parameters:
settings (SettingsType)
- __init__(*args, settings=None, **kwargs)#
- Parameters:
settings (SettingsType | None)
- Return type:
None
- settings: SettingsType#
- class BaseTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessor[SettingsType,MessageInType,MessageOutType],ABC,Generic[SettingsType,MessageInType,MessageOutType]Base class for transformers – processors which receive messages and produce output. This base simply overrides type annotations of
BaseProcessorto indicate that outputs are not optional. (We don’t bother overriding send and asend because those are deprecated.)- Parameters:
settings (SettingsType)
- inherited-members:
- class BaseStatefulProcessor(*args, **kwargs)[source]#
Bases:
BaseProcessor[SettingsType,MessageInType,MessageOutType],Stateful[StateType],ABC,Generic[SettingsType,MessageInType,MessageOutType,StateType]Base class implementing common stateful processor functionality. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes. Use BaseStatefulConsumer for operations that do not return a result, or BaseStatefulTransformer for operations that do return a result.
- property state: StateType#
- settings: SettingsType#
- class BaseStatefulProducer(*args, **kwargs)[source]#
Bases:
BaseProducer[SettingsType,MessageOutType],Stateful[StateType],ABC,Generic[SettingsType,MessageOutType,StateType]- Base class implementing common stateful producer functionality.
Examples of stateful producers are things that require counters, clocks, or to cycle through a set of values.
- Unlike BaseStatefulProcessor, this class does not message hashing because there
are no input messages. We still use self._hash to simply track the transition from initialization (.hash == -1) to state reset (.hash == 0).
- property state: StateType#
- class BaseStatefulConsumer(*args, **kwargs)[source]#
Bases:
BaseStatefulProcessor[SettingsType,MessageInType,None,StateType],ABC,Generic[SettingsType,MessageInType,StateType]Base class for stateful message consumers that don’t produce output. This class merely overrides the type annotations of BaseStatefulProcessor.
- __init__(*args, **kwargs)#
- Return type:
None
- property state: StateType#
- settings: SettingsType#
- class BaseStatefulTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulProcessor[SettingsType,MessageInType,MessageOutType,StateType],ABC,Generic[SettingsType,MessageInType,MessageOutType,StateType]Base class for stateful message transformers that produce output. This class merely overrides the type annotations of BaseStatefulProcessor.
- __init__(*args, **kwargs)#
- Return type:
None
- property state: StateType#
- settings: SettingsType#
- class BaseAdaptiveTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SettingsType,MessageInType|SampleMessage,MessageOutType|None,StateType],ABC,Generic[SettingsType,MessageInType,MessageOutType,StateType]- abstractmethod partial_fit(message)[source]#
- Parameters:
message (SampleMessage)
- Return type:
None
- async apartial_fit(message)[source]#
Override me if you need async partial fitting.
- Parameters:
message (SampleMessage)
- Return type:
None
- __init__(*args, **kwargs)#
- Return type:
None
- property state: StateType#
- stateful_op(state, message)#
- settings: SettingsType#
- class BaseAsyncTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SettingsType,MessageInType,MessageOutType,StateType],ABC,Generic[SettingsType,MessageInType,MessageOutType,StateType]This reverses the priority of async and sync methods from
BaseStatefulTransformer. Whereas inBaseStatefulTransformer, the async methods simply called the sync methods, here the sync methods call the async methods, more similar toBaseStatefulProducer.- __init__(*args, **kwargs)#
- Return type:
None
- property state: StateType#
- stateful_op(state, message)#
- settings: SettingsType#
- class CompositeProcessor(*args, **kwargs)[source]#
Bases:
BaseProcessor[SettingsType,MessageInType,MessageOutType],CompositeStateful[SettingsType,MessageOutType],ABC,Generic[SettingsType,MessageInType,MessageOutType]A processor that chains multiple processor together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The last processor may be a consumer, otherwise processors must be transformers. Use CompositeProducer if you want the first processor to be a producer. Concrete subclasses must implement _initialize_processors. Optionally override _reset_state if you want adaptive state behaviour. Example implementation:
- class CustomCompositeProcessor(CompositeProcessor[CustomSettings, AxisArray, AxisArray]):
@staticmethod def _initialize_processors(settings: CustomSettings) -> dict[str, BaseProcessor]:
Where **settings should be replaced with initialisation arguments for each processor.
- settings: SettingsType#
- class CompositeProducer(*args, **kwargs)[source]#
Bases:
BaseProducer[SettingsType,MessageOutType],CompositeStateful[SettingsType,MessageOutType],ABC,Generic[SettingsType,MessageOutType]A producer that chains multiple processors (starting with a producer) together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The first processor must be a producer, the last processor may be a consumer, otherwise processors must be transformers.