ezmsg.sigproc.merge#
Time-aligned merge of two AxisArray streams along a non-time axis.
Merge is an ez.Collection that composes
AlignAlongAxis (time-alignment) with
Concat (axis-aware concatenation).
Classes
- class Merge(*args, settings=None, **kwargs)[source]#
Bases:
CollectionMerge two AxisArray streams by time-aligning and concatenating.
Composes
AlignAlongAxis→Concat.- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
MergeSettings
- INPUT_SIGNAL_A = InputTopicStream:unlocated[AxisArray]()#
- INPUT_SIGNAL_B = InputTopicStream:unlocated[AxisArray]()#
- OUTPUT_SIGNAL = OutputTopicStream:unlocated[AxisArray]()#
- ALIGN = <ezmsg.sigproc.align.AlignAlongAxis object>#
- CONCAT = <ezmsg.sigproc.concat.Concat object>#
- configure()[source]#
A lifecycle hook that runs when the Collection is instantiated.
This is the best place to call
Unit.apply_settings()on each member Unit of the Collection. Override this method to perform collection-specific configuration of child components.- Return type:
None
- network()[source]#
Override this method and have the definition return a NetworkDefinition which defines how InputStreams and OutputStreams from member Units will be connected.
The NetworkDefinition specifies the message routing between components by connecting output streams to input streams.
- Returns:
Network definition specifying stream connections
- Return type:
NetworkDefinition
- class MergeProcessor(settings)[source]#
Bases:
objectConvenience processor that composes alignment + concatenation.
Preserves the same call interface as the previous monolithic processor so that existing code using
proc(msg_a)/proc.push_b(msg_b)continues to work unchanged.- Parameters:
settings (MergeSettings)
- __init__(settings)[source]#
- Parameters:
settings (MergeSettings)
- property align_state#
Expose alignment state for introspection / tests.
- property concat_state#
Expose concatenation state for introspection / tests.
- class MergeSettings(axis: 'str' = 'ch', align_axis: 'str | None' = 'time', buffer_dur: 'float' = 10.0, relabel_axis: 'bool' = True, label_a: 'str' = '_a', label_b: 'str' = '_b', assert_identical_shared_axes: 'bool' = False, new_key: 'str | None' = None)[source]#
Bases:
Settings- Parameters:
If True, raise ValueError when shared CoordinateAxis .data arrays differ.
- __init__(axis='ch', align_axis='time', buffer_dur=10.0, relabel_axis=True, label_a='_a', label_b='_b', assert_identical_shared_axes=False, new_key=None)#