Components - Units and Collections#

An ezmsg pipeline is created from a few basic components. ezmsg provides a framework for you to define your own graphs using its building blocks. The nodes of the graph that defines a pipeline all inherit from the base class Component. The two types of Component are Unit and Collection.

Note

It is convention to import ezmsg.core as ez and then use this shorthand in your code. e.g.,

class MyUnit(ez.Unit):
   ...

Component#

class ezmsg.core.Component(*args, settings=None, **kwargs)[source]#

Metaclass which Unit and Collection inherit from.

The Component class provides the foundation for all components in the ezmsg framework, including Units and Collections. It manages settings, state, streams, and provides the basic infrastructure for message-passing components.

Parameters:

settings (Settings | None) – Optional settings object for component configuration

Note

When creating ezmsg nodes, inherit directly from Unit or Collection.

Unit#

The basic nodes of an ezmsg pipeline graph are Units.

class ezmsg.core.Unit(*args, settings=None, **kwargs)[source]#

Bases: Component

Represents a single step in the graph.

Units can subscribe, publish, and have tasks. Units are the fundamental building blocks of ezmsg applications that perform actual computation and message processing. To create a Unit, inherit from the Unit class.

Parameters:

settings (Settings | None) – Optional settings object for unit configuration

property address: str[source]#

Get the full address of this object.

The address is constructed by joining the location path and name with forward slashes, similar to a filesystem path.

Returns:

The full address string

Return type:

str

apply_settings(settings)[source]#

Update the Component’s Settings object.

This method applies configuration settings to the component. Settings must be applied before the component can be properly initialized and used.

Parameters:

settings (Settings) – An instance of the class-specific Settings

Return type:

None

property components: dict[str, Component][source]#

Get the dictionary of child components for this component.

Returns:

Dictionary mapping component names to their Component objects

Return type:

dict[str, Component]

async initialize()[source]#

Runs when the Unit is instantiated.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should initialize your unit’s state and prepare for message processing.

Return type:

None

property location: list[str][source]#

Get the location path of this addressable object.

Returns:

List of path components representing the object’s location

Return type:

list[str]

Raises:

AssertionError – If location has not been set

property main: Callable[[...], None] | None[source]#

Get the main function for this component.

Returns:

The main callable function, or None if not set

Return type:

Callable[…, None] | None

property name: str[source]#

Get the name of this addressable object.

Returns:

The object’s name

Return type:

str

Raises:

AssertionError – If name has not been set

async setup()[source]#

This is called from within the same process this unit will live

Return type:

None

async shutdown()[source]#

Runs when the Unit terminates.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should clean up resources and perform any necessary shutdown procedures.

Return type:

None

property streams: dict[str, Stream][source]#

Get the dictionary of streams for this component.

Returns:

Dictionary mapping stream names to their Stream objects

Return type:

dict[str, Stream]

property tasks: dict[str, Callable][source]#

Get the dictionary of tasks for this component.

Returns:

Dictionary mapping task names to their callable functions

Return type:

dict[str, Callable]

property threads: dict[str, Callable][source]#

Get the dictionary of thread functions for this component.

Returns:

Dictionary mapping thread names to their callable functions

Return type:

dict[str, Callable]

Collection#

A Collection is a special type of Component that contains other Components (Units and/or other Collections).

ezmsg.core.NetworkDefinition[source]#

alias of Iterable[tuple[Stream | str, Stream | str]]

class ezmsg.core.Collection(*args, settings=None, **kwargs)[source]#

Bases: Component

Connects Units together by defining a graph which connects OutputStreams to InputStreams.

Collections are composite components that contain and coordinate multiple Units, defining how they communicate through stream connections.

Parameters:

settings (Settings | None) – Optional settings object for collection configuration

configure()[source]#

A lifecycle hook that runs when the Collection is instantiated.

This is the best place to call Unit.apply_settings() on each member Unit of the Collection. Override this method to perform collection-specific configuration of child components.

