How to implement a clock-driven producer?#

Clock-driven producers generate data synchronized to clock ticks. They are useful for signal generators, simulators, and other components that need to produce timed data streams.

The BaseClockDrivenProducer base class simplifies this pattern by handling all the timing and sample counting logic internally. You only need to implement the data generation.

When to use BaseClockDrivenProducer#

Use BaseClockDrivenProducer when you need to:

  • Generate synthetic signals (sine waves, noise, test patterns)

  • Simulate sensor data at a specific sample rate

  • Produce timed data streams driven by a Clock

This base class eliminates the need for the Clock Counter Generator pattern by combining the counter functionality into the generator.

Basic Structure#

A clock-driven producer consists of three parts:

  1. Settings - Extends ClockDrivenSettings (which provides fs and n_time)

  2. State - Extends ClockDrivenState (which provides counter and fractional_samples)

  3. Producer - Extends BaseClockDrivenProducer and implements _reset_state and _produce

Example: Sine Wave Generator#

Here’s a complete example of a sine wave generator:

import numpy as np
from ezmsg.util.messages.axisarray import AxisArray, LinearAxis

from ezmsg.baseproc import (
    BaseClockDrivenProducer,
    BaseClockDrivenUnit,
    ClockDrivenSettings,
    ClockDrivenState,
    processor_state,
)


class SinGeneratorSettings(ClockDrivenSettings):
    """
    Settings for the sine wave generator.

    Inherits from ClockDrivenSettings which provides:
    - fs: Output sampling rate in Hz
    - n_time: Samples per block (optional, derived from clock if None)
    """

    freq: float = 1.0
    """Frequency of the sine wave in Hz."""

    amp: float = 1.0
    """Amplitude of the sine wave."""

    phase: float = 0.0
    """Initial phase in radians."""


@processor_state
class SinGeneratorState(ClockDrivenState):
    """
    State for the sine wave generator.

    Inherits from ClockDrivenState which provides:
    - counter: Current sample counter (total samples produced)
    - fractional_samples: For accumulating sub-sample timing
    """

    ang_freq: float = 0.0
    """Pre-computed angular frequency (2 * pi * freq)."""


class SinGenerator(
    BaseClockDrivenProducer[SinGeneratorSettings, SinGeneratorState]
):
    """
    Generates sine wave data synchronized to clock ticks.
    """

    def _reset_state(self, time_axis: LinearAxis) -> None:
        """
        Initialize state. Called once before first production.

        Use this to pre-compute values that don't change between chunks.
        """
        self._state.ang_freq = 2 * np.pi * self.settings.freq

    def _produce(self, n_samples: int, time_axis: LinearAxis) -> AxisArray:
        """
        Generate sine wave data for this chunk.

        Args:
            n_samples: Number of samples to generate
            time_axis: LinearAxis with correct offset and gain (1/fs)

        Returns:
            AxisArray containing the sine wave data
        """
        # Calculate time values using the internal counter
        t = (np.arange(n_samples) + self._state.counter) * time_axis.gain

        # Generate sine wave
        data = self.settings.amp * np.sin(
            self._state.ang_freq * t + self.settings.phase
        )

        return AxisArray(
            data=data,
            dims=["time"],
            axes={"time": time_axis},
        )


class SinGeneratorUnit(
    BaseClockDrivenUnit[SinGeneratorSettings, SinGenerator]
):
    """
    ezmsg Unit wrapper for SinGenerator.

    Receives clock ticks on INPUT_CLOCK and outputs AxisArray on OUTPUT_SIGNAL.
    """

    SETTINGS = SinGeneratorSettings

Key Points#

Settings inheritance: Your settings class should extend ClockDrivenSettings, which provides:

  • fs: The output sampling rate in Hz

  • n_time: Optional fixed chunk size. If None, chunk size is derived from the clock’s gain (fs * clock.gain)

State inheritance: Your state class should extend ClockDrivenState, which provides:

  • counter: Tracks total samples produced (use this for continuous signals)

  • fractional_samples: Accumulates sub-sample timing for accurate chunk sizes

The _produce method: This is where you generate data. You receive:

  • n_samples: How many samples to generate this chunk

  • time_axis: A LinearAxis with the correct offset and gain (1/fs)

The base class automatically:

  • Computes n_samples from clock timing or settings

  • Manages the sample counter (incremented after _produce returns)

  • Handles fractional sample accumulation for non-integer chunk sizes

  • Supports both fixed n_time and variable chunk modes

Using Standalone (Outside ezmsg)#

Clock-driven producers can be used standalone for testing or offline processing:

from ezmsg.util.messages.axisarray import AxisArray

# Create the producer
producer = SinGenerator(SinGeneratorSettings(
    fs=1000.0,      # 1000 Hz sample rate
    n_time=100,     # 100 samples per chunk
    freq=10.0,      # 10 Hz sine wave
    amp=1.0,
))

# Simulate clock ticks (LinearAxis with gain=1/dispatch_rate, offset=timestamp)
clock_tick = AxisArray.LinearAxis(gain=0.1, offset=0.0)  # 10 Hz dispatch

# Generate data
result = producer(clock_tick)
print(f"Shape: {result.data.shape}")  # (100,)
print(f"Sample rate: {1/result.axes['time'].gain} Hz")  # 1000.0 Hz

Using with ezmsg#

In an ezmsg pipeline, connect a Clock to your generator’s INPUT_CLOCK:

import ezmsg.core as ez
from ezmsg.baseproc import Clock, ClockSettings


class SinPipeline(ez.Collection):
    SETTINGS = SinGeneratorSettings

    CLOCK = Clock()
    GENERATOR = SinGeneratorUnit()

    def configure(self) -> None:
        self.CLOCK.apply_settings(ClockSettings(dispatch_rate=10.0))
        self.GENERATOR.apply_settings(self.SETTINGS)

    def network(self) -> ez.NetworkDefinition:
        return (
            (self.CLOCK.OUTPUT_SIGNAL, self.GENERATOR.INPUT_CLOCK),
        )

See Also#