ezmsg.baseproc#

ezmsg-baseproc: Base processor classes for ezmsg.

This package provides the foundational processor architecture for building signal processing pipelines in ezmsg.

class Processor(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageInType, MessageOutType]

Protocol for processors. You probably will not implement this protocol directly. Refer instead to the less ambiguous Consumer and Transformer protocols, and the base classes in this module which implement them.

Note: In Python 3.12+, we can invoke __acall__ directly using await obj(message),

but to support earlier versions we need to use await obj.__acall__(message).

__init__(*args, **kwargs)#
class Producer(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageOutType]

Protocol for producers that generate messages.

__init__(*args, **kwargs)#
class Consumer(*args, **kwargs)[source]#

Bases: Processor[SettingsType, MessageInType, None], Protocol

Protocol for consumers that receive messages but do not return a result.

class Transformer(*args, **kwargs)[source]#

Bases: Processor[SettingsType, MessageInType, MessageOutType], Protocol

Protocol for transformers that receive messages and return a result of the same class.

class StatefulProcessor(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageInType, MessageOutType, StateType]

Base protocol for _stateful_ message processors. You probably will not implement this protocol directly. Refer instead to the less ambiguous StatefulConsumer and StatefulTransformer protocols.

__init__(*args, **kwargs)#
property state: StateType#
stateful_op(state, message)[source]#
Return type:

tuple[Any, Any]

Parameters:
class StatefulProducer(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageOutType, StateType]

Protocol for producers that generate messages without consuming inputs.

__init__(*args, **kwargs)#
property state: StateType#
stateful_op(state)[source]#
Return type:

tuple[Any, Any]

Parameters:

state (Any)

class StatefulConsumer(*args, **kwargs)[source]#

Bases: StatefulProcessor[SettingsType, MessageInType, None, StateType], Protocol

Protocol specifically for processors that consume messages without producing output.

stateful_op(state, message)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], None]

Parameters:
  • state (tuple[StateType, int])

  • message (MessageInType)

class StatefulTransformer(*args, **kwargs)[source]#

Bases: StatefulProcessor[SettingsType, MessageInType, MessageOutType, StateType], Protocol

Protocol specifically for processors that transform messages.

stateful_op(state, message)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], TypeVar(MessageOutType)]

Parameters:
  • state (tuple[StateType, int])

  • message (MessageInType)

class AdaptiveTransformer(*args, **kwargs)[source]#

Bases: StatefulTransformer, Protocol

async apartial_fit(message)[source]#
Return type:

None

Parameters:

message (SampleMessage)

partial_fit(message)[source]#

Update transformer state using labeled training data.

This method should update the internal state/parameters of the transformer based on the provided labeled samples, without performing any transformation.

Return type:

None

Parameters:

message (SampleMessage)

processor_state(cls=None, /, *, init=False, repr=True, eq=True, order=False, unsafe_hash=True, frozen=False, match_args=True, kw_only=False, slots=False, weakref_slot=False)#

Add dunder methods based on the fields defined in the class.

Examines PEP 526 __annotations__ to determine fields.

If init is true, an __init__() method is added to the class. If repr is true, a __repr__() method is added. If order is true, rich comparison dunder methods are added. If unsafe_hash is true, a __hash__() method is added. If frozen is true, fields may not be assigned to after instance creation. If match_args is true, the __match_args__ tuple is added. If kw_only is true, then by default all fields are keyword-only. If slots is true, a new class with a __slots__ attribute is returned.

class BaseProcessor(*args, settings=None, **kwargs)[source]#

Bases: ABC, Generic[SettingsType, MessageInType, MessageOutType]

Base class for processors. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes.

Note that BaseProcessor and its children are sync by default. If you need async by defualt, then override the async methods and call them from the sync methods. Look to BaseProducer for examples of calling async methods from sync methods.

Parameters:

settings (SettingsType)

__init__(*args, settings=None, **kwargs)[source]#
Parameters:

settings (SettingsType | None)

Return type:

None

async asend(message)[source]#

Alias for __acall__.

Return type:

Any

Parameters:

message (Any)

classmethod get_message_type(dir)[source]#
Return type:

Any

Parameters:

dir (str)

classmethod get_settings_type()[source]#
Return type:

type[TypeVar(SettingsType, bound= Settings)]

send(message)[source]#

Alias for __call__.

Return type:

Any

Parameters:

