Source code for ezmsg.learn.model.cca
import numpy as np
from scipy import linalg
[docs]
class IncrementalCCA:
[docs]
def __init__(
self,
n_components=2,
base_smoothing=0.95,
min_smoothing=0.5,
max_smoothing=0.99,
adaptation_rate=0.1,
):
"""
Parameters:
-----------
n_components : int
Number of canonical components to compute
base_smoothing : float
Base smoothing factor (will be adapted)
min_smoothing : float
Minimum allowed smoothing factor
max_smoothing : float
Maximum allowed smoothing factor
adaptation_rate : float
How quickly to adjust smoothing factor (between 0 and 1)
"""
self.n_components = n_components
self.base_smoothing = base_smoothing
self.current_smoothing = base_smoothing
self.min_smoothing = min_smoothing
self.max_smoothing = max_smoothing
self.adaptation_rate = adaptation_rate
self.initialized = False
[docs]
def initialize(self, d1, d2):
"""Initialize the necessary matrices"""
self.d1 = d1
self.d2 = d2
# Initialize correlation matrices
self.C11 = np.zeros((d1, d1))
self.C22 = np.zeros((d2, d2))
self.C12 = np.zeros((d1, d2))
self.initialized = True
def _compute_change_magnitude(self, C11_new, C22_new, C12_new):
"""Compute magnitude of change in correlation structure"""
# Frobenius norm of differences
diff11 = np.linalg.norm(C11_new - self.C11)
diff22 = np.linalg.norm(C22_new - self.C22)
diff12 = np.linalg.norm(C12_new - self.C12)
# Normalize by matrix sizes
diff11 /= self.d1 * self.d1
diff22 /= self.d2 * self.d2
diff12 /= self.d1 * self.d2
return (diff11 + diff22 + diff12) / 3
def _adapt_smoothing(self, change_magnitude):
"""Adapt smoothing factor based on detected changes"""
# If change is large, decrease smoothing factor
target_smoothing = self.base_smoothing * (1.0 - change_magnitude)
target_smoothing = np.clip(
target_smoothing, self.min_smoothing, self.max_smoothing
)
# Smooth the adaptation itself
self.current_smoothing = (
1 - self.adaptation_rate
) * self.current_smoothing + self.adaptation_rate * target_smoothing
[docs]
def partial_fit(self, X1, X2, update_projections=True):
"""Update the model with new samples using adaptive smoothing
Assumes X1 and X2 are already centered and scaled"""
if not self.initialized:
self.initialize(X1.shape[1], X2.shape[1])
# Compute new correlation matrices from current batch
C11_new = X1.T @ X1 / X1.shape[0]
C22_new = X2.T @ X2 / X2.shape[0]
C12_new = X1.T @ X2 / X1.shape[0]
# Detect changes and adapt smoothing factor
if self.C11.any(): # Skip first update
change_magnitude = self._compute_change_magnitude(C11_new, C22_new, C12_new)
self._adapt_smoothing(change_magnitude)
# Update with current smoothing factor
alpha = self.current_smoothing
self.C11 = alpha * self.C11 + (1 - alpha) * C11_new
self.C22 = alpha * self.C22 + (1 - alpha) * C22_new
self.C12 = alpha * self.C12 + (1 - alpha) * C12_new
if update_projections:
self._update_projections()
def _update_projections(self):
"""Update canonical vectors and correlations"""
eps = 1e-8
C11_reg = self.C11 + eps * np.eye(self.d1)
C22_reg = self.C22 + eps * np.eye(self.d2)
K = (
linalg.inv(linalg.sqrtm(C11_reg))