Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(whitener): add more flexibility #191

Merged
merged 3 commits into from
Aug 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 209 additions & 34 deletions tests/preprocessing/test_whitener.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import math

import numpy as np
import pytest
import xarray as xr

Expand Down Expand Up @@ -32,27 +33,43 @@
]


def generate_well_conditioned_data(lazy=False):
t = np.linspace(0, 50, 200)
std = 0.1
X = np.sin(t)[:, None] + np.random.normal(0, std, size=(200, 3))
X[:, 1] = X[:, 1] ** 3
X[:, 2] = abs(X[:, 2]) ** (0.5)
X = xr.DataArray(
X,
dims=["sample", "feature"],
coords={"sample": np.arange(200), "feature": np.arange(3)},
name="X",
)
X = X - X.mean("sample")
if lazy:
X = X.chunk({"sample": 5, "feature": -1})
return X


# TESTS
# =============================================================================
@pytest.mark.parametrize(
"synthetic_dataarray",
VALID_TEST_DATA,
indirect=["synthetic_dataarray"],
"lazy",
[False, True],
)
def test_fit(synthetic_dataarray):
data = synthetic_dataarray.rename({"sample0": "sample", "feature0": "feature"})
def test_fit(lazy):
data = generate_well_conditioned_data(lazy)

whitener = Whitener(n_modes=2)
whitener.fit(data)


@pytest.mark.parametrize(
"synthetic_dataarray",
VALID_TEST_DATA,
indirect=["synthetic_dataarray"],
"lazy",
[False, True],
)
def test_transform(synthetic_dataarray):
data = synthetic_dataarray.rename({"sample0": "sample", "feature0": "feature"})
def test_transform(lazy):
data = generate_well_conditioned_data(lazy)

whitener = Whitener(n_modes=2)
whitener.fit(data)
Expand All @@ -73,12 +90,11 @@ def test_transform(synthetic_dataarray):


@pytest.mark.parametrize(
"synthetic_dataarray",
VALID_TEST_DATA,
indirect=["synthetic_dataarray"],
"lazy",
[False, True],
)
def test_fit_transform(synthetic_dataarray):
data = synthetic_dataarray.rename({"sample0": "sample", "feature0": "feature"})
def test_fit_transform(lazy):
data = generate_well_conditioned_data(lazy)

whitener = Whitener(n_modes=2)

Expand All @@ -98,12 +114,11 @@ def test_fit_transform(synthetic_dataarray):


@pytest.mark.parametrize(
"synthetic_dataarray",
VALID_TEST_DATA,
indirect=["synthetic_dataarray"],
"lazy",
[False, True],
)
def test_invserse_transform_data(synthetic_dataarray):
data = synthetic_dataarray.rename({"sample0": "sample", "feature0": "feature"})
def test_invserse_transform_data(lazy):
data = generate_well_conditioned_data(lazy)

whitener = Whitener(n_modes=2)
whitener.fit(data)
Expand All @@ -123,32 +138,49 @@ def test_invserse_transform_data(synthetic_dataarray):


@pytest.mark.parametrize(
"alpha",
[0.0, 0.5, 1.0, 1.5],
"alpha,use_pca",
[
(0.0, False),
(0.5, False),
(1.0, False),
(1.5, False),
(0.0, True),
(0.5, True),
(1.0, True),
(1.5, True),
],
)
def test_transform_alpha(alpha):
data = generate_synthetic_dataarray(1, 1, "index", "no_nan", "no_dask")
data = data.rename({"sample0": "sample", "feature0": "feature"})
def test_transform_alpha(alpha, use_pca):
data = data = generate_well_conditioned_data()

whitener = Whitener(n_modes=2, alpha=alpha)
whitener = Whitener(alpha=alpha, use_pca=use_pca)
data_whitened = whitener.fit_transform(data)

norm = (data_whitened**2).sum("sample")
nc = data.shape[0] - 1
norm = (data_whitened**2).sum("sample") / nc
ones = norm / norm
# Check that for alpha=0 full whitening is performed
if math.isclose(alpha, 0.0, abs_tol=1e-6):
xr.testing.assert_allclose(norm, ones, atol=1e-6)


@pytest.mark.parametrize(
"alpha",
[0.0, 0.5, 1.0, 1.5],
"alpha,use_pca",
[
(0.0, False),
(0.5, False),
(1.0, False),
(1.5, False),
(0.0, True),
(0.5, True),
(1.0, True),
(1.5, True),
],
)
def test_invserse_transform_alpha(alpha):
data = generate_synthetic_dataarray(1, 1, "index", "no_nan", "no_dask")
data = data.rename({"sample0": "sample", "feature0": "feature"})

whitener = Whitener(n_modes=6, alpha=alpha)
def test_inverse_transform_alpha(alpha, use_pca):
# Use data with more samples than features and high signal-to-noise ratio
data = generate_well_conditioned_data()
whitener = Whitener(alpha=alpha, use_pca=use_pca)
data_whitened = whitener.fit_transform(data)
data_unwhitened = whitener.inverse_transform_data(data_whitened)

