ezmsg.sigproc.concat#
Concatenate two AxisArray streams along an existing or new axis.
Classes
- class Concat(*args, settings=None, **kwargs)[source]#
Bases:
UnitConcatenate two AxisArray streams along an axis.
Pairs messages by arrival order (FIFO). No time-alignment.
- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ConcatSettings
- INPUT_SIGNAL_A = InputStream:unlocated[AxisArray]()#
- INPUT_SIGNAL_B = InputStream:unlocated[AxisArray]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[AxisArray](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
- async initialize()[source]#
Runs when the Unit is instantiated.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should initialize your unit’s state and prepare for message processing.
- Return type:
None
- class ConcatProcessor(settings)[source]#
Bases:
objectConcatenate paired AxisArray messages from two input queues.
Uses FIFO queue pairing (like
AddProcessor). No time-alignment or buffering — inputs are assumed pre-synchronized.- Parameters:
settings (ConcatSettings)
- __init__(settings)[source]#
- Parameters:
settings (ConcatSettings)
- property state: ConcatState#
- class ConcatSettings(axis: 'str' = 'ch', align_axis: 'str | None' = None, relabel_axis: 'bool' = True, label_a: 'str' = '_a', label_b: 'str' = '_b', assert_identical_shared_axes: 'bool' = False, new_key: 'str | None' = None)[source]#
Bases:
Settings- Parameters:
- label_a: str = '_a'#
Per-side label for signal A.
Used in two distinct ways depending on whether
axisis an existing or new dimension on the inputs:Existing axis (
axisis in both inputs’.dims):label_ais a suffix appended to each entry of A’s existing coordinate-axis labels whenrelabel_axisis True. Defaults to"_a".New axis (
axisis not in either input’s.dims):label_ais used as the singledataentry on the merged axis’s CoordinateAxis at index 0. E.g. settinglabel_a="spk", label_b="sbp"on a Merge of two(time, ch)streams produces a(time, ch, feature)output whosefeatureaxis hasdata=["spk", "sbp"].
- label_b: str = '_b'#
Per-side label for signal B.
See
label_a. Defaults to"_b"; used as the new-axis label at index 1 in the new-axis case.
If True, raise ValueError when shared CoordinateAxis .data arrays differ.
- __init__(axis='ch', align_axis=None, relabel_axis=True, label_a='_a', label_b='_b', assert_identical_shared_axes=False, new_key=None)#
- class ConcatState(queue_a: "'asyncio.Queue[AxisArray]'" = <factory>, queue_b: "'asyncio.Queue[AxisArray]'" = <factory>, merged_concat_axis: 'CoordinateAxis | None' = None, cached_axes: 'dict[str, AxisBase] | None'=None, a_fingerprint: 'tuple | None' = None, b_fingerprint: 'tuple | None' = None)[source]#
Bases:
object- Parameters:
- merged_concat_axis: CoordinateAxis | None = None#
- __init__(queue_a=<factory>, queue_b=<factory>, merged_concat_axis=None, cached_axes=None, a_fingerprint=None, b_fingerprint=None)#