Utility Units and Classes#
Utility ezmsg Units and associated classes for ezmsg pipelines. Currently includes the following Units:
DebugLog: An ezmsg Unit for logging message debug information. Convenient way to print message contents without modifying other units.TerminateOnTimeout: An ezmsg Unit that terminates the pipeline after a specified timeout period.TerminateOnTotal: An ezmsg Unit that terminates the pipeline after a specified total number of messages have been processed.MessageGate: An ezmsg Unit that blocks messages based on a condition: open, open after N messages, closed, or closed after N messages.MessageLogger: An ezmsg Unit that logs all messages passing through it to a file.MessageQueue: An ezmsg Unit that is placed between two other Units to induce backpressure.MessageCollector: An ezmsg Unit that collects messages into a local list.MessageReplay: An ezmsg Unit that streams messages fromMessageLoggercreated files.
and utility classes for encoding/decoding messages:
MessageDecoder: Utility class that decodes ezmsg messages from their JSON representation.MessageEncoder: Utility class that encodes ezmsg messages to their JSON representation.
DebugLog#
- class ezmsg.util.debuglog.DebugLog(*args, settings=None, **kwargs)[source]#
Bases:
UnitLogs messages that pass through.
- OUTPUT = OutputStream:unlocated[typing.Any](self.num_buffers=32, self.force_tcp=False)[source]#
Send messages back out to continue through the graph.
- SETTINGS[source]#
alias of
DebugLogSettings
- class ezmsg.util.debuglog.DebugLogSettings(name='DEBUG', max_length=400)[source]#
Bases:
SettingsSettingsclass associated withDebugLog- Parameters:
name (
str) – Useful name for the logger. The name is included in the logstring so that if multiple DebugLogs are used in one pipeline, their messages can be differentiated.max_length (
int|None) – Sets a maximum number of chars which will be printed from the message. If the message is longer, the log message will be truncated.
Terminate#
- class ezmsg.util.terminate.TerminateOnTimeout(*args, settings=None, **kwargs)[source]#
Bases:
UnitEnd a pipeline execution when a certain amount of time has passed without receiving a message.
- SETTINGS[source]#
alias of
TerminateOnTimeoutSettings
- STATE[source]#
alias of
TerminateOnTimeoutState
- class ezmsg.util.terminate.TerminateOnTimeoutSettings(time=2.0, poll_rate=4.0)[source]#
Bases:
SettingsSettings for
TerminateOnTimeoutUnit.
- class ezmsg.util.terminate.TerminateOnTotal(*args, settings=None, **kwargs)[source]#
Bases:
UnitEnd a pipeline execution once a certain number of messages have been received.
- INPUT_TOTAL = InputStream:unlocated[<class 'int'>]()[source]#
Change the total number of messages to terminate after. If this number has already been reached, termination will occur immediately.
- SETTINGS[source]#
alias of
TerminateOnTotalSettings
- STATE[source]#
alias of
TerminateOnTotalState
- async initialize()[source]#
Runs when the Unit is instantiated.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should initialize your unit’s state and prepare for message processing.
- Return type:
- class ezmsg.util.terminate.TerminateOnTotalSettings(total=None)[source]#
Bases:
SettingsSettings for
TerminateOnTotalUnit.
MessageReplay#
- class ezmsg.util.messagereplay.FileReplayMessage(filename=None, rate=None)[source]#
Bases:
objectAdd a file to the queue.
- class ezmsg.util.messagereplay.MessageCollector(*args, settings=None, **kwargs)[source]#
Bases:
UnitCollects
Messagesinto a local list.- OUTPUT_MESSAGE = OutputStream:unlocated[typing.Any](self.num_buffers=32, self.force_tcp=False)[source]#
Messages will pass straight through after being recorded and be published here.
- STATE[source]#
alias of
MessageCollectorState
- class ezmsg.util.messagereplay.MessageReplay(*args, settings=None, **kwargs)[source]#
Bases:
UnitStream messages from files created by
MessageLogger. Stores a queue of files to stream and streams from them in order.- INPUT_FILE = InputStream:unlocated[<class 'ezmsg.util.messagereplay.FileReplayMessage'>]()[source]#
Add a new file to the queue.
- INPUT_PAUSED = InputStream:unlocated[<class 'bool'>]()[source]#
Send
Trueto pause the stream,Falseto restart the stream.
- INPUT_STOP = InputStream:unlocated[<class 'bool'>]()[source]#
Stop the stream. Send
Trueto also clear the queue. SendFalseto reset to the beginning of the current file.
- OUTPUT_MESSAGE = OutputStream:unlocated[typing.Any](self.num_buffers=32, self.force_tcp=False)[source]#
The output on which the messages from the files will be streamed.
- OUTPUT_REPLAY_STATUS = OutputStream:unlocated[<class 'ezmsg.util.messagereplay.ReplayStatusMessage'>](self.num_buffers=32, self.force_tcp=False)[source]#
Publishes status messages.
- OUTPUT_TOTAL = OutputStream:unlocated[<class 'int'>](self.num_buffers=32, self.force_tcp=False)[source]#
Publishes an integer total of messages which have been published on OUTPUT_MESSAGE from a single file. Resets when a file completes.
- SETTINGS[source]#
alias of
MessageReplaySettings
- STATE[source]#
alias of
MessageReplayState
- async initialize()[source]#
Runs when the Unit is instantiated.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should initialize your unit’s state and prepare for message processing.
- Return type:
- class ezmsg.util.messagereplay.MessageReplaySettings(filename=None, rate=None, progress=False)[source]#
Bases:
Settings,FileReplayMessageSettings for
MesssageReplayUnit.- Parameters:
progress (
bool) – will use tqdm to indicate progress through the file. tqdm must be installed.
MessageGate#
- class ezmsg.util.messagegate.GateMessage(open)[source]#
Bases:
objectSend this message to
INPUT_GATEto open or close the gate.- Parameters:
open (bool) – True to open the gate (allow messages), False to close it (discard messages)
- class ezmsg.util.messagegate.MessageGate(*args, settings=None, **kwargs)[source]#
Bases:
UnitBlocks
Messagesfrom continuing through the system. Can be set as open, closed, open after n messages, or closed after n messages.- INPUT = InputStream:unlocated[typing.Any]()[source]#
Messages which will flow through or be discarded, depending on gate status.
- INPUT_GATE = InputStream:unlocated[<class 'ezmsg.util.messagegate.GateMessage'>]()[source]#
Stop or start message flow. If
GateMessage.open == True, messages will flow through. IfGateMessage.open == False, messages will be discarded.
- OUTPUT = OutputStream:unlocated[typing.Any](self.num_buffers=32, self.force_tcp=False)[source]#
Publishes messages which flow through.
- SETTINGS[source]#
alias of
MessageGateSettings
- STATE[source]#
alias of
MessageGateState
- async initialize()[source]#
Runs when the Unit is instantiated.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should initialize your unit’s state and prepare for message processing.
- Return type:
- class ezmsg.util.messagegate.MessageGateSettings(start_open=False, default_open=False, default_after=None)[source]#
Bases:
SettingsSettings for
MessageGateunit.- Parameters:
start_open (
bool) – sets the gate’s initial state to allow messages to flow through or be discarded.Truewill allow messages to flow through initially,Falsewill discard messages initially.default_open (
bool) – sets the gate’s behavior after the default_after number of messages have flowed through.Truewill allow messages to flow through,Falsewill discard messages.default_after (
int|None) – sets the number of messages after which the default_open state will be applied.
MessageLogger#
- class ezmsg.util.messagelogger.MessageLogger(*args, settings=None, **kwargs)[source]#
Bases:
UnitLogs all messages it receives to a file. File path can be set in
SETTINGSor set dynamically by passing apathlib.PathtoINPUT_START.- INPUT_MESSAGE = InputStream:unlocated[typing.Any]()[source]#
Pass a piece of data to log it to every open file which the
MessageLoggeris using.
- INPUT_START = InputStream:unlocated[<class 'pathlib.Path'>]()[source]#
Pass a
pathlib.Pathto begin logging messages to that path. If the file path already exists, the existing file will be truncated to 0 length. If the file is already open, nothing will happen.
- INPUT_STOP = InputStream:unlocated[<class 'pathlib.Path'>]()[source]#
Pass a
pathlib.Pathto stop logging messages to that path.
- OUTPUT_MESSAGE = OutputStream:unlocated[typing.Any](self.num_buffers=32, self.force_tcp=False)[source]#
Messages which are sent to
INPUT_MESSAGEwill pass through and be published onOUTPUT_MESSAGE.
- OUTPUT_START = OutputStream:unlocated[<class 'pathlib.Path'>](self.num_buffers=32, self.force_tcp=False)[source]#
If a file passed to
INPUT_STARTis successfully opened, its path will be published toOUTPUT_START, otherwiseNone.
- OUTPUT_STOP = OutputStream:unlocated[<class 'pathlib.Path'>](self.num_buffers=32, self.force_tcp=False)[source]#
If a file passed to
INPUT_STOPis successfully closed, its path will be published toOUTPUT_STOP, otherwiseNone.
- SETTINGS[source]#
alias of
MessageLoggerSettings
- STATE[source]#
alias of
MessageLoggerState
- close_file(filepath)[source]#
Close a file that was being used for message logging.
- Parameters:
filepath (Path) – Path to the file to close
- Returns:
File path if file successfully closed, otherwise None
- Return type:
Path | None
- async initialize()[source]#
Note that files defined at startup are not published to outputs
- Return type:
- class ezmsg.util.messagelogger.MessageLoggerSettings(output=None, write_period=0.0)[source]#
Bases:
SettingsSettings for
MessageLoggerUnit.- Parameters:
output (
Path|None) –pathlib.Pathfor a file where the messages will be logged. If the file path already exists, the existing file will be truncated to 0 length.write_period (
float) – Period in seconds for performing a write to disk. If <=0, write incoming messages to disk immediately, applying backpressure as needed. If >0, messages are buffered and periodically written to disk in a separate task
MessageQueue#
- class ezmsg.util.messagequeue.MessageQueue(*args, settings=None, **kwargs)[source]#
Bases:
UnitPlace between two other
Unitsto induce backpressure.- OUTPUT = OutputStream:unlocated[typing.Any](self.num_buffers=32, self.force_tcp=False)[source]#
Subscribe to pull messages out of the queue.
- SETTINGS[source]#
alias of
MessageQueueSettings
- STATE[source]#
alias of
MessageQueueState
- async initialize()[source]#
Runs when the Unit is instantiated.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should initialize your unit’s state and prepare for message processing.
- class ezmsg.util.messagequeue.MessageQueueSettings(maxsize=0, leaky=False, log_above_n=None, output_hz=None)[source]#
Bases:
SettingsSettings for
MessageQueueclass.
MessageCodec#
Message encoding and decoding utilities for ezmsg logging.
This module provides JSON serialization support for complex objects including:
Dataclass objects with type preservation
NumPy arrays with efficient binary encoding
Arbitrary objects via pickle fallback
The MessageEncoder and MessageDecoder classes handle automatic conversion between Python objects and JSON representations suitable for file logging.
- class ezmsg.util.messagecodec.MessageDecoder(*args, **kwargs)[source]#
Bases:
JSONDecoderJSON decoder for ezmsg messages.
Automatically reconstructs dataclasses, numpy arrays, and pickled objects from their JSON representations using the _object_hook function.
- class ezmsg.util.messagecodec.MessageEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]#
Bases:
JSONEncoderJSON encoder for ezmsg messages with support for dataclasses, numpy arrays, and arbitrary objects.
This encoder extends the standard JSON encoder to handle:
Dataclass objects (serialized as dictionaries with type information)
NumPy arrays (serialized as base64-encoded data with metadata)
Other objects via pickle (as fallback)
- default(o)[source]#
Implement this method in a subclass such that it returns a serializable object for
o, or calls the base implementation (to raise aTypeError).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return super().default(o)
- class ezmsg.util.messagecodec.StampedMessage(msg: Any, timestamp: float | None)[source]#
Bases:
NamedTupleA message with an associated timestamp.
- Parameters:
- ezmsg.util.messagecodec.import_type(typestr)[source]#
Import a type from a string representation.
- Parameters:
typestr (str) – String representation in format ‘module:qualname’
- Returns:
The imported type
- Return type:
- Raises:
ImportError – If typestr does not resolve to a valid type