ezmsg.sigproc.align#
Time-align two AxisArray streams, outputting paired aligned chunks.
Classes
- class AlignAlongAxis(*args, settings=None, **kwargs)[source]#
Bases:
UnitTime-align two AxisArray streams and output paired aligned chunks.
Each subscriber can publish to both output streams; when alignment succeeds, a paired (A, B) result is yielded to the respective outputs.
- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
AlignAlongAxisSettings
- INPUT_SIGNAL_A = InputStream:unlocated[AxisArray]()#
- INPUT_SIGNAL_B = InputStream:unlocated[AxisArray]()#
- OUTPUT_SIGNAL_A = OutputStream:unlocated[AxisArray](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
- OUTPUT_SIGNAL_B = OutputStream:unlocated[AxisArray](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
- async initialize()[source]#
Runs when the Unit is instantiated.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should initialize your unit’s state and prepare for message processing.
- Return type:
None
- class AlignAlongAxisProcessor(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[AlignAlongAxisSettings,AxisArray,tuple[AxisArray,AxisArray] |None,AlignAlongAxisState]Processor that time-aligns two AxisArray streams.
Input A flows through
__call__/_processwith automatic hash-based reset. Input B flows throughpush_b().Returns
(aligned_a, aligned_b)when alignment succeeds, elseNone.
- class AlignAlongAxisSettings(axis: 'str' = 'time', buffer_dur: 'float' = 10.0)[source]#
Bases:
Settings
- class AlignAlongAxisState[source]#
Bases:
object- buf_a: HybridAxisArrayBuffer | None = None#
- buf_b: HybridAxisArrayBuffer | None = None#