ezmsg.sigproc.adaptive_lnc#

Adaptive line-noise cancellation (LNC).

An LMS adaptive filter that estimates and subtracts a line-frequency (e.g. 50/60 Hz mains) interferer, and optionally its harmonics, from a multichannel signal. Quadrature references (sin/cos) at the line frequency – and at each harmonic k x the fundamental, phase-locked to it – drive per-channel adaptive weights that reconstruct the line noise; the summed reconstruction is subtracted from the signal (Widrow & Stearns, Adaptive Signal Processing, LMS noise cancellation with quadrature references).

Because we observe the signal only in samples, the line shows up at a normalised frequency of f_mains / fs cycles/sample. Both the device clock (fs) and the mains frequency are imprecise and drift, so that ratio is unknown and time-varying. The transformer therefore tracks it adaptively:

  • A numerically-controlled oscillator (NCO) generates the reference at an angular frequency omega (rad/sample); harmonics are generated at k * omega, so a single oscillator serves every harmonic.

  • A per-channel LMS adapts amplitude and phase at each harmonic frequency.

  • A frequency-locked loop (FLL) nudges omega from the rotation of the fundamental’s adaptive weights, pooled across all channels for robustness – the line frequency is a single global quantity, so every channel constrains the same estimate. The pooled estimator (a power-weighted cross-product) is invariant to static per-channel phase, so it needs no assumption about inter-channel phase relationships.

Both loops are parameterised by a time constant in seconds, so their behaviour is independent of the (possibly wildly variable) chunk size:

  • adapt_time_constant is the settling time of the per-channel LMS; internally mu = 2 / (adapt_time_constant * fs).

  • freq_time_constant is the settling time of the FLL; the per-update gain is derived from the elapsed time of each update interval, beta = 1 - exp(-dt / freq_time_constant) (the same tau<->alpha mapping EWMA uses), so chunking never changes the dynamics. It should exceed adapt_time_constant so the two loops do not fight.

The frequency estimate accumulates over an internal window of one mains period before each update – not a fixed chunk size, but a measurement window that fills across however the caller chunks the stream (4, 30, 60, mixed samples). Output is emitted for every chunk immediately; only the frequency update waits for a full window, which keeps the rotation estimate clean even when chunks are only a handful of samples. As a result the loop behaves identically for a single offline buffer (it converges within that one buffer) and for a fast sequence of tiny chunks, and tracking-on output is chunk-invariant.

The sequential A/D also gives each channel a known sub-degree per-channel phase offset, but at the line frequency it is negligible and it cancels in frequency tracking, so it is not handled here – the dedicated SamplingDelayAlignmentTransformer owns that correction broadband, upstream, for every cross-channel step.

Statefulness: expensive setup happens once in _reset_state(); each chunk runs only the per-sample recursion plus one pooled frequency update. With the FLL disabled (freq_time_constant None/<=0) the transformer reduces to a fixed-frequency canceller whose output is independent of chunking.

Functions

design_lnc_sos(omega, mu, num_harmonics=1)[source]#

SOS notch cascade equivalent to the quadrature-LMS line canceller.

A fixed-frequency quadrature-LMS canceller (unit references, step mu, control=1) is exactly a linear time-invariant 2nd-order notch (Glover 1977 / Widrow). For the fundamental its input->output (y = x - nr) transfer function is:

H(z) = (z^2 - 2cos(w)z + 1) / (z^2 - (2 - mu)cos(w)z + (1 - mu))

i.e. zeros exactly on the unit circle at e^{+/-j w} (a perfect notch) and poles at radius sqrt(1 - mu). Harmonic k is the same notch at k * omega; the cascade of the per-harmonic sections approximates the parallel multi-reference LMS to a small fraction of a dB (exact for a single harmonic). Verified against the per-sample LMS to ~1e-12 (float64).

This is the building block for an Array-API / GPU (MLX sosfilt) implementation that replaces the per-sample Python recursion.

Parameters:
  • omega (float) – Fundamental angular frequency in rad/sample (the FLL-tracked value).

  • mu (float) – LMS step size, i.e. 2 / (adapt_time_constant * fs).

  • num_harmonics (int) – Number of harmonics (sections), k = 1 .. num_harmonics.

Returns:

(num_harmonics, 6) SOS array ([b0, b1, b2, a0, a1, a2] per row, a0 = 1), float64. Apply with scipy.signal.sosfilt (or the MLX sosfilt_mlx_metal) carrying zi across chunks.

Return type:

np.ndarray

Classes

