ezmsg.tools.shmem.shmem#
It is possible to move data from ezmsg to non-ezmsg processes using shared memory. This module contains the ezmsg half of that communication. The non-ezmsg half is in the .shmem_mirror module. The same shmem_name must be passed to both the ShMemCircBuff and the EZShmMirror objects!
The ShMemCircBuff class is a sink node that receives AxisArray messages and writes them to a shared memory buffer.
Upon initialization, or upon receiving updated settings with a different shmem_name value, the node creates a shared memory object located at {shorten_shmem_name(shmem_name)} to hold the metadata initialized with placeholder values (e.g., srate = -1). Additionally, the node has a convenience handle to the metadata via self.STATE.meta_struct = ShmemArrMeta.from_buffer(shmem.buf).
- Upon receiving a data message, its metadata is checked, and if it does not match the shmem metadata
(which will always be true for the first message) then the node first updates the metadata, then it (re-)creates a shared memory buffer to hold the data, located at shorten_shmem_name(f”{shmem_name}/buffer{buffer_generation}”), where buffer_generation is an integer that tracks how many times the buffer has been reset. This corresponds to the same integer stored in the metadata.
The other half must monitor the metadata shared memory to see if it changes, and if it does then it must recreate the data shared memory buffer reader at the new location.
Functions
- shorten_shmem_name(long_name)[source]#
Convert a potentially long shared memory name to a shorter, fixed-length name.
Classes
- class ShMemCircBuff(*args, settings=None, **kwargs)[source]#
Bases:
Unit- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ShMemCircBuffSettings
- STATE#
alias of
ShMemCircBuffState
- INPUT_SIGNAL = InputStream:unlocated[AxisArray]()#
- INPUT_SETTINGS = InputStream:unlocated[ShMemCircBuffSettings]()#
- async initialize()[source]#
Runs when the Unit is instantiated.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should initialize your unit’s state and prepare for message processing.
- Return type:
None
- on_settings(msg)[source]#
- Parameters:
msg (ShMemCircBuffSettings)
- Return type:
None
- async shutdown()[source]#
Runs when the Unit terminates.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should clean up resources and perform any necessary shutdown procedures.
- Return type:
None
- class ShMemCircBuffSettings(shmem_name: str | None, buf_dur: float, conn: Connection | None = None, axis: str = 'time')[source]#
Bases:
Settings- Parameters:
shmem_name (str | None)
buf_dur (float)
conn (Connection | None)
axis (str)
- conn: Connection | None = None#
- __init__(shmem_name, buf_dur, conn=None, axis='time')#
- Parameters:
shmem_name (str | None)
buf_dur (float)
conn (Connection | None)
axis (str)
- Return type:
None
- class ShMemCircBuffState[source]#
Bases:
State- meta_shmem: SharedMemory | None = None#
- meta_struct: ShmemArrMeta | None = None#
- buffer_shmem: SharedMemory | None = None#
- class ShmemArrMeta[source]#
Bases:
StructureStructure containing the metadata describing the separate shmem buffer.
The SharedMemory object is expected to have allocated enough memory for this header + the memory required for the buffer described by this header. i.e., meta_size = ctypes.sizeof(ShmemArrMeta) item_size = np.dtype(dtype).itemsize shmem_size = int(meta_size + np.prod(shape) * item_size) shmem = SharedMemory(name=”…”, create=True, size=shmem_size) meta = ShmemArrMeta.from_buffer(shmem) meta.dtype = dtype meta.ndim = len(shape) meta.shape[:meta.ndim] = shape circ_buff = np.ndarray(shape, dtype=dtype, buffer=shmem.buf[meta_size:])
- buffer_generation#
Structure/Union member
- bvalid#
Structure/Union member
- dtype#
Structure/Union member
- ndim#
Structure/Union member
- shape#
Structure/Union member
- srate#
Structure/Union member
- wrap_counter#
Structure/Union member
- write_index#
Structure/Union member
- shorten_shmem_name(long_name)[source]#
Convert a potentially long shared memory name to a shorter, fixed-length name.
- class ShmemArrMeta[source]#
Bases:
StructureStructure containing the metadata describing the separate shmem buffer.
The SharedMemory object is expected to have allocated enough memory for this header + the memory required for the buffer described by this header. i.e., meta_size = ctypes.sizeof(ShmemArrMeta) item_size = np.dtype(dtype).itemsize shmem_size = int(meta_size + np.prod(shape) * item_size) shmem = SharedMemory(name=”…”, create=True, size=shmem_size) meta = ShmemArrMeta.from_buffer(shmem) meta.dtype = dtype meta.ndim = len(shape) meta.shape[:meta.ndim] = shape circ_buff = np.ndarray(shape, dtype=dtype, buffer=shmem.buf[meta_size:])
- buffer_generation#
Structure/Union member
- bvalid#
Structure/Union member
- dtype#
Structure/Union member
- ndim#
Structure/Union member
- shape#
Structure/Union member
- srate#
Structure/Union member
- wrap_counter#
Structure/Union member
- write_index#
Structure/Union member
- class ShMemCircBuffSettings(shmem_name: str | None, buf_dur: float, conn: Connection | None = None, axis: str = 'time')[source]#
Bases:
Settings- Parameters:
shmem_name (str | None)
buf_dur (float)
conn (Connection | None)
axis (str)
- conn: Connection | None = None#
- __init__(shmem_name, buf_dur, conn=None, axis='time')#
- Parameters:
shmem_name (str | None)
buf_dur (float)
conn (Connection | None)
axis (str)
- Return type:
None
- class ShMemCircBuffState[source]#
Bases:
State- meta_shmem: SharedMemory | None = None#
- meta_struct: ShmemArrMeta | None = None#
- buffer_shmem: SharedMemory | None = None#
- class ShMemCircBuff(*args, settings=None, **kwargs)[source]#
Bases:
Unit- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ShMemCircBuffSettings
- STATE#
alias of
ShMemCircBuffState
- INPUT_SIGNAL = InputStream:unlocated[AxisArray]()#
- INPUT_SETTINGS = InputStream:unlocated[ShMemCircBuffSettings]()#
- async initialize()[source]#
Runs when the Unit is instantiated.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should initialize your unit’s state and prepare for message processing.
- Return type:
None
- on_settings(msg)[source]#
- Parameters:
msg (ShMemCircBuffSettings)
- Return type:
None
- async shutdown()[source]#
Runs when the Unit terminates.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should clean up resources and perform any necessary shutdown procedures.
- Return type:
None