Expand All @@ -162,3 +194,146 @@ def test_invalid_alpha():
err_msg = "`alpha` must be greater than or equal to 0"
with pytest.raises(ValueError, match=err_msg):
Whitener(n_modes=2, alpha=-1.0)


def test_raise_warning_ill_conditioned():
# Synthetic dataset has 6 samples and 7 features
data = generate_synthetic_dataarray(1, 1, "index", "no_nan", "no_dask")
data = data.rename({"sample0": "sample", "feature0": "feature"})

whitener = Whitener()
warn_msg = "The number of samples (.*) is smaller than the number of features (.*), leading to an ill-conditioned problem.*"
with pytest.warns(match=warn_msg):
_ = whitener.fit_transform(data)


@pytest.mark.parametrize(
"n_modes",
[1, 2, 3],
)
def test_transform_pca_n_modes(n_modes):
data = generate_well_conditioned_data()

whitener = Whitener(use_pca=True, n_modes=n_modes)
transformed = whitener.fit_transform(data)

# PCA-Whitener reduces dimensionality
assert transformed.shape[1] == n_modes


def test_whitener_identity_transformation():
data = generate_well_conditioned_data()

whitener = Whitener(alpha=1.0, use_pca=False)
transformed = whitener.fit_transform(data)
reconstructed = whitener.inverse_transform_data(transformed)

# No transformation takes place
xr.testing.assert_identical(data, transformed)
xr.testing.assert_identical(data, reconstructed)


@pytest.mark.parametrize(
"alpha,use_pca",
[
(0.0, True),
(0.5, True),
(1.0, True),
(1.5, True),
(0.0, False),
(0.5, False),
(1.0, False),
(1.5, False),
],
)
def test_unwhiten_cross_covariance_matrix(alpha, use_pca):
def unwhiten_cov_mat(Cw, S1_inv, S2_inv):
n_dims_s1 = len(S1_inv.shape)
n_dims_s2 = len(S2_inv.shape)

match n_dims_s1:
case 0:
C_unwhitened = Cw
case 1:
C_unwhitened = S1_inv[:, None] * Cw
case 2:
C_unwhitened = S1_inv @ Cw
case _:
raise ValueError("Invalid number of dimensions for S1_inv")

match n_dims_s2:
case 0:
pass
case 1:
C_unwhitened = C_unwhitened * S2_inv[None, :]
case 2:
C_unwhitened = C_unwhitened @ S2_inv
case _:
raise ValueError("Invalid number of dimensions for S2_inv")

return C_unwhitened

"""Test that we can uncover the original total amount of covariance between two datasets after whitening."""
data1 = generate_well_conditioned_data()
data2 = generate_well_conditioned_data() ** 2

whitener1 = Whitener(alpha=0.5, use_pca=True, n_modes="all")
whitener2 = Whitener(alpha=alpha, use_pca=use_pca, n_modes="all")

transformed1 = whitener1.fit_transform(data1)
transformed2 = whitener2.fit_transform(data2)

S1_inv = whitener1.get_Tinv(unwhiten_only=True).values
S2_inv = whitener2.get_Tinv(unwhiten_only=True).values

C = data1.values.T @ data2.values
Cw = transformed1.values.T @ transformed2.values

C_rec = unwhiten_cov_mat(Cw, S1_inv, S2_inv)

total_covariance = np.linalg.norm(C)
total_covariance_rec = np.linalg.norm(C_rec)
np.testing.assert_almost_equal(total_covariance, total_covariance_rec)


@pytest.mark.parametrize(
"use_pca",
[True, False],
)
def test_standardize_and_decorrelate(use_pca):
X = generate_well_conditioned_data()
n_features = X.shape[1]

whitener = Whitener(alpha=0.0, use_pca=use_pca, n_modes="all")
transformed = whitener.fit_transform(X)

# Check that transformed data is standardized
assert np.allclose(transformed.mean("sample").values, np.zeros(n_features))
assert np.allclose(transformed.std("sample").values, np.ones(n_features), rtol=1e-2)

# Check that transformed data is decorrelated
C = np.corrcoef(transformed.values, rowvar=False)
target = np.identity(n_features)
np.testing.assert_allclose(C, target, atol=1e-6)


@pytest.mark.parametrize(
"alpha,use_pca",
[
(0.0, True),
(0.5, True),
(1.0, True),
(1.5, True),
(0.0, False),
(0.5, False),
(1.0, False),
(1.5, False),
],
)
def test_transform_keep_coordinates(alpha, use_pca):
X = generate_well_conditioned_data()

whitener = Whitener(alpha=alpha, use_pca=use_pca, n_modes="all")
transformed = whitener.fit_transform(X)

assert len(transformed.coords) == len(X.coords)
Loading
Loading