Source code for ezmsg.blackrock.util

import threading
import typing

import numpy.typing as npt


[docs] class ClockSync: _instance = None _lock = threading.Lock() def __new__(cls, *args, **kwargs): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance
[docs] def __init__(self, alpha: float = 0.1, sysfreq: float = 1.0): if not hasattr(self, "_initialized"): self._alpha = alpha self._initialized = True self._offset: typing.Optional[float] = None self._last_pair: typing.Optional[typing.Tuple[int, float]] = None self._sysfreq = sysfreq
[docs] def add_pair(self, nsp_time, sys_time): # The protocol monitor packet arrived in the system 560 usec after # it left the Gemini device. We can get the Gemini clock in our # system time by subtracting this number: sys_time = sys_time - 0.00056 offset = sys_time - nsp_time / self.sysfreq if self.offset is None or ( self._last_pair is not None and (nsp_time < self._last_pair[0] or sys_time < self._last_pair[1]) ): self._offset = offset self._offset = (1 - self._alpha) * self._offset + self._alpha * offset self._last_pair = (nsp_time, sys_time)
@property def offset(self) -> float: with self._lock: return self._offset @property def sysfreq(self) -> float: with self._lock: return self._sysfreq @typing.overload def nsp2system(self, nsp_timestamp: int) -> float: ... @typing.overload def nsp2system(self, nsp_timestamp: npt.NDArray[int]) -> npt.NDArray[float]: ...
[docs] def nsp2system(self, nsp_timestamp): # offset = system - nsp / sysfreq --> system = nsp / sysfreq + offset with self._lock: return nsp_timestamp / self._sysfreq + self._offset
@typing.overload def system2nsp(self, system_timestamp: float) -> int: ... @typing.overload def system2nsp(self, system_timestamp: npt.NDArray[int]) -> npt.NDArray[float]: ...
[docs] def system2nsp(self, system_timestamp): # offset = system - nsp / sysfreq --> nsp = (system - offset) * sysfreq with self._lock: return int((system_timestamp - self._offset) * self._sysfreq)