ezmsg.sigproc.merge#

Time-aligned merge of two AxisArray streams along a non-time axis.

Merge is an ez.Collection that composes AlignAlongAxis (time-alignment) with Concat (axis-aware concatenation).

Classes

class Merge(*args, settings=None, **kwargs)[source]#

Bases: Collection

Merge two AxisArray streams by time-aligning and concatenating.

Composes AlignAlongAxisConcat.

Parameters:

settings (Settings | None)

SETTINGS#

alias of MergeSettings

INPUT_SIGNAL_A = InputTopicStream:unlocated[AxisArray]()#
INPUT_SIGNAL_B = InputTopicStream:unlocated[AxisArray]()#
OUTPUT_SIGNAL = OutputTopicStream:unlocated[AxisArray]()#
ALIGN = <ezmsg.sigproc.align.AlignAlongAxis object>#
CONCAT = <ezmsg.sigproc.concat.Concat object>#
configure()[source]#

A lifecycle hook that runs when the Collection is instantiated.

This is the best place to call Unit.apply_settings() on each member Unit of the Collection. Override this method to perform collection-specific configuration of child components.

Return type:

None

network()[source]#

Override this method and have the definition return a NetworkDefinition which defines how InputStreams and OutputStreams from member Units will be connected.

The NetworkDefinition specifies the message routing between components by connecting output streams to input streams.

Returns:

Network definition specifying stream connections

Return type:

NetworkDefinition

class MergeProcessor(settings)[source]#

Bases: object

Convenience processor that composes alignment + concatenation.

Preserves the same call interface as the previous monolithic processor so that existing code using proc(msg_a) / proc.push_b(msg_b) continues to work unchanged.

Parameters:

settings (MergeSettings)

__init__(settings)[source]#
Parameters:

settings (MergeSettings)

property align_state#

Expose alignment state for introspection / tests.

property concat_state#

Expose concatenation state for introspection / tests.

push_b(msg_b)[source]#
Parameters:

msg_b (AxisArray)

Return type:

AxisArray | None

class MergeSettings(axis: 'str' = 'ch', align_axis: 'str | None' = 'time', buffer_dur: 'float' = 10.0, relabel_axis: 'bool' = True, label_a: 'str' = '_a', label_b: 'str' = '_b', assert_identical_shared_axes: 'bool' = False, new_key: 'str | None' = None)[source]#

Bases: Settings

Parameters:
  • axis (str)

  • align_axis (str | None)

  • buffer_dur (float)

  • relabel_axis (bool)

  • label_a (str)

  • label_b (str)

  • assert_identical_shared_axes (bool)

  • new_key (str | None)

axis: str = 'ch'#

Axis along which to concatenate the two signals.

align_axis: str | None = 'time'#

Axis used for alignment. If None, defaults to the first dimension.

buffer_dur: float = 10.0#

Buffer duration in seconds for each input stream.

relabel_axis: bool = True#

Whether to relabel coordinate axis labels to ensure uniqueness.

label_a: str = '_a'#

Suffix appended to signal A labels when relabel_axis is True.

label_b: str = '_b'#

Suffix appended to signal B labels when relabel_axis is True.

assert_identical_shared_axes: bool = False#

If True, raise ValueError when shared CoordinateAxis .data arrays differ.

new_key: str | None = None#

Output AxisArray key. If None, uses the key from signal A.

__init__(axis='ch', align_axis='time', buffer_dur=10.0, relabel_axis=True, label_a='_a', label_b='_b', assert_identical_shared_axes=False, new_key=None)#
Parameters:
  • axis (str)

  • align_axis (str | None)

  • buffer_dur (float)

  • relabel_axis (bool)

  • label_a (str)

  • label_b (str)

  • assert_identical_shared_axes (bool)

  • new_key (str | None)

Return type:

None