ezmsg.baseproc.util.pipeline_settings#
Pipeline-settings event producer and supporting helpers.
This module provides a generic way to capture every settings change happening inside an ezmsg pipeline and emit each one as a self-contained message that can travel from unit to unit on the graph.
Three layers:
Helpers —
flatten_ez_settings/flatten_component_settings/sanitize_settings_value/sanitize_settings_column_name. These project arbitrary settings objects (ez.Settings,dict, dataclass, numpy array, enum, path, …) into either dotted-key/value pairs or NWB-compatible scalar/array values. Useful to any tabular sink, not just NWB.Message —
PipelineSettingsEventcarries one settings change. It mirrors the meaningful subset ofezmsg.core.SettingsChangedEvent(seq,timestamp,component_address,repr_value,structured_value) plus atable_namefor tabular sinks and anevent_typedistinguishing the startup snapshot from later updates.flatten_for_tableprojects to the{"data": json_str}shape anNWBPointRow-compatible sink expects.Producer / Unit —
PipelineSettingsProducer+PipelineSettingsUnitopen aGraphContext, queue oneINITIALevent per in-scope component (using the current settings snapshot), and stream subsequentUPDATEDevents from the graph server’s settings subscription.
Module Attributes
Sentinel |
Functions
- flatten_component_settings(component_address, value)[source]#
Flatten and sanitize a component settings payload for tabular storage.
- flatten_ez_settings(settings, prefix='')[source]#
Flatten an ezmsg settings object into dotted key/value pairs.
- sanitize_settings_column_name(name)[source]#
Convert a settings field path into a tabular-safe column name.
- sanitize_settings_value(value)[source]#
Convert a settings value into a tabular-friendly scalar or array value.
Classes
- class PipelineSettingsEvent(seq, timestamp, component_address, event_type, repr_value, structured_value=None, source_session_id=None, table_name='settings_annotations')[source]#
Bases:
objectOne settings-change observation, ready to ship across the graph.
Mirrors the JSON-friendly portion of
ezmsg.core.SettingsChangedEventso the message survives any ezmsg transport without custom encoders. Sinks that want a tabular row callflatten_for_table(); sinks that need native column types can readstructured_valuedirectly and project with the helpers in this module.- Parameters:
- timestamp: float#
time.monotonic()seconds when the change was observed.The graph server stamps settings events with
time.time(); the producer rebases each event onto the local monotonic clock at emit time so downstream consumers can compare these against other monotonic timestamps in the pipeline.
- event_type: PipelineSettingsEventType#
INITIALfor startup-snapshot rows,UPDATEDfor live changes.
- repr_value: dict[str, Any] | str#
Human-readable settings dump (
dictfor dataclass-shaped settings,repr()string fallback otherwise).
- structured_value: dict[str, Any] | None = None#
Parsed structured settings dump (
Noneif structure couldn’t be derived; falls back torepr_value).
- table_name: str = 'settings_annotations'#
Name of the target table/series the consuming sink should write into.
- flatten_for_table()[source]#
Project this event into
NWBPointRow-compatible columns.Returns
{"data": json_string}so the event can be appended to apynwb.misc.AnnotationSeries(single-string-per-timestamp). The JSON payload self-describes the change withcomponent,event_type,seq, and the structuredsettingssnapshot.Returns
Nonefor theINIT_FINAL_COMPONENT_ADDRESSsentinel — the event is a control marker for typed-column sinks and has no row to write in a JSON-row sink.
- __init__(seq, timestamp, component_address, event_type, repr_value, structured_value=None, source_session_id=None, table_name='settings_annotations')#
- class PipelineSettingsEventType(*values)[source]#
-
Distinguishes startup snapshot rows from in-flight settings changes.
- INITIAL = 'INITIAL'#
- UPDATED = 'UPDATED'#
- class PipelineSettingsProducer(*args, **kwargs)[source]#
Bases:
BaseStatefulProducer[PipelineSettingsProducerSettings,PipelineSettingsEvent,PipelineSettingsProducerState]Stream the graph server’s settings events out as
PipelineSettingsEvent.On reset (which happens on first
__acall__), the producer:Opens a
GraphContextagainst the running graph server.Pulls the current settings snapshot and queues one
INITIALevent per in-scope component.Subscribes to subsequent settings events; each one becomes an
UPDATEDevent in the queue.
_producereturns one event per call, awaiting the queue when it’s empty. If the GraphContext can’t be opened (no server running, network issue), the producer logs a warning and the queue stays empty — the unit will simply never publish anything, matching the “best-effort” semantics of the previous in-sink approach.
- class PipelineSettingsProducerSettings(target_table='settings_annotations', scope_components=None, graph_address=None)[source]#
Bases:
SettingsSettings for
PipelineSettingsProducer.- Parameters:
- target_table: str = 'settings_annotations'#
Value to stamp onto each emitted event’s
table_namefield.
- scope_components: tuple[str, ...] | None = None#
If set, only emit events for these component addresses.
Nonemeans watch every component the graph server reports.PipelineSettingsUnitfills this in automatically by discovering its own session’s components on initialize.
- graph_address: tuple[str, int] | None = None#
Address of the
GraphServerto connect to.NoneletsGraphContextresolve the default (EZMSG_GRAPHSERVER_ADDRenv var, falling back to127.0.0.1:25978). Set explicitly when running multiple parallel ezmsg systems, or in tests whereez.runwas called with a non-defaultgraph_address.
- class PipelineSettingsProducerState[source]#
Bases:
objectState for
PipelineSettingsProducer.
- class PipelineSettingsUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseProducerUnit[PipelineSettingsProducerSettings,PipelineSettingsEvent,PipelineSettingsProducer]Producer unit that emits one
PipelineSettingsEventper change.Wire its
OUTPUT_SIGNALto any sink that consumesPipelineSettingsEvent(or, more loosely, any sink that accepts objects implementing theNWBPointRowprotocol — settings events satisfy it viaPipelineSettingsEvent.flatten_for_table()).On
initialize, ifscope_componentsis unset, the unit looks up its own session in the graph snapshot and scopes the producer to that session’s components. This matches the behaviour of the original in-sink watcher: only events from components running alongside the sink are emitted, so multiple parallel pipelines don’t cross-pollute.- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
PipelineSettingsProducerSettings
- async initialize()[source]#
Runs when the Unit is instantiated.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should initialize your unit’s state and prepare for message processing.
- Return type:
- async shutdown()[source]#
Runs when the Unit terminates.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should clean up resources and perform any necessary shutdown procedures.
- Return type: