ezmsg.sigproc.align#

Time-align two AxisArray streams, outputting paired aligned chunks.

Classes

class AlignAlongAxis(*args, settings=None, **kwargs)[source]#

Bases: Unit

Time-align two AxisArray streams and output paired aligned chunks.

Each subscriber can publish to both output streams; when alignment succeeds, a paired (A, B) result is yielded to the respective outputs.

Parameters:

settings (Settings | None)

SETTINGS#

alias of AlignAlongAxisSettings

INPUT_SIGNAL_A = InputStream:unlocated[AxisArray]()#
INPUT_SIGNAL_B = InputStream:unlocated[AxisArray]()#
OUTPUT_SIGNAL_A = OutputStream:unlocated[AxisArray](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
OUTPUT_SIGNAL_B = OutputStream:unlocated[AxisArray](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
async initialize()[source]#

Runs when the Unit is instantiated.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should initialize your unit’s state and prepare for message processing.

Return type:

None

async on_a(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

AsyncGenerator

async on_b(msg)[source]#
Parameters:

msg (AxisArray)

Return type:

AsyncGenerator

class AlignAlongAxisProcessor(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[AlignAlongAxisSettings, AxisArray, tuple[AxisArray, AxisArray] | None, AlignAlongAxisState]

Processor that time-aligns two AxisArray streams.

Input A flows through __call__ / _process with automatic hash-based reset. Input B flows through push_b().

Returns (aligned_a, aligned_b) when alignment succeeds, else None.

push_b(message)[source]#

Process input B: check gain, detect shape changes, buffer, try align.

Parameters:

message (AxisArray)

Return type:

tuple[AxisArray, AxisArray] | None

class AlignAlongAxisSettings(axis: 'str' = 'time', buffer_dur: 'float' = 10.0)[source]#

Bases: Settings

Parameters:
axis: str = 'time'#

Axis used for alignment (typically the time axis).

buffer_dur: float = 10.0#

Buffer duration in seconds for each input stream.

__init__(axis='time', buffer_dur=10.0)#
Parameters:
Return type:

None

class AlignAlongAxisState[source]#

Bases: object

gain: float | None = None#
align_axis: str | None = None#
aligned: bool = False#
buf_a: HybridAxisArrayBuffer | None = None#
buf_b: HybridAxisArrayBuffer | None = None#
a_shape_sig: tuple[int, ...] | None = None#
b_shape_sig: tuple[int, ...] | None = None#