Utility Units and Classes#

Utility ezmsg Units and associated classes for ezmsg pipelines. Currently includes the following Units:

  • DebugLog: An ezmsg Unit for logging message debug information. Convenient way to print message contents without modifying other units.

  • TerminateOnTimeout: An ezmsg Unit that terminates the pipeline after a specified timeout period.

  • TerminateOnTotal: An ezmsg Unit that terminates the pipeline after a specified total number of messages have been processed.

  • MessageGate: An ezmsg Unit that blocks messages based on a condition: open, open after N messages, closed, or closed after N messages.

  • MessageLogger: An ezmsg Unit that logs all messages passing through it to a file.

  • MessageQueue: An ezmsg Unit that is placed between two other Units to induce backpressure.

  • MessageCollector: An ezmsg Unit that collects messages into a local list.

  • MessageReplay: An ezmsg Unit that streams messages from MessageLogger created files.

and utility classes for encoding/decoding messages:

  • MessageDecoder: Utility class that decodes ezmsg messages from their JSON representation.

  • MessageEncoder: Utility class that encodes ezmsg messages to their JSON representation.

DebugLog#

class ezmsg.util.debuglog.DebugLog(*args, settings=None, **kwargs)[source]#

Bases: Unit

Logs messages that pass through.

INPUT = InputStream:unlocated[typing.Any]()[source]#

Send messages to log here.

OUTPUT = OutputStream:unlocated[typing.Any](self.num_buffers=32, self.force_tcp=False)[source]#

Send messages back out to continue through the graph.

SETTINGS[source]#

alias of DebugLogSettings

class ezmsg.util.debuglog.DebugLogSettings(name='DEBUG', max_length=400)[source]#

Bases: Settings

Settings class associated with DebugLog

Parameters:
  • name (str) – Useful name for the logger. The name is included in the logstring so that if multiple DebugLogs are used in one pipeline, their messages can be differentiated.

  • max_length (int | None) – Sets a maximum number of chars which will be printed from the message. If the message is longer, the log message will be truncated.

Terminate#

class ezmsg.util.terminate.TerminateOnTimeout(*args, settings=None, **kwargs)[source]#

Bases: Unit

End a pipeline execution when a certain amount of time has passed without receiving a message.

INPUT = InputStream:unlocated[typing.Any]()[source]#

Send messages here.

SETTINGS[source]#

alias of TerminateOnTimeoutSettings

STATE[source]#

alias of TerminateOnTimeoutState

class ezmsg.util.terminate.TerminateOnTimeoutSettings(time=2.0, poll_rate=4.0)[source]#

Bases: Settings

Settings for TerminateOnTimeout Unit.

Parameters:
  • time (float) – Terminate if no message has been received in this time (sec)

  • poll_rate (float) – Hz.

class ezmsg.util.terminate.TerminateOnTimeoutState[source]#

Bases: State

class ezmsg.util.terminate.TerminateOnTotal(*args, settings=None, **kwargs)[source]#

Bases: Unit

End a pipeline execution once a certain number of messages have been received.

INPUT_MESSAGE = InputStream:unlocated[typing.Any]()[source]#

Send messages here.

INPUT_TOTAL = InputStream:unlocated[<class 'int'>]()[source]#

Change the total number of messages to terminate after. If this number has already been reached, termination will occur immediately.

SETTINGS[source]#

alias of TerminateOnTotalSettings

STATE[source]#

alias of TerminateOnTotalState

async initialize()[source]#

Runs when the Unit is instantiated.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should initialize your unit’s state and prepare for message processing.

Return type:

None

maybe_terminate()[source]#

Check if termination conditions are met and raise NormalTermination if so.

class ezmsg.util.terminate.TerminateOnTotalSettings(total=None)[source]#

Bases: Settings

Settings for TerminateOnTotal Unit.

Parameters:

total (int | None) – The total number of messages to terminate after.

class ezmsg.util.terminate.TerminateOnTotalState[source]#

Bases: State

MessageReplay#

