[med-svn] [python-mne] 161/376: FIX : bug fix in reading of SSP vectors ENH : implementing minimum norm estimate WIP (todo: loose case + tests !)
Yaroslav Halchenko
debian at onerussian.com
Fri Nov 27 17:22:26 UTC 2015
This is an automated email from the git hooks/post-receive script.
yoh pushed a commit to annotated tag v0.1
in repository python-mne.
commit 1454b8f1ca88d8a5365267ed290c0afe041a32ac
Author: Alexandre Gramfort <alexandre.gramfort at inria.fr>
Date: Sun Mar 27 22:29:19 2011 -0400
FIX : bug fix in reading of SSP vectors
ENH : implementing minimum norm estimate WIP (todo: loose case + tests !)
---
examples/plot_minimum_norm_estimate.py | 61 ++++++++
examples/plot_whiten_forward_solution.py | 41 -----
examples/plot_whitened_evoked_data.py | 45 +++++-
mne/__init__.py | 2 +-
mne/cov.py | 257 +++++++++++++++++--------------
mne/fiff/__init__.py | 2 +-
mne/fiff/evoked.py | 3 +
mne/fiff/pick.py | 2 +-
mne/fiff/proj.py | 114 ++++++++------
mne/inverse.py | 235 ++++++++++++++++++++++++++++
mne/tests/test_cov.py | 7 +-
11 files changed, 546 insertions(+), 223 deletions(-)
diff --git a/examples/plot_minimum_norm_estimate.py b/examples/plot_minimum_norm_estimate.py
new file mode 100644
index 0000000..79aef87
--- /dev/null
+++ b/examples/plot_minimum_norm_estimate.py
@@ -0,0 +1,61 @@
+"""
+================================================
+Compute MNE-dSPM inverse solution on evoked data
+================================================
+
+Compute dSPM inverse solution on MNE evoked dataset
+and stores the solution in stc files for visualisation.
+
+"""
+
+# Author: Alexandre Gramfort <gramfort at nmr.mgh.harvard.edu>
+#
+# License: BSD (3-clause)
+
+print __doc__
+
+import numpy as np
+import pylab as pl
+import mne
+from mne.datasets import sample
+from mne.fiff import Evoked
+
+data_path = sample.data_path('.')
+fname_fwd = data_path + '/MEG/sample/sample_audvis-meg-eeg-oct-6-fwd.fif'
+fname_cov = data_path + '/MEG/sample/sample_audvis-cov.fif'
+fname_evoked = data_path + '/MEG/sample/sample_audvis-ave.fif'
+
+setno = 0
+snr = 3.0
+lambda2 = 1.0 / snr**2
+dSPM = True
+
+# Load data
+evoked = Evoked(fname_evoked, setno=setno, baseline=(None, 0))
+forward = mne.read_forward_solution(fname_fwd)
+noise_cov = mne.Covariance()
+noise_cov.load(fname_cov)
+
+# Compute inverse solution
+stc, K, W = mne.minimum_norm(evoked, forward, noise_cov, orientation='fixed',
+ method='dspm', snr=3, loose=0.2, pca=True)
+
+# Save result in stc files
+lh_vertices = stc['inv']['src'][0]['vertno']
+rh_vertices = stc['inv']['src'][1]['vertno']
+lh_data = stc['sol'][:len(lh_vertices)]
+rh_data = stc['sol'][-len(rh_vertices):]
+
+mne.write_stc('mne_dSPM_inverse-lh.stc', tmin=stc['tmin'], tstep=stc['tstep'],
+ vertices=lh_vertices, data=lh_data)
+mne.write_stc('mne_dSPM_inverse-rh.stc', tmin=stc['tmin'], tstep=stc['tstep'],
+ vertices=rh_vertices, data=rh_data)
+
+###############################################################################
+# View activation time-series
+times = stc['tmin'] + stc['tstep'] * np.arange(stc['sol'].shape[1])
+pl.close('all')
+pl.plot(1e3*times, stc['sol'][::100,:].T)
+pl.xlabel('time (ms)')
+pl.ylabel('dSPM value')
+pl.show()
diff --git a/examples/plot_whiten_forward_solution.py b/examples/plot_whiten_forward_solution.py
deleted file mode 100644
index 828240e..0000000
--- a/examples/plot_whiten_forward_solution.py
+++ /dev/null
@@ -1,41 +0,0 @@
-"""
-========================================================
-Whiten a forward operator with a noise covariance matrix
-========================================================
-"""
-# Author: Alexandre Gramfort <gramfort at nmr.mgh.harvard.edu>
-#
-# License: BSD (3-clause)
-
-print __doc__
-
-import mne
-from mne import fiff
-from mne.datasets import sample
-
-data_path = sample.data_path('.')
-fwd_fname = data_path + '/MEG/sample/sample_audvis-meg-eeg-oct-6-fwd.fif'
-ave_fname = data_path + '/MEG/sample/sample_audvis-ave.fif'
-cov_fname = data_path + '/MEG/sample/sample_audvis-cov.fif'
-
-# Reading
-ave = fiff.read_evoked(ave_fname, setno=0, baseline=(None, 0))
-fwd = mne.read_forward_solution(fwd_fname)
-
-cov = mne.Covariance()
-cov.load(cov_fname)
-
-ave_whiten, fwd_whiten, W = cov.whiten_evoked_and_forward(ave, fwd, eps=0.2)
-
-leadfield = fwd_whiten['sol']['data']
-
-print "Leadfield size : %d x %d" % leadfield.shape
-
-###############################################################################
-# Show result
-import pylab as pl
-pl.matshow(leadfield[:306,:500])
-pl.xlabel('sources')
-pl.ylabel('sensors')
-pl.title('Lead field matrix')
-pl.show()
diff --git a/examples/plot_whitened_evoked_data.py b/examples/plot_whitened_evoked_data.py
index 6d105bd..ab7a47e 100644
--- a/examples/plot_whitened_evoked_data.py
+++ b/examples/plot_whitened_evoked_data.py
@@ -10,24 +10,53 @@ Whiten evoked data using a noise covariance matrix
print __doc__
+import numpy as np
import mne
from mne import fiff
-from mne.viz import plot_evoked
from mne.datasets import sample
data_path = sample.data_path('.')
-fname = data_path + '/MEG/sample/sample_audvis-ave.fif'
+raw_fname = data_path + '/MEG/sample/sample_audvis_filt-0-40_raw.fif'
+# raw_fname = data_path + '/MEG/sample/sample_audvis_raw.fif'
cov_fname = data_path + '/MEG/sample/sample_audvis-cov.fif'
-# Reading
-evoked = fiff.read_evoked(fname, setno=0, baseline=(None, 0))
+###############################################################################
+# Set epochs parameters
+event_id = 1
+tmin = -0.2
+tmax = 0.5
+
+###############################################################################
+# Create evoked data
+
+# Setup for reading the raw data
+raw = fiff.Raw(raw_fname)
+events = mne.find_events(raw)
+
+# pick EEG channels - bad channels (modify to your needs)
+exclude = raw.info['bads'] + ['EEG 053'] # bads + 1 more
+picks = fiff.pick_types(raw.info, meg=False, eeg=True, stim=False,
+ exclude=exclude)
+epochs = mne.Epochs(raw, events, event_id,
+ tmin, tmax, picks=picks, baseline=(None, 0))
+evoked = epochs.average() # average epochs and get an Evoked dataset.
+
cov = mne.Covariance()
cov.load(cov_fname)
-evoked_whiten, W = cov.whiten_evoked(evoked)
+# Whiten data
+W, ch_names = cov.whitener(evoked.info, pca=False) # get whitening matrix
+sel = mne.fiff.pick_channels(evoked.ch_names, include=ch_names) # channels id
+whitened_data = np.dot(W, evoked.data[sel]) # apply whitening
###############################################################################
# Show result
-picks = fiff.pick_types(evoked_whiten.info, meg=True, eeg=True,
- exclude=evoked_whiten.info['bads']) # Pick channels to view
-plot_evoked(evoked_whiten, picks, unit=False) # plot without SI unit of data
+times = 1e3 * epochs.times # in ms
+import pylab as pl
+pl.clf()
+pl.plot(times, whitened_data.T)
+pl.xlim([times[0], times[-1]])
+pl.xlabel('time (ms)')
+pl.ylabel('data (NA)')
+pl.title('Whitened EEG data')
+pl.show()
diff --git a/mne/__init__.py b/mne/__init__.py
index a7ecf59..c78f117 100644
--- a/mne/__init__.py
+++ b/mne/__init__.py
@@ -6,7 +6,7 @@ from .forward import read_forward_solution
from .stc import read_stc, write_stc
from .bem_surfaces import read_bem_surfaces
from .source_space import read_source_spaces
-from .inverse import read_inverse_operator, compute_inverse
+from .inverse import read_inverse_operator, compute_inverse, minimum_norm
from .epochs import Epochs
from .label import label_time_courses, read_label
import fiff
diff --git a/mne/cov.py b/mne/cov.py
index 7c9ca10..41683ae 100644
--- a/mne/cov.py
+++ b/mne/cov.py
@@ -16,11 +16,36 @@ from .fiff.channels import _read_bad_channels
from .fiff.write import start_block, end_block, write_int, write_name_list, \
write_double, write_float_matrix, start_file, end_file
-from .fiff.proj import write_proj
+from .fiff.proj import write_proj, make_projector
from .fiff import fiff_open
from .fiff.pick import pick_types, pick_channels_forward
+def rank(A, tol=1e-8):
+ s = linalg.svd(A, compute_uv=0)
+ return np.sum(np.where(s > s[0]*tol, 1, 0))
+
+
+def _get_whitener(A, rnk, pca, ch_type):
+ # whitening operator
+ D, V = linalg.eigh(A, overwrite_a=True)
+ I = np.argsort(D)[::-1]
+ D = D[I]
+ V = V[:, I]
+ D = 1.0 / D
+ if not pca: # No PCA case.
+ print 'Not doing PCA for %s.' % ch_type
+ W = np.sqrt(D)[:, None] * V.T
+ else: # Rey's approach. MNE has been changed to implement this.
+ print 'Setting small %s eigenvalues to zero.' % ch_type
+ D[rnk:] = 0.0
+ W = np.sqrt(D)[:, None] * V.T
+ # This line will reduce the actual number of variables in data
+ # and leadfield to the true rank.
+ W = W[:rnk]
+ return W
+
+
class Covariance(object):
"""Noise covariance matrix"""
@@ -46,11 +71,125 @@ class Covariance(object):
self._cov = cov
self.data = cov['data']
+ self.ch_names = cov['names']
def save(self, fname):
"""save covariance matrix in a FIF file"""
write_cov_file(fname, self._cov)
+ def whitener(self, info, mag_reg=0.1, grad_reg=0.1, eeg_reg=0.1, pca=True):
+ """Compute whitener based on a list of channels
+
+ Parameters
+ ----------
+ info : dict
+ Measurement info of data to apply the whitener.
+ Defines data channels and which are the bad channels
+ to be ignored.
+ mag_reg : float
+ Regularization of the magnetometers.
+ Recommended between 0.05 and 0.2
+ grad_reg : float
+ Regularization of the gradiometers.
+ Recommended between 0.05 and 0.2
+ eeg_reg : float
+ Regularization of the EGG channels.
+ Recommended between 0.05 and 0.2
+ pca : bool
+ If True, whitening is restricted to the space of
+ the data. It makes sense when data have a low rank
+ due to SSP or maxfilter.
+
+ Returns
+ -------
+ W : array
+ Whitening matrix
+ ch_names : list of strings
+ List of channel names on which to apply the whitener.
+ It corresponds to the columns of W.
+ """
+ bads = info['bads']
+ C_idx = [k for k, name in enumerate(self.ch_names)
+ if name in info['ch_names'] and name not in bads]
+ ch_names = [self.ch_names[k] for k in C_idx]
+ C_noise = self.data[np.ix_(C_idx, C_idx)] # take covariance submatrix
+
+ # # Create the projection operator
+ # proj, ncomp, _ = make_projector(info['projs'], ch_names)
+ # if ncomp > 0:
+ # print '\tCreated an SSP operator (subspace dimension = %d)' % ncomp
+ # C_noise = np.dot(proj, np.dot(C_noise, proj.T))
+
+ # Regularize Noise Covariance Matrix.
+ variances = np.diag(C_noise)
+ ind_meg = pick_types(info, meg=True, eeg=False, exclude=bads)
+ names_meg = [info['ch_names'][k] for k in ind_meg]
+ C_ind_meg = [ch_names.index(name) for name in names_meg]
+
+ ind_grad = pick_types(info, meg='grad', eeg=False, exclude=bads)
+ names_grad = [info['ch_names'][k] for k in ind_grad]
+ C_ind_grad = [ch_names.index(name) for name in names_grad]
+
+ ind_mag = pick_types(info, meg='mag', eeg=False, exclude=bads)
+ names_mag = [info['ch_names'][k] for k in ind_mag]
+ C_ind_mag = [ch_names.index(name) for name in names_mag]
+
+ ind_eeg = pick_types(info, meg=False, eeg=True, exclude=bads)
+ names_eeg = [info['ch_names'][k] for k in ind_eeg]
+ C_ind_eeg = [ch_names.index(name) for name in names_eeg]
+
+ has_meg = len(ind_meg) > 0
+ has_eeg = len(ind_eeg) > 0
+
+ if self.kind == 'diagonal':
+ C_noise = np.diag(variances)
+ rnkC_noise = len(variances)
+ print 'Rank of noise covariance is %d' % rnkC_noise
+ else:
+ # estimate noise covariance matrix rank
+ # Loop on all the required data types (MEG MAG, MEG GRAD, EEG)
+
+ if has_meg: # Separate rank of MEG
+ rank_meg = rank(C_noise[C_ind_meg][:, C_ind_meg])
+ print 'Rank of MEG part of noise covariance is %d' % rank_meg
+ if has_eeg: # Separate rank of EEG
+ rank_eeg = rank(C_noise[C_ind_eeg][:, C_ind_eeg])
+ print 'Rank of EEG part of noise covariance is %d' % rank_eeg
+
+ for ind, reg in zip([C_ind_grad, C_ind_mag, C_ind_eeg],
+ [grad_reg, mag_reg, eeg_reg]):
+ if len(ind) > 0:
+ # add constant on diagonal
+ C_noise[ind, ind] += reg * np.mean(variances[ind])
+
+ if has_meg and has_eeg: # Sets cross terms to zero
+ C_noise[np.ix_(C_ind_meg, C_ind_eeg)] = 0.0
+ C_noise[np.ix_(C_ind_eeg, C_ind_meg)] = 0.0
+
+ # whitening operator
+ if has_meg:
+ W_meg = _get_whitener(C_noise[C_ind_meg][:, C_ind_meg], rank_meg,
+ pca, 'MEG')
+
+ if has_eeg:
+ W_eeg = _get_whitener(C_noise[C_ind_eeg][:, C_ind_eeg], rank_eeg,
+ pca, 'EEG')
+
+ if has_meg and not has_eeg: # Only MEG case.
+ W = W_meg
+ elif has_eeg and not has_meg: # Only EEG case.
+ W = W_eeg
+ elif has_eeg and has_meg: # Bimodal MEG and EEG case.
+ # Whitening of MEG and EEG separately, which assumes zero
+ # covariance between MEG and EEG (i.e., a block diagonal noise
+ # covariance). This was recommended by Matti as EEG does not
+ # measure all the signals from the same environmental noise sources
+ # as MEG.
+ W = np.r_[np.c_[W_meg, np.zeros((W_meg.shape[0], W_eeg.shape[1]))],
+ np.c_[np.zeros((W_eeg.shape[0], W_meg.shape[1])), W_eeg]]
+
+ return W, names_meg + names_eeg
+
def estimate_from_raw(self, raw, picks=None, quantum_sec=10):
"""Estimate noise covariance matrix from a raw FIF file
"""
@@ -82,122 +221,6 @@ class Covariance(object):
self.data = cov / n_samples # XXX : check
print '[done]'
- def _regularize(self, data, variances, ch_names, eps):
- """Operates inplace in data
- """
- if len(ch_names) > 0:
- ind = [self._cov['names'].index(name) for name in ch_names]
- reg = eps * np.mean(variances[ind])
- for ii in ind:
- data[ind,ind] += reg
-
- def whiten_evoked(self, evoked, eps=0.2):
- """Whiten an evoked data file
-
- The whitening matrix is estimated and then multiplied to data.
- It makes the additive white noise assumption of MNE
- realistic.
-
- Parameters
- ----------
- evoked : Evoked object
- A evoked data set
- eps : float
- The regularization factor used.
-
- Returns
- -------
- evoked_whiten : Evoked object
- Evoked data set after whitening.
- W : array of shape [n_channels, n_channels]
- The whitening matrix
- """
-
- data = self.data.copy() # will be the regularized covariance
- variances = np.diag(data)
-
- # Add (eps x identity matrix) to magnetometers only.
- # This is based on the mean magnetometer variance like MNE C-code does it.
- mag_ind = pick_types(evoked.info, meg='mag', eeg=False, stim=False)
- mag_names = [evoked.info['chs'][k]['ch_name'] for k in mag_ind]
- self._regularize(data, variances, mag_names, eps)
-
- # Add (eps x identity matrix) to gradiometers only.
- grad_ind = pick_types(evoked.info, meg='grad', eeg=False, stim=False)
- grad_names = [evoked.info['chs'][k]['ch_name'] for k in grad_ind]
- self._regularize(data, variances, grad_names, eps)
-
- # Add (eps x identity matrix) to eeg only.
- eeg_ind = pick_types(evoked.info, meg=False, eeg=True, stim=False)
- eeg_names = [evoked.info['chs'][k]['ch_name'] for k in eeg_ind]
- self._regularize(data, variances, eeg_names, eps)
-
- d, V = linalg.eigh(data) # Compute eigen value decomposition.
-
- # Compute the unique square root inverse, which is a whitening matrix.
- # This matrix can be multiplied with data and leadfield matrix to get
- # whitened inverse solutions.
- d = 1.0 / np.sqrt(d)
- W = d[:,None] * V.T
-
- # Get all channel indices
- n_channels = len(evoked.info['chs'])
- ave_ch_names = [evoked.info['chs'][k]['ch_name']
- for k in range(n_channels)]
- ind = [ave_ch_names.index(name) for name in self._cov['names']]
-
- evoked_whiten = copy.copy(evoked)
- evoked_whiten.data[ind] = np.dot(W, evoked.data[ind])
-
- return evoked_whiten, W
-
- def whiten_evoked_and_forward(self, evoked, fwd, eps=0.2):
- """Whiten an evoked data set and a forward solution
-
- The whitening matrix is estimated and then multiplied to
- forward solution a.k.a. the leadfield matrix.
- It makes the additive white noise assumption of MNE
- realistic.
-
- Parameters
- ----------
- evoked : Evoked object
- A evoked data set
- fwd : forward data
- A forward solution read with mne.read_forward
- eps : float
- The regularization factor used.
-
- Returns
- -------
- ave : Evoked object
- The whitened evoked data set
- fwd : dict
- The whitened forward solution.
- W : array of shape [n_channels, n_channels]
- The whitening matrix
- """
- # handle evoked
- evoked_whiten, W = self.whiten_evoked(evoked, eps=eps)
-
- evoked_ch_names = [ch['ch_name'] for ch in evoked_whiten.info['chs']]
-
- # handle forward (keep channels in covariance matrix)
- fwd_whiten = copy.copy(fwd)
- ind = [fwd_whiten['sol']['row_names'].index(name)
- for name in self._cov['names']]
- fwd_whiten['sol']['data'][ind] = np.dot(W,
- fwd_whiten['sol']['data'][ind])
- fwd_whiten['sol']['row_names'] = [fwd_whiten['sol']['row_names'][k]
- for k in ind]
- fwd_whiten['chs'] = [fwd_whiten['chs'][k] for k in ind]
-
- # keep in forward the channels in the evoked dataset
- fwd_whiten = pick_channels_forward(fwd, include=evoked_ch_names,
- exclude=evoked.info['bads'])
-
- return evoked_whiten, fwd_whiten, W
-
def __repr__(self):
s = "kind : %s" % self.kind
s += ", size : %s x %s" % self.data.shape
diff --git a/mne/fiff/__init__.py b/mne/fiff/__init__.py
index 79c481e..60aef75 100644
--- a/mne/fiff/__init__.py
+++ b/mne/fiff/__init__.py
@@ -10,6 +10,6 @@ from .open import fiff_open
from .evoked import Evoked, read_evoked, write_evoked
from .raw import Raw, read_raw_segment, read_raw_segment_times, \
start_writing_raw, write_raw_buffer, finish_writing_raw
-from .pick import pick_types
+from .pick import pick_types, pick_channels
from .compensator import get_current_comp
diff --git a/mne/fiff/evoked.py b/mne/fiff/evoked.py
index c85692b..9d2871e 100644
--- a/mne/fiff/evoked.py
+++ b/mne/fiff/evoked.py
@@ -396,6 +396,9 @@ class Evoked(object):
s += ", n_channels x n_times : %s x %s" % self.data.shape
return "Evoked (%s)" % s
+ @property
+ def ch_names(self):
+ return self.info['ch_names']
def read_evoked(fname, setno=0, baseline=None):
"""Read an evoked dataset
diff --git a/mne/fiff/pick.py b/mne/fiff/pick.py
index c7df02e..97edd61 100644
--- a/mne/fiff/pick.py
+++ b/mne/fiff/pick.py
@@ -36,7 +36,7 @@ def channel_type(info, idx):
return 'stim'
-def pick_channels(ch_names, include, exclude):
+def pick_channels(ch_names, include, exclude=[]):
"""Pick channels by names
Returns the indices of the good channels in ch_names.
diff --git a/mne/fiff/proj.py b/mne/fiff/proj.py
index b3b7685..5483346 100644
--- a/mne/fiff/proj.py
+++ b/mne/fiff/proj.py
@@ -35,7 +35,7 @@ def read_proj(fid, node):
# Locate the projection data
nodes = dir_tree_find(node, FIFF.FIFFB_PROJ)
if len(nodes) == 0:
- return projdata
+ return projdata
tag = find_tag(fid, nodes[0], FIFF.FIFF_NCHAN)
if tag is not None:
@@ -64,19 +64,19 @@ def read_proj(fid, node):
tag = find_tag(fid, item, FIFF.FIFF_PROJ_ITEM_CH_NAME_LIST)
if tag is not None:
- namelist = tag.data;
+ namelist = tag.data
else:
raise ValueError, 'Projection item channel list missing'
- tag = find_tag(fid, item,FIFF.FIFF_PROJ_ITEM_KIND);
+ tag = find_tag(fid, item, FIFF.FIFF_PROJ_ITEM_KIND)
if tag is not None:
- kind = tag.data;
+ kind = tag.data
else:
raise ValueError, 'Projection item kind missing'
tag = find_tag(fid, item, FIFF.FIFF_PROJ_ITEM_NVEC)
if tag is not None:
- nvec = tag.data
+ nvec = int(tag.data)
else:
raise ValueError, 'Number of projection vectors not specified'
@@ -86,20 +86,21 @@ def read_proj(fid, node):
else:
raise ValueError, 'Projection item channel list missing'
- tag = find_tag(fid, item, FIFF.FIFF_PROJ_ITEM_VECTORS);
+ tag = find_tag(fid, item, FIFF.FIFF_PROJ_ITEM_VECTORS)
if tag is not None:
- data = tag.data;
+ data = tag.data
else:
raise ValueError, 'Projection item data missing'
- tag = find_tag(fid, item, FIFF.FIFF_MNE_PROJ_ITEM_ACTIVE);
+ tag = find_tag(fid, item, FIFF.FIFF_MNE_PROJ_ITEM_ACTIVE)
if tag is not None:
- active = tag.data;
+ active = True
else:
- active = False;
+ active = False
if data.shape[1] != len(names):
- raise ValueError, 'Number of channel names does not match the size of data matrix'
+ raise ValueError, ('Number of channel names does not match the '
+ 'size of data matrix')
# Use exactly the same fields in data as in a named matrix
one = Bunch(kind=kind, active=active, desc=desc,
@@ -128,6 +129,7 @@ def read_proj(fid, node):
from .write import write_int, write_float, write_string, write_name_list, \
write_float_matrix, end_block, start_block
+
def write_proj(fid, projs):
"""Write a projection operator to a file.
@@ -156,7 +158,7 @@ def write_proj(fid, projs):
proj['data']['col_names'])
write_float_matrix(fid, FIFF.FIFF_PROJ_ITEM_VECTORS,
proj['data']['data'])
- end_block(fid,FIFF.FIFFB_PROJ_ITEM)
+ end_block(fid, FIFF.FIFFB_PROJ_ITEM)
end_block(fid, FIFF.FIFFB_PROJ)
@@ -164,32 +166,37 @@ def write_proj(fid, projs):
# Utils
def make_projector(projs, ch_names, bads=[]):
- """
- %
- % [proj,nproj,U] = mne_make_projector(projs,ch_names,bads)
- %
- % proj - The projection operator to apply to the data
- % nproj - How many items in the projector
- % U - The orthogonal basis of the projection vectors (optional)
- %
- % Make an SSP operator
- %
- % projs - A set of projection vectors
- % ch_names - A cell array of channel names
- % bads - Bad channels to exclude
- %
+ """Create an SSP operator from SSP projection vectors
+
+ Parameters
+ ----------
+ projs : list
+ List of projection vectors
+ ch_names : list of strings
+ List of channels to include in the projection matrix
+ bads : list of strings
+ Some bad channels to exclude
+
+ Returns
+ -------
+ proj : array of shape [n_channels, n_channels]
+ The projection operator to apply to the data
+ nproj : int
+ How many items in the projector
+ U : array
+ The orthogonal basis of the projection vectors (optional)
"""
nchan = len(ch_names)
- if len(ch_names) == 0:
+ if nchan == 0:
raise ValueError, 'No channel names specified'
- proj = np.eye(nchan, nchan)
- nproj = 0;
- U = [];
+ proj = np.eye(nchan, nchan)
+ nproj = 0
+ U = []
# Check trivial cases first
if projs is None:
- return proj, nproj, U
+ return proj, nproj, U
nactive = 0
nvec = 0
@@ -225,11 +232,11 @@ def make_projector(projs, ch_names, bads=[]):
# If there is something to pick, pickit
if len(sel) > 0:
for v in range(one['data']['nrow']):
- vecs[sel, nvec+v] = one['data']['data'][v,vecsel].T
+ vecs[sel, nvec+v] = one['data']['data'][v, vecsel].T
- # Rescale for more straightforward detection of small singular values
+ # Rescale for better detection of small singular values
for v in range(one['data']['nrow']):
- onesize = sqrt(np.sum(vecs[:,nvec+v] * vecs[:, nvec + v]))
+ onesize = sqrt(np.sum(vecs[:, nvec + v] * vecs[:, nvec + v]))
if onesize > 0:
vecs[:, nvec+v] /= onesize
nonzero += 1
@@ -240,27 +247,36 @@ def make_projector(projs, ch_names, bads=[]):
if nonzero == 0:
return proj, nproj, U
- # Reorthogonalize the vectors
+ # Reorthogonalize the vectors
U, S, V = linalg.svd(vecs[:,:nvec], full_matrices=False)
- # Throw away the linearly dependent guys
- nvec = np.sum((S / S[0]) < 1e-2)
- U = U[:,:nvec]
- # Here is the celebrated result
- proj -= np.dot(U, U.T)
- nproj = nvec
+ # Throw away the linearly dependent guys
+ nproj = np.sum((S / S[0]) > 1e-2)
+ U = U[:,:nproj]
+
+ # Here is the celebrated result
+ proj -= np.dot(U, U.T)
return proj, nproj, U
def make_projector_info(info):
+ """Make an SSP operator using the measurement info
+
+ Calls make_projector on good channels.
+
+ Parameters
+ ----------
+ info : dict
+ Measurement info
+
+ Returns
+ -------
+ proj : array of shape [n_channels, n_channels]
+ The projection operator to apply to the data
+ nproj : int
+ How many items in the projector
"""
- %
- % [proj,nproj] = mne_make_projector_info(info)
- %
- % Make an SSP operator using the meas info
- %
- """
- proj, nproj, _ = make_projector(info['projs'], info['ch_names'], info['bads'])
+ proj, nproj, _ = make_projector(info['projs'], info['ch_names'],
+ info['bads'])
return proj, nproj
-
diff --git a/mne/inverse.py b/mne/inverse.py
index 532fd51..a6681d6 100644
--- a/mne/inverse.py
+++ b/mne/inverse.py
@@ -1,10 +1,12 @@
# Authors: Alexandre Gramfort <gramfort at nmr.mgh.harvard.edu>
# Matti Hamalainen <msh at nmr.mgh.harvard.edu>
+# Rey Ramirez
#
# License: BSD (3-clause)
from math import sqrt
import numpy as np
+from scipy import linalg
from .fiff.constants import FIFF
from .fiff.open import fiff_open
@@ -485,3 +487,236 @@ def compute_inverse(evoked, inverse_operator, lambda2, dSPM=True):
print '[done]'
return res
+
+
+def minimum_norm(evoked, forward, cov, picks=None, method='dspm',
+ orientation='fixed', snr=3, loose=0.2, depth=True,
+ weightexp=0.5, weightlimit=10, magreg=0.1,
+ gradreg=0.1, eegreg=0.1, fMRI=None, fMRIthresh=None,
+ fMRIoff=0.1, pca=True):
+ """Minimum norm estimate (MNE)
+
+ Compute MNE, dSPM and sLORETA on evoked data starting from
+ a forward operator.
+
+ Parameters
+ ----------
+ evoked : Evoked
+ Evoked data to invert
+ forward : dict
+ Forward operator
+ cov : Covariance
+ Noise covariance matrix
+ picks : array-like
+ List of indices of channels to include
+ method : 'wmne' | 'dspm' | 'sloreta'
+ The method to use
+ orientation : 'fixed' | 'free' | 'loose'
+ Type of orientation constraints 'fixed'.
+ snr : float
+ Signal-to noise ratio defined as in MNE (default: 3).
+ loose : float in [0, 1]
+ Value that weights the source variances of the dipole components
+ defining the tangent space of the cortical surfaces.
+ depth : bool
+ Flag to do depth weighting (default: True).
+ weightexp : float
+ Order of the depth weighting. {0=no, 1=full normalization, default=0.8}
+ weightlimit : float
+ Maximal amount depth weighting (default: 10).
+ magreg : float
+ Amount of regularization of the magnetometer noise covariance matrix
+ gradreg : float
+ Amount of regularization of the gradiometer noise covariance matrix.
+ eegreg : float
+ Amount of regularization of the EEG noise covariance matrix.
+ fMRI : array of shape [n_sources]
+ Vector of fMRI values are the source points.
+ fMRIthresh : float
+ fMRI threshold. The source variances of source points with fMRI smaller
+ than fMRIthresh will be multiplied by OPTIONS.fMRIoff.
+ fMRIoff : float
+ Weight assigned to non-active source points according to fMRI and fMRIthresh.
+
+ Returns
+ -------
+ stc : dict
+ Source time courses
+ """
+
+ # %% ===== CHECK FOR INVALID VALUES =====
+ # if OPTIONS.diagnoise
+ # OPTIONS.pca=0; % Rey added this. If OPTIONS.diagnoise is 1, then OPTIONS.pca=0; 3/23/11
+ # display('wMNE> If using diagonal noise covariance, PCA option should be off. Setting PCA option off.')
+ # end
+
+ assert method in ['wmne', 'dspm', 'sloreta']
+ assert orientation in ['fixed', 'free', 'loose']
+
+ if not 0 <= loose <= 1:
+ raise ValueError('loose value should be smaller than 1 and bigger than '
+ '0, or empty for no loose orientations.')
+ if not 0 <= weightexp <= 1:
+ raise ValueError('weightexp should be a scalar between 0 and 1')
+ if not 0 <= gradreg <= 1:
+ raise ValueError('gradreg should be a scalar between 0 and 1')
+ if not 0 <= magreg <= 1:
+ raise ValueError('magreg should be a scalar between 0 and 1')
+ if not 0 <= eegreg <= 1:
+ raise ValueError('eegreg should be a scalar between 0 and 1')
+
+ # Set regularization parameter based on SNR
+ lambda2 = 1.0 / snr**2
+
+ normals = []
+ for s in forward['src']:
+ normals.append(s['nn'][s['inuse'] != 0])
+ normals = np.concatenate(normals)
+
+ W, ch_names = cov.whitener(evoked.info, magreg, gradreg, eegreg, pca)
+
+ gain = forward['sol']['data']
+ fwd_ch_names = [forward['chs'][k]['ch_name'] for k in range(gain.shape[0])]
+ fwd_idx = [fwd_ch_names.index(name) for name in ch_names]
+ gain = gain[fwd_idx]
+
+ print "Computing inverse solution with %d channels." % len(ch_names)
+
+ rank_noise = len(W)
+ print 'Total rank is %d' % rank_noise
+
+ # processing lead field matrices, weights, areas & orientations
+ # Initializing.
+ n_positions = gain.shape[1] / 3
+
+ if orientation is 'fixed':
+ n_dip_per_pos = 1
+ elif orientation in ['free', 'loose']:
+ n_dip_per_pos = 3
+
+ n_dipoles = n_positions * n_dip_per_pos
+
+ w = np.ones(n_dipoles)
+
+ # compute power
+ if depth:
+ w = np.sum(gain**2, axis=0)
+ w = w.reshape(-1, 3).sum(axis=1)
+ w = w[:,None] * np.ones((1, n_dip_per_pos))
+ w = w.ravel()
+
+ if orientation is 'fixed':
+ print 'Appying fixed dipole orientations.'
+ gain = gain * _block_diag(normals.ravel()[None,:], 3).T
+ elif orientation is 'free':
+ print 'Using free dipole orientations. No constraints.'
+ elif orientation is 'loose':
+ print 'Transforming lead field matrix to cortical coordinate system.'
+ 1/0
+ # gain, Q_Cortex = bst_xyz2lf(gain, normals) # XXX
+ # # Getting indices for tangential dipoles.
+ # itangentialtmp = start:endd;
+ # itangentialtmp(1:3:end) = [];
+ # itangential = [itangential itangentialtmp]; %#ok<AGROW>
+
+ # Whiten lead field.
+ print 'Whitening lead field matrix.'
+ gain = np.dot(W, gain)
+
+ # Computing reciprocal of power.
+ w = 1.0 / w
+
+ # apply areas
+ # if ~isempty(areas)
+ # display('wMNE> Applying areas to compute current source density.')
+ # areas = areas.^2;
+ # w = w .* areas;
+ # end
+ # clear areas
+
+ # apply depth weighthing
+ if depth:
+ # apply weight limit
+ # Applying weight limit.
+ print 'Applying weight limit.'
+ weightlimit2 = weightlimit**2
+ # limit = min(w(w>min(w) * weightlimit2)); % This is the Matti way.
+ # we do the Rey way (robust to possible weight discontinuity).
+ limit = np.min(w) * weightlimit2
+ w[w > limit] = limit
+
+ # apply weight exponent
+ # Applying weight exponent.
+ print 'Applying weight exponent.'
+ w = w ** weightexp
+
+ # apply loose orientations
+ if orientation is 'loose':
+ print 'Applying loose dipole orientations. Loose value of %d.' % loose
+ w[itangential] *= loose
+
+ # Apply fMRI Priors
+ if fMRI is not None:
+ print 'Applying fMRI priors.'
+ w[fMRI < fMRIthresh] *= fMRIoff
+
+ # Adjusting Source Covariance matrix to make trace of L*C_J*L' equal
+ # to number of sensors.
+ print 'Adjusting source covariance matrix.'
+ source_std = np.sqrt(w) # sqrt(C_J)
+ trclcl = linalg.norm(gain * source_std[None,:], ord='fro')
+ source_std *= sqrt(rank_noise) / trclcl # correct C_J
+ gain *= source_std[None,:]
+
+ # Compute SVD.
+ print 'Computing SVD of whitened and weighted lead field matrix.'
+ U, s, Vh = linalg.svd(gain, full_matrices=False)
+ ss = s / (s**2 + lambda2)
+
+ # Compute whitened MNE operator.
+ Kernel = source_std[:,None] * np.dot(Vh.T, ss[:,None] * U.T)
+
+ # Compute dSPM operator.
+ if method is 'dspm':
+ print 'Computing dSPM inverse operator.'
+ dspm_diag = np.sum(Kernel**2, axis=1)
+ if n_dip_per_pos == 1:
+ dspm_diag = np.sqrt(dspm_diag)
+ elif n_dip_per_pos in [2, 3]:
+ dspm_diag = dspm_diag.reshape(-1, n_dip_per_pos)
+ dspm_diag = np.sqrt(np.sum(dspm_diag, axis=1))
+ dspm_diag = (np.ones((1, n_dip_per_pos)) * dspm_diag[:,None]).ravel()
+
+ Kernel /= dspm_diag[:,None]
+
+ # whitened sLORETA imaging kernel
+ elif method is 'sloreta':
+ print 'Computing sLORETA inverse operator.'
+ if n_dip_per_pos == 1:
+ sloreta_diag = np.sqrt(np.sum(Kernel * gain.T, axis=1))
+ Kernel /= sloreta_diag[:,None]
+ elif n_dip_per_pos in [2, 3]:
+ for k in n_positions:
+ start = k*n_dip_per_pos
+ stop = (k+1)*n_dip_per_pos
+ R = np.dot(Kernel[start:stop,:], gain[:,start:stop])
+ SIR = linalg.matfuncs.sqrtm(R, linalg.pinv(R))
+ Kernel[start:stop] = np.dot(SIR, Kernel[start:stop])
+
+ # Multiply inverse operator by whitening matrix, so no need to whiten data
+ Kernel = np.dot(Kernel, W)
+ sel = [evoked.ch_names.index(name) for name in ch_names]
+ sol = np.dot(Kernel, evoked.data[sel])
+
+ stc = dict()
+ stc['inv'] = dict()
+ stc['inv']['src'] = forward['src']
+ # stc['vertices'] = np.concatenate(forward['src'][0]['vertno'],
+ # forward['src'][1]['vertno'])
+ stc['sol'] = sol
+ stc['tmin'] = float(evoked.first) / evoked.info['sfreq']
+ stc['tstep'] = 1.0 / evoked.info['sfreq']
+ print '[done]'
+
+ return stc, Kernel, W
+
diff --git a/mne/tests/test_cov.py b/mne/tests/test_cov.py
index b25ab50..08cf415 100644
--- a/mne/tests/test_cov.py
+++ b/mne/tests/test_cov.py
@@ -53,19 +53,16 @@ def test_whitening_cov():
"""Whitening of evoked data and leadfields
"""
data_path = sample.data_path('.')
- fwd_fname = op.join(data_path, 'MEG', 'sample',
- 'sample_audvis-meg-eeg-oct-6-fwd.fif')
ave_fname = op.join(data_path, 'MEG', 'sample',
'sample_audvis-ave.fif')
cov_fname = op.join(data_path, 'MEG', 'sample',
'sample_audvis-cov.fif')
# Reading
- ave = read_evoked(ave_fname, setno=0, baseline=(None, 0))
- fwd = mne.read_forward_solution(fwd_fname)
+ evoked = read_evoked(ave_fname, setno=0, baseline=(None, 0))
cov = mne.Covariance()
cov.load(cov_fname)
+ cov.whitener(evoked.info)
- ave_whiten, fwd_whiten, W = cov.whiten_evoked_and_forward(ave, fwd)
# XXX : test something
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/python-mne.git
More information about the debian-med-commit
mailing list