Return type:

None

network()[source]#

Override this method and have the definition return a NetworkDefinition which defines how InputStreams and OutputStreams from member Units will be connected.

The NetworkDefinition specifies the message routing between components by connecting output streams to input streams.

Returns:

Network definition specifying stream connections

Return type:

NetworkDefinition

process_components()[source]#

Override this method and have the definition return a tuple which contains Units and Collections which should run in their own processes.

This method allows you to specify which components should be isolated in separate processes for performance or isolation requirements.

Returns:

Collection of components that should run in separate processes

Return type:

Collection[Component]

Component Interaction#

Two fundamental attributes of a Component are its Settings and State, both of which are optional but initialised by the ezmsg backend during Unit initialisation (if present).

class ezmsg.core.Settings[source]#

To pass parameters into a Component, inherit from Settings.

class YourSettings(Settings):
   setting1: int
   setting2: float

To use, declare the Settings object for a Component as a member variable called (all-caps!) SETTINGS. ezmsg will monitor the variable called SETTINGS in the background, so it is important to name it correctly.

class YourUnit(Unit):

   SETTINGS = YourSettings

A Unit can accept a Settings object as a parameter on instantiation.

class YourCollection(Collection):

   YOUR_UNIT = YourUnit(
      YourSettings(
         setting1: int,
         setting2: float
      )
   )

Note

Settings uses type hints to define member variables, but does not enforce type checking.

class ezmsg.core.State[source]#

States are mutable dataclasses that are instantiated by the Unit in its home process.

To track a mutable state for a Component, inherit from State.

class YourState(State):
   state1: int
   state2: float

To use, declare the State object for a Component as a member variable called (all-caps!) STATE. ezmsg will monitor the variable called STATE in the background, so it is important to name it correctly.

Member functions can then access and mutate STATE as needed during function execution. It is recommended to initialize state values inside the initialize() or configure() lifecycle hooks if defaults are not defined.

class YourUnit(Unit):

   STATE = YourState

   async def initialize(self):
      this.STATE.state1 = 0
      this.STATE.state2 = 0.0

Note

State uses type hints to define member variables, but does not enforce type checking.

Stream#

Facilitates a flow of Messages into or out of a Component.

class ezmsg.core.InputStream(msg_type)[source]#

Can be added to any Component as a member variable. Methods may subscribe to it.

InputStream represents a channel that receives messages from other components. Units can subscribe to InputStreams to process incoming messages.

Parameters:

msg_type (Any) – The type of messages this input stream will receive

class ezmsg.core.OutputStream(msg_type, host=None, port=None, num_buffers=32, buf_size=65536, force_tcp=False)[source]#

Can be added to any Component as a member variable. Methods may publish to it.

OutputStream represents a channel that sends messages to other components. Units can publish to OutputStreams to send messages through the system.

Parameters:
  • msg_type (Any) – The type of messages this output stream will send

  • host (str | None) – Optional host address for network publishing

  • port (int | None) – Optional port number for network publishing

  • num_buffers (int) – Number of message buffers to allocate (default: 32)

  • buf_size (int) – Size of each message buffer in bytes

  • force_tcp (bool) – Whether to force TCP transport instead of shared memory

Custom Exceptions#

These are custom exceptions defined in ezmsg.

class ezmsg.core.Complete[source]#

A type of Exception raised by Unit methods, which signals to ezmsg that the function can be shut down gracefully.

If all functions in all Units raise Complete, the entire pipeline will terminate execution. This exception is used to signal normal completion of processing tasks.

Note

This exception indicates successful completion, not an error condition.

class ezmsg.core.NormalTermination[source]#

A type of Exception which signals to ezmsg that the pipeline can be shut down gracefully.

This exception is used to indicate that the system should terminate normally, typically when all processing is complete or when a graceful shutdown is requested.

Note

This exception indicates normal termination, not an error condition.