ezmsg.sigproc.concat#

Concatenate two AxisArray streams along an existing or new axis.

Classes

class Concat(*args, settings=None, **kwargs)[source]#

Bases: Unit

Concatenate two AxisArray streams along an axis.

Pairs messages by arrival order (FIFO). No time-alignment.

Parameters:

settings (Settings | None)

SETTINGS#

alias of ConcatSettings

INPUT_SIGNAL_A = InputStream:unlocated[AxisArray]()#
INPUT_SIGNAL_B = InputStream:unlocated[AxisArray]()#
OUTPUT_SIGNAL = OutputStream:unlocated[AxisArray](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
async initialize()[source]#

Runs when the Unit is instantiated.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should initialize your unit’s state and prepare for message processing.

Return type:

None

async on_a(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

None

async on_b(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

None

async output()[source]#
Return type:

AsyncGenerator

class ConcatProcessor(settings)[source]#

Bases: object

Concatenate paired AxisArray messages from two input queues.

Uses FIFO queue pairing (like AddProcessor). No time-alignment or buffering — inputs are assumed pre-synchronized.

Parameters:

settings (ConcatSettings)

__init__(settings)[source]#
Parameters:

settings (ConcatSettings)

property state: ConcatState#
push_a(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

None

push_b(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

None

class ConcatSettings(axis: 'str' = 'ch', align_axis: 'str | None' = None, relabel_axis: 'bool' = True, label_a: 'str' = '_a', label_b: 'str' = '_b', assert_identical_shared_axes: 'bool' = False, new_key: 'str | None' = None)[source]#

Bases: Settings

Parameters:
  • axis (str)

  • align_axis (str | None)

  • relabel_axis (bool)

  • label_a (str)

  • label_b (str)

  • assert_identical_shared_axes (bool)

  • new_key (str | None)

axis: str = 'ch'#

Axis along which to concatenate the two signals.

align_axis: str | None = None#

Axis along which to validate alignment between the two signals.

relabel_axis: bool = True#

Whether to relabel coordinate axis labels to ensure uniqueness.

label_a: str = '_a'#

Per-side label for signal A.

Used in two distinct ways depending on whether axis is an existing or new dimension on the inputs:

  • Existing axis (axis is in both inputs’ .dims): label_a is a suffix appended to each entry of A’s existing coordinate-axis labels when relabel_axis is True. Defaults to "_a".

  • New axis (axis is not in either input’s .dims): label_a is used as the single data entry on the merged axis’s CoordinateAxis at index 0. E.g. setting label_a="spk", label_b="sbp" on a Merge of two (time, ch) streams produces a (time, ch, feature) output whose feature axis has data=["spk", "sbp"].

label_b: str = '_b'#

Per-side label for signal B.

See label_a. Defaults to "_b"; used as the new-axis label at index 1 in the new-axis case.

assert_identical_shared_axes: bool = False#

If True, raise ValueError when shared CoordinateAxis .data arrays differ.

new_key: str | None = None#

Output AxisArray key. If None, uses the key from signal A.

__init__(axis='ch', align_axis=None, relabel_axis=True, label_a='_a', label_b='_b', assert_identical_shared_axes=False, new_key=None)#
Parameters:
  • axis (str)

  • align_axis (str | None)

  • relabel_axis (bool)

  • label_a (str)

  • label_b (str)

  • assert_identical_shared_axes (bool)

  • new_key (str | None)

Return type:

None

class ConcatState(queue_a: "'asyncio.Queue[AxisArray]'" = <factory>, queue_b: "'asyncio.Queue[AxisArray]'" = <factory>, merged_concat_axis: 'CoordinateAxis | None' = None, cached_axes: 'dict[str, AxisBase] | None'=None, a_fingerprint: 'tuple | None' = None, b_fingerprint: 'tuple | None' = None)[source]#

Bases: object

Parameters:
queue_a: Queue[AxisArray]#
queue_b: Queue[AxisArray]#
merged_concat_axis: CoordinateAxis | None = None#
cached_axes: dict[str, AxisBase] | None = None#
a_fingerprint: tuple | None = None#
b_fingerprint: tuple | None = None#
__init__(queue_a=<factory>, queue_b=<factory>, merged_concat_axis=None, cached_axes=None, a_fingerprint=None, b_fingerprint=None)#
Parameters:
Return type:

None