message (Any)

settings: TypeVar(SettingsType, bound= Settings)#
class BaseProducer(*args, settings=None, **kwargs)[source]#

Bases: ABC, Generic[SettingsType, MessageOutType]

Base class for producers – processors that generate messages without consuming inputs.

Note that BaseProducer and its children are async by default, and the sync methods simply wrap

the async methods. This is the opposite of BaseProcessor and its children which are sync by default. These classes are designed this way because it is highly likely that a producer, which (probably) does not receive inputs, will require some sort of IO which will benefit from being async.

Parameters:

settings (SettingsType | None)

__init__(*args, settings=None, **kwargs)[source]#
Parameters:

settings (SettingsType | None)

Return type:

None

classmethod get_message_type(dir)[source]#
Return type:

type[TypeVar(MessageOutType)] | None

Parameters:

dir (str)

classmethod get_settings_type()[source]#
Return type:

type[TypeVar(SettingsType, bound= Settings)]

class BaseConsumer(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, None], ABC, Generic[SettingsType, MessageInType]

Base class for consumers – processors that receive messages but don’t produce output. This base simply overrides type annotations of BaseProcessor to remove the outputs. (We don’t bother overriding send and asend because those are deprecated.)

Parameters:

settings (SettingsType)

classmethod get_message_type(dir)[source]#
Return type:

type[TypeVar(MessageInType)] | None

Parameters:

dir (str)

class BaseTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, MessageOutType], ABC, Generic[SettingsType, MessageInType, MessageOutType]

Base class for transformers – processors which receive messages and produce output. This base simply overrides type annotations of BaseProcessor to indicate that outputs are not optional. (We don’t bother overriding send and asend because those are deprecated.)

Parameters:

settings (SettingsType)

class Stateful[source]#

Bases: ABC, Generic[StateType]

Mixin class for stateful processors. DO NOT use this class directly. Used to enforce that the processor/producer has a state attribute and stateful_op method.

classmethod get_state_type()[source]#
Return type:

type[TypeVar(StateType)]

property state: StateType#
abstractmethod stateful_op(*args, **kwargs)[source]#
Return type:

tuple

Parameters:
class BaseStatefulProcessor(*args, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, MessageOutType], Stateful[StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

Base class implementing common stateful processor functionality. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes. Use BaseStatefulConsumer for operations that do not return a result, or BaseStatefulTransformer for operations that do return a result.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state, message)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], Any]

Parameters:
class BaseStatefulProducer(*args, **kwargs)[source]#

Bases: BaseProducer[SettingsType, MessageOutType], Stateful[StateType], ABC, Generic[SettingsType, MessageOutType, StateType]

Base class implementing common stateful producer functionality.

Examples of stateful producers are things that require counters, clocks, or to cycle through a set of values.

Unlike BaseStatefulProcessor, this class does not message hashing because there

are no input messages. We still use self._hash to simply track the transition from initialization (.hash == -1) to state reset (.hash == 0).

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], TypeVar(MessageOutType)]

Parameters:

state (tuple[StateType, int] | None)

class BaseStatefulConsumer(*args, **kwargs)[source]#

Bases: BaseStatefulProcessor[SettingsType, MessageInType, None, StateType], ABC, Generic[SettingsType, MessageInType, StateType]

Base class for stateful message consumers that don’t produce output. This class merely overrides the type annotations of BaseStatefulProcessor.

classmethod get_message_type(dir)[source]#
Return type:

type[TypeVar(MessageInType)] | None

Parameters:

dir (str)

stateful_op(state, message)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], None]

Parameters:
  • state (tuple[StateType, int] | None)

  • message (MessageInType)

class BaseStatefulTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulProcessor[SettingsType, MessageInType, MessageOutType, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

Base class for stateful message transformers that produce output. This class merely overrides the type annotations of BaseStatefulProcessor.

stateful_op(state, message)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], TypeVar(MessageOutType)]

Parameters:
  • state (tuple[StateType, int] | None)

  • message (MessageInType)

class BaseAdaptiveTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SettingsType, MessageInType | SampleMessage, MessageOutType | None, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

async apartial_fit(message)[source]#

Override me if you need async partial fitting.

Return type:

None

Parameters:

message (SampleMessage)

abstractmethod partial_fit(message)[source]#
Return type:

None

Parameters:

message (SampleMessage)

class BaseAsyncTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SettingsType, MessageInType, MessageOutType, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

