Components - Units and Collections#
An ezmsg pipeline is created from a few basic components. ezmsg provides a framework for you to define your own graphs using its building blocks. The nodes of the graph that defines a pipeline all inherit from the base class Component. The two types of Component are Unit and Collection.
Note
It is convention to import ezmsg.core as ez and then use this shorthand in your code. e.g.,
class MyUnit(ez.Unit):
...
Component#
- class ezmsg.core.Component(*args, settings=None, **kwargs)[source]#
Metaclass which
UnitandCollectioninherit from.The Component class provides the foundation for all components in the ezmsg framework, including Units and Collections. It manages settings, state, streams, and provides the basic infrastructure for message-passing components.
- Parameters:
settings (Settings | None) – Optional settings object for component configuration
Note
When creating ezmsg nodes, inherit directly from
UnitorCollection.
Unit#
The basic nodes of an ezmsg pipeline graph are Units.
- class ezmsg.core.Unit(*args, settings=None, **kwargs)[source]#
Bases:
ComponentRepresents a single step in the graph.
Units can subscribe, publish, and have tasks. Units are the fundamental building blocks of ezmsg applications that perform actual computation and message processing. To create a Unit, inherit from the Unit class.
- Parameters:
settings (Settings | None) – Optional settings object for unit configuration
- property address: str[source]#
Get the full address of this object.
The address is constructed by joining the location path and name with forward slashes, similar to a filesystem path.
- Returns:
The full address string
- Return type:
- apply_settings(settings)[source]#
Update the Component’s Settings object.
This method applies configuration settings to the component. Settings must be applied before the component can be properly initialized and used.
- property components: dict[str, Component][source]#
Get the dictionary of child components for this component.
- async initialize()[source]#
Runs when the Unit is instantiated.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should initialize your unit’s state and prepare for message processing.
- Return type:
- property location: list[str][source]#
Get the location path of this addressable object.
- Returns:
List of path components representing the object’s location
- Return type:
- Raises:
AssertionError – If location has not been set
- property main: Callable[[...], None] | None[source]#
Get the main function for this component.
- Returns:
The main callable function, or None if not set
- Return type:
Callable[…, None] | None
- property name: str[source]#
Get the name of this addressable object.
- Returns:
The object’s name
- Return type:
- Raises:
AssertionError – If name has not been set
- async shutdown()[source]#
Runs when the Unit terminates.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should clean up resources and perform any necessary shutdown procedures.
- Return type:
Collection#
A Collection is a special type of Component that contains other Components (Units and/or other Collections).
- class ezmsg.core.Collection(*args, settings=None, **kwargs)[source]#
Bases:
ComponentConnects
Unitstogether by defining a graph which connects OutputStreams to InputStreams.Collections are composite components that contain and coordinate multiple Units, defining how they communicate through stream connections.
- Parameters:
settings (Settings | None) – Optional settings object for collection configuration
- configure()[source]#
A lifecycle hook that runs when the Collection is instantiated.
This is the best place to call
Unit.apply_settings()on each member Unit of the Collection. Override this method to perform collection-specific configuration of child components.- Return type:
- network()[source]#
Override this method and have the definition return a NetworkDefinition which defines how InputStreams and OutputStreams from member Units will be connected.
The NetworkDefinition specifies the message routing between components by connecting output streams to input streams.
- Returns:
Network definition specifying stream connections
- Return type:
NetworkDefinition
- process_components()[source]#
Override this method and have the definition return a tuple which contains Units and Collections which should run in their own processes.
This method allows you to specify which components should be isolated in separate processes for performance or isolation requirements.
- Returns:
Collection of components that should run in separate processes
- Return type:
Component Interaction#
Two fundamental attributes of a Component are its Settings and State, both of which are optional but initialised by the ezmsg backend during Unit initialisation (if present).
- class ezmsg.core.Settings[source]#
To pass parameters into a
Component, inherit fromSettings.class YourSettings(Settings): setting1: int setting2: float
To use, declare the
Settingsobject for aComponentas a member variable called (all-caps!)SETTINGS. ezmsg will monitor the variable calledSETTINGSin the background, so it is important to name it correctly.class YourUnit(Unit): SETTINGS = YourSettings
A
Unitcan accept aSettingsobject as a parameter on instantiation.class YourCollection(Collection): YOUR_UNIT = YourUnit( YourSettings( setting1: int, setting2: float ) )
Note
Settingsuses type hints to define member variables, but does not enforce type checking.
- class ezmsg.core.State[source]#
States are mutable dataclasses that are instantiated by the Unit in its home process.
To track a mutable state for a
Component, inherit fromState.class YourState(State): state1: int state2: float
To use, declare the
Stateobject for aComponentas a member variable called (all-caps!)STATE. ezmsg will monitor the variable calledSTATEin the background, so it is important to name it correctly.Member functions can then access and mutate
STATEas needed during function execution. It is recommended to initialize state values inside theinitialize()orconfigure()lifecycle hooks if defaults are not defined.class YourUnit(Unit): STATE = YourState async def initialize(self): this.STATE.state1 = 0 this.STATE.state2 = 0.0
Note
Stateuses type hints to define member variables, but does not enforce type checking.
Stream#
Facilitates a flow of Messages into or out of a Component.
- class ezmsg.core.InputStream(msg_type)[source]#
Can be added to any Component as a member variable. Methods may subscribe to it.
InputStream represents a channel that receives messages from other components. Units can subscribe to InputStreams to process incoming messages.
- Parameters:
msg_type (Any) – The type of messages this input stream will receive
- class ezmsg.core.OutputStream(msg_type, host=None, port=None, num_buffers=32, buf_size=65536, force_tcp=False)[source]#
Can be added to any Component as a member variable. Methods may publish to it.
OutputStream represents a channel that sends messages to other components. Units can publish to OutputStreams to send messages through the system.
- Parameters:
msg_type (Any) – The type of messages this output stream will send
host (str | None) – Optional host address for network publishing
port (int | None) – Optional port number for network publishing
num_buffers (int) – Number of message buffers to allocate (default: 32)
buf_size (int) – Size of each message buffer in bytes
force_tcp (bool) – Whether to force TCP transport instead of shared memory
Custom Exceptions#
These are custom exceptions defined in ezmsg.
- class ezmsg.core.Complete[source]#
A type of Exception raised by Unit methods, which signals to ezmsg that the function can be shut down gracefully.
If all functions in all Units raise Complete, the entire pipeline will terminate execution. This exception is used to signal normal completion of processing tasks.
Note
This exception indicates successful completion, not an error condition.
- class ezmsg.core.NormalTermination[source]#
A type of Exception which signals to ezmsg that the pipeline can be shut down gracefully.
This exception is used to indicate that the system should terminate normally, typically when all processing is complete or when a graceful shutdown is requested.
Note
This exception indicates normal termination, not an error condition.