ezmsg-baseproc#
Base processor classes and protocols for building message-processing components in ezmsg.
Installation#
pip install ezmsg-baseproc
Or install the latest development version:
pip install git+https://github.com/ezmsg-org/ezmsg-baseproc@dev
Overview#
ezmsg-baseproc provides abstract base classes for creating message processors that can be used both standalone and within ezmsg pipelines. The package offers a consistent pattern for building:
Protocols - Type definitions for processors, transformers, consumers, and producers
Processors - Transform input messages to output messages
Producers - Generate output messages without requiring input
Consumers - Accept input messages without producing output
Transformers - A specific type of processor with typed input/output
Stateful variants - Processors that maintain state across invocations
Adaptive transformers - Transformers that can be trained via
partial_fitComposite processors - Chain multiple processors together efficiently
All base classes support both synchronous and asynchronous operation, making them suitable for offline analysis and real-time streaming applications.
Usage#
Creating a Simple Transformer#
from dataclasses import dataclass
from ezmsg.baseproc import BaseTransformer
from ezmsg.util.messages.axisarray import AxisArray, replace
@dataclass
class MySettings:
scale: float = 1.0
class MyTransformer(BaseTransformer[MySettings, AxisArray, AxisArray]):
def _process(self, message: AxisArray) -> AxisArray:
return replace(message, data=message.data * self.settings.scale)
Creating a Stateful Transformer#
from ezmsg.baseproc import BaseStatefulTransformer, processor_state
@processor_state
class MyState:
count: int = 0
hash: int = -1
class MyStatefulTransformer(BaseStatefulTransformer[MySettings, AxisArray, AxisArray, MyState]):
def _reset_state(self, message: AxisArray) -> None:
self._state.count = 0
def _process(self, message: AxisArray) -> AxisArray:
self._state.count += 1
return message
Creating an ezmsg Unit#
from ezmsg.baseproc import BaseTransformerUnit
class MyUnit(BaseTransformerUnit[MySettings, AxisArray, AxisArray, MyTransformer]):
SETTINGS = MySettings
# That's all - the base class handles everything else!
Development#
We use uv for development.
Install
uvif not already installedClone and cd into the repository
Run
uv syncto create a.venvand install dependenciesRun
uv run pytest teststo run tests(Optional) Install pre-commit hooks:
uv run pre-commit install
License#
MIT License - see LICENSE for details.
Documentation#
Contents:
- Processor Base Classes
- Implementing a custom standalone processor
- Implementing a custom ezmsg Unit
- Processor HOW TOs
- How to write a processor?
- How to implement a stateful processor?
- How to use processors standalone (outside ezmsg)?
- How to implement an adaptive transformer?
- How to efficiently chain multiple processors?
- How to turn a processor into an ezmsg Unit?
- How to implement a clock-driven producer?
- How to checkpoint and restore processor state?
- API Reference