From 61e3b370e7af4e8f9ad6ab45279eb03309a5a1b4 Mon Sep 17 00:00:00 2001 From: Matthew Danielson Date: Sun, 2 Apr 2023 12:27:45 -0700 Subject: [PATCH] Begin introducing the Variational GMM-HMM --- lib/hmmlearn/stats.py | 82 +- .../tests/test_gmm_hmm_multisequence.py | 98 ++- lib/hmmlearn/tests/test_gmm_hmm_new.py | 1 - lib/hmmlearn/tests/test_variational_gmm.py | 283 +++++++ lib/hmmlearn/vhmm.py | 745 ++++++++++++++++-- 5 files changed, 1134 insertions(+), 75 deletions(-) create mode 100644 lib/hmmlearn/tests/test_variational_gmm.py diff --git a/lib/hmmlearn/stats.py b/lib/hmmlearn/stats.py index 7a01b8c9..95eb8385 100644 --- a/lib/hmmlearn/stats.py +++ b/lib/hmmlearn/stats.py @@ -1,5 +1,7 @@ import numpy as np -from scipy import linalg +from scipy import linalg, special +from ._utils import logdet +from .utils import fill_covars def log_multivariate_normal_density(X, means, covars, covariance_type='diag'): @@ -96,3 +98,81 @@ def _log_multivariate_normal_density_full(X, means, covars, min_covar=1.e-7): + cv_log_det)) return np.transpose(log_prob) + + +def _variational_log_multivariate_normal_density(X, means, beta, scale, dof, + covariance_type): + """ + Compute the log probability under a multivariate Gaussian distribution. + + Parameters + ---------- + X : array_like, shape (n_samples, n_features) + List of n_features-dimensional data points. Each row corresponds to a + single data point. + + means : array_like, shape (n_components, n_features) + List of n_features-dimensional mean vectors for n_components Gaussians. + Each row corresponds to a single mean vector. + + beta: array_like, shape (n_components, ) + List of n_components estimate on the scale of the variance over + the means. + + scale : array_like + List of n_components covariance parameters for each Gaussian. The shape + depends on `covariance_type`: + + * (n_components, ) if "spherical", + * (n_components, n_features) if "diag", + * (n_components, n_features, n_features) if "full", + * (n_features, n_features) if "tied". + + dof: array_like, shape (n_components, ) + List of n_components estimate on the scale of the variance over + the means. + + covariance_type : {"spherical", "diag", "full", "tied"}, optional + The type of the covariance parameters. Defaults to 'diag'. + + Returns + ------- + lpr : array_like, shape (n_samples, n_components) + Array containing the log probabilities of each data point in + X under each of the n_components multivariate Gaussian distributions. + """ + # Refer to the Gruhl/Sick paper for the notation + # In general, things are neater if we pretend the covariance is + # full / tied. Or, we could treat each case separately, and reduce + # the number of operations. That's left for the future :-) + nc, nf = means.shape + term1 = special.digamma( + .5 * (dof - np.arange(0, nf)[:, None]) + ).sum(axis=0) + if covariance_type in ("diag", "spherical"): + scale = fill_covars(scale, covariance_type, nc, nf) + + W_k = np.linalg.inv(scale) + + term1 += nf * np.log(2) + logdet(W_k) + term1 /= 2 + + # We ignore the constant that is typically excluded in the literature + term2 = 0 + term3 = nf / beta + + # (X - Means) * W_k * (X-Means)^T * dof + # shape becomes (nc, n_samples, + delta = (X - means[:, None]) + # m is the dimension of the mixture + # i is the length of the sequence X + # j, k are the n_features + if covariance_type in ("full", "diag", "spherical"): + dots = np.einsum("mij,mjk,mik,m->im", + delta, W_k, delta, dof) + else: + dots = np.einsum("mij,jk,mik,->im", + delta, W_k, delta, dof) + last_term = .5 * (dots + term3) + + return term1 - term2 - last_term diff --git a/lib/hmmlearn/tests/test_gmm_hmm_multisequence.py b/lib/hmmlearn/tests/test_gmm_hmm_multisequence.py index 9e76a42e..aeeb925f 100644 --- a/lib/hmmlearn/tests/test_gmm_hmm_multisequence.py +++ b/lib/hmmlearn/tests/test_gmm_hmm_multisequence.py @@ -4,7 +4,7 @@ from numpy.testing import assert_allclose import pytest -from hmmlearn import hmm +from hmmlearn import hmm, vhmm from hmmlearn.base import ConvergenceMonitor @@ -221,11 +221,67 @@ def make_permutations(items): return [list(p) for p in itertools.permutations(sequence_indices)] +def setup_em(covariance_type, implementation, init_params, verbose): + model = hmm.GMMHMM( + n_components=2, + n_mix=2, + n_iter=100, + covariance_type=covariance_type, + verbose=verbose, + init_params=init_params, + random_state=1234, + implementation=implementation + ) + + # don't use random parameters for testing + init = 1. / model.n_components + model.startprob_ = np.full(model.n_components, init) + model.transmat_ = \ + np.full((model.n_components, model.n_components), init) + + model.monitor_ = StrictMonitor( + model.monitor_.tol, + model.monitor_.n_iter, + model.monitor_.verbose, + ) + return model + +def setup_vi(covariance_type, implementation, init_params, verbose, lengths): + model = vhmm.VariationalGMMHMM( + n_components=2, + n_mix=2, + n_iter=100, + covariance_type=covariance_type, + verbose=verbose, + init_params=init_params, + random_state=1234, + implementation=implementation + ) + # don't use random parameters for testing + prior_init = 1. / model.n_components + model.startprob_prior_ = np.full(model.n_components, prior_init) + model.transmat_prior_= \ + np.full((model.n_components, model.n_components), prior_init) + model.startprob_posterior_ = np.full( + model.n_components, len(lengths) * prior_init) + model.transmat_prior_= \ + np.full(( + model.n_components, model.n_components), sum(lengths) * prior_init + ) + model.monitor_ = StrictMonitor( + model.monitor_.tol, + model.monitor_.n_iter, + model.monitor_.verbose, + ) + return model + +@pytest.mark.parametrize("hmm_type", + ["em", "vi"]) @pytest.mark.parametrize("covariance_type", ["diag", "spherical", "tied", "full"]) @pytest.mark.parametrize("implementation", ["scaling", "log"]) def test_gmmhmm_multi_sequence_fit_invariant_to_sequence_ordering( - covariance_type, implementation, init_params='mcw', verbose=False + hmm_type, covariance_type, implementation, init_params='mcw', verbose=False ): """ Sanity check GMM-HMM fit behaviour when run on multiple sequences @@ -253,34 +309,20 @@ def test_gmmhmm_multi_sequence_fit_invariant_to_sequence_ordering( sequences = sequence_data[p] X = np.concatenate(sequences) lengths = [len(seq) for seq in sequences] - - model = hmm.GMMHMM( - n_components=2, - n_mix=2, - n_iter=100, - covariance_type=covariance_type, - verbose=verbose, - init_params=init_params, - random_state=1234, - implementation=implementation - ) - - # don't use random parameters for testing - init = 1. / model.n_components - model.startprob_ = np.full(model.n_components, init) - model.transmat_ = \ - np.full((model.n_components, model.n_components), init) - - model.monitor_ = StrictMonitor( - model.monitor_.tol, - model.monitor_.n_iter, - model.monitor_.verbose, - ) + if hmm_type == "em": + model = setup_em(covariance_type, implementation, init_params, + verbose) + # Choice of rtol value is ad-hoc, no theoretical motivation. + rtol = 5e-3 + else: + model = setup_vi(covariance_type, implementation, init_params, + verbose, lengths) + # In General, the EM solution can use a smaller rtol, while the VI + # solution needs a bit larger + rtol = 5e-2 model.fit(X, lengths) - assert model.monitor_.converged scores.append(model.score(X, lengths)) - # Choice of rtol value is ad-hoc, no theoretical motivation. - assert_allclose(scores, np.mean(scores), rtol=5e-03) + assert_allclose(scores, np.mean(scores), rtol=rtol) diff --git a/lib/hmmlearn/tests/test_gmm_hmm_new.py b/lib/hmmlearn/tests/test_gmm_hmm_new.py index 32123f6f..7a6ad524 100644 --- a/lib/hmmlearn/tests/test_gmm_hmm_new.py +++ b/lib/hmmlearn/tests/test_gmm_hmm_new.py @@ -131,7 +131,6 @@ def test_fit(self, implementation): lengths = None h = self.new_hmm(implementation) X, _state_sequence = h.sample(n_samples) - # Mess up the parameters and see if we can re-learn them. covs0, means0, priors0, trans0, weights0 = prep_params( self.n_components, self.n_mix, self.n_features, diff --git a/lib/hmmlearn/tests/test_variational_gmm.py b/lib/hmmlearn/tests/test_variational_gmm.py new file mode 100644 index 00000000..3b45b94b --- /dev/null +++ b/lib/hmmlearn/tests/test_variational_gmm.py @@ -0,0 +1,283 @@ +import numpy as np +from numpy.testing import assert_allclose, assert_array_less +import pytest +from sklearn.utils import check_random_state + +from ..hmm import GMMHMM +from ..vhmm import VariationalGMMHMM, VariationalGaussianHMM +from .test_gmm_hmm import create_random_gmm +from . import ( + assert_log_likelihood_increasing, compare_variational_and_em_models, + normalized) + + +def sample_from_parallelepiped(low, high, n_samples, random_state): + (n_features,) = low.shape + X = np.zeros((n_samples, n_features)) + for i in range(n_features): + X[:, i] = random_state.uniform(low[i], high[i], n_samples) + return X + + +def prep_params(n_comps, n_mix, n_features, covar_type, + low, high, random_state): + # the idea is to generate ``n_comps`` bounding boxes and then + # generate ``n_mix`` mixture means in each of them + + dim_lims = np.zeros((n_comps + 1, n_features)) + + # this generates a sequence of coordinates, which are then used as + # vertices of bounding boxes for mixtures + dim_lims[1:] = np.cumsum( + random_state.uniform(low, high, (n_comps, n_features)), axis=0 + ) + + means = np.zeros((n_comps, n_mix, n_features)) + for i, (left, right) in enumerate(zip(dim_lims, dim_lims[1:])): + means[i] = sample_from_parallelepiped(left, right, n_mix, + random_state) + + startprob = np.zeros(n_comps) + startprob[0] = 1 + + transmat = normalized(random_state.uniform(size=(n_comps, n_comps)), + axis=1) + + if covar_type == "spherical": + covs = random_state.uniform(0.1, 5, size=(n_comps, n_mix)) + elif covar_type == "diag": + covs = random_state.uniform(0.1, 5, size=(n_comps, n_mix, n_features)) + elif covar_type == "tied": + covs = np.zeros((n_comps, n_features, n_features)) + for i in range(n_comps): + low = random_state.uniform(-2, 2, (n_features, n_features)) + covs[i] = low.T @ low + elif covar_type == "full": + covs = np.zeros((n_comps, n_mix, n_features, n_features)) + for i in range(n_comps): + for j in range(n_mix): + low = random_state.uniform(-2, 2, + size=(n_features, n_features)) + covs[i, j] = low.T @ low + + weights = normalized(random_state.uniform(size=(n_comps, n_mix)), axis=1) + + return covs, means, startprob, transmat, weights + + +class GaussianLikeMixin: + n_components = 3 + n_mix = 1 + n_features = 2 + low, high = 10, 15 + + def new_hmm(self, implementation): + return VariationalGMMHMM(n_components=self.n_components, + n_mix=self.n_mix, + covariance_type=self.covariance_type, + random_state=None, + implementation=implementation) + + def new_gaussian(self, implementation): + return VariationalGaussianHMM(n_components=self.n_components, + covariance_type=self.covariance_type, + random_state=None, + implementation=implementation) + + def new_hmm_to_sample(self, implementation): + prng = np.random.RandomState(14) + covars, means, startprob, transmat, weights = prep_params( + self.n_components, self.n_mix, self.n_features, + self.covariance_type, self.low, self.high, prng) + h = GMMHMM(n_components=self.n_components, n_mix=self.n_mix, + covariance_type=self.covariance_type, + random_state=prng, + implementation=implementation) + h.startprob_ = startprob + h.transmat_ = transmat + h.weights_ = weights + h.means_ = means + h.covars_ = covars + return h + + + @pytest.mark.parametrize("implementation", ["scaling", "log"]) + def test_learn(self, implementation): + n_samples = 1000 + h = self.new_hmm_to_sample(implementation) + X, states = h.sample(n_samples, random_state=32) + + vg = self.new_gaussian(implementation) + vg.fit(X) + vh = self.new_hmm(implementation) + vh.fit(X) + assert vh.score(X) == pytest.approx(vg.score(X)) + + +class TestVariationalGMMHMMWithFullCovars(GaussianLikeMixin): + covariance_type = "full" + + +class TestVariationalGMMHMMWithDiagCovars(GaussianLikeMixin): + covariance_type = "diag" + +# For a Gaussian HMM, Tied covariance means all HMM States share +# one Covariance Matrix. For a GMM HMM, Tied covariance means all +# mixture components within a state share one Covariance Matrix. +# So it does not make sense to compare them two models + +# class TestVariationalGMMHMMWithTied(GaussianLikeMixin): +# covariance_type = "tied" +# + +class TestVariationalGMMHMMWithSphericalCovars(GaussianLikeMixin): + covariance_type = "spherical" + + +class VariationalGMMHMMTestMixin: + n_components = 3 + n_mix = 2 + n_features = 2 + low, high = 10, 15 + + def new_hmm(self, implementation): + return VariationalGMMHMM(n_components=self.n_components, + n_mix=self.n_mix, + covariance_type=self.covariance_type, + random_state=None, + implementation=implementation, tol=1e-6) + + def new_hmm_to_sample(self, implementation): + prng = np.random.RandomState(44) + covars, means, startprob, transmat, weights = prep_params( + self.n_components, self.n_mix, self.n_features, + self.covariance_type, self.low, self.high, prng) + h = GMMHMM(n_components=self.n_components, n_mix=self.n_mix, + covariance_type=self.covariance_type, + random_state=prng, + implementation=implementation) + h.startprob_ = startprob + h.transmat_ = transmat + h.weights_ = weights + h.means_ = means + h.covars_ = covars + return h + + + @pytest.mark.parametrize("implementation", ["scaling", "log"]) + def test_check_bad_covariance_type(self, implementation): + h = self.new_hmm(implementation) + with pytest.raises(ValueError): + h.covariance_type = "bad_covariance_type" + h._check() + + #@pytest.mark.parametrize("implementation", ["scaling", "log"]) + #def test_check_good_covariance_type(self, implementation): + # h = self.new_hmm(implementation) + # h._check() # should not raise any errors + + def do_test_learn(self, implementation, X, lengths): + vb_hmm = self.new_hmm(implementation) + vb_hmm.fit(X, lengths) + assert not np.any(np.isnan(vb_hmm.means_posterior_)) + + em_hmm = GMMHMM( + n_components=vb_hmm.n_components, + n_mix=vb_hmm.n_mix, + implementation=implementation, + covariance_type=self.covariance_type, + ) + em_hmm.startprob_ = vb_hmm.startprob_ + em_hmm.transmat_ = vb_hmm.transmat_ + em_hmm.weights_ = vb_hmm.weights_ + em_hmm.means_ = vb_hmm.means_ + em_hmm.covars_ = vb_hmm.covars_ + compare_variational_and_em_models(vb_hmm, em_hmm, X, lengths) + + @pytest.mark.parametrize("implementation", ["scaling", "log"]) + def test_learn(self, implementation): + n_samples = 2000 + source = self.new_hmm_to_sample(implementation) + X, states = source.sample(n_samples, random_state=32) + self.do_test_learn(implementation, X, [X.shape[0]]) + + @pytest.mark.parametrize("implementation", ["scaling", "log"]) + def test_learn_multisequence(self, implementation): + n_samples = 2000 + source = self.new_hmm_to_sample(implementation) + X, states = source.sample(n_samples, random_state=32) + self.do_test_learn(implementation, X, [n_samples //4] * 4) + +# @pytest.mark.parametrize("implementation", ["scaling", "log"]) +# def test_sample(self, implementation): +# n_samples = 1000 +# h = self.new_hmm(implementation) +# X, states = h.sample(n_samples) +# assert X.shape == (n_samples, self.n_features) +# assert len(states) == n_samples +# +# @pytest.mark.parametrize("implementation", ["scaling", "log"]) +# def test_init(self, implementation): +# n_samples = 1000 +# h = self.new_hmm(implementation) +# X, _states = h.sample(n_samples) +# h._init(X, [n_samples]) +# h._check() # should not raise any errors +# +# @pytest.mark.parametrize("implementation", ["scaling", "log"]) +# def test_score_samples_and_decode(self, implementation): +# n_samples = 1000 +# h = self.new_hmm(implementation) +# X, states = h.sample(n_samples) +# +# _ll, posteriors = h.score_samples(X) +# assert_allclose(np.sum(posteriors, axis=1), np.ones(n_samples)) +# +# _viterbi_ll, decoded_states = h.decode(X) +# assert_allclose(states, decoded_states) +# + @pytest.mark.parametrize("implementation", ["scaling", "log"]) + def test_fit_sparse_data(self, implementation): + n_samples = 1000 + h = self.new_hmm_to_sample(implementation) + h.means_ *= 1000 # this will put gaussians very far apart + X, _states = h.sample(n_samples) + + m = self.new_hmm_to_sample(implementation) + # this should not raise + # "ValueError: array must not contain infs or NaNs" + h.fit(X) + + @pytest.mark.xfail + @pytest.mark.parametrize("implementation", ["scaling", "log"]) + def test_fit_zero_variance(self, implementation): + # Example from issue #2 on GitHub. + # this data has singular covariance matrix + X = np.asarray([ + [7.15000000e+02, 5.8500000e+02, 0.00000000e+00, 0.00000000e+00], + [7.15000000e+02, 5.2000000e+02, 1.04705811e+00, -6.03696289e+01], + [7.15000000e+02, 4.5500000e+02, 7.20886230e-01, -5.27055664e+01], + [7.15000000e+02, 3.9000000e+02, -4.57946777e-01, -7.80605469e+01], + [7.15000000e+02, 3.2500000e+02, -6.43127441e+00, -5.59954834e+01], + [7.15000000e+02, 2.6000000e+02, -2.90063477e+00, -7.80220947e+01], + [7.15000000e+02, 1.9500000e+02, 8.45532227e+00, -7.03294373e+01], + [7.15000000e+02, 1.3000000e+02, 4.09387207e+00, -5.83621216e+01], + [7.15000000e+02, 6.5000000e+01, -1.21667480e+00, -4.48131409e+01] + ]) + + h = self.new_hmm(implementation) + h.fit(X) + + +class TestVariationalGMMHMMWithSphericalCovars(VariationalGMMHMMTestMixin): + covariance_type = 'spherical' + +class TestVariationalGMMHMMWithDiagCovars(VariationalGMMHMMTestMixin): + covariance_type = 'diag' + +class TestVariationalGMMHMMWithFullCovars(VariationalGMMHMMTestMixin): + covariance_type = 'full' + +class TestVariationalGMMHMMWithTiedCovars(VariationalGMMHMMTestMixin): + covariance_type = 'tied' +# diff --git a/lib/hmmlearn/vhmm.py b/lib/hmmlearn/vhmm.py index c79e25ce..dc8230e3 100644 --- a/lib/hmmlearn/vhmm.py +++ b/lib/hmmlearn/vhmm.py @@ -7,10 +7,11 @@ from sklearn.utils import check_random_state from . import _kl_divergence as _kl, _utils -from ._emissions import BaseCategoricalHMM, BaseGaussianHMM +from ._emissions import BaseCategoricalHMM, BaseGaussianHMM, BaseGMMHMM from .base import VariationalBaseHMM from .hmm import COVARIANCE_TYPES -from .utils import fill_covars +from .utils import fill_covars, log_normalize, normalize +from .stats import _variational_log_multivariate_normal_density _log = logging.getLogger(__name__) @@ -121,7 +122,7 @@ def __init__(self, n_components=1, algorithm=algorithm, random_state=random_state, n_iter=n_iter, tol=tol, verbose=verbose, params=params, init_params=init_params, - implementation=implementation + implementation=implementation, ) self.emissionprob_prior = emissionprob_prior self.n_features = n_features @@ -226,13 +227,12 @@ def _compute_lower_bound(self, log_prob): class VariationalGaussianHMM(BaseGaussianHMM, VariationalBaseHMM): """ - Hidden Markov Model with Multivariate Gaussian Emissions trained + Hidden Markov Model with Gaussian Mixture Model Emissions trained using Variational Inference. References: - * https://arxiv.org/abs/1605.08618 - * https://core.ac.uk/reader/10883750 - * https://theses.gla.ac.uk/6941/7/2005McGroryPhD.pdf + * https://titan.cs.gsu.edu/~sji/papers/AL_TPAMI.pdf + * TODO: The speech processing book Attributes ---------- @@ -414,7 +414,7 @@ def __init__(self, n_components=1, covariance_type="full", algorithm=algorithm, random_state=random_state, n_iter=n_iter, tol=tol, verbose=verbose, params=params, init_params=init_params, - implementation=implementation + implementation=implementation, ) self.covariance_type = covariance_type self.means_prior = means_prior @@ -643,45 +643,15 @@ def _check(self): f"found {self.scale_posterior_.shape}") def _compute_subnorm_log_likelihood(self, X): - # Refer to the Gruhl/Sick paper for the notation - # In general, things are neater if we pretend the covariance is - # full / tied. Or, we could treat each case separately, and reduce - # the number of operations. That's left for the future :-) + return _variational_log_multivariate_normal_density( + X, + self.means_posterior_, + self.beta_posterior_, + self.scale_posterior_, + self.dof_posterior_, + self.covariance_type) nf = self.n_features - term1 = special.digamma( - .5 * (self.dof_posterior_ - np.arange(0, nf)[:, None]) - ).sum(axis=0) - - scale_posterior_ = self.scale_posterior_ - if self.covariance_type in ("diag", "spherical"): - scale_posterior_ = fill_covars(self.scale_posterior_, - self.covariance_type, self.n_components, self.n_features) - W_k = np.linalg.inv(scale_posterior_) - term1 += nf * np.log(2) + _utils.logdet(W_k) - term1 /= 2. - - # We ignore the constant that is typically excluded in the literature - # term2 = self.n_features * log(2 * M_PI) / 2 - term2 = 0 - term3 = nf / self.beta_posterior_ - - # (X - Means) * W_k * (X-Means)^T * self.dof_posterior_ - delta = (X - self.means_posterior_[:, None]) - # c is the HMM Component - # i is the length of the sequence X - # j, k are the n_features - # output shape is length * number of components - if self.covariance_type in ("full", "diag", "spherical"): - dots = np.einsum("cij,cjk,cik,c->ic", - delta, W_k, delta, self.dof_posterior_) - elif self.covariance_type == "tied": - dots = np.einsum("cij,jk,cik,->ic", - delta, W_k, delta, self.dof_posterior_) - last_term = .5 * (dots + term3) - lll = term1 - term2 - last_term - return lll - def _do_mstep(self, stats): """ Perform the M-step of VB-EM algorithm. @@ -830,3 +800,688 @@ def _needs_sufficient_statistics_for_mean(self): def _needs_sufficient_statistics_for_covars(self): return 'c' in self.params + + +class VariationalGMMHMM(BaseGMMHMM, VariationalBaseHMM): + """ + Hidden Markov Model with Multivariate Gaussian Emissions trained + using Variational Inference. + + References: + * Watanabe, Shinji, and Jen-Tzung Chien. Bayesian Speech and Language + Processing. Cambridge University Press, 2015. + * https://titan.cs.gsu.edu/~sji/papers/AL_TPAMI.pdf + + Attributes + ---------- + n_features : int + Dimensionality of the Gaussian emissions. + + monitor_ : ConvergenceMonitor + Monitor object used to check the convergence of EM. + + startprob_prior_ : array, shape (n_components, ) + Prior for the initial state occupation distribution. + + startprob_posterior_ : array, shape (n_components, ) + Posterior estimate of the state occupation distribution. + + transmat_prior_ : array, shape (n_components, n_components) + Prior for the matrix of transition probabilities between states. + + transmat_posterior_ : array, shape (n_components, n_components) + Posterior estimate of the transition probabilities between states. + + weights_prior_: array, shape (n_components, n_mix) + Mixture weights for each state. + + weights_posterior_: array, shape (n_components, n_mix) + Mixture weights for each state. + + means_prior_: array, shape (n_components, n_mix, n_features) + Prior estimates for the mean of each state. + + means_posterior_: array, shape (n_components, n_mix, n_features) + Posterior estimates for the mean of each state. + + beta_prior_: array, shape (n_components, n_mix, ) + Prior estimate on the scale of the variance over the means. + + beta_posterior_: array, shape (n_components, n_mix, ) + Posterior estimate of the scale of the variance over the means. + + covars_ : array + Covariance parameters for each state. + + The shape depends on :attr:`covariance_type`: + + * (n_components, n_mix, ) if "spherical", + * (n_components, n_mix, n_features) if "diag", + * (n_components, n_mix, n_features, n_features) if "full", + * (n_features, n_mix, n_features) if "tied". + + dof_prior_: array + The Degrees Of Freedom prior for each state's Wishart distribution. + The shape depends on :attr:`covariance_type`: + + * array, shape (n_components, n_mix, ) if "full", + * array, shape (n_mix, ) if "tied". + + dof_prior_: int / array + The Prior on the Degrees Of Freedom for each state's + Wishart distribution. + The shape depends on :attr:`covariance_type`: + + * array, shape (n_components, n_mix, ) if "full", + * array, shape (n_mix, ) if "tied". + + dof_posterior_: int / array + The Degrees Of Freedom for each state's Wishart distribution. + The shape depends on :attr:`covariance_type`: + + * array, shape (n_components, n_mix, ) if "full", + * array, shape (n_mix, ) if "tied". + + scale_prior_ : array + Prior for the Inverse scale parameter for each state's + Wishart distribution. The wishart distribution is + the conjugate prior for the covariance. + + The shape depends on :attr:`covariance_type`: + + * (n_components, n_mix, ) if "spherical", + * (n_components, n_mix, n_features) if "diag", + * (n_components, n_mix, n_features, n_features) if "full", + * (n_features, n_mix, n_features) if "tied". + + scale_posterior_ : array + Inverse scale parameter for each state's wishart distribution. + The wishart distribution is the conjugate prior for the covariance. + + The shape depends on :attr:`covariance_type`: + + * (n_components, n_mix, ) if "spherical", + * (n_components, n_mix, n_features) if "diag", + * (n_components, n_mix, n_features, n_features) if "full", + * (n_features, n_mix, n_features) if "tied". + + Examples + -------- + >>> from hmmlearn.hmm import VariationalGaussianHMM + >>> VariationalGaussianHMM(n_components=2) #doctest: +ELLIPSIS + VariationalGaussianHMM(algorithm='viterbi',... + """ + + def __init__(self, n_components=1, n_mix=1, covariance_type="full", + startprob_prior=None, transmat_prior=None, + weights_prior=None, means_prior=None, + beta_prior=None, dof_prior=None, + scale_prior=None, algorithm="viterbi", + random_state=None, n_iter=100, tol=1e-6, verbose=False, + params="stwmc", init_params="stwmc", + implementation="log"): + """ + Parameters + ---------- + n_components : int + Number of states. + + n_mix : int + Number of states in the GMM. + + covariance_type : {"spherical", "diag", "full", "tied"}, optional + The type of covariance parameters to use: + + * "spherical" --- each state uses a single variance value that + applies to all features. + * "diag" --- each state uses a diagonal covariance matrix + (default). + * "full" --- each state uses a full (i.e. unrestricted) + covariance matrix. + * "tied" --- all mixture components of each state use **the same** + full covariance matrix (note that this is not the same as for + `VariationalGaussianHMM`). + + startprob_prior : array, shape (n_components, ), optional + Parameters of the Dirichlet prior distribution for + :attr:`startprob_`. + + transmat_prior : array, shape (n_components, n_components), optional + Parameters of the Dirichlet prior distribution for each row + of the transition probabilities :attr:`transmat_`. + + weights_prior : array, shape (n_components, n_mix), optional + Parameters of the Dirichlet prior distribution for + :attr:`startprob_`. + + means_prior, beta_prior : array, shape (n_components, n_mix), optional + Mean and precision of the Normal prior distribtion for + :attr:`means_`. + + scale_prior, dof_prior : array, optional + Parameters of the prior distribution for the covariance matrix + :attr:`covars_`. + + If :attr:`covariance_type` is "spherical" or "diag" the prior is + the inverse gamma distribution, otherwise --- the inverse Wishart + distribution. + + The shape of the scale_prior array depends on + :attr:`covariance_type`: + + * (n_components, n_mix, ) if "spherical", + * (n_components, n_mix, n_features) if "diag", + * (n_components, n_mix, n_features, n_features) if "full", + * (n_features, n_mix, n_features) if "tied". + + algorithm : {"viterbi", "map"}, optional + Decoder algorithm. + + random_state: RandomState or an int seed, optional + A random number generator instance. + + n_iter : int, optional + Maximum number of iterations to perform. + + tol : float, optional + Convergence threshold. EM will stop if the gain in log-likelihood + is below this value. + + verbose : bool, optional + Whether per-iteration convergence reports are printed to + :data:`sys.stderr`. Convergence can also be diagnosed using the + :attr:`monitor_` attribute. + + params, init_params : string, optional + The parameters that get updated during (``params``) or initialized + before (``init_params``) the training. Can contain any combination + of 's' for startprob, 't' for transmat, 'm' for means, and 'c' for + covars. Defaults to all parameters. + + implementation: string, optional + Determines if the forward-backward algorithm is implemented with + logarithms ("log"), or using scaling ("scaling"). The default is + to use logarithms for backwards compatability. + """ + super().__init__( + n_components=n_components, startprob_prior=startprob_prior, + transmat_prior=transmat_prior, + algorithm=algorithm, random_state=random_state, + n_iter=n_iter, tol=tol, verbose=verbose, + params=params, init_params=init_params, + implementation=implementation + ) + self.n_mix = n_mix + self.weights_prior = weights_prior + self.covariance_type = covariance_type + self.means_prior = means_prior + self.beta_prior = beta_prior + self.dof_prior = dof_prior + self.scale_prior = scale_prior + + def _init(self, X, lengths): + """ + Initialize model parameters prior to fitting. + + Parameters + ---------- + X : array-like, shape (n_samples, n_features) + Feature matrix of individual samples. + lengths : array-like of integers, shape (n_sequences, ) + Lengths of the individual sequences in ``X``. The sum of + these should be ``n_samples``. + """ + super()._init(X, lengths) + nc = self.n_components + nf = self.n_features + nm = self.n_mix + + def compute_cv(): + return np.cov(X.T) + self.min_covar * np.eye(nf) + + # Default values for covariance prior parameters + # Kmeans will be used for initializing both the means + # and the covariances + # self._init_covar_priors() + # self._fix_priors_shape() + + X_mean = X.mean(axis=0) + main_kmeans = cluster.KMeans(n_clusters=nc, + random_state=self.random_state, + n_init=10) + cv = None # covariance matrix + labels = main_kmeans.fit_predict(X) + main_centroid = np.mean(main_kmeans.cluster_centers_, axis=0) + means = [] + cluster_counts = [] + for label in range(nc): + kmeans = cluster.KMeans(n_clusters=nm, + random_state=self.random_state, + n_init=10) + X_cluster = X[np.where(labels == label)] + if X_cluster.shape[0] >= nm: + kmeans.fit(X_cluster) + means.append(kmeans.cluster_centers_) + cluster_counts.append([X_cluster.shape[0] / nm] * nm) + else: + if cv is None: + cv = compute_cv() + m_cluster = np.random.multivariate_normal(main_centroid, + cov=cv, + size=nm) + means.append(m_cluster) + cluster_counts.append([1] * nm) + + + if (self._needs_init("w", "weights_prior_") or + self._needs_init("w", "weights_posterior_")): + if self.weights_prior is None: + self.weights_prior_ = np.full( + (nc, nm), 1. / nm + ) + else: + self.weights_prior_ = self.weights_prior + self.weights_posterior_ = self.weights_prior_ * sum(lengths) + + self.weights_ = self.weights_posterior_.copy() + normalize(self.weights_, axis=1) + + if (self._needs_init("m", "means_prior_") + or self._needs_init("m", "means_posterior_") + or self._needs_init("m", "beta_prior_") + or self._needs_init("m", "beta_posterior_")): + if self.means_prior is None: + self.means_prior_ = np.full( + (nc, nm, nf), X_mean) + else: + self.means_prior_ = self.means_prior + # Initialize to the data means + self.means_posterior_ = np.stack(means) + + # For compat with GMMHMM + self.means_ = self.means_posterior_ + + if self.beta_prior is None: + self.beta_prior_ = np.zeros((nc, nm)) + 1 + else: + self.beta_prior_ = self.beta_prior + + # Count of items in each mixture components + self.beta_posterior_ = np.stack(cluster_counts) + + if (self._needs_init("c", "dof_prior_") + or self._needs_init("c", "dof_posterior_") + or self._needs_init("c", "scale_prior_") + or self._needs_init("c", "scale_posterior_")): + if self.covariance_type in ("full", "diag", "spherical"): + if self.dof_prior is None: + self.dof_prior_ = np.full( + (nc, nm,), nf) + else: + self.dof_prior_ = self.dof_prior + self.dof_posterior_ = np.stack(cluster_counts) + + elif self.covariance_type == "tied": + if self.dof_prior is None: + self.dof_prior_ = np.full(nc, 2*nf) + else: + self.dof_prior_ = self.dof_prior + self.dof_posterior_ = np.stack(cluster_counts).sum(axis=1) + + # Covariance posterior comes from the estimate of the data + cv = np.cov(X.T) + 1E-3 * np.eye(X.shape[1]) + + if self.covariance_type == "full": + if self.scale_prior is None: + self.scale_prior_ = np.broadcast_to( + np.identity(nf) * 1e-3, + (nc, nm, nf, nf) + ) + else: + self.scale_prior_ = self.scale_prior + self.covars_ = np.zeros((nc, nm, nf, nf)) + self.covars_[:] = cv + self.scale_posterior_ = ( + self.covars_ + * np.asarray(self.dof_posterior_)[:,:, None, None]) + + elif self.covariance_type == "tied": + if self.scale_prior is None: + self.scale_prior_ = np.broadcast_to( + np.identity(nf) * 1e-3, + (nc, nf, nf) + ) + else: + self.scale_prior_ = self.scale_prior + self.covars_ = np.zeros((nc, nf, nf)) + self.covars_[:] = cv + self.scale_posterior_ = (self.covars_ + * self.dof_posterior_[:, None, None]) + + elif self.covariance_type == "diag": + if self.scale_prior is None: + self.scale_prior_ = np.full( + (nc, nm, nf), 1e-3) + else: + self.scale_prior_ = self.scale_prior + self.covars_ = np.zeros((nc, nm, nf)) + self.covars_[:] = np.diag(cv) + self.scale_posterior_ = np.einsum( + "ijk,ij->ijk",self.covars_, self.dof_posterior_) + + elif self.covariance_type == "spherical": + if self.scale_prior is None: + self.scale_prior_ = np.full((nc, nm), 1e-3) + else: + self.scale_prior_ = self.scale_prior + self.covars_ = np.zeros((nc, nm)) + self.covars_[:] = cv.mean() + self.scale_posterior_ = np.einsum( + "ij,ik->ij",self.covars_, self.dof_posterior_) + + def _get_n_fit_scalars_per_param(self): + if self.covariance_type not in COVARIANCE_TYPES: + raise ValueError( + f"{self.covariance_type} is invalid") + nc = self.n_components + nf = self.n_features + nm = self.n_mix + return { + "s": (nc - 1), + "t": nc * (nc - 1), + "w": nm, + "m": nm * (nc * nf + nc), + "c": { + "full": nm * (nc + nc * nf * (nf + 1) // 2), + "tied": nm * (1 + nf * (nf + 1) // 2), + "diag": nm * (nc + nc * nf), + "spherical": nm * (nc + nc), + + }[self.covariance_type], + } + + def _check(self): + """ + Validate model parameters prior to fitting. + + Raises + ------ + ValueError + If any of the parameters are invalid, e.g. if :attr:`startprob_` + don't sum to 1. + """ + + if self.covariance_type not in COVARIANCE_TYPES: + raise ValueError( + f"{self.covariance_type} is invalid") + if not hasattr(self, "n_features"): + self.n_features = self.means_.shape[2] + + nc = self.n_components + nf = self.n_features + nm = self.n_mix + + means_shape = (nc, nm, nf) + + self.means_prior_ = np.asarray(self.means_prior_, dtype=float) + self.means_posterior_ = np.asarray(self.means_posterior_, dtype=float) + if self.means_prior_.shape != means_shape: + raise ValueError( + "means_prior_ have shape (n_components, n_mix, n_features)") + if self.means_posterior_.shape != means_shape: + raise ValueError( + "means_posterior_ must have shape" + "(n_components, n_mix, n_features)") + + self.beta_prior_ = np.asarray(self.beta_prior_, dtype=float) + self.beta_posterior_ = np.asarray(self.beta_posterior_, dtype=float) + if self.beta_prior_.shape != (nc, nm): + raise ValueError( + "beta_prior_ have shape (n_components, n_mix)") + + if self.beta_posterior_.shape != (nc, nm,): + raise ValueError( + "beta_posterior_ must have shape (n_components, n_mix)") + + if self.covariance_type in ("full", "diag", "spherical"): + self.dof_prior_ = np.asarray(self.dof_prior_, dtype=float) + self.dof_posterior_ = np.asarray(self.dof_posterior_, dtype=float) + if self.dof_prior_.shape != (nc, nm): + raise ValueError( + "dof_prior_ have shape (n_components, n_mix)") + + if self.dof_posterior_.shape != (nc, nm): + raise ValueError( + "dof_posterior_ must have shape (n_components, n_mix)") + + elif self.covariance_type == "tied": + self.dof_prior_ = np.asarray(self.dof_prior_, dtype=float) + self.dof_posterior_ = np.asarray(self.dof_posterior_, dtype=float) + if self.dof_prior_.shape != (nc, ): + raise ValueError( + "dof_prior_ have shape (n_components, )") + + if self.dof_posterior_.shape != (nc, ): + raise ValueError( + "dof_posterior_ must have shape (n_components, )") + + self.scale_prior_ = np.asarray(self.scale_prior_, dtype=float) + self.scale_posterior_ = np.asarray(self.scale_posterior_, dtype=float) + + expected = None + if self.covariance_type == "full": + expected = (nc, nm, nf, nf) + elif self.covariance_type == "tied": + expected = (nc, nf, nf) + elif self.covariance_type == "diag": + expected = (nc, nm, nf) + elif self.covariance_type == "spherical": + expected = (nc, nm) + # Now check the W's + if self.scale_prior_.shape != expected: + raise ValueError(f"scale_prior_ must have shape {expected}, " + f"found {self.scale_prior_.shape}") + + if self.scale_posterior_.shape != expected: + raise ValueError(f"scale_posterior_ must have shape {expected}, " + f"found {self.scale_posterior_.shape}") + + def _compute_subnorm_log_likelihood(self, X): + lll = np.zeros((X.shape[0], self.n_components), dtype=float) + for comp in range(self.n_components): + subnorm = self._subnorm_for_one_component(X, comp) + lll[:, comp] = special.logsumexp(subnorm, axis=1) + return lll + + def _log_density_for_sufficient_statistics(self, X, component): + return self._subnorm_for_one_component(X, component) + + def _subnorm_for_one_component(self, X, c): + """ + + Parameters + ---------- + X: + c: int + The HMM component to compute probabilities for + """ + mixture_weights = (special.digamma(self.weights_posterior_[c]) + - special.digamma(self.weights_posterior_[c].sum())) + + normal = _variational_log_multivariate_normal_density( + X, + self.means_posterior_[c], + self.beta_posterior_[c], + self.scale_posterior_[c], + self.dof_posterior_[c], + self.covariance_type + ) + return mixture_weights + normal + + def _do_mstep(self, stats): + """ + Perform the M-step of VB-EM algorithm. + + Parameters + ---------- + stats : dict + Sufficient statistics updated from all available samples. + """ + super()._do_mstep(stats) + + nf = self.n_features + # Einsum key: + # c is number of components + # m is number of mix + # i is length of X + # j/k are n_features + if "w" in self.params: + self.weights_posterior_ = (self.weights_prior_ + + stats['post_mix_sum']) + # For compat with GMMHMM + self.weights_[:] = self.weights_posterior_ + normalize(self.weights_, axis=-1) + + if "m" in self.params: + self.beta_posterior_ = self.beta_prior_ + stats['post_mix_sum'] + self.means_posterior_ = np.einsum("cm,cmj->cmj", self.beta_prior_, + self.means_prior_) + self.means_posterior_ += stats['m_n'] + self.means_posterior_ = (self.means_posterior_ + / self.beta_posterior_[:, :, None]) + # For compat with GMMHMM + self.means_ = self.means_posterior_ + + if "c" in self.params: + if self.covariance_type == "full": + # Pages 259-260 of Bayesian Speech and Language Processing + # Update DOF + self.dof_posterior_ = self.dof_prior_ + stats['post_mix_sum'] + # Update scale + self.scale_posterior_ = ( + self.scale_prior_ + + stats['c_n'] + + np.einsum("ck,cki,ckj->ckij", + self.beta_prior_, + self.means_prior_, + self.means_prior_) + - np.einsum("ck,cki,ckj->ckij", + self.beta_posterior_, + self.means_posterior_, + self.means_posterior_)) + c_n = self.scale_posterior_ + c_d = self.dof_posterior_[:, :, None, None] + elif self.covariance_type == "tied": + # inferred from 'full' + self.dof_posterior_ = (self.dof_prior_ + + stats['post_mix_sum'].sum(axis=-1)) + self.scale_posterior_ = ( + self.scale_prior_ + + stats['c_n'] + + np.einsum("ck,cki,ckj->cij", + self.beta_prior_, + self.means_prior_, + self.means_prior_) + - np.einsum("ck,cki,ckj->cij", + self.beta_posterior_, + self.means_posterior_, + self.means_posterior_)) + c_n = self.scale_posterior_ + c_d = self.dof_posterior_[:, None, None] + elif self.covariance_type == "diag": + self.dof_posterior_ = self.dof_prior_ + stats['post_mix_sum'] + self.scale_posterior_ = (self.scale_prior_ + + stats['c_n'] + + np.einsum("ck,cki->cki", + self.beta_prior_, + self.means_prior_**2) + - np.einsum("ck,cki->cki", + self.beta_posterior_, + self.means_posterior_**2)) + c_n = self.scale_posterior_ + c_d = self.dof_posterior_[:, :, None] + elif self.covariance_type == "spherical": + # inferred from 'diag' + self.dof_posterior_ = self.dof_prior_ + stats['post_mix_sum'] + self.scale_posterior_ = (stats['c_n'] + + np.einsum("ck,cki->ck", + self.beta_prior_, + self.means_prior_**2) + - np.einsum("ck,cki->ck", + self.beta_posterior_, + self.means_posterior_**2)) / nf + self.scale_posterior_ += self.scale_prior_ + c_n = self.scale_posterior_ + c_d = self.dof_posterior_ + + # For compat with GMMHMM + self.covars_[:] = c_n / c_d + + def _compute_lower_bound(self, log_prob): + + nc = self.n_components + nm = self.n_mix + nf = self.n_features + # First, get the contribution from the state transitions + # and initial probabilities + lower_bound = super()._compute_lower_bound(log_prob) + # Then compute the contributions of the emissions + weights_lower_bound = 0 + gaussians_lower_bound = 0 + + # For ease of implementation, pretend everything is shaped like + # full covariance. + scale_posterior_ = self.scale_posterior_ + scale_prior_ = self.scale_prior_ + if self.covariance_type != "full": + scale_posterior_ = np.zeros((nc, nm, nf, nf)) + scale_prior_ = np.zeros((nc, nm, nf, nf)) + for i in range(nc): + scale_posterior_[i] = fill_covars( + self.scale_posterior_[i], self.covariance_type, nm, nf) + scale_prior_[i] = fill_covars( + self.scale_prior_[i], self.covariance_type, nm, nf) + + W_k = np.linalg.inv(scale_posterior_) + + if self.covariance_type != "tied": + dof = self.dof_posterior_ + else: + dof = np.repeat(self.dof_posterior_, nm).reshape(nc, nm) + + # Now compute KL Divergence of the weights, and all of the gaussians + for i in range(nc): + # The contribution of the mixture weights + weights_lower_bound -= _kl.kl_dirichlet( + self.weights_posterior_[i], self.weights_prior_[i]) + # The contributino of the gaussians + for j in range(nm): + precision = W_k[i, j] * dof[i, j] + # KL for the normal distributions + term1 = np.linalg.inv(self.beta_posterior_[i, j] * precision) + term2 = np.linalg.inv(self.beta_prior_[i, j] * precision) + kln = _kl.kl_multivariate_normal_distribution( + self.means_posterior_[i, j], term1, + self.means_prior_[i, j], term2, + ) + gaussians_lower_bound -= kln + # KL for the wishart distributions + klw = 0. + if self.covariance_type in ("full", "diag", "spherical"): + klw = _kl.kl_wishart_distribution( + self.dof_posterior_[i, j], scale_posterior_[i, j], + self.dof_prior_[i, j], scale_prior_[i, j]) + elif self.covariance_type == "tied": + # Just compute it for the first component + if j == 0: + klw = _kl.kl_wishart_distribution( + self.dof_posterior_[i], self.scale_posterior_[i], + self.dof_prior_[i], self.scale_prior_[i]) + gaussians_lower_bound -= klw + return lower_bound + weights_lower_bound + gaussians_lower_bound + + def _needs_sufficient_statistics_for_mean(self): + return 'm' in self.params or 'c' in self.params + + def _needs_sufficient_statistics_for_covars(self): + return 'c' in self.params