Source code for ezmsg.baseproc.counter

"""Counter generator for sample counting and timing."""

import numpy as np
from ezmsg.util.messages.axisarray import AxisArray, LinearAxis, replace

from .clockdriven import (
    BaseClockDrivenProducer,
    ClockDrivenSettings,
    ClockDrivenState,
)
from .protocols import processor_state
from .units import BaseClockDrivenUnit


[docs] class CounterSettings(ClockDrivenSettings): """Settings for :obj:`Counter` and :obj:`CounterTransformer`.""" mod: int | None = None """If set, counter values rollover at this modulus."""
[docs] @processor_state class CounterTransformerState(ClockDrivenState): """State for :obj:`CounterTransformer`.""" template: AxisArray | None = None
[docs] class CounterTransformer(BaseClockDrivenProducer[CounterSettings, CounterTransformerState]): """ Transforms clock ticks (LinearAxis) into AxisArray counter values. Each clock tick produces a block of counter values. The block size is either fixed (n_time setting) or derived from the clock's gain (fs * gain). """ def _reset_state(self, time_axis: LinearAxis) -> None: """Reset state - initialize template for counter output.""" self._state.template = AxisArray( data=np.array([], dtype=int), dims=["time"], axes={"time": time_axis}, key="counter", ) def _produce(self, n_samples: int, time_axis: LinearAxis) -> AxisArray: """Generate counter values for this chunk.""" # Generate counter data (using pre-increment counter value) block_samp = np.arange(self._state.counter, self._state.counter + n_samples) if self.settings.mod is not None: block_samp = block_samp % self.settings.mod return replace( self._state.template, data=block_samp, axes={"time": time_axis}, )
[docs] class Counter(BaseClockDrivenUnit[CounterSettings, CounterTransformer]): """ Transforms clock ticks into monotonically increasing counter values as AxisArray. Receives timing from INPUT_CLOCK (LinearAxis from Clock) and outputs AxisArray. """ SETTINGS = CounterSettings