ezmsg.sigproc.sampler#

Functions

sampler(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True)[source]#

Sample data into a buffer, accept triggers, and return slices of sampled data around the trigger time.

Returns:

A generator that expects .send either an AxisArray containing streaming data messages, or a SampleTriggerMessage containing a trigger, and yields the list of SampleMessage s.

Parameters:
Return type:

SamplerTransformer

Classes

class Sampler(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SamplerSettings, AxisArray, AxisArray, SamplerTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of SamplerSettings

INPUT_TRIGGER = InputStream:unlocated[<class 'ezmsg.sigproc.util.message.SampleTriggerMessage'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.sigproc.util.message.SampleMessage'>](self.num_buffers=32, self.force_tcp=False)#
async on_trigger(msg)[source]#
Parameters:

msg (SampleTriggerMessage)

Return type:

None

async on_signal(message)[source]#
Parameters:

message (AxisArray)

Return type:

AsyncGenerator

class SamplerSettings(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True, buffer_update_strategy='immediate')[source]#

Bases: Settings

Settings for Sampler. See sampler for a description of the fields.

Parameters:
buffer_dur: float#

The duration of the buffer in seconds. The buffer must be long enough to store the oldest sample to be included in a window. e.g., a trigger lagged by 0.5 seconds with a period of (-1.0, +1.5) will need a buffer of 0.5 + (1.5 - -1.0) = 3.0 seconds. It is best to at least double your estimate if memory allows.

axis: str | None = None#

The axis along which to sample the data. None (default) will choose the first axis in the first input. Note: (for now) the axis must exist in the msg .axes and be of type AxisArray.LinearAxis

period: tuple[float, float] | None = None#

Optional default period (in seconds) if unspecified in SampleTriggerMessage.

value: Any = None#

Optional default value if unspecified in SampleTriggerMessage

estimate_alignment: bool = True#
If true, use message timestamp fields and reported sampling rate to estimate

sample-accurate alignment for samples.

If false, sampling will be limited to incoming message rate – “Block timing” NOTE: For faster-than-realtime playback – Incoming timestamps must reflect “realtime” operation for estimate_alignment to operate correctly.

buffer_update_strategy: Literal['immediate', 'threshold', 'on_demand'] = 'immediate'#

The buffer update strategy. See ezmsg.sigproc.util.buffer.UpdateStrategy. If you expect to push data much more frequently than triggers, then “on_demand” might be more efficient. For most other scenarios, “immediate” is best.

__init__(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True, buffer_update_strategy='immediate')#
Parameters:
Return type:

None

class SamplerState[source]#

Bases: object

buffer: HybridAxisArrayBuffer | None = None#
triggers: deque[SampleTriggerMessage] | None = None#
class SamplerTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SamplerSettings, AxisArray, AxisArray, SamplerState]

push_trigger(message)[source]#
Parameters:

message (SampleTriggerMessage)

Return type:

list[SampleMessage]

class TriggerGenerator(*args, settings=None, **kwargs)[source]#

Bases: BaseProducerUnit[TriggerGeneratorSettings, SampleTriggerMessage, TriggerProducer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of TriggerGeneratorSettings

class TriggerGeneratorSettings(period: tuple[float, float], prewait: float = 0.5, publish_period: float = 5.0)[source]#

Bases: Settings

Parameters:
period: tuple[float, float]#

The period around the trigger event.

prewait: float = 0.5#

The time before the first trigger (sec)

__init__(period, prewait=0.5, publish_period=5.0)#
Parameters:
Return type:

None

publish_period: float = 5.0#

The period between triggers (sec)

class TriggerGeneratorState[source]#

Bases: object

output: int = 0#
class TriggerProducer(*args, **kwargs)[source]#

Bases: BaseStatefulProducer[TriggerGeneratorSettings, SampleTriggerMessage, TriggerGeneratorState]

class SamplerSettings(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True, buffer_update_strategy='immediate')[source]#

Bases: Settings

Settings for Sampler. See sampler for a description of the fields.

Parameters:
buffer_dur: float#

The duration of the buffer in seconds. The buffer must be long enough to store the oldest sample to be included in a window. e.g., a trigger lagged by 0.5 seconds with a period of (-1.0, +1.5) will need a buffer of 0.5 + (1.5 - -1.0) = 3.0 seconds. It is best to at least double your estimate if memory allows.

axis: str | None = None#

The axis along which to sample the data. None (default) will choose the first axis in the first input. Note: (for now) the axis must exist in the msg .axes and be of type AxisArray.LinearAxis

period: tuple[float, float] | None = None#

Optional default period (in seconds) if unspecified in SampleTriggerMessage.

value: Any = None#

Optional default value if unspecified in SampleTriggerMessage

estimate_alignment: bool = True#
If true, use message timestamp fields and reported sampling rate to estimate

sample-accurate alignment for samples.

If false, sampling will be limited to incoming message rate – “Block timing” NOTE: For faster-than-realtime playback – Incoming timestamps must reflect “realtime” operation for estimate_alignment to operate correctly.

buffer_update_strategy: Literal['immediate', 'threshold', 'on_demand'] = 'immediate'#

The buffer update strategy. See ezmsg.sigproc.util.buffer.UpdateStrategy. If you expect to push data much more frequently than triggers, then “on_demand” might be more efficient. For most other scenarios, “immediate” is best.

__init__(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True, buffer_update_strategy='immediate')#
Parameters:
Return type:

None

class SamplerState[source]#

Bases: object

buffer: HybridAxisArrayBuffer | None = None#
triggers: deque[SampleTriggerMessage] | None = None#
class SamplerTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SamplerSettings, AxisArray, AxisArray, SamplerState]

push_trigger(message)[source]#
Parameters:

message (SampleTriggerMessage)

Return type:

list[SampleMessage]

class Sampler(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SamplerSettings, AxisArray, AxisArray, SamplerTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of SamplerSettings

INPUT_TRIGGER = InputStream:unlocated[<class 'ezmsg.sigproc.util.message.SampleTriggerMessage'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.sigproc.util.message.SampleMessage'>](self.num_buffers=32, self.force_tcp=False)#
async on_trigger(msg)[source]#
Parameters:

msg (SampleTriggerMessage)

Return type:

None

async on_signal(message)[source]#
Parameters:

message (AxisArray)

Return type:

AsyncGenerator

sampler(buffer_dur, axis=None, period=None, value=None, estimate_alignment=True)[source]#

Sample data into a buffer, accept triggers, and return slices of sampled data around the trigger time.

Returns:

A generator that expects .send either an AxisArray containing streaming data messages, or a SampleTriggerMessage containing a trigger, and yields the list of SampleMessage s.

Parameters:
Return type:

SamplerTransformer

class TriggerGeneratorSettings(period: tuple[float, float], prewait: float = 0.5, publish_period: float = 5.0)[source]#

Bases: Settings

Parameters:
period: tuple[float, float]#

The period around the trigger event.

prewait: float = 0.5#

The time before the first trigger (sec)

__init__(period, prewait=0.5, publish_period=5.0)#
Parameters:
Return type:

None

publish_period: float = 5.0#

The period between triggers (sec)

class TriggerGeneratorState[source]#

Bases: object

output: int = 0#
class TriggerProducer(*args, **kwargs)[source]#

Bases: BaseStatefulProducer[TriggerGeneratorSettings, SampleTriggerMessage, TriggerGeneratorState]

class TriggerGenerator(*args, settings=None, **kwargs)[source]#

Bases: BaseProducerUnit[TriggerGeneratorSettings, SampleTriggerMessage, TriggerProducer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of TriggerGeneratorSettings