[med-svn] [python-mne] 299/353: ENH : implement MxNE with L21 prior
Yaroslav Halchenko
debian at onerussian.com
Fri Nov 27 17:25:19 UTC 2015
This is an automated email from the git hooks/post-receive script.
yoh pushed a commit to tag 0.4
in repository python-mne.
commit 095f2f09fecab79bd98e815a29ffdf4811919de1
Author: Alexandre Gramfort <alexandre.gramfort at inria.fr>
Date: Tue Jul 17 17:34:21 2012 +0200
ENH : implement MxNE with L21 prior
---
mne/forward.py | 53 ++++++
mne/minimum_norm/inverse.py | 42 +++++
mne/mixed_norm/__init__.py | 7 +
mne/mixed_norm/inverse.py | 223 +++++++++++++++++++++++++
mne/mixed_norm/optim.py | 304 +++++++++++++++++++++++++++++++++++
mne/mixed_norm/tests/__init__.py | 0
mne/mixed_norm/tests/test_inverse.py | 48 ++++++
mne/mixed_norm/tests/test_optim.py | 36 +++++
8 files changed, 713 insertions(+)
diff --git a/mne/forward.py b/mne/forward.py
index 44b2545..d934c0a 100644
--- a/mne/forward.py
+++ b/mne/forward.py
@@ -469,6 +469,13 @@ def read_forward_solution(fname, force_fixed=False, surf_ori=False,
return fwd
+def is_fixed_orient(forward):
+ """Has forward operator fixed orientation?
+ """
+ is_fixed_ori = (forward['source_ori'] == FIFF.FIFFV_MNE_FIXED_ORI)
+ return is_fixed_ori
+
+
def write_forward_meas_info(fid, info):
"""Write measurement info stored in forward solution
@@ -503,6 +510,52 @@ def write_forward_meas_info(fid, info):
end_block(fid, FIFF.FIFFB_MNE_PARENT_MEAS_FILE)
+def compute_orient_prior(forward, loose=0.2):
+ """Compute orientation prior
+
+ Parameters
+ ----------
+ forward : dict
+ Forward operator
+ loose : float in [0, 1] or None
+ The loose orientation parameter
+
+ Returns
+ -------
+ orient_prior : array
+ Orientation priors
+ """
+ is_fixed_ori = is_fixed_orient(forward)
+ n_sources = forward['sol']['data'].shape[1]
+
+ if not forward['surf_ori'] and loose is not None:
+ raise ValueError('Forward operator is not oriented in surface '
+ 'coordinates. loose parameter should be None '
+ 'not %s.' % loose)
+
+ if loose is not None and not (0 <= loose <= 1):
+ raise ValueError('loose value should be smaller than 1 and bigger than'
+ ' 0, or None for not loose orientations.')
+
+ if is_fixed_ori and loose is not None:
+ warnings.warn('Ignoring loose parameter with forward operator with '
+ 'fixed orientation.')
+
+ if is_fixed_ori:
+ orient_prior = np.ones(n_sources, dtype=np.float)
+ else:
+ n_dip_per_pos = 1 if is_fixed_ori else 3
+ n_positions = n_sources / 3
+ n_dipoles = n_positions * n_dip_per_pos
+ orient_prior = np.ones(n_dipoles, dtype=np.float)
+
+ if loose is not None:
+ print ('Applying loose dipole orientations. Loose value of %s.'
+ % loose)
+ orient_prior[np.mod(np.arange(n_dipoles), 3) != 2] *= loose
+ return orient_prior
+
+
def compute_depth_prior(G, exp=0.8, limit=10.0):
"""Compute weighting for depth prior
"""
diff --git a/mne/minimum_norm/inverse.py b/mne/minimum_norm/inverse.py
index 8dc74c2..0f9ca8e 100644
--- a/mne/minimum_norm/inverse.py
+++ b/mne/minimum_norm/inverse.py
@@ -960,6 +960,48 @@ def _xyz2lf(Lf_xyz, normals):
###############################################################################
# Assemble the inverse operator
+def _prepare_inverse(forward, info, noise_cov, pca=False):
+ """Util function for inverse solvers
+ """
+ fwd_ch_names = [c['ch_name'] for c in forward['info']['chs']]
+ ch_names = [c['ch_name'] for c in info['chs']
+ if (c['ch_name'] not in info['bads'])
+ and (c['ch_name'] in fwd_ch_names)]
+ n_chan = len(ch_names)
+ print "Computing inverse operator with %d channels." % n_chan
+
+ #
+ # Handle noise cov
+ #
+ noise_cov = prepare_noise_cov(noise_cov, info, ch_names)
+
+ # Omit the zeroes due to projection
+ eig = noise_cov['eig']
+ nzero = (eig > 0)
+ n_nzero = sum(nzero)
+
+ if pca:
+ whitener = np.zeros((n_nzero, n_chan), dtype=np.float)
+ whitener = 1.0 / np.sqrt(eig[nzero])
+ # Rows of eigvec are the eigenvectors
+ whitener = noise_cov['eigvec'][nzero] / np.sqrt(eig[nzero])[:, None]
+ print 'Reducing data rank to %d' % n_nzero
+ else:
+ whitener = np.zeros((n_chan, n_chan), dtype=np.float)
+ whitener[nzero, nzero] = 1.0 / np.sqrt(eig[nzero])
+ # Rows of eigvec are the eigenvectors
+ whitener = np.dot(whitener, noise_cov['eigvec'])
+
+ gain = forward['sol']['data']
+
+ fwd_idx = [fwd_ch_names.index(name) for name in ch_names]
+ gain = gain[fwd_idx]
+
+ print 'Total rank is %d' % n_nzero
+
+ return ch_names, gain, noise_cov, whitener, n_nzero
+
+
def make_inverse_operator(info, forward, noise_cov, loose=0.2, depth=0.8):
"""Assemble inverse operator
diff --git a/mne/mixed_norm/__init__.py b/mne/mixed_norm/__init__.py
new file mode 100644
index 0000000..f856145
--- /dev/null
+++ b/mne/mixed_norm/__init__.py
@@ -0,0 +1,7 @@
+"""Non-Linear inverse solvers based on Mixed Norm Estimates (MxNE)"""
+
+# Author: Alexandre Gramfort <gramfort at nmr.mgh.harvard.edu>
+#
+# License: Simplified BSD
+
+from .inverse import mixed_norm
diff --git a/mne/mixed_norm/inverse.py b/mne/mixed_norm/inverse.py
new file mode 100644
index 0000000..19c49a5
--- /dev/null
+++ b/mne/mixed_norm/inverse.py
@@ -0,0 +1,223 @@
+# Author: Alexandre Gramfort <gramfort at nmr.mgh.harvard.edu>
+#
+# License: Simplified BSD
+
+from copy import deepcopy
+import numpy as np
+from scipy import linalg
+
+from ..source_estimate import SourceEstimate
+from ..minimum_norm.inverse import combine_xyz, _make_stc, _prepare_inverse
+from ..forward import compute_orient_prior, is_fixed_orient
+from ..fiff.pick import pick_channels_evoked
+from .optim import mixed_norm_solver, norm_l2inf
+
+
+def _prepare_gain(gain, forward, whitener, depth, loose, weights, weights_min):
+ print 'Whitening lead field matrix.'
+ gain = np.dot(whitener, gain)
+
+ # Handle depth prior scaling
+ source_weighting = np.sum(gain ** 2, axis=0) ** depth
+
+ # apply loose orientations
+ orient_prior = compute_orient_prior(forward, loose)
+
+ source_weighting /= orient_prior
+ source_weighting = np.sqrt(source_weighting)
+ gain /= source_weighting[None, :]
+
+ # Handle weights
+ mask = None
+ if weights is not None:
+ if isinstance(weights, SourceEstimate):
+ # weights = np.sqrt(np.sum(weights.data ** 2, axis=1))
+ weights = np.max(np.abs(weights.data), axis=1)
+ weights_max = np.max(weights)
+ if weights_min > weights_max:
+ raise ValueError('weights_min > weights_max (%s > %s)' %
+ (weights_min, weights_max))
+ weights_min = weights_min / weights_max
+ weights = weights / weights_max
+ n_dip_per_pos = 1 if is_fixed_orient(forward) else 3
+ weights = np.ravel(np.tile(weights, [n_dip_per_pos, 1]).T)
+ if len(weights) != gain.shape[1]:
+ raise ValueError('weights do not have the correct dimension '
+ ' (%d != %d)' % (len(weights), gain.shape[1]))
+ source_weighting /= weights
+ gain *= weights[None, :]
+
+ if weights_min is not None:
+ mask = (weights > weights_min)
+ gain = gain[:, mask]
+ n_sources = np.sum(mask) / n_dip_per_pos
+ print "Reducing source space to %d sources" % n_sources
+
+ return gain, source_weighting, mask
+
+
+def _make_sparse_stc(X, active_set, forward, tmin, tstep):
+ if not is_fixed_orient(forward):
+ print 'combining the current components...',
+ X = combine_xyz(X)
+
+ active_idx = np.where(active_set)[0]
+ n_dip_per_pos = 1 if is_fixed_orient(forward) else 3
+ if n_dip_per_pos > 1:
+ active_idx = np.unique(active_idx // n_dip_per_pos)
+
+ src = forward['src']
+
+ n_lh_points = len(src[0]['vertno'])
+ lh_vertno = src[0]['vertno'][active_idx[active_idx < n_lh_points]]
+ rh_vertno = src[1]['vertno'][active_idx[active_idx >= n_lh_points]
+ - n_lh_points]
+ stc = _make_stc(X, tmin, tstep, [lh_vertno, rh_vertno])
+ return stc
+
+
+def mixed_norm(evoked, forward, noise_cov, alpha, loose=0.2, depth=0.8,
+ maxit=3000, tol=1e-4, active_set_size=10, pca=True,
+ debias=True, time_pca=True, weights=None, weights_min=None,
+ return_residual=False):
+ """Mixed-norm estimate (MxNE)
+
+ Compute L1/L2 mixed-norm solution on evoked data.
+
+ Parameters
+ ----------
+ evoked : instance of Evoked or list of instance of Evoked
+ Evoked data to invert
+ forward : dict
+ Forward operator
+ noise_cov : instance of Covariance
+ Noise covariance to compute whitener
+ alpha : float
+ Regularization parameter
+ loose : float in [0, 1]
+ Value that weights the source variances of the dipole components
+ defining the tangent space of the cortical surfaces. If loose
+ is 0 or None then the solution is computed with fixed orientation.
+ maxit : int
+ Maximum number of iterations
+ tol : float
+ Tolerance parameter
+ active_set_size : int | None
+ Size of active set increment. If None, no active set strategy is used.
+ pca: bool
+ If True the rank of the data is reduced to true dimension.
+ debias: bool
+ Remove coefficient amplitude bias due to L1 penalty
+ time_pca: bool or int
+ If True the rank of the concatenated epochs is reduced to
+ its true dimension. If is 'int' the rank is limited to this value.
+ weights: None | array | SourceEstimate
+ Weight for penalty in mixed_norm. Can be None or
+ 1d array of length n_sources or a SourceEstimate e.g. obtained
+ with wMNE or dSPM or fMRI
+ weights_min: float
+ Do not consider in the estimation sources for which weights
+ is less than weights_min.
+ return_residual: bool
+ If True, the residual is returned as an Evoked instance.
+
+ Returns
+ -------
+ stc : dict
+ Source time courses
+
+ References
+ ----------
+ Gramfort A., Kowalski M. and Hamalainen, M,
+ Mixed-norm estimates for the M/EEG inverse problem using accelerated
+ gradient methods, Physics in Medicine and Biology, 2012
+ """
+ if not isinstance(evoked, list):
+ evoked = [evoked]
+
+ all_ch_names = evoked[0].ch_names
+ if not all(all_ch_names == evoked[i].ch_names
+ for i in range(1, len(evoked))):
+ raise Exception('All the datasets must have the same good channels.')
+
+ info = evoked[0].info
+ ch_names, gain, _, whitener, _ = _prepare_inverse(forward,
+ info, noise_cov, pca)
+
+ # Whiten lead field.
+ gain, source_weighting, mask = _prepare_gain(gain, forward, whitener,
+ depth, loose, weights, weights_min)
+
+ sel = [all_ch_names.index(name) for name in ch_names]
+ M = np.concatenate([e.data[sel] for e in evoked], axis=1)
+
+ # Whiten data
+ print 'Whitening data matrix.'
+ M = np.dot(whitener, M)
+
+ if time_pca:
+ U, s, Vh = linalg.svd(M, full_matrices=False)
+ if not isinstance(time_pca, bool) and isinstance(time_pca, int):
+ U = U[:, :time_pca]
+ s = s[:time_pca]
+ Vh = Vh[:time_pca]
+ M = U * s
+
+ # Scaling to make setting of alpha easy
+ n_dip_per_pos = 1 if is_fixed_orient(forward) else 3
+ alpha_max = norm_l2inf(np.dot(gain.T, M), n_dip_per_pos, copy=False)
+ alpha_max *= 0.01
+ gain /= alpha_max
+ source_weighting *= alpha_max
+
+ X, active_set, E = mixed_norm_solver(M, gain, alpha,
+ maxit=maxit, tol=tol, verbose=True,
+ active_set_size=active_set_size,
+ debias=debias,
+ n_orient=n_dip_per_pos)
+
+ if mask is not None:
+ active_set_tmp = np.zeros(len(mask), dtype=np.bool)
+ active_set_tmp[mask] = active_set
+ active_set = active_set_tmp
+ del active_set_tmp
+
+ if time_pca:
+ X = np.dot(X, Vh)
+
+ if active_set.sum() == 0:
+ raise Exception("No active dipoles found. alpha is too big.")
+
+ # Reapply weights to have correct unit
+ X /= source_weighting[active_set][:, None]
+
+ stcs = list()
+ residual = list()
+ cnt = 0
+ for e in evoked:
+ tmin = float(e.first) / e.info['sfreq']
+ tstep = 1.0 / e.info['sfreq']
+ stc = _make_sparse_stc(X[:, cnt:(cnt + len(e.times))], active_set,
+ forward, tmin, tstep)
+ stcs.append(stc)
+
+ if return_residual:
+ sel = [forward['sol']['row_names'].index(c) for c in ch_names]
+ r = deepcopy(e)
+ r = pick_channels_evoked(r, include=ch_names)
+ r.data -= np.dot(forward['sol']['data'][sel, :][:, active_set], X)
+ residual.append(r)
+
+ print '[done]'
+
+ if len(stcs) == 1:
+ out = stcs[0]
+ if return_residual:
+ residual = residual[0]
+ else:
+ out = stcs
+
+ if return_residual:
+ out = out, residual
+
+ return out
diff --git a/mne/mixed_norm/optim.py b/mne/mixed_norm/optim.py
new file mode 100644
index 0000000..efe5ac3
--- /dev/null
+++ b/mne/mixed_norm/optim.py
@@ -0,0 +1,304 @@
+# Author: Alexandre Gramfort <gramfort at nmr.mgh.harvard.edu>
+#
+# License: Simplified BSD
+
+from math import sqrt
+import numpy as np
+from scipy import linalg
+
+
+def groups_norm2(A, n_orient):
+ """compute squared L2 norms of groups inplace"""
+ n_positions = A.shape[0] // n_orient
+ return np.sum(np.power(A, 2, A).reshape(n_positions, -1), axis=1)
+
+
+def norm_l2inf(A, n_orient, copy=True):
+ """L2-inf norm"""
+ if A.size == 0:
+ return 0.0
+ if copy:
+ A = A.copy()
+ return sqrt(np.max(groups_norm2(A, n_orient)))
+
+
+def norm_l21(A, n_orient, copy=True):
+ """L21 norm"""
+ if A.size == 0:
+ return 0.0
+ if copy:
+ A = A.copy()
+ return np.sum(np.sqrt(groups_norm2(A, n_orient)))
+
+
+def prox_l21(Y, alpha, n_orient):
+ """proximity operator for l21 norm
+
+ (L2 over columns and L1 over rows => groups contain n_orient rows)
+
+ Example
+ -------
+ >>> Y = np.tile(np.array([0, 4, 3, 0, 0], dtype=np.float), (2, 1))
+ >>> Y = np.r_[Y, np.zeros_like(Y)]
+ >>> print Y
+ [[ 0. 4. 3. 0. 0.]
+ [ 0. 4. 3. 0. 0.]
+ [ 0. 0. 0. 0. 0.]
+ [ 0. 0. 0. 0. 0.]]
+ >>> Yp, active_set = prox_l21(Y, 2, 2)
+ >>> print Yp
+ [[ 0. 2.86862915 2.15147186 0. 0. ]
+ [ 0. 2.86862915 2.15147186 0. 0. ]]
+ >>> print active_set
+ [ True True False False]
+ """
+ if len(Y) == 0:
+ return np.zeros((0, Y.shape[1]), dtype=Y.dtype), \
+ np.zeros((0,), dtype=np.bool)
+ n_positions = Y.shape[0] // n_orient
+ rows_norm = np.sqrt(np.sum((np.abs(Y) ** 2).reshape(n_positions, -1),
+ axis=1))
+ shrink = np.maximum(1.0 - alpha / rows_norm, 0.0)
+ active_set = shrink > 0.0
+ if n_orient > 1:
+ active_set = np.tile(active_set[:, None], [1, n_orient]).ravel()
+ shrink = np.tile(shrink[:, None], [1, n_orient]).ravel()
+ Y = Y[active_set]
+ Y *= shrink[active_set][:, np.newaxis]
+ return Y, active_set
+
+
+def prox_l1(Y, alpha, n_orient):
+ """proximity operator for l1 norm with multiple orientation support
+
+ L2 over orientation and L1 over position (space + time)
+
+ Example
+ -------
+ >>> Y = np.tile(np.array([1, 2, 3, 2, 0], dtype=np.float), (2, 1))
+ >>> Y = np.r_[Y, np.zeros_like(Y)]
+ >>> print Y
+ [[ 1. 2. 3. 2. 0.]
+ [ 1. 2. 3. 2. 0.]
+ [ 0. 0. 0. 0. 0.]
+ [ 0. 0. 0. 0. 0.]]
+ >>> Yp, active_set = prox_l1(Y, 2, 2)
+ >>> print Yp
+ [[ 0. 0.58578644 1.58578644 0.58578644 0. ]
+ [ 0. 0.58578644 1.58578644 0.58578644 0. ]]
+ >>> print active_set
+ [ True True False False]
+ """
+ n_positions = Y.shape[0] // n_orient
+ norms = np.sqrt(np.sum((np.abs(Y) ** 2).T.reshape(-1, n_orient), axis=1))
+ shrink = np.maximum(1.0 - alpha / norms, 0.0).reshape(-1, n_positions).T
+ active_set = np.any(shrink > 0.0, axis=1)
+ shrink = shrink[active_set]
+ if n_orient > 1:
+ active_set = np.tile(active_set[:, None], [1, n_orient]).ravel()
+ Y = Y[active_set]
+ if len(Y) > 0:
+ for o in range(n_orient):
+ Y[o::n_orient] *= shrink
+ return Y, active_set
+
+
+def dgap_l21(M, G, X, active_set, alpha, n_orient):
+ """Duality gaps for the mixed norm inverse problem
+
+ Parameters
+ ----------
+ M : array of shape [n_sensors, n_times]
+ data
+ G : array of shape [n_sensors, n_active]
+ Gain matrix a.k.a. lead field
+ X : array of shape [n_active, n_times]
+ Sources
+ active_set : array of bool
+ Mask of active sources
+ alpha : float
+ Regularization parameter
+ n_orient : int
+ Number of dipoles per locations (typically 1 or 3)
+
+ Returns
+ -------
+ gap : float
+ Dual gap
+ pobj : float
+ Primal cost
+ dobj : float
+ Dual cost. gap = pobj - dobj
+ R : array of shape [n_sensors, n_times]
+ Current residual of M - G * X
+ """
+ GX = np.dot(G[:, active_set], X)
+ R = M - GX
+ penalty = norm_l21(X, n_orient, copy=True)
+ nR2 = np.sum(R ** 2)
+ pobj = 0.5 * nR2 + alpha * penalty
+ dual_norm = norm_l2inf(np.dot(G.T, R), n_orient, copy=False)
+ scaling = alpha / dual_norm
+ scaling = min(scaling, 1.0)
+ dobj = 0.5 * (scaling ** 2) * nR2 + scaling * np.sum(R * GX)
+ gap = pobj - dobj
+ return gap, pobj, dobj, R
+
+
+def _mixed_norm_solver(M, G, alpha, maxit=200, tol=1e-8, verbose=True,
+ init=None, n_orient=1):
+ """Solves L21 inverse solver"""
+ n_sensors, n_times = M.shape
+ n_sensors, n_sources = G.shape
+
+ lipschitz_constant = 1.1 * linalg.norm(G, ord=2) ** 2
+
+ if n_sources < n_sensors:
+ gram = np.dot(G.T, G)
+ GTM = np.dot(G.T, M)
+ else:
+ gram = None
+
+ if init is None:
+ X = 0.0
+ active_set = np.zeros(n_sources, dtype=np.bool)
+ R = M.copy()
+ if gram is not None:
+ R = np.dot(G.T, R)
+ else:
+ X, active_set = init
+ if gram is None:
+ R = M - np.dot(G[:, active_set], X)
+ else:
+ R = GTM - np.dot(gram[:, active_set], X)
+
+ t = 1.0
+ Y = np.zeros((n_sources, n_times)) # FISTA aux variable
+ E = [] # track cost function
+
+ active_set = np.ones(n_sources, dtype=np.bool) # HACK
+
+ for i in xrange(maxit):
+ X0, active_set_0 = X, active_set # store previous values
+ if gram is None:
+ Y += np.dot(G.T, R) / lipschitz_constant # ISTA step
+ else:
+ Y += R / lipschitz_constant # ISTA step
+ X, active_set = prox_l21(Y, alpha / lipschitz_constant, n_orient)
+
+ t0 = t
+ t = 0.5 * (1.0 + sqrt(1.0 + 4.0 * t ** 2))
+ Y[:] = 0.0
+ dt = ((t0 - 1.0) / t)
+ Y[active_set] = (1.0 + dt) * X
+ Y[active_set_0] -= dt * X0
+ Y_as = active_set_0 | active_set
+
+ if gram is None:
+ R = M - np.dot(G[:, Y_as], Y[Y_as])
+ else:
+ R = GTM - np.dot(gram[:, Y_as], Y[Y_as])
+
+ if verbose: # log cost function value
+ gap, pobj, dobj, _ = dgap_l21(M, G, X, active_set, alpha, n_orient)
+ E.append(pobj)
+ print "pobj : %s -- gap : %s" % (pobj, gap)
+ if gap < tol:
+ print 'Convergence reached ! (gap: %s < %s)' % (gap, tol)
+ break
+ return X, active_set, E
+
+
+def mixed_norm_solver(M, G, alpha, maxit=200, tol=1e-8, verbose=True,
+ active_set_size=50, debias=True, n_orient=1):
+ """Solves L21 inverse solver with active set strategy
+
+ Parameters
+ ----------
+ M : array
+ The data
+ G : array
+ The forward operator
+ maxit : int
+ The number of iterations
+ tol : float
+ Tolerance on dual gap for convergence checking
+ verbose : bool
+ Use verbose output
+ active_set_size : int
+ Size of active set increase at each iteration.
+ debias : bool
+ Debias source estimates
+ n_orient : int
+ The number of orientation (1 : fixed or 3 : free or loose).
+
+ Returns
+ -------
+ X: array
+ The source estimates
+ active_set: array
+ The mask of active sources
+ E: array
+ The cost function over the iterations
+ """
+ n_dipoles = G.shape[1]
+ n_positions = n_dipoles // n_orient
+ alpha_max = norm_l2inf(np.dot(G.T, M), n_orient, copy=False)
+ print "-- ALPHA MAX : %s" % alpha_max
+ if active_set_size is not None:
+ n_sensors, n_times = M.shape
+ idx_large_corr = np.argsort(groups_norm2(np.dot(G.T, M), n_orient))
+ active_set = np.zeros(n_positions, dtype=np.bool)
+ active_set[idx_large_corr[-active_set_size:]] = True
+ if n_orient > 1:
+ active_set = np.tile(active_set[:, None], [1, n_orient]).ravel()
+ init = None
+ for k in xrange(maxit):
+ X, as_, E = _mixed_norm_solver(M, G[:, active_set], alpha,
+ maxit=maxit, tol=tol, verbose=False,
+ init=init, n_orient=n_orient)
+ as_ = np.where(active_set)[0][as_]
+ gap, pobj, dobj, R = dgap_l21(M, G, X, as_, alpha, n_orient)
+ print 'gap = %s, pobj = %s' % (gap, pobj)
+ if gap < tol:
+ print 'Convergence reached ! (gap: %s < %s)' % (gap, tol)
+ break
+ else: # add sources
+ idx_large_corr = np.argsort(groups_norm2(np.dot(G.T, R),
+ n_orient))
+ new_active_idx = idx_large_corr[-active_set_size:]
+ if n_orient > 1:
+ new_active_idx = n_orient * new_active_idx[:, None] + \
+ np.arange(n_orient)[None, :]
+ new_active_idx = new_active_idx.ravel()
+ idx_old_active_set = as_
+ active_set_old = active_set.copy()
+ active_set[new_active_idx] = True
+ as_size = np.sum(active_set)
+ print 'active set size %s' % as_size
+ X_init = np.zeros((as_size, n_times), dtype=X.dtype)
+ idx_active_set = np.where(active_set)[0]
+ idx = np.searchsorted(idx_active_set, idx_old_active_set)
+ X_init[idx] = X
+ init = X_init, active_set[active_set == True]
+ if np.all(active_set_old == active_set):
+ print 'Convergence stopped (AS did not change) !'
+ break
+ else:
+ print 'Did NOT converge ! (gap: %s > %s)' % (gap, tol)
+
+ active_set = np.zeros_like(active_set)
+ active_set[as_] = True
+ else:
+ X, active_set, E = _mixed_norm_solver(M, G, alpha, maxit=maxit,
+ tol=tol, verbose=verbose,
+ n_orient=n_orient)
+
+ if (active_set.sum() > 0) and debias:
+ # Run ordinary least square on active set
+ GX = np.dot(G[:, active_set], X)
+ k, _, _, _ = linalg.lstsq(GX.reshape(-1, 1), M.reshape(-1, 1))
+ X *= k
+ GX *= k
+
+ return X, active_set, E
diff --git a/mne/mixed_norm/tests/__init__.py b/mne/mixed_norm/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/mne/mixed_norm/tests/test_inverse.py b/mne/mixed_norm/tests/test_inverse.py
new file mode 100644
index 0000000..d5d53e7
--- /dev/null
+++ b/mne/mixed_norm/tests/test_inverse.py
@@ -0,0 +1,48 @@
+# Author: Alexandre Gramfort <gramfort at nmr.mgh.harvard.edu>
+#
+# License: Simplified BSD
+
+import os.path as op
+from numpy.testing import assert_array_almost_equal
+from nose.tools import assert_true
+
+from ...datasets import sample
+from ...label import read_label
+from ... import fiff, read_cov, read_forward_solution
+from ..inverse import mixed_norm
+
+
+examples_folder = op.join(op.dirname(__file__), '..', '..', '..', 'examples')
+data_path = sample.data_path(examples_folder)
+fname_data = op.join(data_path, 'MEG', 'sample', 'sample_audvis-ave.fif')
+fname_cov = op.join(data_path, 'MEG', 'sample', 'sample_audvis-cov.fif')
+fname_fwd = op.join(data_path, 'MEG', 'sample',
+ 'sample_audvis-meg-oct-6-fwd.fif')
+label = 'Aud-rh'
+fname_label = op.join(data_path, 'MEG', 'sample', 'labels', '%s.label' % label)
+
+evoked = fiff.Evoked(fname_data, setno=1, baseline=(None, 0))
+
+# Read noise covariance matrix
+cov = read_cov(fname_cov)
+
+# Handling average file
+setno = 0
+loose = None
+
+evoked = fiff.read_evoked(fname_data, setno=setno, baseline=(None, 0))
+evoked.crop(tmin=0.08, tmax=0.12)
+
+# Handling forward solution
+forward = read_forward_solution(fname_fwd, force_fixed=True)
+label = read_label(fname_label)
+
+
+def test_MxNE_inverse():
+ """Test MxNE inverse computation"""
+ alpha = 60 # spatial regularization parameter
+ stc = mixed_norm(evoked, forward, cov, alpha, loose=None, depth=0.9,
+ maxit=500, tol=1e-4, active_set_size=10)
+
+ assert_array_almost_equal(stc.times, evoked.times, 5)
+ assert_true(stc.vertno[1][0] in label['vertices'])
diff --git a/mne/mixed_norm/tests/test_optim.py b/mne/mixed_norm/tests/test_optim.py
new file mode 100644
index 0000000..a07fabd
--- /dev/null
+++ b/mne/mixed_norm/tests/test_optim.py
@@ -0,0 +1,36 @@
+# Author: Alexandre Gramfort <gramfort at nmr.mgh.harvard.edu>
+#
+# License: Simplified BSD
+
+import numpy as np
+from numpy.testing import assert_array_equal
+
+from ..optim import mixed_norm_solver
+
+
+def test_l21_MxNE():
+ """Test convergence of MxNE"""
+ n, p, t, alpha = 30, 40, 20, 1
+ rng = np.random.RandomState(0)
+ G = rng.randn(n, p)
+ G /= np.std(G, axis=0)[None, :]
+ X = np.zeros((p, t))
+ X[0] = 3
+ X[4] = -2
+ M = np.dot(G, X)
+ X_hat, active_set, _ = mixed_norm_solver(M,
+ G, alpha, maxit=1000, tol=1e-8, verbose=True,
+ active_set_size=None, debias=True)
+ assert_array_equal(np.where(active_set)[0], [0, 4])
+ X_hat, active_set, _ = mixed_norm_solver(M,
+ G, alpha, maxit=1000, tol=1e-8, verbose=True,
+ active_set_size=1, debias=True)
+ assert_array_equal(np.where(active_set)[0], [0, 4])
+ X_hat, active_set, _ = mixed_norm_solver(M,
+ G, alpha, maxit=1000, tol=1e-8, verbose=True,
+ active_set_size=1, debias=True, n_orient=2)
+ assert_array_equal(np.where(active_set)[0], [0, 1, 4, 5])
+ X_hat, active_set, _ = mixed_norm_solver(M,
+ G, alpha, maxit=1000, tol=1e-8, verbose=True,
+ active_set_size=1, debias=True, n_orient=5)
+ assert_array_equal(np.where(active_set)[0], [0, 1, 2, 3, 4])
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/python-mne.git
More information about the debian-med-commit
mailing list