ezmsg.sigproc.concat#
Concatenate two AxisArray streams along an existing or new axis.
Classes
- class Concat(*args, settings=None, **kwargs)[source]#
Bases:
UnitConcatenate two AxisArray streams along an axis.
Pairs messages by arrival order (FIFO). No time-alignment.
- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ConcatSettings
- INPUT_SIGNAL_A = InputStream:unlocated[AxisArray]()#
- INPUT_SIGNAL_B = InputStream:unlocated[AxisArray]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[AxisArray](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
- async initialize()[source]#
Runs when the Unit is instantiated.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should initialize your unit’s state and prepare for message processing.
- Return type:
None
- class ConcatProcessor(settings)[source]#
Bases:
objectConcatenate paired AxisArray messages from two input queues.
Uses FIFO queue pairing (like
AddProcessor). No time-alignment or buffering — inputs are assumed pre-synchronized.- Parameters:
settings (ConcatSettings)
- __init__(settings)[source]#
- Parameters:
settings (ConcatSettings)
- property state: ConcatState#
- class ConcatSettings(axis: 'str' = 'ch', align_axis: 'str | None' = None, relabel_axis: 'bool' = True, label_a: 'str' = '_a', label_b: 'str' = '_b', assert_identical_shared_axes: 'bool' = False, new_key: 'str | None' = None)[source]#
Bases:
Settings- Parameters:
If True, raise ValueError when shared CoordinateAxis .data arrays differ.
- __init__(axis='ch', align_axis=None, relabel_axis=True, label_a='_a', label_b='_b', assert_identical_shared_axes=False, new_key=None)#
- class ConcatState(queue_a: "'asyncio.Queue[AxisArray]'" = <factory>, queue_b: "'asyncio.Queue[AxisArray]'" = <factory>, merged_concat_axis: 'CoordinateAxis | None' = None, cached_axes: 'dict[str, AxisBase] | None'=None, a_fingerprint: 'tuple | None' = None, b_fingerprint: 'tuple | None' = None)[source]#
Bases:
object- Parameters:
- merged_concat_axis: CoordinateAxis | None = None#
- __init__(queue_a=<factory>, queue_b=<factory>, merged_concat_axis=None, cached_axes=None, a_fingerprint=None, b_fingerprint=None)#