Source code for ezmsg.tools.proc

import asyncio
import multiprocessing
import multiprocessing.connection
import typing

import ezmsg.core as ez

from .shmem.shmem import ShMemCircBuff, ShMemCircBuffSettings


BUF_DUR = 3.0


[docs] class EzMonitorProcess(multiprocessing.Process):
[docs] def __init__( self, settings: ShMemCircBuffSettings, topic: str, address: typing.Optional[typing.Tuple[str, int]] = None, ) -> None: super().__init__() self._settings = settings self._topic = topic self._graph_address = address
[docs] def run(self) -> None: comps = {"SHMEM": ShMemCircBuff(self._settings)} conns = ((self._topic, comps["SHMEM"].INPUT_SIGNAL),) ez.run(components=comps, connections=conns, graph_address=self._graph_address)
[docs] class EZProcManager: """ Manages the subprocess that runs an ezmsg pipeline comprising a single ShMemCircBuff unit connected to a pipeline. The unit must be parameterized with the correct shared memory name. We do not actually interact with the shared memory in this class. See .mirror.EzmsgShmMirror. """
[docs] def __init__( self, graph_ip: str, graph_port: int, buf_dur: float = BUF_DUR ) -> None: self._graph_addr: typing.Tuple[str, int] = (graph_ip, graph_port) self._buf_dur = buf_dur self._proc = None self._node_path: typing.Optional[str] = None self._remote_conn, self._conn = multiprocessing.Pipe()
@property def node_path(self) -> str: return self._node_path @property def conn(self) -> typing.Optional[multiprocessing.connection.Connection]: return self._conn
[docs] def reset(self, node_path: typing.Optional[str]) -> None: self._cleanup_subprocess() self._node_path = node_path self._init_subprocess()
[docs] def cleanup(self): self._cleanup_subprocess()
def _cleanup_subprocess(self) -> None: if self._proc is not None: self._conn.send("quit") # Close process self._proc.join() self._proc = None # TODO: Somehow closing the proc doesn't always clear the VISBUFF connections. loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete( ez.graphserver.GraphService(address=self._graph_addr).disconnect( self._node_path, "VISBUFF/INPUT_SIGNAL" ) ) def _init_subprocess(self, axis: str = "time"): unit_settings = ShMemCircBuffSettings( shmem_name="buff_" + self._node_path, buf_dur=self._buf_dur, conn=self._remote_conn, axis=axis, ) self._proc = EzMonitorProcess( unit_settings, self._node_path, address=self._graph_addr ) self._proc.start()
# if self._rend_conn.poll(): msg = self._rend_conn.recv()