class ezmsg.util.messagereplay.FileReplayMessage(filename=None, rate=None)[source]#

Bases: object

Add a file to the queue.

Parameters:
  • filename (Path | None) – The path of the file to replay.

  • rate (float | None) – in Hertz at which the messages will be published. 0 = realtime (if timestamps in file) If not specified, messages will publish as fast as possible.

class ezmsg.util.messagereplay.MessageCollector(*args, settings=None, **kwargs)[source]#

Bases: Unit

Collects Messages into a local list.

INPUT_MESSAGE = InputStream:unlocated[typing.Any]()[source]#

Send messages here to be collected.

OUTPUT_MESSAGE = OutputStream:unlocated[typing.Any](self.num_buffers=32, self.force_tcp=False)[source]#

Messages will pass straight through after being recorded and be published here.

STATE[source]#

alias of MessageCollectorState

property messages: list[Any][source]#

Access the list of messages.

Returns:

A list of messages which have been collected.

class ezmsg.util.messagereplay.MessageCollectorState[source]#

Bases: State

class ezmsg.util.messagereplay.MessageReplay(*args, settings=None, **kwargs)[source]#

Bases: Unit

Stream messages from files created by MessageLogger. Stores a queue of files to stream and streams from them in order.

INPUT_FILE = InputStream:unlocated[<class 'ezmsg.util.messagereplay.FileReplayMessage'>]()[source]#

Add a new file to the queue.

INPUT_PAUSED = InputStream:unlocated[<class 'bool'>]()[source]#

Send True to pause the stream, False to restart the stream.

INPUT_STOP = InputStream:unlocated[<class 'bool'>]()[source]#

Stop the stream. Send True to also clear the queue. Send False to reset to the beginning of the current file.

OUTPUT_MESSAGE = OutputStream:unlocated[typing.Any](self.num_buffers=32, self.force_tcp=False)[source]#

The output on which the messages from the files will be streamed.

OUTPUT_REPLAY_STATUS = OutputStream:unlocated[<class 'ezmsg.util.messagereplay.ReplayStatusMessage'>](self.num_buffers=32, self.force_tcp=False)[source]#

Publishes status messages.

OUTPUT_TOTAL = OutputStream:unlocated[<class 'int'>](self.num_buffers=32, self.force_tcp=False)[source]#

Publishes an integer total of messages which have been published on OUTPUT_MESSAGE from a single file. Resets when a file completes.

SETTINGS[source]#

alias of MessageReplaySettings

STATE[source]#

alias of MessageReplayState

async initialize()[source]#

Runs when the Unit is instantiated.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should initialize your unit’s state and prepare for message processing.

Return type:

None

class ezmsg.util.messagereplay.MessageReplaySettings(filename=None, rate=None, progress=False)[source]#

Bases: Settings, FileReplayMessage

Settings for MesssageReplay Unit.

Parameters:

progress (bool) – will use tqdm to indicate progress through the file. tqdm must be installed.

class ezmsg.util.messagereplay.MessageReplayState[source]#

Bases: State

class ezmsg.util.messagereplay.ReplayStatusMessage(filename, idx, total, done=False)[source]#

Bases: object

Message which gives the status of a file replay.

Parameters:
  • filename (Path) – The name of the file currently being replayed.

  • idx (int) – The line number of the message that was just published.

  • total (int) – Number of messages in the file.

  • done (bool) – Whether the file has finished replaying.

MessageGate#

class ezmsg.util.messagegate.GateMessage(open)[source]#

Bases: object

Send this message to INPUT_GATE to open or close the gate.

Parameters:

open (bool) – True to open the gate (allow messages), False to close it (discard messages)

class ezmsg.util.messagegate.MessageGate(*args, settings=None, **kwargs)[source]#

Bases: Unit

Blocks Messages from continuing through the system. Can be set as open, closed, open after n messages, or closed after n messages.

INPUT = InputStream:unlocated[typing.Any]()[source]#

Messages which will flow through or be discarded, depending on gate status.

