ezmsg.tools.shmem.shmem#

It is possible to move data from ezmsg to non-ezmsg processes using shared memory. This module contains the ezmsg half of that communication. The non-ezmsg half is in the .shmem_mirror module. The same shmem_name must be passed to both the ShMemCircBuff and the EZShmMirror objects!

The ShMemCircBuff class is a sink node that receives AxisArray messages and writes them to a shared memory buffer.

Upon initialization, or upon receiving updated settings with a different shmem_name value, the node creates a shared memory object located at {shorten_shmem_name(shmem_name)} to hold the metadata initialized with placeholder values (e.g., srate = -1). Additionally, the node has a convenience handle to the metadata via self.STATE.meta_struct = ShmemArrMeta.from_buffer(shmem.buf).

Upon receiving a data message, its metadata is checked, and if it does not match the shmem metadata

(which will always be true for the first message) then the node first updates the metadata, then it (re-)creates a shared memory buffer to hold the data, located at shorten_shmem_name(f”{shmem_name}/buffer{buffer_generation}”), where buffer_generation is an integer that tracks how many times the buffer has been reset. This corresponds to the same integer stored in the metadata.

The other half must monitor the metadata shared memory to see if it changes, and if it does then it must recreate the data shared memory buffer reader at the new location.

Functions

shorten_shmem_name(long_name)[source]#

Convert a potentially long shared memory name to a shorter, fixed-length name.

Parameters:

long_name (str) – The original, potentially long shared memory name

Returns:

A shortened, deterministic name suitable for shared memory

Return type:

str

to_bytes(data)[source]#
Parameters:

data (Any)

Return type:

bytes

Classes

class ShMemCircBuff(*args, settings=None, **kwargs)[source]#

Bases: Unit

Parameters:

settings (Settings | None)

SETTINGS#

alias of ShMemCircBuffSettings

STATE#

alias of ShMemCircBuffState

INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
INPUT_SETTINGS = InputStream:unlocated[<class 'ezmsg.tools.shmem.shmem.ShMemCircBuffSettings'>]()#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

on_settings(msg)[source]#
Parameters:

msg (ShMemCircBuffSettings)

Return type:

None

async shutdown()[source]#

Runs when the Unit terminates. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

async check_continue()[source]#
async on_message(msg)[source]#
Parameters:

msg (AxisArray)

class ShMemCircBuffSettings(shmem_name: str | None, buf_dur: float, conn: Connection | None = None, axis: str = 'time')[source]#

Bases: Settings

Parameters:
shmem_name: str | None#
buf_dur: float#
conn: Connection | None = None#
axis: str = 'time'#
__init__(shmem_name, buf_dur, conn=None, axis='time')#
Parameters:
Return type:

None

class ShMemCircBuffState[source]#

Bases: State

meta_shmem: SharedMemory | None = None#
meta_struct: ShmemArrMeta | None = None#
buffer_shmem: SharedMemory | None = None#
buffer_arr: ndarray[tuple[int, ...], dtype[_ScalarType_co]] | None = None#
meta_hash: int = -1#
class ShmemArrMeta[source]#

Bases: Structure

Structure containing the metadata describing the separate shmem buffer.

The SharedMemory object is expected to have allocated enough memory for this header + the memory required for the buffer described by this header. i.e., meta_size = ctypes.sizeof(ShmemArrMeta) item_size = np.dtype(dtype).itemsize shmem_size = int(meta_size + np.prod(shape) * item_size) shmem = SharedMemory(name=”…”, create=True, size=shmem_size) meta = ShmemArrMeta.from_buffer(shmem) meta.dtype = dtype meta.ndim = len(shape) meta.shape[:meta.ndim] = shape circ_buff = np.ndarray(shape, dtype=dtype, buffer=shmem.buf[meta_size:])

property key: str#
buffer_generation#

Structure/Union member

bvalid#

Structure/Union member

dtype#

Structure/Union member

ndim#

Structure/Union member

shape#

Structure/Union member

srate#

Structure/Union member

wrap_counter#

Structure/Union member

write_index#

Structure/Union member

to_bytes(data)[source]#
Parameters:

data (Any)

Return type:

bytes

shorten_shmem_name(long_name)[source]#

Convert a potentially long shared memory name to a shorter, fixed-length name.

Parameters:

long_name (str) – The original, potentially long shared memory name

Returns:

A shortened, deterministic name suitable for shared memory

Return type:

str

class ShmemArrMeta[source]#

Bases: Structure

Structure containing the metadata describing the separate shmem buffer.

The SharedMemory object is expected to have allocated enough memory for this header + the memory required for the buffer described by this header. i.e., meta_size = ctypes.sizeof(ShmemArrMeta) item_size = np.dtype(dtype).itemsize shmem_size = int(meta_size + np.prod(shape) * item_size) shmem = SharedMemory(name=”…”, create=True, size=shmem_size) meta = ShmemArrMeta.from_buffer(shmem) meta.dtype = dtype meta.ndim = len(shape) meta.shape[:meta.ndim] = shape circ_buff = np.ndarray(shape, dtype=dtype, buffer=shmem.buf[meta_size:])

property key: str#
buffer_generation#

Structure/Union member

bvalid#

Structure/Union member

dtype#

Structure/Union member

ndim#

Structure/Union member

shape#

Structure/Union member

srate#

Structure/Union member

wrap_counter#

Structure/Union member

write_index#

Structure/Union member

class ShMemCircBuffSettings(shmem_name: str | None, buf_dur: float, conn: Connection | None = None, axis: str = 'time')[source]#

Bases: Settings

Parameters:
shmem_name: str | None#
buf_dur: float#
conn: Connection | None = None#
axis: str = 'time'#
__init__(shmem_name, buf_dur, conn=None, axis='time')#
Parameters:
Return type:

None

class ShMemCircBuffState[source]#

Bases: State

meta_shmem: SharedMemory | None = None#
meta_struct: ShmemArrMeta | None = None#
buffer_shmem: SharedMemory | None = None#
buffer_arr: ndarray[tuple[int, ...], dtype[_ScalarType_co]] | None = None#
meta_hash: int = -1#
class ShMemCircBuff(*args, settings=None, **kwargs)[source]#

Bases: Unit

Parameters:

settings (Settings | None)

SETTINGS#

alias of ShMemCircBuffSettings

STATE#

alias of ShMemCircBuffState

INPUT_SIGNAL = InputStream:unlocated[<class 'ezmsg.util.messages.axisarray.AxisArray'>]()#
INPUT_SETTINGS = InputStream:unlocated[<class 'ezmsg.tools.shmem.shmem.ShMemCircBuffSettings'>]()#
async initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

on_settings(msg)[source]#
Parameters:

msg (ShMemCircBuffSettings)

Return type:

None

async shutdown()[source]#

Runs when the Unit terminates. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

async check_continue()[source]#
async on_message(msg)[source]#
Parameters:

msg (AxisArray)