ezmsg.baseproc.processor#

Base processor classes for ezmsg (non-stateful).

Classes

class BaseConsumer(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, None], ABC, Generic[SettingsType, MessageInType]

Base class for consumers – processors that receive messages but don’t produce output. This base simply overrides type annotations of BaseProcessor to remove the outputs. (We don’t bother overriding send and asend because those are deprecated.)

Parameters:

settings (SettingsType)

classmethod get_message_type(dir)[source]#
Return type:

type[TypeVar(MessageInType)] | None

Parameters:

dir (str)

class BaseProcessor(*args, settings=None, **kwargs)[source]#

Bases: ABC, Generic[SettingsType, MessageInType, MessageOutType]

Base class for processors. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes.

  • Use BaseConsumer or BaseTransformer for ops that return a result or not, respectively.

  • Use BaseStatefulProcessor and its children for operations that require state.

Note that BaseProcessor and its children are sync by default. If you need async by default, then override the async methods and call them from the sync methods. Look to BaseProducer for examples of calling async methods from sync methods.

Parameters:

settings (SettingsType)

NONRESET_SETTINGS_FIELDS: ClassVar[frozenset[str]] = frozenset({})#
classmethod get_settings_type()[source]#
Return type:

type[TypeVar(SettingsType, bound= Settings)]

classmethod get_message_type(dir)[source]#
Return type:

Any

Parameters:

dir (str)

__init__(*args, settings=None, **kwargs)[source]#
Parameters:

settings (SettingsType | None)

Return type:

None

settings: SettingsType#
update_settings(new_settings)[source]#

Apply new settings to this processor, requesting a state reset if needed.

Diffs new_settings against the current self.settings. If every changed field is listed in NONRESET_SETTINGS_FIELDS, self.settings is simply rebound and the processor keeps running. Otherwise, _request_reset() is called so the next message triggers a fresh _reset_state (for stateful subclasses). Override for finer-grained control than the class-level allow-list provides.

Return type:

None

Parameters:

new_settings (SettingsType)

send(message)[source]#

Alias for __call__.

Return type:

Any

Parameters:

message (Any)

async asend(message)[source]#

Alias for __acall__.

Return type:

Any

Parameters:

message (Any)

close()[source]#

Release any resources held by this processor.

No-op by default; override in subclasses that hold sockets, file handles, hardware sessions, or other external resources. The Unit base classes call this on the previous instance whenever a settings change triggers a recreate.

Return type:

None

class BaseProducer(*args, settings=None, **kwargs)[source]#

Bases: ABC, Generic[SettingsType, MessageOutType]

Base class for producers – processors that generate messages without consuming inputs.

Note that BaseProducer and its children are async by default, and the sync methods simply wrap

the async methods. This is the opposite of BaseProcessor and its children which are sync by default. These classes are designed this way because it is highly likely that a producer, which (probably) does not receive inputs, will require some sort of IO which will benefit from being async.

Parameters:

settings (SettingsType)

NONRESET_SETTINGS_FIELDS: ClassVar[frozenset[str]] = frozenset({})#
classmethod get_settings_type()[source]#
Return type:

type[TypeVar(SettingsType, bound= Settings)]

classmethod get_message_type(dir)[source]#
Return type:

type[TypeVar(MessageOutType)] | None

Parameters:

dir (str)

__init__(*args, settings=None, **kwargs)[source]#
Parameters:

settings (SettingsType | None)

Return type:

None

settings: SettingsType#
update_settings(new_settings)[source]#

Apply new settings to this producer, requesting a state reset if needed.

See BaseProcessor.update_settings(). The reset path matters most for stateful producers, where it reopens hardware or re-derives cached values in _reset_state.

Return type:

None

Parameters:

new_settings (SettingsType)

close()[source]#

Release any resources held by this producer.

No-op by default; override in subclasses that hold sockets, file handles, hardware sessions, or other external resources. The Unit base classes call this on the previous instance whenever a settings change triggers a recreate.

Return type:

None

class BaseTransformer(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, MessageOutType], ABC, Generic[SettingsType, MessageInType, MessageOutType]

Base class for transformers – processors which receive messages and produce output. This base simply overrides type annotations of BaseProcessor to indicate that outputs are not optional. (We don’t bother overriding send and asend because those are deprecated.)

Parameters:

settings (SettingsType)