ezmsg.sigproc.merge#

Time-aligned merge of two AxisArray streams along a non-time axis.

Classes

class Merge(*args, settings=None, **kwargs)[source]#

Bases: BaseProcessorUnit[MergeSettings]

Merge two AxisArray streams by time-aligning and concatenating along a non-time axis.

Input A routes through the processor’s __acall__ (triggering hash-based reset when the stream structure changes). Input B routes through push_b which independently tracks its own structure.

Inherits INPUT_SETTINGS and on_settingscreate_processor from BaseProcessorUnit.

Parameters:

settings (Settings | None)

SETTINGS#

alias of MergeSettings

INPUT_SIGNAL_A = InputStream:unlocated[AxisArray]()#
INPUT_SIGNAL_B = InputStream:unlocated[AxisArray]()#
OUTPUT_SIGNAL = OutputStream:unlocated[AxisArray](self.num_buffers=32, self.force_tcp=False)#
create_processor()[source]#
Return type:

None

async on_a(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

AsyncGenerator

async on_b(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

AsyncGenerator

class MergeProcessor(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[MergeSettings, AxisArray, AxisArray | None, MergeState]

Processor that time-aligns two AxisArray streams and concatenates them.

Input A flows through the standard __call__ / _process path, getting automatic _hash_message / _reset_state handling from BaseStatefulTransformer. Input B flows through push_b(), which independently tracks its own structure.

Invalidation rules:

  • Gain mismatch (either input vs stored common gain) → full reset.

  • Concat-axis dimensionality change → per-input buffer reset + alignment and merged-axis cache invalidation.

  • Non-align/non-concat axis shape change → per-input buffer reset + alignment invalidation.

push_b(message)[source]#

Process input B: check gain, detect structural changes, buffer, try merge.

Parameters:

message (AxisArray)

Return type:

AxisArray | None

class MergeSettings(axis: 'str' = 'ch', align_axis: 'str | None' = 'time', buffer_dur: 'float' = 10.0, relabel_axis: 'bool' = True, label_a: 'str' = '_a', label_b: 'str' = '_b', new_key: 'str | None' = None)[source]#

Bases: Settings

Parameters:
  • axis (str)

  • align_axis (str | None)

  • buffer_dur (float)

  • relabel_axis (bool)

  • label_a (str)

  • label_b (str)

  • new_key (str | None)

axis: str = 'ch'#

Axis along which to concatenate the two signals.

align_axis: str | None = 'time'#

Axis used for alignment. If None, defaults to the first dimension.

buffer_dur: float = 10.0#

Buffer duration in seconds for each input stream.

relabel_axis: bool = True#

Whether to relabel coordinate axis labels to ensure uniqueness.

label_a: str = '_a'#

Suffix appended to signal A labels when relabel_axis is True.

label_b: str = '_b'#

Suffix appended to signal B labels when relabel_axis is True.

new_key: str | None = None#

Output AxisArray key. If None, uses the key from signal A.

__init__(axis='ch', align_axis='time', buffer_dur=10.0, relabel_axis=True, label_a='_a', label_b='_b', new_key=None)#
Parameters:
  • axis (str)

  • align_axis (str | None)

  • buffer_dur (float)

  • relabel_axis (bool)

  • label_a (str)

  • label_b (str)

  • new_key (str | None)

Return type:

None

class MergeState[source]#

Bases: object

gain: float | None = None#
align_axis: str | None = None#
aligned: bool = False#
merged_concat_axis: CoordinateAxis | None = None#
buf_a: HybridAxisArrayBuffer | None = None#
concat_axis_a: CoordinateAxis | None = None#
a_concat_dim: int | None = None#
a_other_dims: tuple[int, ...] | None = None#
buf_b: HybridAxisArrayBuffer | None = None#
concat_axis_b: CoordinateAxis | None = None#
b_concat_dim: int | None = None#
b_other_dims: tuple[int, ...] | None = None#