-
Notifications
You must be signed in to change notification settings - Fork 44
/
model_average.py
428 lines (344 loc) · 14.6 KB
/
model_average.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
from __future__ import absolute_import
import numpy as np
from sklearn.base import BaseEstimator, clone
from sklearn.utils import check_array, as_float_array
from sklearn.externals.joblib import Parallel, delayed
from functools import partial
from .inverse_covariance import _init_coefs
from . import QuicGraphicalLasso
def _check_psd(m):
return np.all(np.linalg.eigvals(m) >= 0)
def _fully_random_weights(n_features, lam_scale, prng):
"""Generate a symmetric random matrix with zeros along the diagonal."""
weights = np.zeros((n_features, n_features))
n_off_diag = int((n_features ** 2 - n_features) / 2)
weights[np.triu_indices(n_features, k=1)] = 0.1 * lam_scale * prng.randn(
n_off_diag
) + (0.25 * lam_scale)
weights[weights < 0] = 0
weights = weights + weights.T
return weights
def _random_weights(n_features, lam, lam_perturb, prng):
"""Generate a symmetric random matrix with zeros along the diagnoal and
non-zero elements take the value {lam * lam_perturb, lam / lam_perturb}
with probability 1/2.
"""
weights = np.zeros((n_features, n_features))
n_off_diag = int((n_features ** 2 - n_features) / 2)
berns = prng.binomial(1, 0.5, size=n_off_diag)
vals = np.zeros(berns.shape)
vals[berns == 0] = 1. * lam * lam_perturb
vals[berns == 1] = 1. * lam / lam_perturb
weights[np.triu_indices(n_features, k=1)] = vals
weights[weights < 0] = 0
weights = weights + weights.T
return weights
def _fix_weights(weight_fun, *args):
"""Ensure random weight matrix is valid.
TODO: The diagonally dominant tuning currently doesn't make sense.
Our weight matrix has zeros along the diagonal, so multiplying by
a diagonal matrix results in a zero-matrix.
"""
weights = weight_fun(*args)
# TODO: fix this
# disable checks for now
return weights
# if positive semidefinite, then we're good as is
if _check_psd(weights):
return weights
# make diagonally dominant
off_diag_sums = np.sum(weights, axis=1) # NOTE: assumes diag is zero
mod_mat = np.linalg.inv(np.sqrt(np.diag(off_diag_sums)))
return np.dot(mod_mat, weights, mod_mat)
def _default_bootstrap(n_samples, num_subsamples, prng):
"""Returns an array of integers (0, n_samples-1)^num_subsamples."""
return prng.permutation(n_samples)[:num_subsamples]
def _fit(
indexed_params,
penalization,
lam,
lam_perturb,
lam_scale_,
estimator,
penalty_name,
subsample,
bootstrap,
prng,
X=None,
):
"""Wrapper function outside of instance for fitting a single model average
trial.
If X is None, then we assume we are using a broadcast spark object. Else,
we expect X to get passed into this function.
"""
index = indexed_params
if isinstance(X, np.ndarray):
local_X = X
else:
local_X = X.value
n_samples, n_features = local_X.shape
prec_is_real = False
while not prec_is_real:
boot_lam = None
if penalization == "subsampling":
pass
elif penalization == "random":
boot_lam = _fix_weights(_random_weights, n_features, lam, lam_perturb, prng)
elif penalization == "fully-random":
boot_lam = _fix_weights(_fully_random_weights, n_features, lam_scale_, prng)
else:
raise NotImplementedError(
(
"Only penalization = 'subsampling', "
"'random', and 'fully-random' have "
"been implemented. Found {}.".format(penalization)
)
)
# new instance of estimator
new_estimator = clone(estimator)
if boot_lam is not None:
new_estimator.set_params(**{penalty_name: boot_lam})
# fit estimator
num_subsamples = int(subsample * n_samples)
rp = bootstrap(n_samples, num_subsamples, prng)
new_estimator.fit(local_X[rp, :])
# check that new_estimator.precision_ is real
# if not, skip this boot_lam and try again
if isinstance(new_estimator.precision_, list):
prec_real_bools = []
for prec in new_estimator.precision_:
prec_real_bools.append(np.all(np.isreal(prec)))
prec_is_real = np.all(np.array(prec_real_bools) is True)
elif isinstance(new_estimator.precision_, np.ndarray):
prec_is_real = np.all(np.isreal(new_estimator.precision_))
else:
raise ValueError("Estimator returned invalid precision_.")
return index, (boot_lam, rp, new_estimator)
def _cpu_map(fun, param_grid, n_jobs, verbose=True):
return Parallel(
n_jobs=n_jobs,
verbose=verbose,
backend="threading", # any sklearn backend should work here
)(delayed(fun)(params) for params in param_grid)
def _spark_map(fun, indexed_param_grid, sc, seed, X_bc):
"""We cannot pass a RandomState instance to each spark worker since it will
behave identically across partitions. Instead, we explictly handle the
partitions with a newly seeded instance.
The seed for each partition will be the "seed" (MonteCarloProfile.seed) +
"split_index" which is the partition index.
Following this trick:
https://wegetsignal.wordpress.com/2015/05/08/
generating-random-numbers-for-rdd-in-spark/
"""
def _wrap_random_state(split_index, partition):
prng = np.random.RandomState(seed + split_index)
yield map(partial(fun, prng=prng, X=X_bc), partition)
par_param_grid = sc.parallelize(indexed_param_grid)
indexed_results = par_param_grid.mapPartitionsWithIndex(
_wrap_random_state
).collect()
return [item for sublist in indexed_results for item in sublist]
class ModelAverage(BaseEstimator):
"""
Randomized model averaging meta-estimator.
See analogous sklearn.linear_model.BaseRandomizedLinearModel.
Parameters
-----------
estimator : An inverse covariance estimator instance
After being fit, estimator.precision_ must either be a matrix with the
precision or a list of precision matrices (e.g., path mode).
n_trials : int (default=100)
Number of random subsets for which to bootstrap the data.
subsample : float in range (0, 1) (default=0.3)
Fraction of examples to subsample in each bootstrap trial.
normalize : bool (default=True)
Determines whether the proportion_ matrix should be normalized to have
values in the range (0, 1) or should be absolute.
lam : float (default=0.5)
Scalar lambda penalty used in penalization='random' mode. Will be
ignored in all other modes.
lam_perturb : float \in (0, 1) (default=0.5)
Scalar perturbation parameter used in penalization='random'. Will be
ignored in all other modes.
init_method : one of 'corrcoef', 'cov', 'spearman', 'kendalltau',
or a custom function. See InverseCovarianceEstimator
penalization : one of 'subsampling', 'random' (default), 'fully-random'
Strategy for generating new random penalization in each trial.
subsampling: Only the observations will be subsampled, the original
penalty supplied in the estimator instance will be used.
Use this technique when the estimator does not support
matrix penalization (e.g., sklearn GraphicalLasso).
random: In addition to randomly subsampling the observations, 'random'
applies a randomly-perturbed 'lam' weight matrix. The entries
of the matrix take the value
{lam * lam_perturb, lam / lam_perturb} with probability 1/2.
User must supply a scalar 'lam' and 'lam_perturb' parameters.
fully-random: In addition to randomly subsampling the observations,
'fully-random' generates a symmetric Gaussian matrix
appropriately scaled for the data.
For more information on 'random' penalization, see:
"Stability Selection"
N. Meinhausen and P. Buehlmann, May 2009
"Random lasso"
S. Wang, B. Nan, S. Rosset, and J. Zhu, Apr 2011
For more information on 'fully-random', see:
"Mixed effects models for resampled network statistics improves
statistical power to find differences in multi-subject functional
connectivity"
M. Narayan and G. Allen, March 2016
support_thresh : float (0, 1)
Threshold for estimating supports from proportions. This is provided
for convience.
penalty_name : string (default='lam')
Name of the penalty kwarg in the estimator. This parameter is
unimportant if penalization='subsampling'.
bootstrap : callable fun (default=_default_bootstrap)
A function that takes n_samples, num_subsamples as inputs and returns
a list of sample indices in the range (0, n_samples-1).
By default, indices are uniformly subsampled.
n_jobs: int (optional)
number of jobs to run in parallel (default 1).
sc: sparkContext (optional)
If a sparkContext object is provided, n_jobs will be ignore and the
work will be parallelized via spark.
seed : np.random.RandomState starting seed. (default=2)
Attributes
----------
proportion_ : matrix of size (n_features, n_features)
Each entry indicates the sample probability (or count) of whether the
inverse covariance is non-zero.
support_ : matrix of size (n_features, n_features)
Support estimate via thresholding proportions by support_thresh.
estimators_ : list of estimator instances (n_trials, )
The estimator instance from each trial.
This returns an empty list if use_cache=False.
lams_ : list of penalization matrices (n_trials, )
The penalization matrix chosen in each trial.
This returns an empty list if penalization='subsampling'.
subsets_ : list of subset indices (n_trials, )
The example indices chosen in each trial.
This returns an empty list if use_cache=False.
lam_ : float
Average matrix value used among lam_ for all estimators.
"""
def __init__(
self,
estimator=None,
n_trials=100,
subsample=0.3,
normalize=True,
lam=0.5,
lam_perturb=0.5,
init_method="corrcoef",
penalization="random",
penalty_name="lam",
support_thresh=0.5,
bootstrap=_default_bootstrap,
n_jobs=1,
sc=None,
seed=1,
):
self.estimator = estimator
self.n_trials = n_trials
self.subsample = subsample
self.normalize = normalize
self.lam = lam
self.lam_perturb = lam_perturb
self.init_method = init_method
self.penalization = penalization
self.penalty_name = penalty_name
self.support_thresh = support_thresh
self.bootstrap = bootstrap
self.n_jobs = n_jobs
self.sc = sc
self.seed = seed
self.prng = np.random.RandomState(seed)
def fit(self, X, y=None):
"""Learn a model averaged proportion matrix for X.
Parameters
----------
X : ndarray, shape (n_samples, n_features)
Data from which to compute the proportion matrix.
"""
# default to QuicGraphicalLasso
estimator = self.estimator or QuicGraphicalLasso()
if self.penalization != "subsampling" and not hasattr(
estimator, self.penalty_name
):
raise ValueError(
(
"Must specify valid penalty for "
"estimator: {}.".format(self.penalty_name)
)
)
self.proportion_ = None
self.support_ = None
self.lam_ = None
self.lam_scale_ = None
self.estimators_ = []
self.lams_ = []
self.subsets_ = []
X = check_array(X, ensure_min_features=2, estimator=self)
X = as_float_array(X, copy=False, force_all_finite=False)
n_samples_, n_features_ = X.shape
_, self.lam_scale_ = _init_coefs(X, method=self.init_method)
fit_fun = partial(
_fit,
penalization=self.penalization,
lam=self.lam,
lam_perturb=self.lam_perturb,
lam_scale_=self.lam_scale_,
estimator=estimator,
penalty_name=self.penalty_name,
subsample=self.subsample,
bootstrap=self.bootstrap,
)
indexed_param_grid = [(nn,) for nn in range(self.n_trials)]
if self.sc is None:
results = _cpu_map(
partial(fit_fun, X=X, prng=self.prng),
indexed_param_grid,
n_jobs=self.n_jobs,
)
else:
X_bc = self.sc.broadcast(X)
results = _spark_map(fit_fun, indexed_param_grid, self.sc, self.seed, X_bc)
X_bc.unpersist()
self.estimators_ = [e for r, (l, s, e) in results]
self.subsets_ = [s for r, (l, s, e) in results]
self.lams_ = [l for r, (l, s, e) in results]
# reduce
self.lam_ = 0.0
self.proportion_ = np.zeros((n_features_, n_features_))
for new_estimator in self.estimators_:
# update proportions
if isinstance(new_estimator.precision_, list):
for prec in new_estimator.precision_:
self.proportion_[np.nonzero(prec)] += 1.
elif isinstance(new_estimator.precision_, np.ndarray):
self.proportion_[np.nonzero(new_estimator.precision_)] += 1.
else:
raise ValueError("Estimator returned invalid precision_.")
# currently, dont estimate self.lam_ if penalty_name is different
if self.penalty_name == "lam":
self.lam_ += np.mean(new_estimator.lam_.flat)
# estimate support locations
threshold = self.support_thresh * self.n_trials
self.support_ = np.zeros(self.proportion_.shape)
self.support_[self.proportion_ > threshold] = 1.0
self.lam_ /= self.n_trials
if self.normalize:
self.proportion_ /= self.n_trials
return self
@property
def precision_(self):
"""Convenience property to make compatible with AdaptiveGraphicalLasso.
This is not a very good precision estimate.
"""
return self.support_
@property
def covariance_(self):
"""Convenience property to make compatible with AdaptiveGraphicalLasso.
This is not a very good covariance estimate.
"""
return np.linalg.inv(self.support_)