ezmsg.tools.shmem.shmem#
It is possible to move data from ezmsg to non-ezmsg processes using shared memory. This module contains the ezmsg half of that communication. The non-ezmsg half is in the .shmem_mirror module. The same shmem_name must be passed to both the ShMemCircBuff and the EZShmMirror objects!
The ShMemCircBuff class is a sink node that receives AxisArray messages and writes them to a shared memory buffer.
Upon initialization, or upon receiving updated settings with a different shmem_name value, the node creates a shared memory object located at {shorten_shmem_name(shmem_name)} to hold the metadata initialized with placeholder values (e.g., srate = -1). Additionally, the node has a convenience handle to the metadata via self.STATE.meta_struct = ShmemArrMeta.from_buffer(shmem.buf).
- Upon receiving a data message, its metadata is checked, and if it does not match the shmem metadata
(which will always be true for the first message) then the node first updates the metadata, then it (re-)creates a shared memory buffer to hold the data, located at shorten_shmem_name(f”{shmem_name}/buffer{buffer_generation}”), where buffer_generation is an integer that tracks how many times the buffer has been reset. This corresponds to the same integer stored in the metadata.
The other half must monitor the metadata shared memory to see if it changes, and if it does then it must recreate the data shared memory buffer reader at the new location.
Functions
- shorten_shmem_name(long_name)[source]#
Convert a potentially long shared memory name to a shorter, fixed-length name.
Classes
- class ShMemCircBuff(*args, settings=None, **kwargs)[source]#
Bases:
Unit- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ShMemCircBuffSettings
- STATE#
alias of
ShMemCircBuffState
- INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- INPUT_SETTINGS = InputStream:unlocated[<class 'ezmsg.tools.shmem.shmem.ShMemCircBuffSettings'>]()#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- on_settings(msg)[source]#
- Parameters:
msg (ShMemCircBuffSettings)
- Return type:
None
- class ShMemCircBuffSettings(shmem_name: str | None, buf_dur: float, conn: Connection | None = None, axis: str = 'time')[source]#
Bases:
Settings- Parameters:
shmem_name (str | None)
buf_dur (float)
conn (Connection | None)
axis (str)
- conn: Connection | None = None#
- __init__(shmem_name, buf_dur, conn=None, axis='time')#
- Parameters:
shmem_name (str | None)
buf_dur (float)
conn (Connection | None)
axis (str)
- Return type:
None
- class ShMemCircBuffState[source]#
Bases:
State- meta_shmem: SharedMemory | None = None#
- meta_struct: ShmemArrMeta | None = None#
- buffer_shmem: SharedMemory | None = None#
- class ShmemArrMeta[source]#
Bases:
StructureStructure containing the metadata describing the separate shmem buffer.
The SharedMemory object is expected to have allocated enough memory for this header + the memory required for the buffer described by this header. i.e., meta_size = ctypes.sizeof(ShmemArrMeta) item_size = np.dtype(dtype).itemsize shmem_size = int(meta_size + np.prod(shape) * item_size) shmem = SharedMemory(name=”…”, create=True, size=shmem_size) meta = ShmemArrMeta.from_buffer(shmem) meta.dtype = dtype meta.ndim = len(shape) meta.shape[:meta.ndim] = shape circ_buff = np.ndarray(shape, dtype=dtype, buffer=shmem.buf[meta_size:])
- buffer_generation#
Structure/Union member
- bvalid#
Structure/Union member
- dtype#
Structure/Union member
- ndim#
Structure/Union member
- shape#
Structure/Union member
- srate#
Structure/Union member
- wrap_counter#
Structure/Union member
- write_index#
Structure/Union member
- shorten_shmem_name(long_name)[source]#
Convert a potentially long shared memory name to a shorter, fixed-length name.
- class ShmemArrMeta[source]#
Bases:
StructureStructure containing the metadata describing the separate shmem buffer.
The SharedMemory object is expected to have allocated enough memory for this header + the memory required for the buffer described by this header. i.e., meta_size = ctypes.sizeof(ShmemArrMeta) item_size = np.dtype(dtype).itemsize shmem_size = int(meta_size + np.prod(shape) * item_size) shmem = SharedMemory(name=”…”, create=True, size=shmem_size) meta = ShmemArrMeta.from_buffer(shmem) meta.dtype = dtype meta.ndim = len(shape) meta.shape[:meta.ndim] = shape circ_buff = np.ndarray(shape, dtype=dtype, buffer=shmem.buf[meta_size:])
- buffer_generation#
Structure/Union member
- bvalid#
Structure/Union member
- dtype#
Structure/Union member
- ndim#
Structure/Union member
- shape#
Structure/Union member
- srate#
Structure/Union member
- wrap_counter#
Structure/Union member
- write_index#
Structure/Union member
- class ShMemCircBuffSettings(shmem_name: str | None, buf_dur: float, conn: Connection | None = None, axis: str = 'time')[source]#
Bases:
Settings- Parameters:
shmem_name (str | None)
buf_dur (float)
conn (Connection | None)
axis (str)
- conn: Connection | None = None#
- __init__(shmem_name, buf_dur, conn=None, axis='time')#
- Parameters:
shmem_name (str | None)
buf_dur (float)
conn (Connection | None)
axis (str)
- Return type:
None
- class ShMemCircBuffState[source]#
Bases:
State- meta_shmem: SharedMemory | None = None#
- meta_struct: ShmemArrMeta | None = None#
- buffer_shmem: SharedMemory | None = None#
- class ShMemCircBuff(*args, settings=None, **kwargs)[source]#
Bases:
Unit- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ShMemCircBuffSettings
- STATE#
alias of
ShMemCircBuffState
- INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
- INPUT_SETTINGS = InputStream:unlocated[<class 'ezmsg.tools.shmem.shmem.ShMemCircBuffSettings'>]()#
- async initialize()[source]#
Runs when the
Unitis instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run asasyncfunctions by simply adding theasynckeyword when overriding.- Return type:
None
- on_settings(msg)[source]#
- Parameters:
msg (ShMemCircBuffSettings)
- Return type:
None