ezmsg.baseproc.composite#

Composite processor classes for building pipelines.

Classes

class CompositeProcessor(*args, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, MessageOutType], CompositeStateful[SettingsType, MessageOutType], ABC, Generic[SettingsType, MessageInType, MessageOutType]

A processor that chains multiple processor together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The last processor may be a consumer, otherwise processors must be transformers. Use CompositeProducer if you want the first processor to be a producer. Concrete subclasses must implement _initialize_processors. Optionally override _reset_state if you want adaptive state behaviour. Example implementation:

class CustomCompositeProcessor(CompositeProcessor[CustomSettings, AxisArray, AxisArray]):

@staticmethod def _initialize_processors(settings: CustomSettings) -> dict[str, BaseProcessor]:

return {

“stateful_transformer”: CustomStatefulProducer(**settings), “transformer”: CustomTransformer(**settings),

}

Where **settings should be replaced with initialisation arguments for each processor.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state, message)[source]#
Return type:

tuple[dict[str, tuple[Any, int]], Optional[TypeVar(MessageOutType)]]

Parameters:
class CompositeProducer(*args, **kwargs)[source]#

Bases: BaseProducer[SettingsType, MessageOutType], CompositeStateful[SettingsType, MessageOutType], ABC, Generic[SettingsType, MessageOutType]

A producer that chains multiple processors (starting with a producer) together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The first processor must be a producer, the last processor may be a consumer, otherwise processors must be transformers.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state)[source]#
Return type:

tuple[dict[str, tuple[Any, int]], Optional[TypeVar(MessageOutType)]]

Parameters:

state (dict[str, tuple[Any, int]] | None)

class CompositeStateful[source]#

Bases: Stateful[dict[str, Any]], ABC, Generic[SettingsType, MessageOutType]

Mixin class for composite processor/producer chains. DO NOT use this class directly. Used to enforce statefulness of the composite processor/producer chain and provide initialization and validation methods.

property state: dict[str, Any]#
abstractmethod stateful_op(state, *args, **kwargs)[source]#
Return type:

tuple[dict[str, tuple[Any, int]], Optional[TypeVar(MessageOutType)]]

Parameters:
class CompositeStateful[source]#

Bases: Stateful[dict[str, Any]], ABC, Generic[SettingsType, MessageOutType]

Mixin class for composite processor/producer chains. DO NOT use this class directly. Used to enforce statefulness of the composite processor/producer chain and provide initialization and validation methods.

property state: dict[str, Any]#
abstractmethod stateful_op(state, *args, **kwargs)[source]#
Return type:

tuple[dict[str, tuple[Any, int]], Optional[TypeVar(MessageOutType)]]

Parameters:
class CompositeProcessor(*args, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, MessageOutType], CompositeStateful[SettingsType, MessageOutType], ABC, Generic[SettingsType, MessageInType, MessageOutType]

A processor that chains multiple processor together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The last processor may be a consumer, otherwise processors must be transformers. Use CompositeProducer if you want the first processor to be a producer. Concrete subclasses must implement _initialize_processors. Optionally override _reset_state if you want adaptive state behaviour. Example implementation:

class CustomCompositeProcessor(CompositeProcessor[CustomSettings, AxisArray, AxisArray]):

@staticmethod def _initialize_processors(settings: CustomSettings) -> dict[str, BaseProcessor]:

return {

“stateful_transformer”: CustomStatefulProducer(**settings), “transformer”: CustomTransformer(**settings),

}

Where **settings should be replaced with initialisation arguments for each processor.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state, message)[source]#
Return type:

tuple[dict[str, tuple[Any, int]], Optional[TypeVar(MessageOutType)]]

Parameters:
class CompositeProducer(*args, **kwargs)[source]#

Bases: BaseProducer[SettingsType, MessageOutType], CompositeStateful[SettingsType, MessageOutType], ABC, Generic[SettingsType, MessageOutType]

A producer that chains multiple processors (starting with a producer) together in a feedforward non-branching graph. The individual processors may be stateless or stateful. The first processor must be a producer, the last processor may be a consumer, otherwise processors must be transformers.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state)[source]#
Return type:

tuple[dict[str, tuple[Any, int]], Optional[TypeVar(MessageOutType)]]

Parameters:

state (dict[str, tuple[Any, int]] | None)