ezmsg.sigproc.concat#

Concatenate two AxisArray streams along an existing or new axis.

Classes

class Concat(*args, settings=None, **kwargs)[source]#

Bases: Unit

Concatenate two AxisArray streams along an axis.

Pairs messages by arrival order (FIFO). No time-alignment.

Parameters:

settings (Settings | None)

SETTINGS#

alias of ConcatSettings

INPUT_SIGNAL_A = InputStream:unlocated[AxisArray]()#
INPUT_SIGNAL_B = InputStream:unlocated[AxisArray]()#
OUTPUT_SIGNAL = OutputStream:unlocated[AxisArray](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
async initialize()[source]#

Runs when the Unit is instantiated.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should initialize your unit’s state and prepare for message processing.

Return type:

None

async on_a(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

None

async on_b(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

None

async output()[source]#
Return type:

AsyncGenerator

class ConcatProcessor(settings)[source]#

Bases: object

Concatenate paired AxisArray messages from two input queues.

Uses FIFO queue pairing (like AddProcessor). No time-alignment or buffering — inputs are assumed pre-synchronized.

Parameters:

settings (ConcatSettings)

__init__(settings)[source]#
Parameters:

settings (ConcatSettings)

property state: ConcatState#
push_a(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

None

push_b(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

None

class ConcatSettings(axis: 'str' = 'ch', align_axis: 'str | None' = None, relabel_axis: 'bool' = True, label_a: 'str' = '_a', label_b: 'str' = '_b', assert_identical_shared_axes: 'bool' = False, new_key: 'str | None' = None)[source]#

Bases: Settings

Parameters:
  • axis (str)

  • align_axis (str | None)

  • relabel_axis (bool)

  • label_a (str)

  • label_b (str)

  • assert_identical_shared_axes (bool)

  • new_key (str | None)

axis: str = 'ch'#

Axis along which to concatenate the two signals.

align_axis: str | None = None#

Axis along which to validate alignment between the two signals.

relabel_axis: bool = True#

Whether to relabel coordinate axis labels to ensure uniqueness.

label_a: str = '_a'#

Suffix appended to signal A labels when relabel_axis is True.

label_b: str = '_b'#

Suffix appended to signal B labels when relabel_axis is True.

assert_identical_shared_axes: bool = False#

If True, raise ValueError when shared CoordinateAxis .data arrays differ.

new_key: str | None = None#

Output AxisArray key. If None, uses the key from signal A.

__init__(axis='ch', align_axis=None, relabel_axis=True, label_a='_a', label_b='_b', assert_identical_shared_axes=False, new_key=None)#
Parameters:
  • axis (str)

  • align_axis (str | None)

  • relabel_axis (bool)

  • label_a (str)

  • label_b (str)

  • assert_identical_shared_axes (bool)

  • new_key (str | None)

Return type:

None

class ConcatState(queue_a: "'asyncio.Queue[AxisArray]'" = <factory>, queue_b: "'asyncio.Queue[AxisArray]'" = <factory>, merged_concat_axis: 'CoordinateAxis | None' = None, cached_axes: 'dict[str, AxisBase] | None'=None, a_fingerprint: 'tuple | None' = None, b_fingerprint: 'tuple | None' = None)[source]#

Bases: object

Parameters:
queue_a: Queue[AxisArray]#
queue_b: Queue[AxisArray]#
merged_concat_axis: CoordinateAxis | None = None#
cached_axes: dict[str, AxisBase] | None = None#
a_fingerprint: tuple | None = None#
b_fingerprint: tuple | None = None#
__init__(queue_a=<factory>, queue_b=<factory>, merged_concat_axis=None, cached_axes=None, a_fingerprint=None, b_fingerprint=None)#
Parameters:
Return type:

None