ezmsg.sigproc.util.axisarray_buffer#

Classes

class HybridAxisArrayBuffer(duration, axis='time', **kwargs)[source]#

Bases: object

A buffer that intelligently handles ezmsg.util.messages.AxisArray objects.

This buffer defers its own initialization until the first message arrives, allowing it to automatically configure its size, shape, dtype, and array backend (e.g., NumPy, CuPy) based on the message content and a desired buffer duration.

Parameters:
  • duration (float) – The desired duration of the buffer in seconds.

  • axis (str) – The name of the axis to buffer along.

  • **kwargs – Additional keyword arguments to pass to the underlying HybridBuffer (e.g., update_strategy, threshold, overflow_strategy, max_size).

__init__(duration, axis='time', **kwargs)[source]#
Parameters:
available()[source]#

The total number of unread samples currently available in the buffer.

Return type:

int

is_empty()[source]#
Return type:

bool

is_full()[source]#
Return type:

bool

property axis_first_value: float | None#

The axis-value (timestamp, typically) of the first sample in the buffer.

property axis_final_value: float | None#

The axis-value (timestamp, typically) of the last sample in the buffer.

write(msg)[source]#

Adds an AxisArray message to the buffer, initializing on the first call.

Parameters:

msg (AxisArray)

Return type:

None

peek(n_samples=None)[source]#

Retrieves the oldest unread data as a new AxisArray without advancing the read head.

Parameters:

n_samples (int | None)

Return type:

AxisArray | None

peek_axis(n_samples=None)[source]#

Retrieves the axis data without advancing the read head.

Parameters:

n_samples (int | None)

Return type:

LinearAxis | CoordinateAxis | None

seek(n_samples)[source]#

Advances the read pointer by n_samples.

Parameters:

n_samples (int)

Return type:

int

read(n_samples=None)[source]#

Retrieves the oldest unread data as a new AxisArray and advances the read head.

Parameters:

n_samples (int | None)

Return type:

AxisArray | None

prune(n_samples)[source]#

Discards all but the last n_samples from the buffer.

Parameters:

n_samples (int)

Return type:

int

property axis_gain: float | None#

The gain of the target axis, which is the time step between samples. This is typically the sampling rate (e.g., 1 / fs).

axis_searchsorted(values, side='left')[source]#

Find the indices into which the given values would be inserted into the target axis data to maintain order.

Parameters:
Return type:

int | Array

class HybridAxisBuffer(duration, **kwargs)[source]#

Bases: object

A buffer that intelligently handles ezmsg.util.messages.AxisArray _axes_ objects.

LinearAxis is maintained internally by tracking its offset, gain, and the number of samples that have passed through. CoordinateAxis has its data values maintained in a HybridBuffer.

Parameters:
  • duration (float) – The desired duration of the buffer in seconds. This is non-limiting when managing a LinearAxis.

  • **kwargs – Additional keyword arguments to pass to the underlying HybridBuffer (e.g., update_strategy, threshold, overflow_strategy, max_size).

__init__(duration, **kwargs)[source]#
Parameters:

duration (float)

property capacity: int#

The maximum number of samples that can be stored in the buffer.

available()[source]#
Return type:

int

is_empty()[source]#
Return type:

bool

is_full()[source]#
Return type:

bool

write(axis, n_samples)[source]#
Parameters:
  • axis (LinearAxis | CoordinateAxis)

  • n_samples (int)

Return type:

None

peek(n_samples=None)[source]#
Parameters:

n_samples (int | None)

Return type:

LinearAxis | CoordinateAxis

seek(n_samples)[source]#
Parameters:

n_samples (int)

Return type:

int

prune(n_samples)[source]#

Discards all but the last n_samples from the buffer.

Parameters:

n_samples (int)

Return type:

int

property final_value: float | None#

The axis-value (timestamp, typically) of the last sample in the buffer. This does not advance the read head.

property first_value: float | None#

The axis-value (timestamp, typically) of the first sample in the buffer. This does not advance the read head.

property gain: float | None#
searchsorted(values, side='left')[source]#
Parameters:
Return type:

