ezmsg.baseproc.protocols#

Protocol definitions and type variables for ezmsg processors.

Classes

class AdaptiveTransformer(*args, **kwargs)[source]#

Bases: StatefulTransformer, Protocol

partial_fit(message)[source]#

Update transformer state using labeled training data.

This method should update the internal state/parameters of the transformer based on the provided labeled samples, without performing any transformation.

Return type:

None

Parameters:

message (SampleMessage)

async apartial_fit(message)[source]#
Return type:

None

Parameters:

message (SampleMessage)

class Consumer(*args, **kwargs)[source]#

Bases: Processor[SettingsType, MessageInType, None], Protocol

Protocol for consumers that receive messages but do not return a result.

class Processor(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageInType, MessageOutType]

Protocol for processors. You probably will not implement this protocol directly. Refer instead to the less ambiguous Consumer and Transformer protocols, and the base classes in this module which implement them.

Note: In Python 3.12+, we can invoke __acall__ directly using await obj(message),

but to support earlier versions we need to use await obj.__acall__(message).

__init__(*args, **kwargs)#
class Producer(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageOutType]

Protocol for producers that generate messages.

__init__(*args, **kwargs)#
class StatefulConsumer(*args, **kwargs)[source]#

Bases: StatefulProcessor[SettingsType, MessageInType, None, StateType], Protocol

Protocol specifically for processors that consume messages without producing output.

stateful_op(state, message)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], None]

Parameters:
  • state (tuple[StateType, int])

  • message (MessageInType)

class StatefulProcessor(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageInType, MessageOutType, StateType]

Base protocol for _stateful_ message processors. You probably will not implement this protocol directly. Refer instead to the less ambiguous StatefulConsumer and StatefulTransformer protocols.

property state: StateType#
stateful_op(state, message)[source]#
Return type:

tuple[Any, Any]

Parameters:
__init__(*args, **kwargs)#
class StatefulProducer(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageOutType, StateType]

Protocol for producers that generate messages without consuming inputs.

property state: StateType#
stateful_op(state)[source]#
Return type:

tuple[Any, Any]

Parameters:

state (Any)

__init__(*args, **kwargs)#
class StatefulTransformer(*args, **kwargs)[source]#

Bases: StatefulProcessor[SettingsType, MessageInType, MessageOutType, StateType], Protocol

Protocol specifically for processors that transform messages.

stateful_op(state, message)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], TypeVar(MessageOutType)]

Parameters:
  • state (tuple[StateType, int])

  • message (MessageInType)

class Transformer(*args, **kwargs)[source]#

Bases: Processor[SettingsType, MessageInType, MessageOutType], Protocol

Protocol for transformers that receive messages and return a result of the same class.

class Processor(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageInType, MessageOutType]

Protocol for processors. You probably will not implement this protocol directly. Refer instead to the less ambiguous Consumer and Transformer protocols, and the base classes in this module which implement them.

Note: In Python 3.12+, we can invoke __acall__ directly using await obj(message),

but to support earlier versions we need to use await obj.__acall__(message).

__init__(*args, **kwargs)#
class Producer(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageOutType]

Protocol for producers that generate messages.

__init__(*args, **kwargs)#
class Consumer(*args, **kwargs)[source]#

Bases: Processor[SettingsType, MessageInType, None], Protocol

Protocol for consumers that receive messages but do not return a result.

class Transformer(*args, **kwargs)[source]#

Bases: Processor[SettingsType, MessageInType, MessageOutType], Protocol

Protocol for transformers that receive messages and return a result of the same class.

class StatefulProcessor(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageInType, MessageOutType, StateType]

Base protocol for _stateful_ message processors. You probably will not implement this protocol directly. Refer instead to the less ambiguous StatefulConsumer and StatefulTransformer protocols.

property state: StateType#
stateful_op(state, message)[source]#
Return type:

tuple[Any, Any]

Parameters:
__init__(*args, **kwargs)#
class StatefulProducer(*args, **kwargs)[source]#

Bases: Protocol[SettingsType, MessageOutType, StateType]

Protocol for producers that generate messages without consuming inputs.

property state: StateType#
stateful_op(state)[source]#
Return type:

tuple[Any, Any]

Parameters:

state (Any)

__init__(*args, **kwargs)#
class StatefulConsumer(*args, **kwargs)[source]#

Bases: StatefulProcessor[SettingsType, MessageInType, None, StateType], Protocol

Protocol specifically for processors that consume messages without producing output.

stateful_op(state, message)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], None]

Parameters:
  • state (tuple[StateType, int])

  • message (MessageInType)

class StatefulTransformer(*args, **kwargs)[source]#

Bases: StatefulProcessor[SettingsType, MessageInType, MessageOutType, StateType], Protocol

Protocol specifically for processors that transform messages.

stateful_op(state, message)[source]#
Return type:

tuple[tuple[TypeVar(StateType), int], TypeVar(MessageOutType)]

Parameters:
  • state (tuple[StateType, int])

  • message (MessageInType)

class AdaptiveTransformer(*args, **kwargs)[source]#

Bases: StatefulTransformer, Protocol

partial_fit(message)[source]#

Update transformer state using labeled training data.

This method should update the internal state/parameters of the transformer based on the provided labeled samples, without performing any transformation.

Return type:

None

Parameters:

message (SampleMessage)

async apartial_fit(message)[source]#
Return type:

None

Parameters:

message (SampleMessage)