ezmsg.baseproc.stateful#

Stateful processor base classes for ezmsg.

Classes

class BaseAdaptiveTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SettingsType, MessageInType | SampleMessage, MessageOutType | None, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

abstractmethod partial_fit(message)[source]#
Return type:

None

Parameters:

message (SampleMessage)

async apartial_fit(message)[source]#

Override me if you need async partial fitting.

Return type:

None

Parameters:

message (SampleMessage)

class BaseAsyncTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SettingsType, MessageInType, MessageOutType, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

This reverses the priority of async and sync methods from BaseStatefulTransformer. Whereas in BaseStatefulTransformer, the async methods simply called the sync methods, here the sync methods call the async methods, more similar to BaseStatefulProducer.

class BaseStatefulConsumer(*args, **kwargs)[source]#

Bases: BaseStatefulProcessor[SettingsType, MessageInType, None, StateType], ABC, Generic[SettingsType, MessageInType, StateType]

Base class for stateful message consumers that don’t produce output. This class merely overrides the type annotations of BaseStatefulProcessor.

classmethod get_message_type(dir)[source]#
Return type:

type[TypeVar(MessageInType)] | None

Parameters:

dir (str)

stateful_op(state, message)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], None]

Parameters:
  • state (tuple[StateType, int] | None)

  • message (MessageInType)

class BaseStatefulProcessor(*args, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, MessageOutType], Stateful[StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

Base class implementing common stateful processor functionality. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes. Use BaseStatefulConsumer for operations that do not return a result, or BaseStatefulTransformer for operations that do return a result.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state, message)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], Any]

Parameters:
class BaseStatefulProducer(*args, **kwargs)[source]#

Bases: BaseProducer[SettingsType, MessageOutType], Stateful[StateType], ABC, Generic[SettingsType, MessageOutType, StateType]

Base class implementing common stateful producer functionality.

Examples of stateful producers are things that require counters, clocks, or to cycle through a set of values.

Unlike BaseStatefulProcessor, this class does not message hashing because there

are no input messages. We still use self._hash to simply track the transition from initialization (.hash == -1) to state reset (.hash == 0).

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], TypeVar(MessageOutType)]

Parameters:

state (tuple[StateType, int] | None)

class BaseStatefulTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulProcessor[SettingsType, MessageInType, MessageOutType, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

Base class for stateful message transformers that produce output. This class merely overrides the type annotations of BaseStatefulProcessor.

stateful_op(state, message)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], TypeVar(MessageOutType)]

Parameters:
  • state (tuple[StateType, int] | None)

  • message (MessageInType)

class Stateful[source]#

Bases: ABC, Generic[StateType]

Mixin class for stateful processors. DO NOT use this class directly. Used to enforce that the processor/producer has a state attribute and stateful_op method.

classmethod get_state_type()[source]#
Return type:

type[TypeVar(StateType)]

property state: StateType#
abstractmethod stateful_op(*args, **kwargs)[source]#
Return type:

tuple

Parameters:
class Stateful[source]#

Bases: ABC, Generic[StateType]

Mixin class for stateful processors. DO NOT use this class directly. Used to enforce that the processor/producer has a state attribute and stateful_op method.

classmethod get_state_type()[source]#
Return type:

type[TypeVar(StateType)]

property state: StateType#
abstractmethod stateful_op(*args, **kwargs)[source]#
Return type:

tuple

Parameters:
class BaseStatefulProcessor(*args, **kwargs)[source]#

Bases: BaseProcessor[SettingsType, MessageInType, MessageOutType], Stateful[StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

Base class implementing common stateful processor functionality. You probably do not want to inherit from this class directly. Refer instead to the more specific base classes. Use BaseStatefulConsumer for operations that do not return a result, or BaseStatefulTransformer for operations that do return a result.

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state, message)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], Any]

Parameters:
class BaseStatefulProducer(*args, **kwargs)[source]#

Bases: BaseProducer[SettingsType, MessageOutType], Stateful[StateType], ABC, Generic[SettingsType, MessageOutType, StateType]

Base class implementing common stateful producer functionality.

Examples of stateful producers are things that require counters, clocks, or to cycle through a set of values.

Unlike BaseStatefulProcessor, this class does not message hashing because there

are no input messages. We still use self._hash to simply track the transition from initialization (.hash == -1) to state reset (.hash == 0).

__init__(*args, **kwargs)[source]#
Return type:

None

stateful_op(state)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], TypeVar(MessageOutType)]

Parameters:

state (tuple[StateType, int] | None)

class BaseStatefulConsumer(*args, **kwargs)[source]#

Bases: BaseStatefulProcessor[SettingsType, MessageInType, None, StateType], ABC, Generic[SettingsType, MessageInType, StateType]

Base class for stateful message consumers that don’t produce output. This class merely overrides the type annotations of BaseStatefulProcessor.

classmethod get_message_type(dir)[source]#
Return type:

type[TypeVar(MessageInType)] | None

Parameters:

dir (str)

stateful_op(state, message)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], None]

Parameters:
  • state (tuple[StateType, int] | None)

  • message (MessageInType)

class BaseStatefulTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulProcessor[SettingsType, MessageInType, MessageOutType, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

Base class for stateful message transformers that produce output. This class merely overrides the type annotations of BaseStatefulProcessor.

stateful_op(state, message)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], TypeVar(MessageOutType)]

Parameters:
  • state (tuple[StateType, int] | None)

  • message (MessageInType)

class BaseAdaptiveTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SettingsType, MessageInType | SampleMessage, MessageOutType | None, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

abstractmethod partial_fit(message)[source]#
Return type:

None

Parameters:

message (SampleMessage)

async apartial_fit(message)[source]#

Override me if you need async partial fitting.

Return type:

None

Parameters:

message (SampleMessage)

class BaseAsyncTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SettingsType, MessageInType, MessageOutType, StateType], ABC, Generic[SettingsType, MessageInType, MessageOutType, StateType]

This reverses the priority of async and sync methods from BaseStatefulTransformer. Whereas in BaseStatefulTransformer, the async methods simply called the sync methods, here the sync methods call the async methods, more similar to BaseStatefulProducer.