Processor Base Classes#
The ezmsg.baseproc module contains the base classes for message processors. The base classes are designed to allow users to create custom processors with minimal errors and minimal repetition of boilerplate code.
The information below was written at the time of a major refactor to help collate the design decisions and to help with future refactoring. However, it may be outdated or incomplete. Please refer to the source code for the most accurate information.
Generic TypeVars#
Idx |
Class |
Description |
|---|---|---|
1 |
|
for messages passed to a consumer, processor, or transformer |
2 |
|
for messages returned by a producer, processor, or transformer |
3 |
|
bound to ez.Settings |
4 |
|
bound to ProcessorState which is simply ez.State with a |
5 |
|
bound to |
Protocols#
Idx |
Class |
Parent |
State |
|
@state |
partial_fit |
|---|---|---|---|---|---|---|
1 |
|
- |
No |
Any -> Any |
- |
- |
2 |
|
- |
No |
None -> Mo |
- |
- |
3 |
|
1 |
No |
Mi -> None |
- |
- |
4 |
|
1 |
No |
Mi -> Mo |
- |
- |
5 |
|
- |
Yes |
Any -> Any |
Y |
- |
6 |
|
- |
Yes |
None -> Mo |
Y |
- |
7 |
|
5 |
Yes |
Mi -> None |
Y |
- |
8 |
|
5 |
Yes |
Mi -> Mo |
Y |
- |
9 |
|
8 |
Yes |
Mi -> Mo |
Y |
Y |
Note: __call__ and partial_fit both have asynchronous alternatives: __acall__ and apartial_fit respectively.
Abstract implementations (Base Classes) for standalone processors#
Idx |
Class |
Parent |
Protocol |
Features |
|---|---|---|---|---|
1 |
|
- |
1 |
|
2 |
|
- |
2 |
Similar to |
3 |
|
1 |
3 |
Overrides return type to None |
4 |
|
1 |
4 |
Overrides input and return types |
5 |
|
1 |
5 |
|
6 |
|
2 |
6 |
|
7 |
|
5 |
7 |
Overrides return type to None |
8 |
|
5 |
8 |
Overrides input and return types |
9 |
|
8 |
9 |
Implements |
10 |
|
8 |
8 |
|
11 |
|
1 |
5 |
Methods iterate over sequence of processors created in |
12 |
|
2 |
6 |
Similar to |
13 |
|
5 |
8 |
Clock-driven data generator. Implements |
NOTES:
Producers do not inherit from
BaseProcessor, so concrete implementations should subclassBaseProducerorBaseStatefulProducer.For concrete implementations of non-producer processors, inherit from the base subclasses of
BaseProcessor(eg.BaseConsumer,BaseTransformer) and from base subclasses ofBaseStatefulProcessor. These two processor classes are primarily used for efficient abstract base class construction.For most base classes, the async methods simply call the synchronous methods where the processor logic is expected. Exceptions are
BaseProducer(and its children) andBaseAsyncTransformerwhich are async-first and should be strongly considered for operations that are I/O bound.For async-first classes, the logic is implemented in the async methods and the sync methods are thin wrappers around the async methods. The wrapper uses a helper method called
run_coroutine_syncto run the async method in a synchronous context, but this adds some noticeable processing overhead.If you need to call your processor outside ezmsg (which uses async), and you cannot easily add an async context* in your processing, then you might want to consider duplicating the processor logic in the sync methods. Note: Jupyter notebooks are async by default, so you can await async code in a notebook without any extra setup.
CompositeProcessorandCompositeProducerare stateful, and structurally subclass theStatefulProcessorandStatefulProducerprotocols, but they do not inherit fromBaseStatefulProcessorandBaseStatefulProducer. They accomplish statefulness by inheriting from the mixin abstract base classCompositeStateful, which implements the state related methods:get_state_type,state.setter,state.getter,_hash_message,_reset_state, andstateful_op(as well as composite processor chain related methods). However,BaseStatefulProcessor,BaseStatefulProducerimplementstateful_opmethod for a single processor in an incompatible way to what is required for composite chains of processors.
Generic TypeVars for ezmsg Units#
Idx |
Class |
Description |
|---|---|---|
5 |
|
bound to |
6 |
|
bound to |
7 |
|
bound to |
8 |
|
bound to |
9 |
|
bound to |
Abstract implementations (Base Classes) for ezmsg Units using processors:#
Idx |
Class |
Parents |
Expected TypeVars |
|---|---|---|---|
1 |
|
- |
- |
2 |
|
- |
|
3 |
|
1 |
|
4 |
|
1 |
|
5 |
|
1 |
|
6 |
|
1 |
|
Note, it is strongly recommended to use BaseConsumerUnit, BaseTransformerUnit, BaseAdaptiveTransformerUnit, or BaseClockDrivenUnit for implementing concrete subclasses rather than BaseProcessorUnit.
Implementing a custom standalone processor#
Create a new settings dataclass:
class MySettings(ez.Settings):Create a new state dataclass:
@processor_state
class MyState:
Decide on your base processor class, considering the protocol, whether it should be async-first, and other factors.
flowchart TD
AMP{Multiple Processors?};
AMP -->|no| ARI{Receives Input?};
AMP -->|yes| ACB{Single Chain / Branching?}
ARI -->|no| P(Producer);
ARI -->|yes| APO{Produces Output?};
ACB -->|branching| NBC[no base class];
ACB -->|single chain| ACRI{Receives Input?};
P --> PS{Stateful?};
APO -->|no| C(Consumer);
APO -->|yes| T(Transformer);
ACRI -->|no| CompositeProducer;
ACRI -->|yes| CompositeProcessor;
PS -->|no| BaseProducer;
PS -->|yes| BaseStatefulProducer;
C --> CS{Stateful?};
T --> TS{Stateful?};
CS -->|no| BaseConsumer;
CS -->|yes| BaseStatefulConsumer;
TS -->|no| BaseTransformer;
TS -->|yes| TSA{Adaptive?};
TSA -->|no| TSAF{Async First?};
TSA -->|yes| BaseAdaptiveTransformer;
TSAF -->|no| BaseStatefulTransformer;
TSAF -->|yes| BaseAsyncTransformer;
Implement the child class.
The minimum implementation is
_processfor sync processors,_aprocessfor async processors, and_producefor producers.For any stateful processor, implement
_reset_state.For stateful processors that need to respond to a change in the incoming data, implement
_hash_message.For adaptive transformers, implement
partial_fit.For chains of processors (
CompositeProcessor/CompositeProducer), need to implement_initialize_processors.For clock-driven producers (
BaseClockDrivenProducer), implement_reset_state(time_axis)and_produce(n_samples, time_axis). See the clock-driven how-to guide.See processors in
ezmsg.sigprocfor signal processing examples, orezmsg.learnfor machine learning examples.
Override non-abstract methods if you need special behaviour.
Implementing a custom ezmsg Unit#
Create and test custom standalone processor as above.
Decide which base unit to implement.
Use the “Generic TypeVars for ezmsg Units” table above to determine the expected TypeVar.
Find the Expected TypeVar in the “ezmsg Units” table.
Create the derived class.
Often, all that is required is the following (e.g., for a custom transformer):
import ezmsg.core as ez
from ezmsg.baseproc import BaseTransformer, BaseTransformerUnit
class CustomTransformerSettings(ez.Settings):
scale: float = 1.0
class CustomTransformer(BaseTransformer[CustomTransformerSettings, float, float]):
def _process(self, message: float) -> float:
return message * self.settings.scale
class CustomUnit(BaseTransformerUnit[
CustomTransformerSettings, # SettingsType
float, # MessageInType
float, # MessageOutType
CustomTransformer, # TransformerType
]):
SETTINGS = CustomTransformerSettings
Note, the type of ProcessorUnit is based on the internal processor and not the input or output of the unit. Input streams are allowed in ProducerUnits and output streams in ConsumerUnits.