"""Noise signal generators."""
import numpy as np
from ezmsg.baseproc import (
BaseClockDrivenProducer,
BaseClockDrivenUnit,
BaseProcessor,
ClockDrivenSettings,
ClockDrivenState,
CompositeProcessor,
processor_state,
)
from ezmsg.sigproc.butterworthfilter import (
ButterworthFilterSettings,
ButterworthFilterTransformer,
)
from ezmsg.util.messages.axisarray import AxisArray, LinearAxis, replace
[docs]
class WhiteNoiseSettings(ClockDrivenSettings):
"""Settings for white noise generators."""
n_ch: int = 1
"""Number of channels to output."""
loc: float = 0.0
"""DC offset (mean of the distribution)."""
scale: float = 1.0
"""Scale (standard deviation of the distribution)."""
[docs]
@processor_state
class WhiteNoiseState(ClockDrivenState):
"""State for WhiteNoiseProducer."""
template: AxisArray | None = None
[docs]
class WhiteNoiseProducer(BaseClockDrivenProducer[WhiteNoiseSettings, WhiteNoiseState]):
"""
Generates white noise synchronized to clock ticks.
Each clock tick produces a block of Gaussian white noise based on the
sample rate (fs) and chunk size (n_time) settings.
"""
def _reset_state(self, time_axis: LinearAxis) -> None:
"""Initialize template with channel axis."""
n_ch = self.settings.n_ch
self._state.template = AxisArray(
data=np.zeros((0, n_ch)),
dims=["time", "ch"],
axes={
"time": time_axis,
"ch": AxisArray.CoordinateAxis(
data=np.arange(n_ch),
dims=["ch"],
),
},
)
def _produce(self, n_samples: int, time_axis: LinearAxis) -> AxisArray:
"""Generate white noise for this chunk."""
# Generate random data
random_data = np.random.normal(
loc=self.settings.loc,
scale=self.settings.scale,
size=(n_samples, self.settings.n_ch),
)
# Create output using template
return replace(
self._state.template,
data=random_data,
axes={
**self._state.template.axes,
"time": time_axis,
},
)
[docs]
class WhiteNoise(BaseClockDrivenUnit[WhiteNoiseSettings, WhiteNoiseProducer]):
"""
Generates white noise synchronized to clock ticks.
Receives timing from INPUT_CLOCK (LinearAxis from Clock) and outputs
white noise AxisArray on OUTPUT_SIGNAL.
"""
SETTINGS = WhiteNoiseSettings
[docs]
class PinkNoiseSettings(WhiteNoiseSettings):
"""Settings for pink noise generator."""
cutoff: float = 300.0
"""
Lowpass cutoff frequency (Hz) for the first-order Butterworth filter
that creates the 1/f characteristic.
"""
[docs]
class PinkNoiseProducer(CompositeProcessor[PinkNoiseSettings, LinearAxis, AxisArray]):
"""
Generates pink (1/f) noise synchronized to clock ticks.
Pink noise is generated by filtering white noise with a first-order
lowpass Butterworth filter.
"""
@staticmethod
def _initialize_processors(settings: PinkNoiseSettings) -> dict[str, BaseProcessor]:
return {
"white_noise": WhiteNoiseProducer(
WhiteNoiseSettings(
fs=settings.fs,
n_time=settings.n_time,
n_ch=settings.n_ch,
loc=settings.loc,
scale=settings.scale,
)
),
"filter": ButterworthFilterTransformer(
ButterworthFilterSettings(axis="time", order=1, cutoff=settings.cutoff)
),
}
[docs]
class PinkNoise(BaseClockDrivenUnit[PinkNoiseSettings, PinkNoiseProducer]):
"""
Generates pink (1/f) noise synchronized to clock ticks.
Receives timing from INPUT_CLOCK (LinearAxis from Clock) and outputs
pink noise AxisArray on OUTPUT_SIGNAL.
"""
SETTINGS = PinkNoiseSettings