Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/spark monte carlo #68

Merged
merged 9 commits into from
Dec 9, 2016
Next Next commit
Major refactor to monte_carlo_profile that enables paralellizing a lo…
…t more and packs data in such a way that it can be used with a spark context.
  • Loading branch information
jasonlaska committed Dec 4, 2016
commit b3077673e47385ac6bd36ce7c4d7af73a757452b
2 changes: 1 addition & 1 deletion examples/profiling_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

mc = MonteCarloProfile(n_features=50, n_trials=10, graph=LatticeGraph(),
n_samples_grid=10, alpha_grid=5, metrics=metrics,
verbose=True)
verbose=True, n_jobs=2)
mc.fit()

###############################################################################
Expand Down
184 changes: 115 additions & 69 deletions inverse_covariance/profiling/monte_carlo_profile.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import absolute_import

import numpy as np
from functools import partial
from sklearn.base import clone
from sklearn.datasets import make_sparse_spd_matrix
from sklearn.externals.joblib import Parallel, delayed
Expand All @@ -26,17 +27,48 @@ def _sample_mvn(n_samples, cov, prng):
return prng.multivariate_normal(np.zeros(n_features), cov, size=n_samples)


def _mc_trial(estimator, metrics, n_samples, cov, prec, prng):
'''Draw a new multivariate normal sample via cov and prng and use cloned
estimator to estimate inverse covariance. Compute estimate performance.
Returns dict of performance.
def _ms_fit(indexed_params, estimator, n_features, graph, prng):
# unpack params
index, (alpha, grid_point) = indexed_params

# draw a new fixed graph for alpha
cov, prec, adj = graph.create(n_features, alpha)

Used as a helper function for Parallel/delayed.
'''
# model selection (once per n_samples grid point)
n_samples = int(grid_point * n_features)
X = _sample_mvn(n_samples, cov, prng)
ms_estimator = clone(estimator)
ms_estimator.fit(X)

return index, (cov, prec, adj), ms_estimator.lam_, n_samples


def _mc_fit(indexed_params, estimator, metrics, prng):
# unpack params
index, (nn, (cov, prec, adj), lam, n_samples) = indexed_params

# compute mc trial
X = _sample_mvn(n_samples, cov, prng)
new_estimator = clone(estimator)
new_estimator.fit(X)
return {k: f(prec, new_estimator.precision_) for k,f in metrics.items()}
mc_estimator = clone(estimator)
mc_estimator.fit(X)
mc_estimator.set_params(lam=lam) # **{'lam': lam}
results = {k: f(prec, mc_estimator.precision_) for k,f in metrics.items()}

return index, results


def _cpu_map(fun, param_grid, n_jobs):
return Parallel(
n_jobs=n_jobs,
verbose=False,
backend='threading',
#max_nbytes=None,
#batch_size=1,
)(
delayed(fun)(
params
)
for params in param_grid)


