ezmsg.sigproc.resample#

Classes

class ResampleProcessor(*args, **kwargs)[source]#

Bases: BaseStatefulProcessor[ResampleSettings, AxisArray, AxisArray, ResampleState]

push_reference(message)[source]#
Parameters:

message (AxisArray)

Return type:

None

send(message)[source]#

Alias for __call__.

Parameters:

message (AxisArray)

Return type:

AxisArray

class ResampleSettings(axis: str = 'time', resample_rate: float | None = None, max_chunk_delay: float = inf, fill_value: str = 'extrapolate', buffer_duration: float = 2.0, buffer_update_strategy: Literal['immediate', 'threshold', 'on_demand'] = 'immediate')[source]#

Bases: Settings

Parameters:
  • axis (str)

  • resample_rate (float | None)

  • max_chunk_delay (float)

  • fill_value (str)

  • buffer_duration (float)

  • buffer_update_strategy (Literal['immediate', 'threshold', 'on_demand'])

axis: str = 'time'#
resample_rate: float | None = None#

target resample rate in Hz. If None, the resample rate will be determined by the reference signal.

max_chunk_delay: float = inf#

Maximum delay between outputs in seconds. If the delay exceeds this value, the transformer will extrapolate.

fill_value: str = 'extrapolate'#

Value to use for out-of-bounds samples. If ‘extrapolate’, the transformer will extrapolate. If ‘last’, the transformer will use the last sample. See scipy.interpolate.interp1d for more options.

buffer_duration: float = 2.0#
buffer_update_strategy: Literal['immediate', 'threshold', 'on_demand'] = 'immediate'#

The buffer update strategy. See ezmsg.sigproc.util.buffer.UpdateStrategy. If you expect to push data much more frequently than it is resampled, then “on_demand” might be more efficient. For most other scenarios, “immediate” is best.

__init__(axis='time', resample_rate=None, max_chunk_delay=inf, fill_value='extrapolate', buffer_duration=2.0, buffer_update_strategy='immediate')#
Parameters:
  • axis (str)

  • resample_rate (float | None)

  • max_chunk_delay (float)

  • fill_value (str)

  • buffer_duration (float)

  • buffer_update_strategy (Literal['immediate', 'threshold', 'on_demand'])

Return type:

None

class ResampleState[source]#

Bases: object

src_buffer: HybridAxisArrayBuffer | None = None#

Buffer for the incoming signal data. This is the source for training the interpolation function. Its contents are rarely empty because we usually hold back some data to allow for accurate interpolation and optionally extrapolation.

ref_axis_buffer: HybridAxisBuffer | None = None#

The buffer for the reference axis (usually a time axis). The interpolation function will be evaluated at the reference axis values. When resample_rate is None, this buffer will be filled with the axis from incoming _reference_ messages. When resample_rate is not None (i.e., prescribed float resample_rate), this buffer is filled with a synthetic axis that is generated from the incoming signal messages.

last_ref_ax_val: float | None = None#

The last value of the reference axis that was returned. This helps us to know what the _next_ returned value should be, and to avoid returning the same value. TODO: We can eliminate this variable if we maintain “by convention” that the reference axis always has 1 value at its start that we exclude from the resampling.

last_write_time: float = -inf#

Wall clock time of the last write to the signal buffer. This is used to determine if we need to extrapolate the reference axis if we have not received an update within max_chunk_delay.

class ResampleUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseConsumerUnit[ResampleSettings, AxisArray, ResampleProcessor]

Parameters:

settings (Settings | None)

SETTINGS#

alias of ResampleSettings

INPUT_REFERENCE = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
async on_reference(message)[source]#
Parameters:

message (AxisArray)

async gen_resampled()[source]#
class ResampleSettings(axis: str = 'time', resample_rate: float | None = None, max_chunk_delay: float = inf, fill_value: str = 'extrapolate', buffer_duration: float = 2.0, buffer_update_strategy: Literal['immediate', 'threshold', 'on_demand'] = 'immediate')[source]#

Bases: Settings

Parameters:
  • axis (str)

  • resample_rate (float | None)

  • max_chunk_delay (float)

  • fill_value (str)

  • buffer_duration (float)

  • buffer_update_strategy (Literal['immediate', 'threshold', 'on_demand'])

axis: str = 'time'#
resample_rate: float | None = None#

target resample rate in Hz. If None, the resample rate will be determined by the reference signal.

max_chunk_delay: float = inf#

Maximum delay between outputs in seconds. If the delay exceeds this value, the transformer will extrapolate.

fill_value: str = 'extrapolate'#

Value to use for out-of-bounds samples. If ‘extrapolate’, the transformer will extrapolate. If ‘last’, the transformer will use the last sample. See scipy.interpolate.interp1d for more options.

buffer_duration: float = 2.0#
buffer_update_strategy: Literal['immediate', 'threshold', 'on_demand'] = 'immediate'#

The buffer update strategy. See ezmsg.sigproc.util.buffer.UpdateStrategy. If you expect to push data much more frequently than it is resampled, then “on_demand” might be more efficient. For most other scenarios, “immediate” is best.

__init__(axis='time', resample_rate=None, max_chunk_delay=inf, fill_value='extrapolate', buffer_duration=2.0, buffer_update_strategy='immediate')#
Parameters:
  • axis (str)

  • resample_rate (float | None)

  • max_chunk_delay (float)

  • fill_value (str)

  • buffer_duration (float)

  • buffer_update_strategy (Literal['immediate', 'threshold', 'on_demand'])

Return type:

None

class ResampleState[source]#

Bases: object

src_buffer: HybridAxisArrayBuffer | None = None#

Buffer for the incoming signal data. This is the source for training the interpolation function. Its contents are rarely empty because we usually hold back some data to allow for accurate interpolation and optionally extrapolation.

ref_axis_buffer: HybridAxisBuffer | None = None#

The buffer for the reference axis (usually a time axis). The interpolation function will be evaluated at the reference axis values. When resample_rate is None, this buffer will be filled with the axis from incoming _reference_ messages. When resample_rate is not None (i.e., prescribed float resample_rate), this buffer is filled with a synthetic axis that is generated from the incoming signal messages.

last_ref_ax_val: float | None = None#

The last value of the reference axis that was returned. This helps us to know what the _next_ returned value should be, and to avoid returning the same value. TODO: We can eliminate this variable if we maintain “by convention” that the reference axis always has 1 value at its start that we exclude from the resampling.

last_write_time: float = -inf#

Wall clock time of the last write to the signal buffer. This is used to determine if we need to extrapolate the reference axis if we have not received an update within max_chunk_delay.

class ResampleProcessor(*args, **kwargs)[source]#

Bases: BaseStatefulProcessor[ResampleSettings, AxisArray, AxisArray, ResampleState]

push_reference(message)[source]#
Parameters:

message (AxisArray)

Return type:

None

send(message)[source]#

Alias for __call__.

Parameters:

message (AxisArray)

Return type:

AxisArray

class ResampleUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseConsumerUnit[ResampleSettings, AxisArray, ResampleProcessor]

Parameters:

settings (Settings | None)

SETTINGS#

alias of ResampleSettings

INPUT_REFERENCE = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
OUTPUT_SIGNAL = OutputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>](self.num_buffers=32, self.force_tcp=False)#
async on_reference(message)[source]#
Parameters:

message (AxisArray)

async gen_resampled()[source]#