This reverses the priority of async and sync methods from BaseStatefulTransformer. Whereas in BaseStatefulTransformer, the async methods simply called the sync methods, here the sync methods call the async methods, more similar to BaseStatefulProducer.

class BaseClockDrivenProducer(*args, **kwargs)[source]#

Bases: BaseStatefulProcessor[ClockDrivenSettingsType, LinearAxis, AxisArray, StateType], Generic[ClockDrivenSettingsType, StateType]

Base class for clock-driven data producers.

Accepts clock ticks (LinearAxis) as input and produces AxisArray output. Handles all the timing/counter logic internally, so subclasses only need to implement the data generation logic.

This eliminates the need for the Clock → Counter → Generator pattern by combining the Counter functionality into the generator base class.

Subclasses must implement:
  • _reset_state(time_axis): Initialize any state needed for production

  • _produce(n_samples, time_axis): Generate the actual output data

Example:

@processor_state
class SinState(ClockDrivenState):
    ang_freq: float = 0.0

class SinProducer(BaseClockDrivenProducer[SinSettings, SinState]):
    def _reset_state(self, time_axis: AxisArray.TimeAxis) -> None:
        self._state.ang_freq = 2 * np.pi * self.settings.fs

    def _produce(self, n_samples: int, time_axis: AxisArray.TimeAxis) -> AxisArray:
        t = (np.arange(n_samples) + self._state.counter) * time_axis.gain
        data = np.sin(self._state.ang_freq * t)
        return AxisArray(data=data, dims=["time"], axes={"time": time_axis})
class ClockDrivenSettings(fs, n_time=None)[source]#

Bases: Settings

Base settings for clock-driven producers.

Subclass this to add your own settings while inheriting fs and n_time.

Example:

class SinGeneratorSettings(ClockDrivenSettings):
    freq: float = 1.0
    amp: float = 1.0
Parameters:
__init__(fs, n_time=None)#
Parameters:
Return type:

None

n_time: int | None = None#

Samples per block. - If specified: fixed chunk size (clock gain is ignored for determining chunk size) - If None: derived from clock gain (fs * clock.gain), with fractional sample tracking

fs: float#

Output sampling rate in Hz.

class ClockDrivenState[source]#

Bases: object

Internal state for clock-driven producers.

Tracks sample counting and fractional sample accumulation. Subclasses should extend this if they need additional state.

counter: int = 0#

Current sample counter (total samples produced).

fractional_samples: float = 0.0#

Accumulated fractional samples for variable chunk mode.

class CompositeStateful[source]#

Bases: Stateful[dict[str, Any]], ABC, Generic[SettingsType, MessageOutType]

Mixin class for composite processor/producer chains. DO NOT use this class directly. Used to enforce statefulness of the composite processor/producer chain and provide initialization and validation methods.

property state: dict[str, Any]#
abstractmethod stateful_op(state, *args, **kwargs)[source]#
Return type:

tuple[dict[str, tuple[Any, int]], Optional[TypeVar(MessageOutType)]]

Parameters:
class CompositeProcessor(*args, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, MessageOutType], CompositeStateful[SettingsType, MessageOutType], ABC, Generic[SettingsType, MessageInType, MessageOutType]

A processor that chains multiple processor together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The last processor may be a consumer, otherwise processors must be transformers. Use CompositeProducer if you want the first processor to be a producer. Concrete subclasses must implement _initialize_processors. Optionally override _reset_state if you want adaptive state behaviour. Example implementation:

class CustomCompositeProcessor(CompositeProcessor[CustomSettings, AxisArray, AxisArray]):

@staticmethod def _initialize_processors(settings: CustomSettings) -> dict[str, BaseProcessor]:

return {

“stateful_transformer”: CustomStatefulProducer(**settings), “transformer”: CustomTransformer(**settings),

}

Where **settings should be replaced with initialisation arguments for each processor.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state, message)[source]#
Return type:

tuple[dict[str, tuple[Any, int]], Optional[TypeVar(MessageOutType)]]

Parameters:
class CompositeProducer(*args, **kwargs)[source]#

Bases: BaseProducer[SettingsType, MessageOutType], CompositeStateful[SettingsType, MessageOutType], ABC, Generic[SettingsType, MessageOutType]

A producer that chains multiple processors (starting with a producer) together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The first processor must be a producer, the last processor may be a consumer, otherwise processors must be transformers.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state)[source]#
Return type:

