Source code for ezmsg.baseproc.clock

"""Clock generator for timing control."""

import asyncio
import math
import time
from dataclasses import field

import ezmsg.core as ez
from ezmsg.util.messages.axisarray import AxisArray

from .protocols import processor_state
from .stateful import BaseStatefulProducer
from .units import BaseProducerUnit


[docs] class ClockSettings(ez.Settings): """Settings for :obj:`ClockProducer`.""" dispatch_rate: float = math.inf """ Dispatch rate in Hz. - Finite value (e.g., 100.0): Dispatch 100 times per second - math.inf: Dispatch as fast as possible (no sleep) """
[docs] @processor_state class ClockState: """State for :obj:`ClockProducer`.""" t_0: float = field(default_factory=time.monotonic) """Start time (monotonic).""" n_dispatch: int = 0 """Number of dispatches since reset."""
[docs] class ClockProducer(BaseStatefulProducer[ClockSettings, AxisArray.LinearAxis, ClockState]): """ Produces clock ticks at a specified rate. Each tick outputs a :obj:`AxisArray.LinearAxis` containing: - ``gain``: 1/dispatch_rate (seconds per tick), or 0.0 if dispatch_rate is infinite - ``offset``: Wall clock timestamp (time.monotonic) This output type allows downstream components (like Counter) to know both the timing of the tick and the nominal dispatch rate. """ def _reset_state(self) -> None: """Reset internal state.""" self._state.t_0 = time.monotonic() self._state.n_dispatch = 0 def _make_output(self, timestamp: float) -> AxisArray.LinearAxis: """Create LinearAxis output with gain and offset.""" if math.isinf(self.settings.dispatch_rate): gain = 0.0 else: gain = 1.0 / self.settings.dispatch_rate return AxisArray.LinearAxis(gain=gain, offset=timestamp) def __call__(self) -> AxisArray.LinearAxis: """Synchronous clock production.""" if self._hash == -1: self._reset_state() self._hash = 0 now = time.monotonic() if math.isfinite(self.settings.dispatch_rate): target_time = self.state.t_0 + (self.state.n_dispatch + 1) / self.settings.dispatch_rate if target_time > now: time.sleep(target_time - now) else: target_time = now self.state.n_dispatch += 1 return self._make_output(target_time) async def _produce(self) -> AxisArray.LinearAxis: """Generate next clock tick.""" now = time.monotonic() if math.isfinite(self.settings.dispatch_rate): target_time = self.state.t_0 + (self.state.n_dispatch + 1) / self.settings.dispatch_rate if target_time > now: await asyncio.sleep(target_time - now) else: target_time = now self.state.n_dispatch += 1 return self._make_output(target_time)
[docs] class Clock( BaseProducerUnit[ ClockSettings, AxisArray.LinearAxis, ClockProducer, ] ): """ Clock unit that produces ticks at a specified rate. Output is a :obj:`AxisArray.LinearAxis` with: - ``gain``: 1/dispatch_rate (seconds per tick) - ``offset``: Wall clock timestamp """ SETTINGS = ClockSettings