Source code for ezmsg.lsl.util
import threading
import time
import typing
import numpy as np
import numpy.typing as npt
import pylsl
[docs]
def collect_timestamp_pairs(
npairs: int = 4,
) -> typing.Tuple[np.ndarray, np.ndarray]:
xs = []
ys = []
for _ in range(npairs):
if _ % 2:
y, x = time.time(), pylsl.local_clock()
else:
x, y = pylsl.local_clock(), time.time()
xs.append(x)
ys.append(y)
time.sleep(0.001)
return np.array(xs), np.array(ys)
[docs]
class ClockSync:
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
[docs]
def __init__(self, alpha: float = 0.1, min_interval: float = 0.1, run_thread: bool = True):
if not hasattr(self, "_initialized"):
self._alpha = alpha
self._interval = min_interval
self._initialized = True
self._last_time = time.time() - 1e9
self._running = False
self._thread: typing.Optional[threading.Thread] = None
# Do first burst so we have a real offset even before the thread starts.
xs, ys = collect_timestamp_pairs(100)
self._offset: float = np.mean(ys - xs)
if run_thread:
self.start()
[docs]
def run_once(self, n: int = 4, force: bool = False):
if force or (time.time() - self._last_time) > self._interval:
xs, ys = collect_timestamp_pairs(n)
offset = np.mean(ys - xs)
self._offset = (1 - self._alpha) * self._offset + self._alpha * offset
self._last_time = time.time()
def _run(self):
while self._running:
time.sleep(self._interval)
self.run_once(4, True)
[docs]
def start(self):
self._running = True
self._thread = threading.Thread(target=self._run)
self._thread.daemon = True
self._thread.start()
[docs]
def stop(self):
self._running = False
@property
def offset(self) -> float:
with self._lock:
return self._offset
@typing.overload
def lsl2system(self, lsl_timestamp: float) -> float: ...
@typing.overload
def lsl2system(self, lsl_timestamp: npt.NDArray[float]) -> npt.NDArray[float]: ...
[docs]
def lsl2system(self, lsl_timestamp):
# offset = system - lsl --> system = lsl + offset
with self._lock:
return lsl_timestamp + self._offset
@typing.overload
def system2lsl(self, system_timestamp: float) -> float: ...
@typing.overload
def system2lsl(self, system_timestamp: npt.NDArray[float]) -> npt.NDArray[float]: ...
[docs]
def system2lsl(self, system_timestamp):
# offset = system - lsl --> lsl = system - offset
with self._lock:
return system_timestamp - self._offset