ezmsg.sigproc.sampler#
Functions
- sampler(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True)[source]#
Sample data into a buffer, accept triggers, and return slices of sampled data around the trigger time.
- Returns:
A generator that expects .send either an
AxisArraycontaining streaming data messages, or aSampleTriggerMessagecontaining a trigger, and yields the list ofSampleMessages.- Parameters:
- Return type:
Classes
- class Sampler(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[SamplerSettings,AxisArray,AxisArray,SamplerTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
SamplerSettings
- INPUT_TRIGGER = InputStream:unlocated[<class 'ezmsg.sigproc.util.message.SampleTriggerMessage'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.sigproc.util.message.SampleMessage'>](self.num_buffers=32, self.force_tcp=False)#
- async on_trigger(msg)[source]#
- Parameters:
msg (SampleTriggerMessage)
- Return type:
None
- class SamplerSettings(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True, buffer_update_strategy='immediate')[source]#
Bases:
SettingsSettings for
Sampler. Seesamplerfor a description of the fields.- Parameters:
- buffer_dur: float#
The duration of the buffer in seconds. The buffer must be long enough to store the oldest sample to be included in a window. e.g., a trigger lagged by 0.5 seconds with a period of (-1.0, +1.5) will need a buffer of 0.5 + (1.5 - -1.0) = 3.0 seconds. It is best to at least double your estimate if memory allows.
- axis: str | None = None#
The axis along which to sample the data. None (default) will choose the first axis in the first input. Note: (for now) the axis must exist in the msg .axes and be of type AxisArray.LinearAxis
- period: tuple[float, float] | None = None#
Optional default period (in seconds) if unspecified in SampleTriggerMessage.
- estimate_alignment: bool = True#
- If true, use message timestamp fields and reported sampling rate to estimate
sample-accurate alignment for samples.
If false, sampling will be limited to incoming message rate – “Block timing” NOTE: For faster-than-realtime playback – Incoming timestamps must reflect “realtime” operation for estimate_alignment to operate correctly.
- buffer_update_strategy: Literal['immediate', 'threshold', 'on_demand'] = 'immediate'#
The buffer update strategy. See
ezmsg.sigproc.util.buffer.UpdateStrategy. If you expect to push data much more frequently than triggers, then “on_demand” might be more efficient. For most other scenarios, “immediate” is best.
- __init__(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True, buffer_update_strategy='immediate')#
- class SamplerState[source]#
Bases:
object- buffer: HybridAxisArrayBuffer | None = None#
- triggers: deque[SampleTriggerMessage] | None = None#
- class SamplerTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SamplerSettings,AxisArray,AxisArray,SamplerState]- push_trigger(message)[source]#
- Parameters:
message (SampleTriggerMessage)
- Return type:
- class TriggerGenerator(*args, settings=None, **kwargs)[source]#
Bases:
BaseProducerUnit[TriggerGeneratorSettings,SampleTriggerMessage,TriggerProducer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
TriggerGeneratorSettings
- class TriggerGeneratorSettings(period: tuple[float, float], prewait: float = 0.5, publish_period: float = 5.0)[source]#
Bases:
Settings- __init__(period, prewait=0.5, publish_period=5.0)#
- class TriggerProducer(*args, **kwargs)[source]#
Bases:
BaseStatefulProducer[TriggerGeneratorSettings,SampleTriggerMessage,TriggerGeneratorState]
- class SamplerSettings(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True, buffer_update_strategy='immediate')[source]#
Bases:
SettingsSettings for
Sampler. Seesamplerfor a description of the fields.- Parameters:
- buffer_dur: float#
The duration of the buffer in seconds. The buffer must be long enough to store the oldest sample to be included in a window. e.g., a trigger lagged by 0.5 seconds with a period of (-1.0, +1.5) will need a buffer of 0.5 + (1.5 - -1.0) = 3.0 seconds. It is best to at least double your estimate if memory allows.
- axis: str | None = None#
The axis along which to sample the data. None (default) will choose the first axis in the first input. Note: (for now) the axis must exist in the msg .axes and be of type AxisArray.LinearAxis
- period: tuple[float, float] | None = None#
Optional default period (in seconds) if unspecified in SampleTriggerMessage.
- estimate_alignment: bool = True#
- If true, use message timestamp fields and reported sampling rate to estimate
sample-accurate alignment for samples.
If false, sampling will be limited to incoming message rate – “Block timing” NOTE: For faster-than-realtime playback – Incoming timestamps must reflect “realtime” operation for estimate_alignment to operate correctly.
- buffer_update_strategy: Literal['immediate', 'threshold', 'on_demand'] = 'immediate'#
The buffer update strategy. See
ezmsg.sigproc.util.buffer.UpdateStrategy. If you expect to push data much more frequently than triggers, then “on_demand” might be more efficient. For most other scenarios, “immediate” is best.
- __init__(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True, buffer_update_strategy='immediate')#
- class SamplerState[source]#
Bases:
object- buffer: HybridAxisArrayBuffer | None = None#
- triggers: deque[SampleTriggerMessage] | None = None#
- class SamplerTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SamplerSettings,AxisArray,AxisArray,SamplerState]- push_trigger(message)[source]#
- Parameters:
message (SampleTriggerMessage)
- Return type:
- class Sampler(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[SamplerSettings,AxisArray,AxisArray,SamplerTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
SamplerSettings
- INPUT_TRIGGER = InputStream:unlocated[<class 'ezmsg.sigproc.util.message.SampleTriggerMessage'>]()#
- OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.sigproc.util.message.SampleMessage'>](self.num_buffers=32, self.force_tcp=False)#
- async on_trigger(msg)[source]#
- Parameters:
msg (SampleTriggerMessage)
- Return type:
None
- sampler(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True)[source]#
Sample data into a buffer, accept triggers, and return slices of sampled data around the trigger time.
- Returns:
A generator that expects .send either an
AxisArraycontaining streaming data messages, or aSampleTriggerMessagecontaining a trigger, and yields the list ofSampleMessages.- Parameters:
- Return type:
- class TriggerGeneratorSettings(period: tuple[float, float], prewait: float = 0.5, publish_period: float = 5.0)[source]#
Bases:
Settings- __init__(period, prewait=0.5, publish_period=5.0)#
- class TriggerProducer(*args, **kwargs)[source]#
Bases:
BaseStatefulProducer[TriggerGeneratorSettings,SampleTriggerMessage,TriggerGeneratorState]
- class TriggerGenerator(*args, settings=None, **kwargs)[source]#
Bases:
BaseProducerUnit[TriggerGeneratorSettings,SampleTriggerMessage,TriggerProducer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
TriggerGeneratorSettings