class MonteCarloProfile(object):
Expand Down Expand Up @@ -141,7 +173,9 @@ def __init__(self, n_features=50, n_trials=100, ms_estimator=None,
self.grid_ = self.n_samples_grid

if isinstance(self.alpha_grid, int):
self.alphas_ = np.logspace(np.log(0.15),np.log10(0.4), self.alpha_grid)
self.alphas_ = np.logspace(
np.log(0.15), np.log10(0.4), self.alpha_grid
)
else:
self.alphas_ = self.alpha_grid

Expand All @@ -156,66 +190,78 @@ def fit(self, X=None, y=None):
self.precision_nnz_ = []
self.results_ = {k: np.zeros((n_alphas, n_grids)) for k in self.metrics}

for aidx, alpha in enumerate(self.alphas_):
if self.verbose:
print 'alpha {} ({}/{})'.format(
alpha,
aidx,
n_alphas,
)

# draw a new fixed graph for alpha
cov, prec, adj = self.graph.create(self.n_features, alpha)

# track nnz of graph precision
precision_nnz = np.count_nonzero(prec.flat)
self.precision_nnz_.append(precision_nnz)
if self.verbose:
print ' Graph has {} nonzero entries'.format(precision_nnz)

for sidx, grid_point in enumerate(self.grid_):
n_samples = int(grid_point * self.n_features)

# model selection (once per n_samples grid point)
X = _sample_mvn(n_samples, cov, self.prng)
ms_estimator = clone(self.ms_estimator)
ms_estimator.fit(X)

if self.verbose:
display_lam = ms_estimator.lam_
if isinstance(display_lam, np.ndarray):
display_lam = np.linalg.norm(display_lam)
print ' ({}/{}), n_samples = {}, selected lambda = {}'.format(
sidx,
n_grids,
n_samples,
display_lam)

# patch trial estimator with this lambda
self.mc_estimator.set_params(**{'lam': ms_estimator.lam_})

# run estimator on n_trials instances
trials = Parallel(
n_jobs=self.n_jobs,
verbose=False,
backend='threading',
#max_nbytes=None,
#batch_size=1,
)(
delayed(_mc_trial)(
self.mc_estimator, self.metrics, n_samples, cov, prec,
self.prng
)
for nn in range(self.n_trials))

for key in self.metrics:
results_by_key = np.array([t[key] for t in trials])
self.results_[key][aidx, sidx] =\
1. * np.sum(results_by_key) / self.n_trials

if self.verbose:
# build an indexed set (or generator) of grid points
param_grid = [(a, g) for a in self.alphas_ for g in self.grid_]
indexed_param_grid = list(zip(range(len(param_grid)), param_grid))

ms_fit = partial(_ms_fit,
estimator=self.ms_estimator,
n_features=self.n_features,
graph=self.graph,
prng=self.prng)

if self.verbose:
print 'Getting parameters via model selection...'

ms_results = _cpu_map(ms_fit, indexed_param_grid, self.n_jobs)

# ensure results are ordered
ms_results = sorted(ms_results, key=lambda r: r[0])

# track nnz of graph precision
self.precision_nnz_ = [
np.count_nonzero(graph[1].flat) for _, graph, _, _ in ms_results
]

# build param grid for mc trials
# following results in an grid where nn indexes each trial of each
# param grid:
# (0, graph_0, lam_0, n_samples_), (1, graph_0, lam_0, n_samples_0),...
trial_param_grid = [
(nn, graph, lam, n_samples)
for _, graph, lam, n_samples in ms_results
for nn in range(self.n_trials)
]
indexed_trial_param_grid = list(
zip(range(len(trial_param_grid)), trial_param_grid)
)

mc_fit = partial(_mc_fit,
estimator=self.mc_estimator,
metrics=self.metrics,
prng=self.prng)

if self.verbose:
print 'Fitting MC trials...'

mc_results = _cpu_map(mc_fit, indexed_trial_param_grid, self.n_jobs)

# ensure results are ordered correctly
mc_results = sorted(mc_results, key=lambda r: r[0])

# strip mc_results
mc_results = [r for _, r in mc_results]

# reduce
param_grid_matrix_index = [
(a, g)
for a in range(len(self.alphas_))
for g in range(len(self.grid_))
]
for param_index in range(len(param_grid)):
trial_start = param_index * self.n_trials
trials = mc_results[trial_start: trial_start + self.n_trials]
aidx, gidx = param_grid_matrix_index[param_index]
for key in self.metrics:
results_by_key = np.array([t[key] for t in trials])
self.results_[key][aidx, gidx] =\
1. * np.sum(results_by_key) / self.n_trials

if self.verbose:
for key in self.metrics:
print 'Results for {}: {}'.format(key, self.results_[key][aidx, :])
print 'Results for {}: {}'.format(
key, self.results_[key][aidx, :]
)

self.is_fitted = True
return self
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test_integration_monte_carlo_profile(self, params_in):
assert np.sum(mc.results_[key].flat) > 0
assert mc.results_[key].shape == (len(mc.alphas_), len(mc.grid_))

assert len(mc.precision_nnz_) == len(mc.alphas_)
assert len(mc.precision_nnz_) == len(mc.alphas_) * len(mc.grid_)
assert mc.precision_nnz_[0] == params_in['n_features'] # for eye

if isinstance(mc.n_samples_grid, int):
Expand Down Expand Up @@ -92,7 +92,7 @@ def test_integration_monte_carlo_profile_default(self, params_in):
assert np.sum(mc.results_[key].flat) > 0
assert mc.results_[key].shape == (len(mc.alphas_), len(mc.grid_))

assert len(mc.precision_nnz_) == len(mc.alphas_)
assert len(mc.precision_nnz_) == len(mc.alphas_) * len(mc.grid_)

if isinstance(mc.n_samples_grid, int):
assert len(mc.grid_) == mc.n_samples_grid
Expand Down