INPUT_GATE = InputStream:unlocated[<class 'ezmsg.util.messagegate.GateMessage'>]()[source]#

Stop or start message flow. If GateMessage.open == True, messages will flow through. If GateMessage.open == False, messages will be discarded.

OUTPUT = OutputStream:unlocated[typing.Any](self.num_buffers=32, self.force_tcp=False)[source]#

Publishes messages which flow through.

SETTINGS[source]#

alias of MessageGateSettings

STATE[source]#

alias of MessageGateState

async initialize()[source]#

Runs when the Unit is instantiated.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should initialize your unit’s state and prepare for message processing.

Return type:

None

set_gate(set_open)[source]#

Set the gate open/closed state and reset message counter.

Parameters:

set_open (bool) – True to open gate, False to close it

Return type:

None

class ezmsg.util.messagegate.MessageGateSettings(start_open=False, default_open=False, default_after=None)[source]#

Bases: Settings

Settings for MessageGate unit.

Parameters:
  • start_open (bool) – sets the gate’s initial state to allow messages to flow through or be discarded. True will allow messages to flow through initially, False will discard messages initially.

  • default_open (bool) – sets the gate’s behavior after the default_after number of messages have flowed through. True will allow messages to flow through, False will discard messages.

  • default_after (int | None) – sets the number of messages after which the default_open state will be applied.

class ezmsg.util.messagegate.MessageGateState[source]#

Bases: State

MessageLogger#

class ezmsg.util.messagelogger.MessageLogger(*args, settings=None, **kwargs)[source]#

Bases: Unit

Logs all messages it receives to a file. File path can be set in SETTINGS or set dynamically by passing a pathlib.Path to INPUT_START.

INPUT_MESSAGE = InputStream:unlocated[typing.Any]()[source]#

Pass a piece of data to log it to every open file which the MessageLogger is using.

INPUT_START = InputStream:unlocated[<class 'pathlib.Path'>]()[source]#

Pass a pathlib.Path to begin logging messages to that path. If the file path already exists, the existing file will be truncated to 0 length. If the file is already open, nothing will happen.

INPUT_STOP = InputStream:unlocated[<class 'pathlib.Path'>]()[source]#

Pass a pathlib.Path to stop logging messages to that path.

OUTPUT_MESSAGE = OutputStream:unlocated[typing.Any](self.num_buffers=32, self.force_tcp=False)[source]#

Messages which are sent to INPUT_MESSAGE will pass through and be published on OUTPUT_MESSAGE.

OUTPUT_START = OutputStream:unlocated[<class 'pathlib.Path'>](self.num_buffers=32, self.force_tcp=False)[source]#

If a file passed to INPUT_START is successfully opened, its path will be published to OUTPUT_START, otherwise None.

OUTPUT_STOP = OutputStream:unlocated[<class 'pathlib.Path'>](self.num_buffers=32, self.force_tcp=False)[source]#

If a file passed to INPUT_STOP is successfully closed, its path will be published to OUTPUT_STOP, otherwise None.

SETTINGS[source]#

alias of MessageLoggerSettings

STATE[source]#

alias of MessageLoggerState

close_file(filepath)[source]#

Close a file that was being used for message logging.

Parameters:

filepath (Path) – Path to the file to close

Returns:

File path if file successfully closed, otherwise None

Return type:

Path | None

async initialize()[source]#

Note that files defined at startup are not published to outputs

Return type:

None

open_file(filepath)[source]#

Open a file for message logging.

Parameters:

filepath (Path) – Path to the file to open

Returns:

File path if file successfully opened, otherwise None

Return type:

Path | None

async shutdown()[source]#

Note that files that are closed at shutdown don’t publish messages

Return type:

None

class ezmsg.util.messagelogger.MessageLoggerSettings(output=None, write_period=0.0)[source]#

Bases: Settings

Settings for MessageLogger Unit.

Parameters:
  • output (Path | None) – pathlib.Path for a file where the messages will be logged. If the file path already exists, the existing file will be truncated to 0 length.

  • write_period (float) – Period in seconds for performing a write to disk. If <=0, write incoming messages to disk immediately, applying backpressure as needed. If >0, messages are buffered and periodically written to disk in a separate task