tuple[dict[str, tuple[Any, int]], Optional[TypeVar(MessageOutType)]]

Parameters:

state (dict[str, tuple[Any, int]] | None)

class BaseProducerUnit(*args, settings=None, **kwargs)[source]#

Bases: Unit, ABC, Generic[SettingsType, MessageOutType, ProducerType]

Base class for producer units – i.e. units that generate messages without consuming inputs. Implement a new Unit as follows:

class CustomUnit(BaseProducerUnit[

CustomProducerSettings, # SettingsType AxisArray, # MessageOutType CustomProducer, # ProducerType

]):

SETTINGS = CustomProducerSettings

… that’s all!

Where CustomProducerSettings, and CustomProducer are custom implementations of ez.Settings, and BaseProducer or BaseStatefulProducer, respectively.

Parameters:

settings (Settings | None)

INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
create_producer()[source]#

Create the producer instance from settings.

Return type:

None

async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

async on_settings(msg)[source]#

Receive a settings message, override self.SETTINGS, and re-create the producer. Child classes that wish to have fine-grained control over whether the core producer resets on settings changes should override this method.

Parameters:

msg (TypeVar(SettingsType, bound= Settings)) – a settings message.

Return type:

None

async produce()[source]#
Return type:

AsyncGenerator

class BaseProcessorUnit(*args, settings=None, **kwargs)[source]#

Bases: Unit, ABC, Generic[SettingsType]

Base class for processor units – i.e. units that process messages. This is an abstract base class that provides common functionality for consumer and transformer units. You probably do not want to inherit from this class directly as you would need to define a custom implementation of create_processor. Refer instead to BaseConsumerUnit or BaseTransformerUnit.

Parameters:

settings (Settings | None)

INPUT_SETTINGS = InputStream:unlocated[~SettingsType]()#
abstractmethod create_processor()[source]#
Return type:

None

async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

async on_settings(msg)[source]#

Receive a settings message, override self.SETTINGS, and re-create the processor. Child classes that wish to have fine-grained control over whether the core processor resets on settings changes should override this method.

Parameters:

msg (TypeVar(SettingsType, bound= Settings)) – a settings message.

Return type:

None

class BaseConsumerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, ConsumerType]

Base class for consumer units – i.e. units that receive messages but do not return results. Implement a new Unit as follows:

class CustomUnit(BaseConsumerUnit[

CustomConsumerSettings, # SettingsType AxisArray, # MessageInType CustomConsumer, # ConsumerType

]):

SETTINGS = CustomConsumerSettings

… that’s all!

Where CustomConsumerSettings and CustomConsumer are custom implementations of: - ez.Settings for settings - BaseConsumer or BaseStatefulConsumer for the consumer implementation

Parameters:

settings (Settings | None)

INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
create_processor()[source]#

Create the consumer instance from settings.

async on_signal(message)[source]#

Consume the message. :type message: TypeVar(MessageInType) :param message: Input message to be consumed

Parameters:

message (MessageInType)

class BaseTransformerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, MessageOutType, TransformerType]

Base class for transformer units – i.e. units that transform input messages into output messages. Implement a new Unit as follows:

class CustomUnit(BaseTransformerUnit[

CustomTransformerSettings, # SettingsType AxisArray, # MessageInType AxisArray, # MessageOutType CustomTransformer, # TransformerType

]):

SETTINGS = CustomTransformerSettings

… that’s all!

Where CustomTransformerSettings and CustomTransformer are custom implementations of: - ez.Settings for settings - One of these transformer types:

  • BaseTransformer

  • BaseStatefulTransformer

  • CompositeProcessor

Parameters:

settings (Settings | None)

INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
create_processor()[source]#

Create the transformer instance from settings.

async on_signal(message)[source]#
Return type:

AsyncGenerator

Parameters:

message (MessageInType)

class BaseAdaptiveTransformerUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, MessageInType, MessageOutType, AdaptiveTransformerType]

Parameters:

settings (Settings | None)

INPUT_SAMPLE = InputStream:unlocated[<class 'ezmsg.baseproc.util.message.SampleMessage'>]()#
INPUT_SIGNAL = InputStream:unlocated[~MessageInType]()#
OUTPUT_SIGNAL = OutputStream:unlocated[~MessageOutType](self.num_buffers=32, self.force_tcp=False)#
create_processor()[source]#