class AdaptiveLNC(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[AdaptiveLNCSettings, AxisArray, AxisArray, AdaptiveLNCTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of AdaptiveLNCSettings

class AdaptiveLNCSettings(line_freq=60.0, num_harmonics=1, adapt_time_constant=0.1, freq_time_constant=0.5, cancel_method=CancelMethod.NOTCH, axis='time')[source]#

Bases: Settings

Settings for AdaptiveLNCTransformer.

Parameters:
line_freq: float = 60.0#

Nominal line (mains) frequency in Hz (60 in N. America, 50 in EU). Seeds the NCO; the true normalised frequency is then tracked by the FLL.

num_harmonics: int = 1#

Number of line-frequency harmonics to cancel, including the fundamental. 1 = fundamental only. Harmonic k is generated phase-locked at k x the single tracked fundamental, so one NCO/FLL serves them all; each harmonic gets its own per-channel LMS weight. e.g. 5 also cancels 120/180/240/300 Hz. Absent harmonics simply drive their weights to ~0.

adapt_time_constant: float = 0.1#

Settling time (seconds) of the amplitude/phase canceller (the LMS). Smaller = faster tracking of amplitude/phase changes and a wider, noisier notch; larger = a narrower, cleaner notch that adapts more slowly. Converted internally to the LMS step size mu = 2 / (adapt_time_constant * fs). Read live each chunk.

freq_time_constant: float | None = 0.5#

Settling time (seconds) of the frequency tracker (the FLL). None (or <= 0) freezes the frequency at the nominal line_freq – a fixed- reference LMS. Should be larger than adapt_time_constant so the two loops do not fight. Independent of chunk size (the per-update gain is derived from elapsed time). Read live each chunk.

cancel_method: CancelMethod = 'notch'#

How to remove the line; see CancelMethod. NOTCH (default) applies the SOS notch cascade – a perfect null that also removes any signal at the line frequency. SUBTRACT estimates the common-mode line (pooled global phase + per-channel amplitude) once per window and subtracts only it, preserving signal that is independent across channels at the line frequency; it assumes a 1-D channel axis. PASSTHROUGH emits the input unchanged. (Per-channel sampling-delay alignment is handled separately, upstream, by SamplingDelayAlignmentTransformer.)

axis: str = 'time'#

Name of the axis to filter along.

__init__(line_freq=60.0, num_harmonics=1, adapt_time_constant=0.1, freq_time_constant=0.5, cancel_method=CancelMethod.NOTCH, axis='time')#
Parameters:
Return type:

None

class AdaptiveLNCState[source]#

Bases: object

State for AdaptiveLNCTransformer.

omega: float = 0.0#

Current NCO angular frequency in rad/sample (tracked by the FLL).

phase: float = 0.0#

NCO phase (rad) for the first sample of the next chunk; accumulates so the demodulation reference is continuous across chunk boundaries.

block_len: int = 0#

FLL update window in samples (one mains period).

samples_in_block: int = 0#

Samples accumulated toward the next FLL update; carried across chunks so the update grid is global, not per-chunk.

zi: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#

SOS biquad filter state, carried across chunks. Backend-native layout: (n_harm, 2, *sample_shape) for scipy, (n_harm, *sample_shape, 2) for the MLX kernel.

sos: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#

Cached notch coefficients, already on the working backend. Rebuilt only when omega or mu changes (i.e. once per FLL window, not per chunk), so streaming avoids redundant host->device transfers.

sos_key: tuple | None = None#

(omega, mu, n_harm) the cached sos was built for.

z_acc_real: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#

Demodulation accumulator sum(removed * cos(phase)) over the current window (per channel); the line phasor’s real part.

z_acc_imag: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#

Demodulation accumulator sum(removed * sin(phase)) over the current window (per channel).

z_phasor_prev: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#

Pooled per-channel line phasor (numpy complex) from the previous window; the FLL measures rotation against it.

line_phasor: complex = 0j#

pooled global line phasor at the last window end.

Type:

Observable

sub_z_real: list | None = None#

Per-harmonic demod accumulators sum(x * cos(k*phase)) this window.

sub_z_imag: list | None = None#

Per-harmonic demod accumulators sum(x * sin(k*phase)) this window.

sub_amp: list | None = None#

Committed per-harmonic per-channel line amplitude (backend arrays); None until the first window completes.

sub_phase_off: list | None = None#

Committed per-harmonic global phase theta_k (scalar floats), for reconstruction.

sub_yc_smooth: list | None = None#

Per-harmonic EWMA-smoothed per-channel line phasor (numpy complex). Smoothing (time constant adapt_time_constant) keeps the line estimate from chasing transient in-band signal.

class AdaptiveLNCTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[AdaptiveLNCSettings, AxisArray, AxisArray, AdaptiveLNCState]

Quadrature-LMS line-noise canceller, implemented as its exact LTI equivalent (an SOS notch cascade) with frequency tracking.

Cancellation. A fixed-frequency quadrature-LMS canceller is exactly a 2nd-order notch (Glover 1977; see design_lnc_sos()). Each harmonic is one biquad notch at k * omega; the cascade is applied with sosfilt carrying state zi across chunks. The output is:

y_notch = sosfilt(design_lnc_sos(omega, mu, num_harmonics), x)
y = y_notch                                    # the line removed

This replaces the former per-sample LMS recursion: it is vectorised, scales with channel count, and dispatches to a GPU sosfilt (MLX/Metal) on MLX arrays – so the whole transformer is Array-API compatible.

Frequency tracking (FLL). Once per window (one mains period) the NCO frequency is nudged by the pooled rotation of the demodulated line phasor between windows:

Z_c = sum_n removed_{c,n} * exp(-j * phase_n)  # per-channel, per window
cross = sum_c Z_c * conj(Z_c_prev)             # power-weighted, pooled
omega += beta * angle(cross) / window_len      # rad/sample

Demodulating the removed line gives the same frequency-error signal the old weight-rotation detector did, without materialising adaptive weights. Static per-channel phase cancels in cross, so no inter-channel phase assumption is needed. The window fills on a global grid carried in state, so the loop is chunk-size independent: a single offline buffer converges within itself and streamed output is chunk-invariant. mu = 2/(adapt_time_constant*fs) and beta = 1 - exp(-window_dt/freq_time_constant).

NONRESET_SETTINGS_FIELDS: ClassVar[frozenset[str]] = frozenset({'adapt_time_constant', 'freq_time_constant'})#
class CancelMethod(*values)[source]#

Bases: str, Enum

How AdaptiveLNCTransformer removes the line.

NOTCH = 'notch'#

Apply the SOS notch cascade – a perfect null that also removes any signal at the line frequency.

SUBTRACT = 'subtract'#

Estimate the common-mode line (pooled global phase + per-channel amplitude) once per window and subtract only it, preserving signal that is independent across channels at the line frequency. Assumes a 1-D channel axis.

PASSTHROUGH = 'passthrough'#

Emit the input unchanged – no cancellation and no frequency tracking.