class ezmsg.util.messagelogger.MessageLoggerState[source]#

Bases: State

ezmsg.util.messagelogger.log_object(obj)[source]#

Convert an object to a JSON string with timestamp for logging.

Parameters:

obj (Any) – Object to convert to log string

Returns:

JSON string containing timestamp and object

Return type:

str

MessageQueue#

class ezmsg.util.messagequeue.MessageQueue(*args, settings=None, **kwargs)[source]#

Bases: Unit

Place between two other Units to induce backpressure.

INPUT = InputStream:unlocated[typing.Any]()[source]#

Send messages to queue here.

OUTPUT = OutputStream:unlocated[typing.Any](self.num_buffers=32, self.force_tcp=False)[source]#

Subscribe to pull messages out of the queue.

SETTINGS[source]#

alias of MessageQueueSettings

STATE[source]#

alias of MessageQueueState

async initialize()[source]#

Runs when the Unit is instantiated.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should initialize your unit’s state and prepare for message processing.

class ezmsg.util.messagequeue.MessageQueueSettings(maxsize=0, leaky=False, log_above_n=None, output_hz=None)[source]#

Bases: Settings

Settings for MessageQueue class.

Parameters:
  • maxsize (int) – The maximum number of items which the queue will hold.

  • leaky (bool) – Whether the queue will drop new messages when it reaches its maxsize, or whether it will wait for space to open for them.

class ezmsg.util.messagequeue.MessageQueueState[source]#

Bases: State

MessageCodec#

Message encoding and decoding utilities for ezmsg logging.

This module provides JSON serialization support for complex objects including:

  • Dataclass objects with type preservation

  • NumPy arrays with efficient binary encoding

  • Arbitrary objects via pickle fallback

The MessageEncoder and MessageDecoder classes handle automatic conversion between Python objects and JSON representations suitable for file logging.

class ezmsg.util.messagecodec.LogStart[source]#

Bases: object

class ezmsg.util.messagecodec.MessageDecoder(*args, **kwargs)[source]#

Bases: JSONDecoder

JSON decoder for ezmsg messages.

Automatically reconstructs dataclasses, numpy arrays, and pickled objects from their JSON representations using the _object_hook function.

class ezmsg.util.messagecodec.MessageEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]#

Bases: JSONEncoder

JSON encoder for ezmsg messages with support for dataclasses, numpy arrays, and arbitrary objects.

This encoder extends the standard JSON encoder to handle:

  • Dataclass objects (serialized as dictionaries with type information)

  • NumPy arrays (serialized as base64-encoded data with metadata)

  • Other objects via pickle (as fallback)

default(o)[source]#

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return super().default(o)
class ezmsg.util.messagecodec.StampedMessage(msg: Any, timestamp: float | None)[source]#

Bases: NamedTuple

A message with an associated timestamp.

Parameters:
  • msg (Any) – The message object

  • timestamp (float | None) – Optional timestamp for the message

msg: Any[source]#

Alias for field number 0

timestamp: float | None[source]#

Alias for field number 1

ezmsg.util.messagecodec.import_type(typestr)[source]#

Import a type from a string representation.

Parameters:

typestr (str) – String representation in format ‘module:qualname’

Returns:

The imported type

Return type:

type

Raises:

ImportError – If typestr does not resolve to a valid type

ezmsg.util.messagecodec.message_log(fname, return_object=True)[source]#

Generator function to read messages from a log file created by MessageLogger.

Parameters:
  • fname (Path) – Path to the log file

  • return_object (bool) – If True, yield only the message objects; if False, yield complete log entries

Returns:

Generator yielding messages or log entries

Return type:

Generator[Any, None, None]

ezmsg.util.messagecodec.type_str(obj)[source]#

Get a string representation of an object’s type for serialization.

Parameters:

obj (Any) – Object to get type string for

Returns:

String representation in format ‘module:qualname’

Return type:

str