ezmsg.sigproc.util.buffer#
Classes
- class HybridBuffer(array_namespace, capacity, other_shape, dtype, update_strategy='on_demand', threshold=0, overflow_strategy='grow', max_size=1073741824, warn_once=True)[source]#
Bases:
objectA stateful, FIFO buffer that combines a deque for fast appends with a contiguous circular buffer for efficient, advancing reads.
This buffer is designed to be agnostic to the array library used (e.g., NumPy, CuPy, PyTorch) via the Python Array API standard.
- Parameters:
array_namespace (Any) – The array library (e.g., numpy, cupy) that conforms to the Array API.
capacity (int) – The current maximum number of samples to store in the circular buffer.
other_shape (tuple[int, ...]) – A tuple defining the shape of the non-sample dimensions.
dtype (Any) – The data type of the samples, belonging to the provided array_namespace.
update_strategy (Literal['immediate', 'threshold', 'on_demand']) – The strategy for synchronizing the deque to the circular buffer (flushing).
threshold (int) – The number of samples to accumulate in the deque before flushing. Ignored if update_strategy is “immediate” or “on_demand”.
overflow_strategy (Literal['grow', 'raise', 'drop', 'warn-overwrite']) – The strategy for handling overflow when the buffer is full. Options are “grow”, “raise”, “drop”, or “warn-overwrite”. If “grow” (default), the buffer will increase its capacity to accommodate new samples up to max_size. If “raise”, an error will be raised when the buffer is full. If “drop”, the overflowing samples will be ignored. If “warn-overwrite”, a warning will be logged then the overflowing samples will overwrite previously-unread samples.
max_size (int) – The maximum size of the buffer in bytes. If the buffer exceeds this size, it will raise an error.
warn_once (bool) – If True, will only warn once on overflow when using “warn-overwrite” strategy.
- __init__(array_namespace, capacity, other_shape, dtype, update_strategy='on_demand', threshold=0, overflow_strategy='grow', max_size=1073741824, warn_once=True)[source]#
- available()[source]#
The total number of unread samples available (in buffer and deque).
- Return type:
- is_empty()[source]#
Returns True if there are no unread samples in the buffer or deque.
- Return type:
- is_full()[source]#
Returns True if the buffer is full and cannot _flush_ more samples without overwriting.
- Return type:
- tell()[source]#
Returns the number of samples that have been read and are still in the buffer.
- Return type:
- write(block)[source]#
Appends a new block (an array of samples) to the internal deque.
- Parameters:
block (Array)
- read(n_samples=None)[source]#
Retrieves the oldest unread samples from the buffer with padding and advances the read head.
- Parameters:
n_samples (int | None) – The number of samples to retrieve. If None, returns all unread samples.
- Returns:
An array containing the requested samples. This may be a view or a copy. Note: The result may have more samples than the buffer.capacity as it may include samples from the deque in the output.
- Return type:
Array
- peek(n_samples=None, out=None)[source]#
Retrieves the oldest unread samples from the buffer with padding without advancing the read head.
- Parameters:
n_samples (int | None) – The number of samples to retrieve. If None, returns all unread samples.
out (Array | None) – Optionally, a destination array to store the samples. If provided, must have shape (n_samples, *other_shape) where other_shape matches the shape of the samples in the buffer. If out is provided then the data will always be copied into it, even if they are contiguous in the buffer.
- Returns:
An array containing the requested samples. This may be a view or a copy. Note: The result may have more samples than the buffer.capacity as it may include samples from the deque in the output.
- Return type:
Array
- peek_at(idx, allow_flush=False)[source]#
Retrieves a specific sample from the buffer without advancing the read head.
- Parameters:
- Returns:
An array containing the requested sample. This may be a view or a copy.
- Return type:
Array
- peek_last()[source]#
Retrieves the last sample in the buffer without advancing the read head.
- Return type:
Array
- class HybridBuffer(array_namespace, capacity, other_shape, dtype, update_strategy='on_demand', threshold=0, overflow_strategy='grow', max_size=1073741824, warn_once=True)[source]#
Bases:
objectA stateful, FIFO buffer that combines a deque for fast appends with a contiguous circular buffer for efficient, advancing reads.
This buffer is designed to be agnostic to the array library used (e.g., NumPy, CuPy, PyTorch) via the Python Array API standard.
- Parameters:
array_namespace (Any) – The array library (e.g., numpy, cupy) that conforms to the Array API.
capacity (int) – The current maximum number of samples to store in the circular buffer.
other_shape (tuple[int, ...]) – A tuple defining the shape of the non-sample dimensions.
dtype (Any) – The data type of the samples, belonging to the provided array_namespace.
update_strategy (Literal['immediate', 'threshold', 'on_demand']) – The strategy for synchronizing the deque to the circular buffer (flushing).
threshold (int) – The number of samples to accumulate in the deque before flushing. Ignored if update_strategy is “immediate” or “on_demand”.
overflow_strategy (Literal['grow', 'raise', 'drop', 'warn-overwrite']) – The strategy for handling overflow when the buffer is full. Options are “grow”, “raise”, “drop”, or “warn-overwrite”. If “grow” (default), the buffer will increase its capacity to accommodate new samples up to max_size. If “raise”, an error will be raised when the buffer is full. If “drop”, the overflowing samples will be ignored. If “warn-overwrite”, a warning will be logged then the overflowing samples will overwrite previously-unread samples.
max_size (int) – The maximum size of the buffer in bytes. If the buffer exceeds this size, it will raise an error.
warn_once (bool) – If True, will only warn once on overflow when using “warn-overwrite” strategy.
- __init__(array_namespace, capacity, other_shape, dtype, update_strategy='on_demand', threshold=0, overflow_strategy='grow', max_size=1073741824, warn_once=True)[source]#
- available()[source]#
The total number of unread samples available (in buffer and deque).
- Return type:
- is_empty()[source]#
Returns True if there are no unread samples in the buffer or deque.
- Return type:
- is_full()[source]#
Returns True if the buffer is full and cannot _flush_ more samples without overwriting.
- Return type:
- tell()[source]#
Returns the number of samples that have been read and are still in the buffer.
- Return type:
- write(block)[source]#
Appends a new block (an array of samples) to the internal deque.
- Parameters:
block (Array)
- read(n_samples=None)[source]#
Retrieves the oldest unread samples from the buffer with padding and advances the read head.
- Parameters:
n_samples (int | None) – The number of samples to retrieve. If None, returns all unread samples.
- Returns:
An array containing the requested samples. This may be a view or a copy. Note: The result may have more samples than the buffer.capacity as it may include samples from the deque in the output.
- Return type:
Array
- peek(n_samples=None, out=None)[source]#
Retrieves the oldest unread samples from the buffer with padding without advancing the read head.
- Parameters:
n_samples (int | None) – The number of samples to retrieve. If None, returns all unread samples.
out (Array | None) – Optionally, a destination array to store the samples. If provided, must have shape (n_samples, *other_shape) where other_shape matches the shape of the samples in the buffer. If out is provided then the data will always be copied into it, even if they are contiguous in the buffer.
- Returns:
An array containing the requested samples. This may be a view or a copy. Note: The result may have more samples than the buffer.capacity as it may include samples from the deque in the output.
- Return type:
Array
- peek_at(idx, allow_flush=False)[source]#
Retrieves a specific sample from the buffer without advancing the read head.
- Parameters:
- Returns:
An array containing the requested sample. This may be a view or a copy.
- Return type:
Array
- peek_last()[source]#
Retrieves the last sample in the buffer without advancing the read head.
- Return type:
Array