Source code for ezmsg.baseproc.clockdriven

"""Clock-driven producer base classes for generating data synchronized to clock ticks."""

import typing
from abc import abstractmethod

import ezmsg.core as ez
from ezmsg.util.messages.axisarray import AxisArray, LinearAxis

from .protocols import StateType, processor_state
from .stateful import BaseStatefulProcessor


[docs] class ClockDrivenSettings(ez.Settings): """ Base settings for clock-driven producers. Subclass this to add your own settings while inheriting fs and n_time. Example:: class SinGeneratorSettings(ClockDrivenSettings): freq: float = 1.0 amp: float = 1.0 """ fs: float """Output sampling rate in Hz.""" n_time: int | None = None """ Samples per block. - If specified: fixed chunk size (clock gain is ignored for determining chunk size) - If None: derived from clock gain (fs * clock.gain), with fractional sample tracking """
# Type variable for settings that extend ClockDrivenSettings ClockDrivenSettingsType = typing.TypeVar("ClockDrivenSettingsType", bound=ClockDrivenSettings)
[docs] @processor_state class ClockDrivenState: """ Internal state for clock-driven producers. Tracks sample counting and fractional sample accumulation. Subclasses should extend this if they need additional state. """ counter: int = 0 """Current sample counter (total samples produced).""" fractional_samples: float = 0.0 """Accumulated fractional samples for variable chunk mode."""
[docs] class BaseClockDrivenProducer( BaseStatefulProcessor[ClockDrivenSettingsType, AxisArray.LinearAxis, AxisArray, StateType], typing.Generic[ClockDrivenSettingsType, StateType], ): """ Base class for clock-driven data producers. Accepts clock ticks (LinearAxis) as input and produces AxisArray output. Handles all the timing/counter logic internally, so subclasses only need to implement the data generation logic. This eliminates the need for the Clock → Counter → Generator pattern by combining the Counter functionality into the generator base class. Subclasses must implement: - ``_reset_state(time_axis)``: Initialize any state needed for production - ``_produce(n_samples, time_axis)``: Generate the actual output data Example:: @processor_state class SinState(ClockDrivenState): ang_freq: float = 0.0 class SinProducer(BaseClockDrivenProducer[SinSettings, SinState]): def _reset_state(self, time_axis: AxisArray.TimeAxis) -> None: self._state.ang_freq = 2 * np.pi * self.settings.fs def _produce(self, n_samples: int, time_axis: AxisArray.TimeAxis) -> AxisArray: t = (np.arange(n_samples) + self._state.counter) * time_axis.gain data = np.sin(self._state.ang_freq * t) return AxisArray(data=data, dims=["time"], axes={"time": time_axis}) """ def _hash_message(self, message: AxisArray.LinearAxis) -> int: # Return constant hash - state should not reset based on clock rate changes. # The producer maintains continuity regardless of clock rate changes. return 0 def _compute_samples_and_offset(self, clock_tick: AxisArray.LinearAxis) -> tuple[int, float] | None: """ Compute number of samples and time offset from a clock tick. Returns: Tuple of (n_samples, offset) or None if no samples to produce yet. Raises: ValueError: If clock gain is 0 (AFAP mode) and n_time is not specified. """ if self.settings.n_time is not None: # Fixed chunk size mode n_samples = self.settings.n_time if clock_tick.gain == 0.0: # AFAP mode - synthetic offset based on counter offset = self._state.counter / self.settings.fs else: # Use clock's timestamp offset = clock_tick.offset else: # Variable chunk size mode - derive from clock gain if clock_tick.gain == 0.0: raise ValueError("Cannot use clock with gain=0 (AFAP) without specifying n_time") # Calculate samples including fractional accumulation samples_float = self.settings.fs * clock_tick.gain + self._state.fractional_samples n_samples = int(samples_float + 1e-9) self._state.fractional_samples = samples_float - n_samples if n_samples == 0: return None offset = clock_tick.offset return n_samples, offset @abstractmethod def _reset_state(self, time_axis: LinearAxis) -> None: """ Reset/initialize state for production. Called once before the first call to _produce, or when state needs resetting. Use this to pre-compute values, create templates, etc. Args: time_axis: TimeAxis with the output sampling rate (fs) and initial offset. """ ... @abstractmethod def _produce(self, n_samples: int, time_axis: LinearAxis) -> AxisArray: """ Generate output data for this chunk. Args: n_samples: Number of samples to generate. time_axis: TimeAxis with correct offset and gain (1/fs) for this chunk. Returns: AxisArray containing the generated data. The time axis should use the provided time_axis or one derived from it. """ ... def _process(self, clock_tick: LinearAxis) -> AxisArray | None: """ Process a clock tick and produce output. Handles all the counter/timing logic internally, then calls _produce. """ result = self._compute_samples_and_offset(clock_tick) if result is None: return None n_samples, offset = result time_axis = AxisArray.TimeAxis(fs=self.settings.fs, offset=offset) # Call subclass production method output = self._produce(n_samples, time_axis) # Update counter self._state.counter += n_samples return output