Create the adaptive transformer instance from settings.

Return type:

None

async on_sample(msg)[source]#
Return type:

None

Parameters:

msg (SampleMessage)

async on_signal(message)[source]#
Return type:

AsyncGenerator

Parameters:

message (MessageInType)

class BaseClockDrivenUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[SettingsType], ABC, Generic[SettingsType, ClockDrivenProducerType]

Base class for clock-driven producer units.

These units receive clock ticks (LinearAxis) and produce AxisArray output. This simplifies the Clock → Counter → Generator pattern by combining the counter functionality into the generator.

Implement a new Unit as follows:

class SinGeneratorUnit(BaseClockDrivenUnit[
    SinGeneratorSettings,     # SettingsType (must extend ClockDrivenSettings)
    SinProducer,              # ClockDrivenProducerType
]):
    SETTINGS = SinGeneratorSettings

Where SinGeneratorSettings extends ClockDrivenSettings and SinProducer extends BaseClockDrivenProducer.

Parameters:

settings (Settings | None)

INPUT_CLOCK = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.LinearAxis'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
create_processor()[source]#

Create the clock-driven producer instance from settings.

Return type:

None

async on_clock(clock_tick)[source]#
Return type:

AsyncGenerator

Parameters:

clock_tick (LinearAxis)

class GenAxisArray(*args, settings=None, **kwargs)[source]#

Bases: Unit

Parameters:

settings (Settings | None)

INPUT_SETTINGS = InputStream:unlocated[<class 'ezmsg.core.settings.Settings'>]()#
INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
STATE#

alias of GenState

construct_generator()[source]#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

async on_settings(msg)[source]#

Update unit settings and reset generator. Note: Not all units will require a full reset with new settings. Override this method to implement a selective reset.

Parameters:

msg (Settings) – Instance of SETTINGS object.

Return type:

None

async on_signal(message)[source]#
Return type:

AsyncGenerator

Parameters:

message (AxisArray)

get_base_producer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

get_base_consumer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

get_base_transformer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

get_base_adaptive_transformer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

get_base_clockdriven_producer_type(cls)[source]#
Return type:

type

Parameters:

cls (type)

class SampleMessage(trigger, sample)[source]#

Bases: object

Parameters:
__init__(trigger, sample)#
Parameters:
Return type:

None

trigger: SampleTriggerMessage#

The time, window, and value (if any) associated with the trigger.

sample: AxisArray#

The data sampled around the trigger.

class SampleTriggerMessage(timestamp=<factory>, period=None, value=None)[source]#

Bases: object

Parameters:
__init__(timestamp=<factory>, period=None, value=None)#
Parameters:
Return type:

None

period: tuple[float, float] | None = None#

The period around the timestamp, in seconds

value: Any = None#

A value or ‘label’ associated with the trigger.

timestamp: float#

Time of the trigger, in seconds. The Clock depends on the input but defaults to time.time

is_sample_message(message)[source]#

Check if the message is a SampleMessage.

Return type:

TypeGuard[SampleMessage]

Parameters:

message (Any)

profile_method(trace_oldest=True)[source]#

Decorator to profile a method by logging its execution time and other details.

Parameters:

trace_oldest (bool) – If True, trace the oldest sample time; otherwise, trace the newest.

Returns:

The decorated function with profiling.

Return type:

Callable

profile_subpub(trace_oldest=True)[source]#

Decorator to profile a subscriber-publisher method in an ezmsg Unit by logging its execution time and other details.

Parameters:

trace_oldest (bool) – If True, trace the oldest sample time; otherwise, trace the newest.

Returns:

The decorated async task with profiling.

Return type:

Callable

exception CoroutineExecutionError[source]#

Bases: Exception

Custom exception for coroutine execution failures

class SyncToAsyncGeneratorWrapper(gen)[source]#

Bases: object

A wrapper for synchronous generators to be used in an async context.

__init__(gen)[source]#
async aclose()[source]#
async asend(value)[source]#
run_coroutine_sync(coroutine, timeout=30)[source]#

Executes an asyncio coroutine synchronously, with enhanced error handling.

Parameters:
  • coroutine (Coroutine[Any, Any, TypeVar(T)]) – The asyncio coroutine to execute

  • timeout (float) – Maximum time in seconds to wait for coroutine completion (default: 30)

Return type:

TypeVar(T)

Returns:

