ezmsg.baseproc.util.pipeline_settings#

Pipeline-settings event producer and supporting helpers.

This module provides a generic way to capture every settings change happening inside an ezmsg pipeline and emit each one as a self-contained message that can travel from unit to unit on the graph.

Three layers:

  1. Helpers — flatten_ez_settings / flatten_component_settings / sanitize_settings_value / sanitize_settings_column_name. These project arbitrary settings objects (ez.Settings, dict, dataclass, numpy array, enum, path, …) into either dotted-key/value pairs or NWB-compatible scalar/array values. Useful to any tabular sink, not just NWB.

  2. Message — PipelineSettingsEvent carries one settings change. It mirrors the meaningful subset of ezmsg.core.SettingsChangedEvent (seq, timestamp, component_address, repr_value, structured_value) plus a table_name for tabular sinks and an event_type distinguishing the startup snapshot from later updates. flatten_for_table projects to the {"data": json_str} shape an NWBPointRow-compatible sink expects.

  3. Producer / Unit — PipelineSettingsProducer + PipelineSettingsUnit open a GraphContext, queue one INITIAL event per in-scope component (using the current settings snapshot), and stream subsequent UPDATED events from the graph server’s settings subscription.

Module Attributes

INIT_FINAL_COMPONENT_ADDRESS

Sentinel component_address value attached to the final PipelineSettingsEvent of the startup snapshot.

Functions

flatten_component_settings(component_address, value)[source]#

Flatten and sanitize a component settings payload for tabular storage.

Return type:

dict[str, Any]

Parameters:
  • component_address (str)

  • value (Any)

flatten_ez_settings(settings, prefix='')[source]#

Flatten an ezmsg settings object into dotted key/value pairs.

Return type:

dict[str, Any]

Parameters:
sanitize_settings_column_name(name)[source]#

Convert a settings field path into a tabular-safe column name.

Return type:

str

Parameters:

name (str)

sanitize_settings_value(value)[source]#

Convert a settings value into a tabular-friendly scalar or array value.

Return type:

Any

Parameters:

value (Any)

Classes

class PipelineSettingsEvent(seq, timestamp, component_address, event_type, repr_value, structured_value=None, source_session_id=None, table_name='settings_annotations')[source]#

Bases: object

One settings-change observation, ready to ship across the graph.

Mirrors the JSON-friendly portion of ezmsg.core.SettingsChangedEvent so the message survives any ezmsg transport without custom encoders. Sinks that want a tabular row call flatten_for_table(); sinks that need native column types can read structured_value directly and project with the helpers in this module.

Parameters:
seq: int#

Monotonic sequence number from the graph server.

timestamp: float#

time.monotonic() seconds when the change was observed.

The graph server stamps settings events with time.time(); the producer rebases each event onto the local monotonic clock at emit time so downstream consumers can compare these against other monotonic timestamps in the pipeline.

component_address: str#

Address of the component whose settings changed.

event_type: PipelineSettingsEventType#

INITIAL for startup-snapshot rows, UPDATED for live changes.

repr_value: dict[str, Any] | str#

Human-readable settings dump (dict for dataclass-shaped settings, repr() string fallback otherwise).

structured_value: dict[str, Any] | None = None#

Parsed structured settings dump (None if structure couldn’t be derived; falls back to repr_value).

source_session_id: str | None = None#
table_name: str = 'settings_annotations'#

Name of the target table/series the consuming sink should write into.

flatten_for_table()[source]#

Project this event into NWBPointRow-compatible columns.

Returns {"data": json_string} so the event can be appended to a pynwb.misc.AnnotationSeries (single-string-per-timestamp). The JSON payload self-describes the change with component, event_type, seq, and the structured settings snapshot.

Returns None for the INIT_FINAL_COMPONENT_ADDRESS sentinel — the event is a control marker for typed-column sinks and has no row to write in a JSON-row sink.

Return type:

Optional[dict[str, Any]]

__init__(seq, timestamp, component_address, event_type, repr_value, structured_value=None, source_session_id=None, table_name='settings_annotations')#
Parameters:
Return type:

None

class PipelineSettingsEventType(*values)[source]#

Bases: str, Enum

Distinguishes startup snapshot rows from in-flight settings changes.

INITIAL = 'INITIAL'#
UPDATED = 'UPDATED'#
class PipelineSettingsProducer(*args, **kwargs)[source]#

Bases: BaseStatefulProducer[PipelineSettingsProducerSettings, PipelineSettingsEvent, PipelineSettingsProducerState]

Stream the graph server’s settings events out as PipelineSettingsEvent.

On reset (which happens on first __acall__), the producer:

  1. Opens a GraphContext against the running graph server.

  2. Pulls the current settings snapshot and queues one INITIAL event per in-scope component.

  3. Subscribes to subsequent settings events; each one becomes an UPDATED event in the queue.

_produce returns one event per call, awaiting the queue when it’s empty. If the GraphContext can’t be opened (no server running, network issue), the producer logs a warning and the queue stays empty — the unit will simply never publish anything, matching the “best-effort” semantics of the previous in-sink approach.

class PipelineSettingsProducerSettings(target_table='settings_annotations', scope_components=None, graph_address=None)[source]#

Bases: Settings

Settings for PipelineSettingsProducer.

Parameters:
target_table: str = 'settings_annotations'#

Value to stamp onto each emitted event’s table_name field.

scope_components: tuple[str, ...] | None = None#

If set, only emit events for these component addresses. None means watch every component the graph server reports. PipelineSettingsUnit fills this in automatically by discovering its own session’s components on initialize.

graph_address: tuple[str, int] | None = None#

Address of the GraphServer to connect to. None lets GraphContext resolve the default (EZMSG_GRAPHSERVER_ADDR env var, falling back to 127.0.0.1:25978). Set explicitly when running multiple parallel ezmsg systems, or in tests where ez.run was called with a non-default graph_address.

__init__(target_table='settings_annotations', scope_components=None, graph_address=None)#
Parameters:
Return type:

None

class PipelineSettingsProducerState[source]#

Bases: object

State for PipelineSettingsProducer.

ctx: GraphContext | None = None#
queue: Queue | None = None#
watch_task: Task | None = None#
last_seq: int = 0#
scope: frozenset[str] | None = None#
initialized: bool = False#
class PipelineSettingsUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProducerUnit[PipelineSettingsProducerSettings, PipelineSettingsEvent, PipelineSettingsProducer]

Producer unit that emits one PipelineSettingsEvent per change.

Wire its OUTPUT_SIGNAL to any sink that consumes PipelineSettingsEvent (or, more loosely, any sink that accepts objects implementing the NWBPointRow protocol — settings events satisfy it via PipelineSettingsEvent.flatten_for_table()).

On initialize, if scope_components is unset, the unit looks up its own session in the graph snapshot and scopes the producer to that session’s components. This matches the behaviour of the original in-sink watcher: only events from components running alongside the sink are emitted, so multiple parallel pipelines don’t cross-pollute.

Parameters:

settings (Settings | None)

SETTINGS#

alias of PipelineSettingsProducerSettings

async initialize()[source]#

Runs when the Unit is instantiated.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should initialize your unit’s state and prepare for message processing.

Return type:

None

async shutdown()[source]#

Runs when the Unit terminates.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should clean up resources and perform any necessary shutdown procedures.

Return type:

None