ezmsg.baseproc#
ezmsg-baseproc: Base processor classes for ezmsg.
This package provides the foundational processor architecture for building signal processing pipelines in ezmsg.
- class Processor(*args, **kwargs)[source]#
Bases:
Protocol[SettingsType,MessageInType,MessageOutType]Protocol for processors. You probably will not implement this protocol directly. Refer instead to the less ambiguous Consumer and Transformer protocols, and the base classes in this module which implement them.
- Note: In Python 3.12+, we can invoke __acall__ directly using await obj(message),
but to support earlier versions we need to use await obj.__acall__(message).
- __init__(*args, **kwargs)#
- class Producer(*args, **kwargs)[source]#
Bases:
Protocol[SettingsType,MessageOutType]Protocol for producers that generate messages.
- __init__(*args, **kwargs)#
- class Consumer(*args, **kwargs)[source]#
Bases:
Processor[SettingsType,MessageInType,None],ProtocolProtocol for consumers that receive messages but do not return a result.
- class Transformer(*args, **kwargs)[source]#
Bases:
Processor[SettingsType,MessageInType,MessageOutType],ProtocolProtocol for transformers that receive messages and return a result of the same class.
- class StatefulProcessor(*args, **kwargs)[source]#
Bases:
Protocol[SettingsType,MessageInType,MessageOutType,StateType]Base protocol for _stateful_ message processors. You probably will not implement this protocol directly. Refer instead to the less ambiguous StatefulConsumer and StatefulTransformer protocols.
- __init__(*args, **kwargs)#
- property state: StateType#
- class StatefulProducer(*args, **kwargs)[source]#
Bases:
Protocol[SettingsType,MessageOutType,StateType]Protocol for producers that generate messages without consuming inputs.
- __init__(*args, **kwargs)#
- property state: StateType#
- class StatefulConsumer(*args, **kwargs)[source]#
Bases:
StatefulProcessor[SettingsType,MessageInType,None,StateType],ProtocolProtocol specifically for processors that consume messages without producing output.
- class StatefulTransformer(*args, **kwargs)[source]#
Bases:
StatefulProcessor[SettingsType,MessageInType,MessageOutType,StateType],ProtocolProtocol specifically for processors that transform messages.
- class AdaptiveTransformer(*args, **kwargs)[source]#
Bases:
StatefulTransformer,Protocol- async apartial_fit(message)[source]#
- Return type:
- Parameters:
message (SampleMessage)
- partial_fit(message)[source]#
Update transformer state using labeled training data.
This method should update the internal state/parameters of the transformer based on the provided labeled samples, without performing any transformation.
- Return type:
- Parameters:
message (SampleMessage)
- processor_state(cls=None, /, *, init=False, repr=True, eq=True, order=False, unsafe_hash=True, frozen=False, match_args=True, kw_only=False, slots=False, weakref_slot=False)#
Add dunder methods based on the fields defined in the class.
Examines PEP 526 __annotations__ to determine fields.
If init is true, an __init__() method is added to the class. If repr is true, a __repr__() method is added. If order is true, rich comparison dunder methods are added. If unsafe_hash is true, a __hash__() method is added. If frozen is true, fields may not be assigned to after instance creation. If match_args is true, the __match_args__ tuple is added. If kw_only is true, then by default all fields are keyword-only. If slots is true, a new class with a __slots__ attribute is returned.
- class BaseProcessor(*args, settings=None, **kwargs)[source]#
Bases:
ABC,Generic[SettingsType,MessageInType,MessageOutType]Base class for processors. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes.
Use
BaseConsumerorBaseTransformerfor ops that return a result or not, respectively.Use
BaseStatefulProcessorand its children for operations that require state.
Note that BaseProcessor and its children are sync by default. If you need async by defualt, then override the async methods and call them from the sync methods. Look to BaseProducer for examples of calling async methods from sync methods.
- Parameters:
settings (SettingsType)
- class BaseProducer(*args, settings=None, **kwargs)[source]#
Bases:
ABC,Generic[SettingsType,MessageOutType]Base class for producers – processors that generate messages without consuming inputs.
- Note that BaseProducer and its children are async by default, and the sync methods simply wrap
the async methods. This is the opposite of
BaseProcessorand its children which are sync by default. These classes are designed this way because it is highly likely that a producer, which (probably) does not receive inputs, will require some sort of IO which will benefit from being async.
- Parameters:
settings (SettingsType | None)
- __init__(*args, settings=None, **kwargs)[source]#
- Parameters:
settings (SettingsType | None)
- Return type:
None
- class BaseConsumer(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessor[SettingsType,MessageInType,None],ABC,Generic[SettingsType,MessageInType]Base class for consumers – processors that receive messages but don’t produce output. This base simply overrides type annotations of BaseProcessor to remove the outputs. (We don’t bother overriding send and asend because those are deprecated.)
- Parameters:
settings (SettingsType)
- class BaseTransformer(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessor[SettingsType,MessageInType,MessageOutType],ABC,Generic[SettingsType,MessageInType,MessageOutType]Base class for transformers – processors which receive messages and produce output. This base simply overrides type annotations of
BaseProcessorto indicate that outputs are not optional. (We don’t bother overriding send and asend because those are deprecated.)- Parameters:
settings (SettingsType)
- class Stateful[source]#
Bases:
ABC,Generic[StateType]Mixin class for stateful processors. DO NOT use this class directly. Used to enforce that the processor/producer has a state attribute and stateful_op method.
- property state: StateType#
- class BaseStatefulProcessor(*args, **kwargs)[source]#
Bases:
BaseProcessor[SettingsType,MessageInType,MessageOutType],Stateful[StateType],ABC,Generic[SettingsType,MessageInType,MessageOutType,StateType]Base class implementing common stateful processor functionality. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes. Use BaseStatefulConsumer for operations that do not return a result, or BaseStatefulTransformer for operations that do return a result.
- class BaseStatefulProducer(*args, **kwargs)[source]#
Bases:
BaseProducer[SettingsType,MessageOutType],Stateful[StateType],ABC,Generic[SettingsType,MessageOutType,StateType]- Base class implementing common stateful producer functionality.
Examples of stateful producers are things that require counters, clocks, or to cycle through a set of values.
- Unlike BaseStatefulProcessor, this class does not message hashing because there
are no input messages. We still use self._hash to simply track the transition from initialization (.hash == -1) to state reset (.hash == 0).
- class BaseStatefulConsumer(*args, **kwargs)[source]#
Bases:
BaseStatefulProcessor[SettingsType,MessageInType,None,StateType],ABC,Generic[SettingsType,MessageInType,StateType]Base class for stateful message consumers that don’t produce output. This class merely overrides the type annotations of BaseStatefulProcessor.
- class BaseStatefulTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulProcessor[SettingsType,MessageInType,MessageOutType,StateType],ABC,Generic[SettingsType,MessageInType,MessageOutType,StateType]Base class for stateful message transformers that produce output. This class merely overrides the type annotations of BaseStatefulProcessor.
- class BaseAdaptiveTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SettingsType,MessageInType|SampleMessage,MessageOutType|None,StateType],ABC,Generic[SettingsType,MessageInType,MessageOutType,StateType]- async apartial_fit(message)[source]#
Override me if you need async partial fitting.
- Return type:
- Parameters:
message (SampleMessage)
- abstractmethod partial_fit(message)[source]#
- Return type:
- Parameters:
message (SampleMessage)
- class BaseAsyncTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SettingsType,MessageInType,MessageOutType,StateType],ABC,Generic[SettingsType,MessageInType,MessageOutType,StateType]This reverses the priority of async and sync methods from
BaseStatefulTransformer. Whereas inBaseStatefulTransformer, the async methods simply called the sync methods, here the sync methods call the async methods, more similar toBaseStatefulProducer.
- class BaseClockDrivenProducer(*args, **kwargs)[source]#
Bases:
BaseStatefulProcessor[ClockDrivenSettingsType,LinearAxis,AxisArray,StateType],Generic[ClockDrivenSettingsType,StateType]Base class for clock-driven data producers.
Accepts clock ticks (LinearAxis) as input and produces AxisArray output. Handles all the timing/counter logic internally, so subclasses only need to implement the data generation logic.
This eliminates the need for the Clock → Counter → Generator pattern by combining the Counter functionality into the generator base class.
- Subclasses must implement:
_reset_state(time_axis): Initialize any state needed for production_produce(n_samples, time_axis): Generate the actual output data
Example:
@processor_state class SinState(ClockDrivenState): ang_freq: float = 0.0 class SinProducer(BaseClockDrivenProducer[SinSettings, SinState]): def _reset_state(self, time_axis: AxisArray.TimeAxis) -> None: self._state.ang_freq = 2 * np.pi * self.settings.fs def _produce(self, n_samples: int, time_axis: AxisArray.TimeAxis) -> AxisArray: t = (np.arange(n_samples) + self._state.counter) * time_axis.gain data = np.sin(self._state.ang_freq * t) return AxisArray(data=data, dims=["time"], axes={"time": time_axis})
- class ClockDrivenSettings(fs, n_time=None)[source]#
Bases:
SettingsBase settings for clock-driven producers.
Subclass this to add your own settings while inheriting fs and n_time.
Example:
class SinGeneratorSettings(ClockDrivenSettings): freq: float = 1.0 amp: float = 1.0
- class ClockDrivenState[source]#
Bases:
objectInternal state for clock-driven producers.
Tracks sample counting and fractional sample accumulation. Subclasses should extend this if they need additional state.
- class CompositeStateful[source]#
Bases:
Stateful[dict[str,Any]],ABC,Generic[SettingsType,MessageOutType]Mixin class for composite processor/producer chains. DO NOT use this class directly. Used to enforce statefulness of the composite processor/producer chain and provide initialization and validation methods.
- class CompositeProcessor(*args, **kwargs)[source]#
Bases:
BaseProcessor[SettingsType,MessageInType,MessageOutType],CompositeStateful[SettingsType,MessageOutType],ABC,Generic[SettingsType,MessageInType,MessageOutType]A processor that chains multiple processor together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The last processor may be a consumer, otherwise processors must be transformers. Use CompositeProducer if you want the first processor to be a producer. Concrete subclasses must implement _initialize_processors. Optionally override _reset_state if you want adaptive state behaviour. Example implementation:
- class CustomCompositeProcessor(CompositeProcessor[CustomSettings, AxisArray, AxisArray]):
@staticmethod def _initialize_processors(settings: CustomSettings) -> dict[str, BaseProcessor]:
Where **settings should be replaced with initialisation arguments for each processor.
- class CompositeProducer(*args, **kwargs)[source]#
Bases:
BaseProducer[SettingsType,MessageOutType],CompositeStateful[SettingsType,MessageOutType],ABC,Generic[SettingsType,MessageOutType]A producer that chains multiple processors (starting with a producer) together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The first processor must be a producer, the last processor may be a consumer, otherwise processors must be transformers.
- class BaseProducerUnit(*args, settings=None, **kwargs)[source]#
Bases:
Unit,ABC,Generic[SettingsType,MessageOutType,ProducerType]Base class for producer units – i.e. units that generate messages without consuming inputs. Implement a new Unit as follows:
- class CustomUnit(BaseProducerUnit[
CustomProducerSettings, # SettingsType AxisArray, # MessageOutType CustomProducer, # ProducerType
- ]):
SETTINGS = CustomProducerSettings
… that’s all!
Where CustomProducerSettings, and CustomProducer are custom implementations of ez.Settings, and BaseProducer or BaseStatefulProducer, respectively.
- Parameters:
settings (Settings | None)
- INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
- async on_settings(msg)[source]#
Receive a settings message, override self.SETTINGS, and re-create the producer. Child classes that wish to have fine-grained control over whether the core producer resets on settings changes should override this method.
- class BaseProcessorUnit(*args, settings=None, **kwargs)[source]#
Bases:
Unit,ABC,Generic[SettingsType]Base class for processor units – i.e. units that process messages. This is an abstract base class that provides common functionality for consumer and transformer units. You probably do not want to inherit from this class directly as you would need to define a custom implementation of create_processor. Refer instead to BaseConsumerUnit or BaseTransformerUnit.
- Parameters:
settings (Settings | None)
- INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
- class BaseConsumerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,ConsumerType]Base class for consumer units – i.e. units that receive messages but do not return results. Implement a new Unit as follows:
- class CustomUnit(BaseConsumerUnit[
CustomConsumerSettings, # SettingsType AxisArray, # MessageInType CustomConsumer, # ConsumerType
- ]):
SETTINGS = CustomConsumerSettings
… that’s all!
Where CustomConsumerSettings and CustomConsumer are custom implementations of: - ez.Settings for settings - BaseConsumer or BaseStatefulConsumer for the consumer implementation
- Parameters:
settings (Settings | None)
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- class BaseTransformerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,MessageOutType,TransformerType]Base class for transformer units – i.e. units that transform input messages into output messages. Implement a new Unit as follows:
- class CustomUnit(BaseTransformerUnit[
CustomTransformerSettings, # SettingsType AxisArray, # MessageInType AxisArray, # MessageOutType CustomTransformer, # TransformerType
- ]):
SETTINGS = CustomTransformerSettings
… that’s all!
Where CustomTransformerSettings and CustomTransformer are custom implementations of: - ez.Settings for settings - One of these transformer types:
BaseTransformer
BaseStatefulTransformer
CompositeProcessor
- Parameters:
settings (Settings | None)
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- class BaseAdaptiveTransformerUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,MessageInType,MessageOutType,AdaptiveTransformerType]- Parameters:
settings (Settings | None)
- INPUT_SAMPLE = InputStream:unlocated[<class 'ezmsg.baseproc.util.message.SampleMessage'>]()#
- INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
- async on_sample(msg)[source]#
- Return type:
- Parameters:
msg (SampleMessage)
- class BaseClockDrivenUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProcessorUnit[SettingsType],ABC,Generic[SettingsType,ClockDrivenProducerType]Base class for clock-driven producer units.
These units receive clock ticks (LinearAxis) and produce AxisArray output. This simplifies the Clock → Counter → Generator pattern by combining the counter functionality into the generator.
Implement a new Unit as follows:
class SinGeneratorUnit(BaseClockDrivenUnit[ SinGeneratorSettings, # SettingsType (must extend ClockDrivenSettings) SinProducer, # ClockDrivenProducerType ]): SETTINGS = SinGeneratorSettings
Where SinGeneratorSettings extends ClockDrivenSettings and SinProducer extends BaseClockDrivenProducer.
- Parameters:
settings (Settings | None)
- INPUT_CLOCK = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.LinearAxis'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
- async on_clock(clock_tick)[source]#
- Return type:
- Parameters:
clock_tick (LinearAxis)
- class GenAxisArray(*args, settings=None, **kwargs)[source]#
Bases:
Unit- Parameters:
settings (Settings | None)
- INPUT_SETTINGS = InputStream:unlocated[<class 'ezmsg.core.settings.Settings'>]()#
- INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
- STATE#
alias of
GenState
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
- async on_settings(msg)[source]#
Update unit settings and reset generator. Note: Not all units will require a full reset with new settings. Override this method to implement a selective reset.
- class SampleMessage(trigger, sample)[source]#
Bases:
object- Parameters:
trigger (SampleTriggerMessage)
sample (AxisArray)
- __init__(trigger, sample)#
- Parameters:
trigger (SampleTriggerMessage)
sample (AxisArray)
- Return type:
None
-
trigger:
SampleTriggerMessage# The time, window, and value (if any) associated with the trigger.
- class SampleTriggerMessage(timestamp=<factory>, period=None, value=None)[source]#
Bases:
object- __init__(timestamp=<factory>, period=None, value=None)#
- is_sample_message(message)[source]#
Check if the message is a SampleMessage.
- Return type:
TypeGuard[SampleMessage]- Parameters:
message (Any)
- profile_method(trace_oldest=True)[source]#
Decorator to profile a method by logging its execution time and other details.
- Parameters:
trace_oldest (bool) – If True, trace the oldest sample time; otherwise, trace the newest.
- Returns:
The decorated function with profiling.
- Return type:
Callable
- profile_subpub(trace_oldest=True)[source]#
Decorator to profile a subscriber-publisher method in an ezmsg Unit by logging its execution time and other details.
- Parameters:
trace_oldest (bool) – If True, trace the oldest sample time; otherwise, trace the newest.
- Returns:
The decorated async task with profiling.
- Return type:
Callable
- exception CoroutineExecutionError[source]#
Bases:
ExceptionCustom exception for coroutine execution failures
- class SyncToAsyncGeneratorWrapper(gen)[source]#
Bases:
objectA wrapper for synchronous generators to be used in an async context.
- run_coroutine_sync(coroutine, timeout=30)[source]#
Executes an asyncio coroutine synchronously, with enhanced error handling.
- Parameters:
- Return type:
TypeVar(T)- Returns:
The result of the coroutine execution
- Raises:
CoroutineExecutionError – If execution fails due to threading or event loop issues
TimeoutError – If execution exceeds the timeout period
Exception – Any exception raised by the coroutine
- check_message_type_compatibility(type1, type2)[source]#
Check if two types are compatible for message passing. Returns True if: - Both are None/NoneType - Either is typing.Any - type1 is a subclass of type2, which includes
type1 and type2 are concrete types and type1 is a subclass of type2
type1 is None/NoneType and type2 is typing.Optional, or
type1 is subtype of the non-None inner type of type2 if type2 is Optional
type1 is a Union/Optional type and all inner types are compatible with type2
- resolve_typevar(cls, target_typevar)[source]#
Resolve the concrete type bound to a TypeVar in a class hierarchy. This function traverses the method resolution order (MRO) of the class and checks the original bases of each class in the MRO for the TypeVar. If the TypeVar is found, it returns the concrete type bound to it. If the TypeVar is not found, it raises a TypeError.
If the resolved type is itself a TypeVar, this function recursively resolves it until a concrete type is found.
- class Clock(*args, settings=None, **kwargs)[source]#
Bases:
BaseProducerUnit[ClockSettings,LinearAxis,ClockProducer]Clock unit that produces ticks at a specified rate.
Output is a
AxisArray.LinearAxiswith: -gain: 1/dispatch_rate (seconds per tick) -offset: Wall clock timestamp- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ClockSettings
- class ClockProducer(*args, **kwargs)[source]#
Bases:
BaseStatefulProducer[ClockSettings,LinearAxis,ClockState]Produces clock ticks at a specified rate.
Each tick outputs a
AxisArray.LinearAxiscontaining: -gain: 1/dispatch_rate (seconds per tick), or 0.0 if dispatch_rate is infinite -offset: Wall clock timestamp (time.monotonic)This output type allows downstream components (like Counter) to know both the timing of the tick and the nominal dispatch rate.
- class ClockSettings(dispatch_rate=inf)[source]#
Bases:
SettingsSettings for
ClockProducer.- Parameters:
dispatch_rate (float)
- class ClockState[source]#
Bases:
objectState for
ClockProducer.
- class Counter(*args, settings=None, **kwargs)[source]#
Bases:
BaseClockDrivenUnit[CounterSettings,CounterTransformer]Transforms clock ticks into monotonically increasing counter values as AxisArray.
Receives timing from INPUT_CLOCK (LinearAxis from Clock) and outputs AxisArray.
- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
CounterSettings
- class CounterSettings(fs, n_time=None, mod=None)[source]#
Bases:
ClockDrivenSettingsSettings for
CounterandCounterTransformer.- __init__(fs, n_time=None, mod=None)#
- class CounterTransformer(*args, **kwargs)[source]#
Bases:
BaseClockDrivenProducer[CounterSettings,CounterTransformerState]Transforms clock ticks (LinearAxis) into AxisArray counter values.
Each clock tick produces a block of counter values. The block size is either fixed (n_time setting) or derived from the clock’s gain (fs * gain).
- class CounterTransformerState[source]#
Bases:
ClockDrivenStateState for
CounterTransformer.
Modules
Clock generator for timing control. |
|
Clock-driven producer base classes for generating data synchronized to clock ticks. |
|
Composite processor classes for building pipelines. |
|
Counter generator for sample counting and timing. |
|
Base processor classes for ezmsg (non-stateful). |
|
Protocol definitions and type variables for ezmsg processors. |
|
Stateful processor base classes for ezmsg. |
|
Base Unit classes for ezmsg integration. |
|