ezmsg.sigproc.util.buffer#

Classes

class HybridBuffer(array_namespace, capacity, other_shape, dtype, update_strategy='on_demand', threshold=0, overflow_strategy='grow', max_size=1073741824, warn_once=True)[source]#

Bases: object

A stateful, FIFO buffer that combines a deque for fast appends with a contiguous circular buffer for efficient, advancing reads.

This buffer is designed to be agnostic to the array library used (e.g., NumPy, CuPy, PyTorch) via the Python Array API standard.

Parameters:
  • array_namespace (Any) – The array library (e.g., numpy, cupy) that conforms to the Array API.

  • capacity (int) – The current maximum number of samples to store in the circular buffer.

  • other_shape (tuple[int, ...]) – A tuple defining the shape of the non-sample dimensions.

  • dtype (Any) – The data type of the samples, belonging to the provided array_namespace.

  • update_strategy (Literal['immediate', 'threshold', 'on_demand']) – The strategy for synchronizing the deque to the circular buffer (flushing).

  • threshold (int) – The number of samples to accumulate in the deque before flushing. Ignored if update_strategy is “immediate” or “on_demand”.

  • overflow_strategy (Literal['grow', 'raise', 'drop', 'warn-overwrite']) – The strategy for handling overflow when the buffer is full. Options are “grow”, “raise”, “drop”, or “warn-overwrite”. If “grow” (default), the buffer will increase its capacity to accommodate new samples up to max_size. If “raise”, an error will be raised when the buffer is full. If “drop”, the overflowing samples will be ignored. If “warn-overwrite”, a warning will be logged then the overflowing samples will overwrite previously-unread samples.

  • max_size (int) – The maximum size of the buffer in bytes. If the buffer exceeds this size, it will raise an error.

  • warn_once (bool) – If True, will only warn once on overflow when using “warn-overwrite” strategy.

__init__(array_namespace, capacity, other_shape, dtype, update_strategy='on_demand', threshold=0, overflow_strategy='grow', max_size=1073741824, warn_once=True)[source]#
Parameters:
  • array_namespace (Any)

  • capacity (int)

  • other_shape (tuple[int, ...])

  • dtype (Any)

  • update_strategy (Literal['immediate', 'threshold', 'on_demand'])

  • threshold (int)

  • overflow_strategy (Literal['grow', 'raise', 'drop', 'warn-overwrite'])

  • max_size (int)

  • warn_once (bool)

property capacity: int#

The maximum number of samples that can be stored in the buffer.

available()[source]#

The total number of unread samples available (in buffer and deque).

Return type:

int

is_empty()[source]#

Returns True if there are no unread samples in the buffer or deque.

Return type:

bool

is_full()[source]#

Returns True if the buffer is full and cannot _flush_ more samples without overwriting.

Return type:

bool

tell()[source]#

Returns the number of samples that have been read and are still in the buffer.

Return type:

int

write(block)[source]#

Appends a new block (an array of samples) to the internal deque.

Parameters:

block (Array)

read(n_samples=None)[source]#

Retrieves the oldest unread samples from the buffer with padding and advances the read head.

Parameters:

n_samples (int | None) – The number of samples to retrieve. If None, returns all unread samples.

Returns:

An array containing the requested samples. This may be a view or a copy. Note: The result may have more samples than the buffer.capacity as it may include samples from the deque in the output.

Return type:

Array

peek(n_samples=None, out=None)[source]#

Retrieves the oldest unread samples from the buffer with padding without advancing the read head.

Parameters:
  • n_samples (int | None) – The number of samples to retrieve. If None, returns all unread samples.

  • out (Array | None) – Optionally, a destination array to store the samples. If provided, must have shape (n_samples, *other_shape) where other_shape matches the shape of the samples in the buffer. If out is provided then the data will always be copied into it, even if they are contiguous in the buffer.

Returns:

An array containing the requested samples. This may be a view or a copy. Note: The result may have more samples than the buffer.capacity as it may include samples from the deque in the output.

Return type:

Array

peek_at(idx, allow_flush=False)[source]#

Retrieves a specific sample from the buffer without advancing the read head.

Parameters:
  • idx (int) – The index of the sample to retrieve, relative to the read head.

  • allow_flush (bool) – If True, allows flushing the deque to the buffer if the requested sample is not in the buffer. If False and the sample is in the deque, the sample will be retrieved from the deque (slow!).

Returns:

An array containing the requested sample. This may be a view or a copy.

Return type:

Array

peek_last()[source]#

Retrieves the last sample in the buffer without advancing the read head.

Return type:

Array

seek(n_samples)[source]#

Advances the read head by n_samples.

Parameters:
  • n_samples (int) – The number of samples to seek.

  • negative. (Will seek forward if positive or backward if)

Returns:

The number of samples actually skipped.

Return type:

int

flush()[source]#

Transfers all data from the deque to the circular buffer. Note: This may overwrite data depending on the overflow strategy,

