Your First ezmsg Pipeline#
If this is your first time using ezmsg, you’re in the right place. This notebook will walk you through the basics of creating a very simple ezmsg system. You can copy the code into your own Python script or Jupyter notebook environment using the helpful copy button in the top right corner of each code block.
ezmsg is ideal for creating modular processing pipelines whose steps can be arranged as a directed acyclic graph. In this notebook, we will walk through a very simple graph which generates a count of numbers, adds 1 to each number, and prints to standard output.

In ezmsg syntax, this graph would look like this:

We will write an ezmsg Unit for each discrete step of our pipeline, and then later show you how to optionally connect them together in a Collection.
First, ensure you have ezmsg installed. Please consult Installation and Configuration for installation instructions.
Note
If you are using a Jupyter notebook, you can install ezmsg directly from the notebook using the following command:
!pip install ezmsg
Next, ensure we have all the necessary imports:
import ezmsg.core as ez
from dataclasses import dataclass
from collections.abc import AsyncGenerator
Building a basic ezmsg Unit#
A very simple ezmsg Unit will be made up of the following components:
settings provided in the form of a settings class (optional, at most one)
state provided in the form of a state class (optional, at most one)
input / output streams (required, at least one input or output)
method that does the processing you desire for the Unit (required, at least one method)
As one can see from above there is a lot of flexibility in which components are used and in what quantity. In particular, one can have multiple streams and processing methods (but not settings or state) adding more functionality to each Unit.
Our very basic counting Unit will simply produce integers starting at 0. We don’t need to worry about its state but we would like to specify at what point it stops so we’ll create a settings class that contains how many numbers to generate.
class CountSettings(ez.Settings):
iterations: int
Note
We inherited from ez.Settings which is a frozen dataclass. This means that the settings are immutable after they are set, which is good practice for configuration data.
Next, we need to create a message type that the Unit will produce. This message will be sent to the next Unit in the pipeline. In this case, we will create a simple dataclass that contains an integer value. (In such a simple case, we could simply avoid creating a new class and use a simple integer, but this is a good practice for more complex messages.)
@dataclass
class CountMessage:
value: int
Now that we have the ingredients, we will create the Unit that will generate the count. Every Unit represents a node in the directed acyclic graph and should contain inputs and/or outputs and at least one function which subscribes to the inputs or publishes to the outputs.
For Count, we create an OutputStream and a publishing function which will perform the number calculation and yield CountMessages to the OutputStream.
class Count(ez.Unit):
# Settings must be provided in this way: `SETTINGS = YourSettingsClass`
# Do not instantiate the settings class here
SETTINGS = CountSettings
OUTPUT_COUNT = ez.OutputStream(CountMessage)
@ez.publisher(OUTPUT_COUNT)
async def count(self) -> AsyncGenerator:
count = 0
while count < self.SETTINGS.iterations:
yield self.OUTPUT_COUNT, CountMessage(value=count)
count = count + 1
raise ez.NormalTermination
First, notice that we inherit from ez.Unit, which is the base class for all ezmsg Units.
Next, notice that we provide the settings class as a class attribute called SETTINGS. This is how ezmsg knows what settings type to use when instantiating the Unit. Instantiation is done at the system level as we will see in the section Creating a pipeline.
Note
The SETTINGS = CountSettings format is important. It should always be in the format SETTINGS = YourSettingsClass with the only change being to replace YourSettingsClass with the name of the class.
Then, we define an output stream called OUTPUT_COUNT which will send messages of type CountMessage. OUTPUT_COUNT is the name you provide to the outgoing stream, and CountMessage is the type of messages that will be sent on this stream. As an output stream, it will be used to connect the Unit to other Units in a System.
Note
CountMessage literally contains an integer, so in this very simple case we could have simply used OUTPUT_COUNT = ez.OutputStream(int). But this is good practice for more complex messages.
The method count() is where the actual number generation occurs. From the code, we can see that it generates integer values starting at 0 for the variable count and stops once it reaches the value of iterations within the Unit’s SETTINGS attribute.
It yields the tuple (self.OUTPUT_COUNT, CountMessage(value=count)), which gives the output stream OUTPUT_COUNT a CountMessage with the value of count. But in order for it to actually publish this value to other units we decorate the method with @ez.publisher(OUTPUT_COUNT). Finally, when the Unit completes its task, we raise ez.NormalTermination, which indicates to ezmsg that this Unit is done processing.
Note
Important final note on the count() method. It is preceded by the async keyword. This indicates that the method is a coroutine function, and can be run asynchronously. This means that if it isn’t ready to yield a value it relinquishes the control to another process. The benefit is that it does not block other processes from executing allowing more parallelised execution. This is a key feature of ezmsg and recommended when working with I/O bound tasks or long-running computations.
Building a Unit with multiple streams#
The next Unit in the chain should accept a CountMessage from the first Unit, add 1 to its value, and yield a new CountMessage. To do this, we create a new Unit which contains a function which both subscribes and publishes. Analogously to when we decorated the count() method with @ez.publisher(OUTPUT_COUNT), we will decorate this function with both @ez.subscriber(<name-of-input-stream>) to indicate that it will receive messages from the input stream, and @ez.publisher(<name-of-output-stream>) to indicate that it will send messages to the output stream.
class AddOne(ez.Unit):
INPUT_COUNT = ez.InputStream(CountMessage)
OUTPUT_PLUS_ONE = ez.OutputStream(CountMessage)
@ez.subscriber(INPUT_COUNT)
@ez.publisher(OUTPUT_PLUS_ONE)
async def on_message(self, message) -> AsyncGenerator:
yield self.OUTPUT_PLUS_ONE, CountMessage(value=message.value + 1)
The subscribing function will be called anytime the Unit receives a message to the InputStream INPUT_COUNT that the function subscribes to. It will then publish the result of adding one to the OutputStream OUTPUT_PLUS_ONE.
We will connect this Unit to Count later on, when we create the full pipeline.
Another Unit - print the result#
Finally, the last unit should print the value of any messages it receives. It will only have an input stream and no output stream, as it will not send any messages to other Units.
class PrintValue(ez.Unit):
INPUT = ez.InputStream(CountMessage)
@ez.subscriber(INPUT)
async def on_message(self, message) -> None:
print(message.value)
Combining multiple Units into a Collection#
We can optionally combine Units into a single node called a Collection.
First, define the member Units. Then use configure() to apply settings to the component Units at initialisation. Finally use network() to provide an iterable of pairs that define how the Units are connected. Remember, connections are directed (one-way), so the pairs are in the form (from-Unit, to-Unit). As an example, in the first connection pair provided below, the first element is the output stream of COUNT, and the second element is the input stream of ADD_ONE.
class CountCollection(ez.Collection):
# Define member units
COUNT = Count()
ADD_ONE = AddOne()
PRINT = PrintValue()
# Use the configure function to apply settings to member Units
def configure(self) -> None:
self.COUNT.apply_settings(CountSettings(iterations=20))
# Use the network function to connect inputs and outputs of Units
def network(self) -> ez.NetworkDefinition:
return (
(self.COUNT.OUTPUT_COUNT, self.ADD_ONE.INPUT_COUNT),
(self.ADD_ONE.OUTPUT_PLUS_ONE, self.PRINT.INPUT)
)
Note
Within the configure() method, we applied settings to the Count Unit by using an in-built method called apply_settings(). This method takes an instance of the settings class and applies it to the Unit. In this case, we set the number of iterations to 20, meaning that the Count Unit will generate numbers from 0 to 19.
Note
The configure() method is optional. If you do not need to apply settings to the member Units, you can omit it. The network() method is also optional, but it is required if you want to connect the Units together in a specific way. If you do not provide a network() method, the Units will not be connected and will not communicate with each other.
Creating a pipeline#
We combined the Units above into a Collection, but this doesn’t always make sense, so let’s step back and see how to create a pipeline using just the Units we created above. Notice, that setting up the connections is very similar to how we defined network() in the Collection above.
components = {
"COUNT": Count(settings=CountSettings(iterations=10)),
"ADD_ONE": AddOne(),
"PRINT": PrintValue()
}
connections = (
(components["COUNT"].OUTPUT_COUNT, components["ADD_ONE"].INPUT_COUNT),
(components["ADD_ONE"].OUTPUT_PLUS_ONE, components["PRINT"].INPUT)
)
ez.run(components = components, connections = connections)
Note
We instantiated the Count Unit with settings directly in the components dictionary. Another way is to use apply_settings():
"COUNT": Count().apply_settings(iterations=10)
Note
The ez.run() function is the entry point to running an ezmsg system. It takes a dictionary of components (Units or Collections) and an iterable of connections (pairs of input/output streams) and runs the system. More on this in the next tutorial section.
Quick Aside - Using the Collection to Create a Pipeline#
If using a Collection, the above code is no different. Units and Collections are both seen by ezmsg as an object called a Component, so we could have simply done:
components = {"COUNT_COLLECTION": CountCollection()}
connections = ()
ez.run(components = components, connections = connections)
Warning
DO NOT have two different ez.run() calls in the same script or notebook. Only one ez.run() call is allowed per execution context. We have given you two examples of how to build a pipeline from Units, so when running the code, comment out one of the previous two code blocks (components, connections, and ez.run() calls).
How to run the pipeline?#
All the code is there now! If you want to run the pipeline, we just need to run the script. If you’re using a Jupyter notebook, you can run the code cells directly. If you’re using a Python script, save the code in a file (e.g., pipeline.py) and run it from the command line:
python3 pipeline.py
There is more to running the pipeline than this little tutorial has shown you so far, so click Next below to check out the next section on running ezmsg for more details on how to run your ezmsg pipelines and systems.