ezmsg.sigproc.util.buffer#
A stateful, FIFO buffer that combines a deque for fast appends with a contiguous circular buffer for efficient, advancing reads.
Classes
- class HybridBuffer(array_namespace, capacity, other_shape, dtype, update_strategy='on_demand', threshold=0, overflow_strategy='grow', max_size=1073741824, warn_once=True)[source]#
Bases:
objectA stateful, FIFO buffer that combines a deque for fast appends with a contiguous circular buffer for efficient, advancing reads.
This buffer is designed to be agnostic to the array library used (e.g., NumPy, CuPy, PyTorch) via the Python Array API standard.
- Parameters:
array_namespace (Any) – The array library (e.g., numpy, cupy) that conforms to the Array API.
capacity (int) – The current maximum number of samples to store in the circular buffer.
other_shape (tuple[int, ...]) – A tuple defining the shape of the non-sample dimensions.
dtype (Any) – The data type of the samples, belonging to the provided array_namespace.
update_strategy (Literal['immediate', 'threshold', 'on_demand']) – The strategy for synchronizing the deque to the circular buffer (flushing).
threshold (int) – The number of samples to accumulate in the deque before flushing. Ignored if update_strategy is “immediate” or “on_demand”.
overflow_strategy (Literal['grow', 'raise', 'drop', 'warn-overwrite']) – The strategy for handling overflow when the buffer is full. Options are “grow”, “raise”, “drop”, or “warn-overwrite”. If “grow” (default), the buffer will increase its capacity to accommodate new samples up to max_size. If “raise”, an error will be raised when the buffer is full. If “drop”, the overflowing samples will be ignored. If “warn-overwrite”, a warning will be logged then the overflowing samples will overwrite previously-unread samples.
max_size (int) – The maximum size of the buffer in bytes. If the buffer exceeds this size, it will raise an error.
warn_once (bool) – If True, will only warn once on overflow when using “warn-overwrite” strategy.
- __init__(array_namespace, capacity, other_shape, dtype, update_strategy='on_demand', threshold=0, overflow_strategy='grow', max_size=1073741824, warn_once=True)[source]#
- available()[source]#
The total number of unread samples available (in buffer and deque).
- Return type:
- is_empty()[source]#
Returns True if there are no unread samples in the buffer or deque.
- Return type:
- is_full()[source]#
Returns True if the buffer is full and cannot _flush_ more samples without overwriting.
- Return type:
- tell()[source]#
Returns the number of samples that have been read and are still in the buffer.
- Return type:
- write(block)[source]#
Appends a new block (an array of samples) to the internal deque.
- Parameters:
block (Array)
- read(n_samples=None)[source]#
Retrieves the oldest unread samples from the buffer with padding and advances the read head.
- Parameters:
n_samples (int | None) – The number of samples to retrieve. If None, returns all unread samples.
- Returns:
An array containing the requested samples. This may be a view or a copy. Note: The result may have more samples than the buffer.capacity as it may include samples from the deque in the output.
- Return type:
Array
- peek(n_samples=None, out=None)[source]#
Retrieves the oldest unread samples from the buffer with padding without advancing the read head.
- Parameters:
n_samples (int | None) – The number of samples to retrieve. If None, returns all unread samples.
out (Array | None) – Optionally, a destination array to store the samples. If provided, must have shape
(n_samples, *other_shape)where other_shape matches the shape of the samples in the buffer. Ifoutis provided then the data will always be copied into it, even if they are contiguous in the buffer.
- Returns:
An array containing the requested samples. This may be a view or a copy. Note: The result may have more samples than the buffer.capacity as it may include samples from the deque in the output.
- Return type:
Array
- peek_at(idx, allow_flush=False)[source]#
Retrieves a specific sample from the buffer without advancing the read head.
- Parameters:
- Returns:
An array containing the requested sample. This may be a view or a copy.
- Return type:
Array
- peek_last()[source]#
Retrieves the last sample in the buffer without advancing the read head.
- Return type:
Array