which will invalidate previous state variables.

class HybridBuffer(array_namespace, capacity, other_shape, dtype, update_strategy='on_demand', threshold=0, overflow_strategy='grow', max_size=1073741824, warn_once=True)[source]#

Bases: object

A stateful, FIFO buffer that combines a deque for fast appends with a contiguous circular buffer for efficient, advancing reads.

This buffer is designed to be agnostic to the array library used (e.g., NumPy, CuPy, PyTorch) via the Python Array API standard.

Parameters:
  • array_namespace (Any) – The array library (e.g., numpy, cupy) that conforms to the Array API.

  • capacity (int) – The current maximum number of samples to store in the circular buffer.

  • other_shape (tuple[int, ...]) – A tuple defining the shape of the non-sample dimensions.

  • dtype (Any) – The data type of the samples, belonging to the provided array_namespace.

  • update_strategy (Literal['immediate', 'threshold', 'on_demand']) – The strategy for synchronizing the deque to the circular buffer (flushing).

  • threshold (int) – The number of samples to accumulate in the deque before flushing. Ignored if update_strategy is “immediate” or “on_demand”.

  • overflow_strategy (Literal['grow', 'raise', 'drop', 'warn-overwrite']) – The strategy for handling overflow when the buffer is full. Options are “grow”, “raise”, “drop”, or “warn-overwrite”. If “grow” (default), the buffer will increase its capacity to accommodate new samples up to max_size. If “raise”, an error will be raised when the buffer is full. If “drop”, the overflowing samples will be ignored. If “warn-overwrite”, a warning will be logged then the overflowing samples will overwrite previously-unread samples.

  • max_size (int) – The maximum size of the buffer in bytes. If the buffer exceeds this size, it will raise an error.

  • warn_once (bool) – If True, will only warn once on overflow when using “warn-overwrite” strategy.

__init__(array_namespace, capacity, other_shape, dtype, update_strategy='on_demand', threshold=0, overflow_strategy='grow', max_size=1073741824, warn_once=True)[source]#
Parameters:
  • array_namespace (Any)

  • capacity (int)

  • other_shape (tuple[int, ...])

  • dtype (Any)

  • update_strategy (Literal['immediate', 'threshold', 'on_demand'])

  • threshold (int)

  • overflow_strategy (Literal['grow', 'raise', 'drop', 'warn-overwrite'])

  • max_size (int)

  • warn_once (bool)

property capacity: int#

The maximum number of samples that can be stored in the buffer.

available()[source]#

The total number of unread samples available (in buffer and deque).

Return type:

int

is_empty()[source]#

Returns True if there are no unread samples in the buffer or deque.

Return type:

bool

is_full()[source]#

Returns True if the buffer is full and cannot _flush_ more samples without overwriting.

Return type:

bool

tell()[source]#

Returns the number of samples that have been read and are still in the buffer.

Return type:

int

write(block)[source]#

Appends a new block (an array of samples) to the internal deque.

Parameters:

block (Array)

read(n_samples=None)[source]#

Retrieves the oldest unread samples from the buffer with padding and advances the read head.

Parameters:

n_samples (int | None) – The number of samples to retrieve. If None, returns all unread samples.

Returns:

An array containing the requested samples. This may be a view or a copy. Note: The result may have more samples than the buffer.capacity as it may include samples from the deque in the output.

Return type:

Array

peek(n_samples=None, out=None)[source]#

Retrieves the oldest unread samples from the buffer with padding without advancing the read head.

Parameters:
  • n_samples (int | None) – The number of samples to retrieve. If None, returns all unread samples.

  • out (Array | None) – Optionally, a destination array to store the samples. If provided, must have shape (n_samples, *other_shape) where other_shape matches the shape of the samples in the buffer. If out is provided then the data will always be copied into it, even if they are contiguous in the buffer.

Returns:

An array containing the requested samples. This may be a view or a copy. Note: The result may have more samples than the buffer.capacity as it may include samples from the deque in the output.

Return type:

Array

peek_at(idx, allow_flush=False)[source]#

Retrieves a specific sample from the buffer without advancing the read head.

Parameters:
  • idx (int) – The index of the sample to retrieve, relative to the read head.

  • allow_flush (bool) – If True, allows flushing the deque to the buffer if the requested sample is not in the buffer. If False and the sample is in the deque, the sample will be retrieved from the deque (slow!).

Returns:

An array containing the requested sample. This may be a view or a copy.

Return type:

Array

peek_last()[source]#

Retrieves the last sample in the buffer without advancing the read head.

Return type:

Array

seek(n_samples)[source]#

Advances the read head by n_samples.

Parameters:
  • n_samples (int) – The number of samples to seek.

  • negative. (Will seek forward if positive or backward if)

Returns:

The number of samples actually skipped.

Return type:

int

flush()[source]#

Transfers all data from the deque to the circular buffer. Note: This may overwrite data depending on the overflow strategy,

which will invalidate previous state variables.