Source code for ezmsg.tools.shmem.shmem_mirror

"""
It is possible to move data from ezmsg to non-ezmsg processes using shared memory. This module contains the non-ezmsg
half of that communication. The ezmsg half is found in .shmem.
The same `shmem_name` must be passed to both the ShMemCircBuff and the EZShmMirror objects!
"""

import copy
import time
import typing
from multiprocessing.shared_memory import SharedMemory

import numpy as np
import numpy.typing as npt

from .shmem import ShmemArrMeta, ShMemCircBuffState, shorten_shmem_name

CONNECT_RETRY_INTERVAL = 0.5


[docs] class EZShmMirror: """ An object that has a local (in-client-process) representation of the shared memory from another process' .shmem.ShMemCircBuff Unit. There are 2 pieces of shared memory: the metadata and the data buffer. The ezmsg node is responsible for creating both pieces. Here we only connect to them. We cannot know if the shared memory exists before we try to connect to it, so we must try the connection -- sometimes repeatedly while handling connection errors. """
[docs] def __init__(self, shmem_name: typing.Optional[str] = None): self._mirror_state: ShMemCircBuffState = ShMemCircBuffState() self._shmem_name: typing.Optional[str] = None self._change_callback: typing.Optional[typing.Callable] = None self._last_meta: typing.Optional[ShmemArrMeta] = None self._read_index = 0 # Used by auto_view self._last_connect_try = -np.inf # If shmem_name is None then this will simply not connect to anything. self.connect(shmem_name)
def __del__(self): self.disconnect()
[docs] def disconnect(self): self._cleanup_buffer() self._cleanup_meta() self._shmem_name = None
@property def meta(self) -> typing.Optional[ShmemArrMeta]: if self._mirror_state.meta_struct is None: return None return copy.deepcopy(self._mirror_state.meta_struct) @property def buffer(self) -> typing.Optional[npt.NDArray]: return self._mirror_state.buffer_arr @property def write_index(self) -> typing.Optional[int]: return self._mirror_state.meta_struct.write_index @property def connected(self) -> bool: return self.buffer is not None def _cleanup_meta(self): if self._mirror_state.meta_shmem is not None: del self._mirror_state.meta_struct self._mirror_state.meta_struct = None if self._mirror_state.meta_shmem is not None: # Note: Uncommenting the following does not eliminate the resource_tracker warnings. try: self._mirror_state.meta_shmem.close() except Exception as e: print(f"Error closing meta: {e}") del self._mirror_state.meta_shmem self._mirror_state.meta_shmem = None self._read_index = 0 def _cleanup_buffer(self): if self._mirror_state.buffer_arr is not None: del self._mirror_state.buffer_arr self._mirror_state.buffer_arr = None if self._mirror_state.buffer_shmem is not None: # Note: Uncommenting the following does not eliminate the resource_tracker warnings. try: self._mirror_state.buffer_shmem.close() except Exception as e: print(f"Error closing buffer: {e}") del self._mirror_state.buffer_shmem self._mirror_state.buffer_shmem = None self._read_index = 0
[docs] def register_change_callback(self, callback: typing.Callable) -> None: self._change_callback = callback
[docs] def unregister_change_callback(self) -> None: self._change_callback = None
def _connect_meta(self): # Attempt to connect to the meta shmem try: short_name = shorten_shmem_name(self._shmem_name) self._mirror_state.meta_shmem = SharedMemory(short_name, create=False) self._mirror_state.meta_struct = ShmemArrMeta.from_buffer( self._mirror_state.meta_shmem.buf ) except FileNotFoundError: self._mirror_state.meta_struct = None self._mirror_state.meta_shmem = None def _reset_buffer(self) -> bool: if self._mirror_state.buffer_shmem is not None: # We might enter here if input data changed shape or dtype, # meaning we are reconnecting to the same _name_ but different layout. self._cleanup_buffer() if ( self._mirror_state.meta_struct is None or not self._mirror_state.meta_struct.bvalid ): # Cannot connect to buffer without valid meta. return False try: buff_name = ( self._shmem_name + "/buffer" + str(self._mirror_state.meta_struct.buffer_generation) ) short_name = shorten_shmem_name(buff_name) self._mirror_state.buffer_shmem = SharedMemory(short_name, create=False) self._mirror_state.buffer_arr = np.ndarray( self._mirror_state.meta_struct.shape[ : self._mirror_state.meta_struct.ndim ], dtype=np.dtype(self._mirror_state.meta_struct.dtype), buffer=self._mirror_state.buffer_shmem.buf[:], ) self._last_meta = self.meta # Copy if self._change_callback is not None: self._change_callback() return True except FileNotFoundError: self._mirror_state.buffer_arr = None self._mirror_state.buffer_shmem = None except TypeError: # buffer is too small for requested array self._mirror_state.buffer_arr = None self._mirror_state.buffer_shmem = None print("DEBUG!") return False
[docs] def connect(self, name: str) -> None: if self._shmem_name is None or self._shmem_name != name: # Clear connection self._cleanup_buffer() self._cleanup_meta() self._shmem_name = name if self._shmem_name is None: # Provided name was None. Do not connect. return if (time.time() - self._last_connect_try) <= CONNECT_RETRY_INTERVAL: # Delay retrying the connection to avoid spamming the system. return if self._mirror_state.meta_struct is None: # The only way we can enter this `connect` method and not enter this logical block # is if the provided `name` was the same as the last name. self._connect_meta() self._last_connect_try = time.time()
[docs] def auto_view( self, n: typing.Optional[int] = None ) -> typing.Tuple[npt.NDArray, bool]: if self._mirror_state.meta_struct is None: self.connect(self._shmem_name) if ( self._mirror_state.meta_struct is None or not self._mirror_state.meta_struct.bvalid ): # Still not connected # or we are connected but the buffer data is invalid. return np.array([[]]), False b_connected = True # Determine if we need to reset the buffer if ( self._last_meta is None or self._mirror_state.meta_struct.buffer_generation != self._last_meta.buffer_generation or self._mirror_state.buffer_arr is None ): b_connected = self._reset_buffer() if not b_connected: # We STILL aren't connected. return np.array([[]]), False # -- From here, we should know we have a good connection to a valid buffer -- # wrapped_since_last_read = ( self._mirror_state.meta_struct.wrap_counter - self._last_meta.wrap_counter ) b_overflow = wrapped_since_last_read > 1 or ( wrapped_since_last_read == 1 and self._mirror_state.meta_struct.write_index >= self._read_index ) if b_overflow: # In case of overflow, start reading from the oldest available data self._read_index = ( self._mirror_state.meta_struct.write_index + 1 ) % self._mirror_state.meta_struct.shape[0] self._last_meta.wrap_counter = self._mirror_state.meta_struct.wrap_counter # Calculate how many samples are available n_available = 0 if self._mirror_state.buffer_arr is not None: if self._mirror_state.meta_struct.write_index >= self._read_index: n_available = ( self._mirror_state.meta_struct.write_index - self._read_index ) else: n_available = ( self._mirror_state.meta_struct.shape[0] - self._read_index + self._mirror_state.meta_struct.write_index ) if n_available == 0 or (n is not None and n_available < n): # Not enough samples available. # Return a null-slice of the buffer. This provides correct dimensions. return self._mirror_state.buffer_arr[:0], b_overflow # We have enough samples. if n is None: n = n_available if (self._read_index + n) <= self._mirror_state.meta_struct.shape[0]: # Return a contiguous chunk t_slice = np.s_[max(0, self._read_index) : self._read_index + n] result = self._mirror_state.buffer_arr[t_slice, :] else: # Split read into two chunks n_after_wrap = n - ( self._mirror_state.meta_struct.shape[0] - self._read_index ) result = np.concatenate( ( self._mirror_state.buffer_arr[self._read_index :], self._mirror_state.buffer_arr[:n_after_wrap], ), axis=0, ) self._read_index = ( self._read_index + n ) % self._mirror_state.meta_struct.shape[0] self._last_meta.wrap_counter = self._mirror_state.meta_struct.wrap_counter return result, b_overflow