The result of the coroutine execution

Raises:
check_message_type_compatibility(type1, type2)[source]#

Check if two types are compatible for message passing. Returns True if: - Both are None/NoneType - Either is typing.Any - type1 is a subclass of type2, which includes

  • type1 and type2 are concrete types and type1 is a subclass of type2

  • type1 is None/NoneType and type2 is typing.Optional, or

  • type1 is subtype of the non-None inner type of type2 if type2 is Optional

  • type1 is a Union/Optional type and all inner types are compatible with type2

Parameters:
Returns:

True if the types are compatible, False otherwise

Return type:

bool

resolve_typevar(cls, target_typevar)[source]#

Resolve the concrete type bound to a TypeVar in a class hierarchy. This function traverses the method resolution order (MRO) of the class and checks the original bases of each class in the MRO for the TypeVar. If the TypeVar is found, it returns the concrete type bound to it. If the TypeVar is not found, it raises a TypeError.

If the resolved type is itself a TypeVar, this function recursively resolves it until a concrete type is found.

Parameters:
  • cls (type) – The class to inspect.

  • target_typevar (TypeVar) – The TypeVar to resolve.

Returns:

The concrete type bound to the TypeVar.

Return type:

type

class Clock(*args, settings=None, **kwargs)[source]#

Bases: BaseProducerUnit[ClockSettings, LinearAxis, ClockProducer]

Clock unit that produces ticks at a specified rate.

Output is a AxisArray.LinearAxis with: - gain: 1/dispatch_rate (seconds per tick) - offset: Wall clock timestamp

Parameters:

settings (Settings | None)

SETTINGS#

alias of ClockSettings

class ClockProducer(*args, **kwargs)[source]#

Bases: BaseStatefulProducer[ClockSettings, LinearAxis, ClockState]

Produces clock ticks at a specified rate.

Each tick outputs a AxisArray.LinearAxis containing: - gain: 1/dispatch_rate (seconds per tick), or 0.0 if dispatch_rate is infinite - offset: Wall clock timestamp (time.monotonic)

This output type allows downstream components (like Counter) to know both the timing of the tick and the nominal dispatch rate.

class ClockSettings(dispatch_rate=inf)[source]#

Bases: Settings

Settings for ClockProducer.

Parameters:

dispatch_rate (float)

__init__(dispatch_rate=inf)#
Parameters:

dispatch_rate (float)

Return type:

None

dispatch_rate: float = inf#

Dispatch rate in Hz. - Finite value (e.g., 100.0): Dispatch 100 times per second - math.inf: Dispatch as fast as possible (no sleep)

class ClockState[source]#

Bases: object

State for ClockProducer.

n_dispatch: int = 0#

Number of dispatches since reset.

t_0: float#

Start time (monotonic).

class Counter(*args, settings=None, **kwargs)[source]#

Bases: BaseClockDrivenUnit[CounterSettings, CounterTransformer]

Transforms clock ticks into monotonically increasing counter values as AxisArray.

Receives timing from INPUT_CLOCK (LinearAxis from Clock) and outputs AxisArray.

Parameters:

settings (Settings | None)

SETTINGS#

alias of CounterSettings

class CounterSettings(fs, n_time=None, mod=None)[source]#

Bases: ClockDrivenSettings

Settings for Counter and CounterTransformer.

Parameters:
__init__(fs, n_time=None, mod=None)#
Parameters:
Return type:

None

mod: int | None = None#

If set, counter values rollover at this modulus.

class CounterTransformer(*args, **kwargs)[source]#

Bases: BaseClockDrivenProducer[CounterSettings, CounterTransformerState]

Transforms clock ticks (LinearAxis) into AxisArray counter values.

Each clock tick produces a block of counter values. The block size is either fixed (n_time setting) or derived from the clock’s gain (fs * gain).

class CounterTransformerState[source]#

Bases: ClockDrivenState

State for CounterTransformer.

template: AxisArray | None = None#

Modules

clock

Clock generator for timing control.

clockdriven

Clock-driven producer base classes for generating data synchronized to clock ticks.

composite

Composite processor classes for building pipelines.

counter

Counter generator for sample counting and timing.

processor

Base processor classes for ezmsg (non-stateful).

protocols

Protocol definitions and type variables for ezmsg processors.

stateful

Stateful processor base classes for ezmsg.

units

Base Unit classes for ezmsg integration.

util