int | Array

class HybridAxisBuffer(duration, **kwargs)[source]#

Bases: object

A buffer that intelligently handles ezmsg.util.messages.AxisArray _axes_ objects.

LinearAxis is maintained internally by tracking its offset, gain, and the number of samples that have passed through. CoordinateAxis has its data values maintained in a HybridBuffer.

Parameters:
  • duration (float) – The desired duration of the buffer in seconds. This is non-limiting when managing a LinearAxis.

  • **kwargs – Additional keyword arguments to pass to the underlying HybridBuffer (e.g., update_strategy, threshold, overflow_strategy, max_size).

__init__(duration, **kwargs)[source]#
Parameters:

duration (float)

property capacity: int#

The maximum number of samples that can be stored in the buffer.

available()[source]#
Return type:

int

is_empty()[source]#
Return type:

bool

is_full()[source]#
Return type:

bool

write(axis, n_samples)[source]#
Parameters:
  • axis (LinearAxis | CoordinateAxis)

  • n_samples (int)

Return type:

None

peek(n_samples=None)[source]#
Parameters:

n_samples (int | None)

Return type:

LinearAxis | CoordinateAxis

seek(n_samples)[source]#
Parameters:

n_samples (int)

Return type:

int

prune(n_samples)[source]#

Discards all but the last n_samples from the buffer.

Parameters:

n_samples (int)

Return type:

int

property final_value: float | None#

The axis-value (timestamp, typically) of the last sample in the buffer. This does not advance the read head.

property first_value: float | None#

The axis-value (timestamp, typically) of the first sample in the buffer. This does not advance the read head.

property gain: float | None#
searchsorted(values, side='left')[source]#
Parameters:
Return type:

int | Array

class HybridAxisArrayBuffer(duration, axis='time', **kwargs)[source]#

Bases: object

A buffer that intelligently handles ezmsg.util.messages.AxisArray objects.

This buffer defers its own initialization until the first message arrives, allowing it to automatically configure its size, shape, dtype, and array backend (e.g., NumPy, CuPy) based on the message content and a desired buffer duration.

Parameters:
  • duration (float) – The desired duration of the buffer in seconds.

  • axis (str) – The name of the axis to buffer along.

  • **kwargs – Additional keyword arguments to pass to the underlying HybridBuffer (e.g., update_strategy, threshold, overflow_strategy, max_size).

__init__(duration, axis='time', **kwargs)[source]#
Parameters:
available()[source]#

The total number of unread samples currently available in the buffer.

Return type:

int

is_empty()[source]#
Return type:

bool

is_full()[source]#
Return type:

bool

property axis_first_value: float | None#

The axis-value (timestamp, typically) of the first sample in the buffer.

property axis_final_value: float | None#

The axis-value (timestamp, typically) of the last sample in the buffer.

write(msg)[source]#

Adds an AxisArray message to the buffer, initializing on the first call.

Parameters:

msg (AxisArray)

Return type:

None

peek(n_samples=None)[source]#

Retrieves the oldest unread data as a new AxisArray without advancing the read head.

Parameters:

n_samples (int | None)

Return type:

AxisArray | None

peek_axis(n_samples=None)[source]#

Retrieves the axis data without advancing the read head.

Parameters:

n_samples (int | None)

Return type:

LinearAxis | CoordinateAxis | None

seek(n_samples)[source]#

Advances the read pointer by n_samples.

Parameters:

n_samples (int)

Return type:

int

read(n_samples=None)[source]#

Retrieves the oldest unread data as a new AxisArray and advances the read head.

Parameters:

n_samples (int | None)

Return type:

AxisArray | None

prune(n_samples)[source]#

Discards all but the last n_samples from the buffer.

Parameters:

n_samples (int)

Return type:

int

property axis_gain: float | None#

The gain of the target axis, which is the time step between samples. This is typically the sampling rate (e.g., 1 / fs).

axis_searchsorted(values, side='left')[source]#

Find the indices into which the given values would be inserted into the target axis data to maintain order.

Parameters:
Return type:

int | Array