Source code for ezmsg.lsl.util

import asyncio
import threading
import time
import typing

import numpy as np
import numpy.typing as npt
import pylsl


def _sample_clock_pair(i: int) -> typing.Tuple[float, float]:
    """Sample one LSL/system clock pair, alternating order to reduce bias."""
    if i % 2:
        y, x = time.monotonic(), pylsl.local_clock()
    else:
        x, y = pylsl.local_clock(), time.monotonic()
    return x, y


[docs] def collect_timestamp_pairs( npairs: int = 4, ) -> typing.Tuple[np.ndarray, np.ndarray]: xs, ys = [], [] for i in range(npairs): x, y = _sample_clock_pair(i) xs.append(x) ys.append(y) time.sleep(0.001) # Usually sleeps more than 1 msec. return np.array(xs), np.array(ys)
[docs] async def acollect_timestamp_pairs( npairs: int = 4, ) -> typing.Tuple[np.ndarray, np.ndarray]: xs, ys = [], [] for i in range(npairs): x, y = _sample_clock_pair(i) xs.append(x) ys.append(y) await asyncio.sleep(0) return np.array(xs), np.array(ys)
[docs] class ClockSync: _instance = None _lock = threading.Lock() def __new__(cls, *args, **kwargs): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance
[docs] def __init__(self, alpha: float = 0.1, min_interval: float = 0.1, run_thread: bool = True): if not hasattr(self, "_initialized"): self._alpha = alpha self._interval = min_interval self._initialized = True self._last_time = time.monotonic() - 1e9 self._running = False self._thread: typing.Optional[threading.Thread] = None self._task: typing.Optional[asyncio.Task] = None # Do first burst so we have a real offset even before the thread starts. xs, ys = collect_timestamp_pairs(100) self._offset: float = np.mean(ys - xs) if run_thread: self.start()
def _update_offset(self, xs: np.ndarray, ys: np.ndarray) -> None: offset = np.mean(ys - xs) self._offset = (1 - self._alpha) * self._offset + self._alpha * offset self._last_time = time.monotonic() def _should_update(self, force: bool = False) -> bool: return force or (time.monotonic() - self._last_time) > self._interval
[docs] def run_once(self, n: int = 4, force: bool = False): if self._should_update(force): self._update_offset(*collect_timestamp_pairs(n))
[docs] async def arun_once(self, n: int = 4, force: bool = False): if self._should_update(force): self._update_offset(*await acollect_timestamp_pairs(n))
def _run(self): while self._running: time.sleep(self._interval) self.run_once(4, True) async def _arun(self): try: while True: await asyncio.sleep(self._interval) await self.arun_once(4, True) except asyncio.CancelledError: pass
[docs] def start(self): if self._thread is not None and self._thread.is_alive(): return self._running = True self._thread = threading.Thread(target=self._run) self._thread.daemon = True self._thread.start()
[docs] async def astart(self): if self._task is None or self._task.done(): self._task = asyncio.create_task(self._arun())
[docs] async def astop(self): if self._task is not None: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None
[docs] def stop(self): self._running = False if self._task is not None: self._task.cancel() self._task = None
@property def offset(self) -> float: with self._lock: return self._offset @typing.overload def lsl2system(self, lsl_timestamp: float) -> float: ... @typing.overload def lsl2system(self, lsl_timestamp: npt.NDArray[float]) -> npt.NDArray[float]: ...
[docs] def lsl2system(self, lsl_timestamp): # offset = system - lsl --> system = lsl + offset with self._lock: return lsl_timestamp + self._offset
@typing.overload def system2lsl(self, system_timestamp: float) -> float: ... @typing.overload def system2lsl(self, system_timestamp: npt.NDArray[float]) -> npt.NDArray[float]: ...
[docs] def system2lsl(self, system_timestamp): # offset = system - lsl --> lsl = system - offset with self._lock: return system_timestamp - self._offset