[med-svn] [python-mne] 343/376: ENH : new apply_inverse_epochs and refactoring of minimum_norm.inverse + tests
Yaroslav Halchenko
debian at onerussian.com
Fri Nov 27 17:23:19 UTC 2015
This is an automated email from the git hooks/post-receive script.
yoh pushed a commit to annotated tag v0.1
in repository python-mne.
commit 906abbbb7ea38863c62aaff6c3558715a2e12076
Author: Alexandre Gramfort <alexandre.gramfort at inria.fr>
Date: Thu Aug 25 15:12:32 2011 -0400
ENH : new apply_inverse_epochs and refactoring of minimum_norm.inverse + tests
---
.../plot_compute_mne_inverse_epochs_in_label.py | 65 +++++
mne/minimum_norm/__init__.py | 2 +-
mne/minimum_norm/inverse.py | 311 ++++++++++++---------
mne/minimum_norm/tests/test_inverse.py | 61 +++-
4 files changed, 299 insertions(+), 140 deletions(-)
diff --git a/examples/inverse/plot_compute_mne_inverse_epochs_in_label.py b/examples/inverse/plot_compute_mne_inverse_epochs_in_label.py
new file mode 100644
index 0000000..3975282
--- /dev/null
+++ b/examples/inverse/plot_compute_mne_inverse_epochs_in_label.py
@@ -0,0 +1,65 @@
+"""
+==================================================
+Compute MNE-dSPM inverse solution on single epochs
+==================================================
+
+Compute dSPM inverse solution on single trial epochs restricted
+to a brain label.
+
+"""
+
+# Author: Alexandre Gramfort <gramfort at nmr.mgh.harvard.edu>
+#
+# License: BSD (3-clause)
+
+print __doc__
+
+import pylab as pl
+import mne
+from mne.datasets import sample
+from mne.fiff import Raw, pick_types
+from mne.minimum_norm import apply_inverse_epochs, read_inverse_operator
+
+
+data_path = sample.data_path('..')
+fname_inv = data_path + '/MEG/sample/sample_audvis-meg-oct-6-meg-inv.fif'
+fname_raw = data_path + '/MEG/sample/sample_audvis_filt-0-40_raw.fif'
+fname_event = data_path + '/MEG/sample/sample_audvis_filt-0-40_raw-eve.fif'
+label_name = 'Aud-lh'
+fname_label = data_path + '/MEG/sample/labels/%s.label' % label_name
+
+event_id, tmin, tmax = 1, -0.2, 0.5
+snr = 3.0
+lambda2 = 1.0 / snr ** 2
+dSPM = True
+
+# Load data
+inverse_operator = read_inverse_operator(fname_inv)
+label = mne.read_label(fname_label)
+raw = Raw(fname_raw)
+events = mne.read_events(fname_event)
+
+# Set up pick list
+include = []
+exclude = raw.info['bads'] + ['EEG 053'] # bads + 1 more
+
+# pick MEG channels
+picks = pick_types(raw.info, meg=True, eeg=False, stim=False, eog=True,
+ include=include, exclude=exclude)
+# Read epochs
+epochs = mne.Epochs(raw, events, event_id, tmin, tmax, picks=picks,
+ baseline=(None, 0), reject=dict(mag=4e-12, grad=4000e-13,
+ eog=150e-6))
+
+# Compute inverse solution and stcs for each epoch
+stcs = apply_inverse_epochs(epochs, inverse_operator, lambda2, dSPM, label,
+ pick_normal=True)
+
+data = sum(stc.data for stc in stcs) / len(stcs)
+
+###############################################################################
+# View activation time-series
+pl.plot(1e3 * stcs[0].times, data.T)
+pl.xlabel('time (ms)')
+pl.ylabel('dSPM value')
+pl.show()
diff --git a/mne/minimum_norm/__init__.py b/mne/minimum_norm/__init__.py
index 273ffa8..899ec93 100644
--- a/mne/minimum_norm/__init__.py
+++ b/mne/minimum_norm/__init__.py
@@ -1,3 +1,3 @@
from .inverse import read_inverse_operator, apply_inverse, minimum_norm, \
- apply_inverse_raw
+ apply_inverse_raw, apply_inverse_epochs
from .time_frequency import source_induced_power
diff --git a/mne/minimum_norm/inverse.py b/mne/minimum_norm/inverse.py
index 7f36d4d..6930e94 100644
--- a/mne/minimum_norm/inverse.py
+++ b/mne/minimum_norm/inverse.py
@@ -14,7 +14,7 @@ from ..fiff.tag import find_tag
from ..fiff.matrix import _read_named_matrix, _transpose_named_matrix
from ..fiff.proj import read_proj, make_projector
from ..fiff.tree import dir_tree_find
-from ..fiff.pick import pick_channels_evoked, pick_channels
+from ..fiff.pick import pick_channels
from ..cov import read_cov
from ..source_space import read_source_spaces_from_tree, find_source_space_hemi
@@ -131,8 +131,7 @@ def read_inverse_operator(fname):
#
inv['eigen_leads'] = _transpose_named_matrix(inv['eigen_leads'])
inv['eigen_fields'] = _read_named_matrix(fid, invs,
- FIFF.FIFF_MNE_INVERSE_FIELDS)
-
+ FIFF.FIFF_MNE_INVERSE_FIELDS)
print '[done]'
#
# Read the covariance matrices
@@ -271,6 +270,20 @@ def combine_xyz(vec, square=False):
return comb
+def _combine_ori(sol, inverse_operator, pick_normal):
+ if inverse_operator['source_ori'] == FIFF.FIFFV_MNE_FREE_ORI:
+ print 'combining the current components...',
+ if pick_normal:
+ is_loose = 0 < inverse_operator['orient_prior']['data'][0] < 1
+ if not is_loose:
+ raise ValueError('The pick_normal parameter is only valid '
+ 'when working with loose orientations.')
+ sol = sol[2::3] # take one every 3 sources ie. only the normal
+ else:
+ sol = combine_xyz(sol)
+ return sol
+
+
def prepare_inverse_operator(orig, nave, lambda2, dSPM):
"""Prepare an inverse operator for actually computing the inverse
@@ -346,7 +359,8 @@ def prepare_inverse_operator(orig, nave, lambda2, dSPM):
#
# No need to omit the zeroes due to projection
#
- inv['whitener'] = np.diag(1.0 / np.sqrt(inv['noise_cov']['data'].ravel()))
+ inv['whitener'] = np.diag(1.0 /
+ np.sqrt(inv['noise_cov']['data'].ravel()))
print ('\tCreated the whitener using a diagonal noise covariance '
'matrix (%d small eigenvalues discarded)' % ncomp)
@@ -389,6 +403,94 @@ def prepare_inverse_operator(orig, nave, lambda2, dSPM):
return inv
+def _assemble_kernel(inv, label, dSPM):
+ #
+ # Simple matrix multiplication followed by combination of the
+ # three current components
+ #
+ # This does all the data transformations to compute the weights for the
+ # eigenleads
+ #
+ eigen_leads = inv['eigen_leads']['data']
+ source_cov = inv['source_cov']['data'][:, None]
+ noise_norm = inv['noisenorm'][:, None]
+
+ src = inv['src']
+ lh_vertno = src[0]['vertno']
+ rh_vertno = src[1]['vertno']
+
+ if label is not None:
+ lh_vertno, rh_vertno, src_sel = _get_label_sel(label, inv)
+
+ noise_norm = noise_norm[src_sel]
+
+ if inv['source_ori'] == FIFF.FIFFV_MNE_FREE_ORI:
+ src_sel = 3 * src_sel
+ src_sel = np.c_[src_sel, src_sel + 1, src_sel + 2]
+ src_sel = src_sel.ravel()
+
+ eigen_leads = eigen_leads[src_sel]
+ source_cov = source_cov[src_sel]
+
+ trans = inv['reginv'][:, None] * reduce(np.dot,
+ [inv['eigen_fields']['data'],
+ inv['whitener'],
+ inv['proj']])
+ #
+ # Transformation into current distributions by weighting the eigenleads
+ # with the weights computed above
+ #
+ if inv['eigen_leads_weighted']:
+ #
+ # R^0.5 has been already factored in
+ #
+ print '(eigenleads already weighted)...',
+ K = np.dot(eigen_leads, trans)
+ else:
+ #
+ # R^0.5 has to be factored in
+ #
+ print '(eigenleads need to be weighted)...',
+ K = np.sqrt(source_cov) * np.dot(eigen_leads, trans)
+
+ if not dSPM:
+ noise_norm = None
+
+ return K, noise_norm, lh_vertno, rh_vertno
+
+
+def _make_stc(sol, tmin, tstep, lh_vertno, rh_vertno):
+ stc = SourceEstimate(None)
+ stc.data = sol
+ stc.tmin = tmin
+ stc.tstep = tstep
+ stc.lh_vertno = lh_vertno
+ stc.rh_vertno = rh_vertno
+ stc._init_times()
+ return stc
+
+
+def _get_label_sel(label, inv):
+ src = inv['src']
+ lh_vertno = src[0]['vertno']
+ rh_vertno = src[1]['vertno']
+
+ if label['hemi'] == 'lh':
+ vertno_sel = np.intersect1d(lh_vertno, label['vertices'])
+ src_sel = np.searchsorted(lh_vertno, vertno_sel)
+ lh_vertno = vertno_sel
+ rh_vertno = np.array([])
+ elif label['hemi'] == 'rh':
+ vertno_sel = np.intersect1d(rh_vertno, label['vertices'])
+ src_sel = np.searchsorted(rh_vertno, vertno_sel) + len(lh_vertno)
+ lh_vertno = np.array([])
+ rh_vertno = vertno_sel
+ else:
+ raise Exception("Unknown hemisphere type")
+
+ return lh_vertno, rh_vertno, src_sel
+
+
def apply_inverse(evoked, inverse_operator, lambda2, dSPM=True,
pick_normal=False):
"""Apply inverse operator to evoked data
@@ -417,7 +519,6 @@ def apply_inverse(evoked, inverse_operator, lambda2, dSPM=True,
stc: SourceEstimate
The source estimates
"""
-
#
# Set up the inverse according to the parameters
#
@@ -427,63 +528,22 @@ def apply_inverse(evoked, inverse_operator, lambda2, dSPM=True,
#
# Pick the correct channels from the data
#
- evoked = pick_channels_evoked(evoked, inv['noise_cov']['names'])
- print 'Picked %d channels from the data' % evoked.info['nchan']
- print 'Computing inverse...',
- #
- # Simple matrix multiplication followed by combination of the
- # three current components
- #
- # This does all the data transformations to compute the weights for the
- # eigenleads
- #
- trans = inv['reginv'][:, None] * reduce(np.dot,
- [inv['eigen_fields']['data'],
- inv['whitener'],
- inv['proj'],
- evoked.data])
-
- #
- # Transformation into current distributions by weighting the eigenleads
- # with the weights computed above
- #
- if inv['eigen_leads_weighted']:
- #
- # R^0.5 has been already factored in
- #
- print '(eigenleads already weighted)...',
- sol = np.dot(inv['eigen_leads']['data'], trans)
- else:
- #
- # R^0.5 has to be factored in
- #
- print '(eigenleads need to be weighted)...',
- sol = np.sqrt(inv['source_cov']['data'])[:, None] * \
- np.dot(inv['eigen_leads']['data'], trans)
+ sel = pick_channels(evoked.ch_names, include=inv['noise_cov']['names'])
+ print 'Picked %d channels from the data' % len(sel)
- if inv['source_ori'] == FIFF.FIFFV_MNE_FREE_ORI:
- print 'combining the current components...',
- if pick_normal:
- is_loose = 0 < inverse_operator['orient_prior']['data'][0] < 1
- if not is_loose:
- raise ValueError('The pick_normal parameter is only valid '
- 'when working with loose orientations.')
- sol = sol[2::3] # take one every 3 sources ie. only the normal
- else:
- sol = combine_xyz(sol)
+ print 'Computing inverse...',
+ K, noise_norm, _, _ = _assemble_kernel(inv, None, dSPM)
+ sol = np.dot(K, evoked.data[sel]) # apply imaging kernel
+ sol = _combine_ori(sol, inv, pick_normal)
- if dSPM:
+ if noise_norm is not None:
print '(dSPM)...',
- sol *= inv['noisenorm'][:, None]
+ sol *= noise_norm
+ tstep = 1.0 / evoked.info['sfreq']
+ tmin = float(evoked.first) / evoked.info['sfreq']
src = inv['src']
- stc = SourceEstimate(None)
- stc.data = sol
- stc.tmin = float(evoked.first) / evoked.info['sfreq']
- stc.tstep = 1.0 / evoked.info['sfreq']
- stc.lh_vertno = src[0]['vertno']
- stc.rh_vertno = src[1]['vertno']
- stc._init_times()
+ stc = _make_stc(sol, tmin, tstep, src[0]['vertno'], src[1]['vertno'])
print '[done]'
return stc
@@ -501,7 +561,7 @@ def apply_inverse_raw(raw, inverse_operator, lambda2, dSPM=True,
Parameters
----------
raw: Raw object
- Evoked data
+ Raw data
inverse_operator: dict
Inverse operator read with mne.read_inverse_operator
lambda2: float
@@ -529,7 +589,6 @@ def apply_inverse_raw(raw, inverse_operator, lambda2, dSPM=True,
stc: SourceEstimate
The source estimates
"""
-
#
# Set up the inverse according to the parameters
#
@@ -540,13 +599,6 @@ def apply_inverse_raw(raw, inverse_operator, lambda2, dSPM=True,
sel = pick_channels(raw.ch_names, include=inv['noise_cov']['names'])
print 'Picked %d channels from the data' % len(sel)
print 'Computing inverse...',
- #
- # Simple matrix multiplication followed by combination of the
- # three current components
- #
- # This does all the data transformations to compute the weights for the
- # eigenleads
- #
src = inv['src']
lh_vertno = src[0]['vertno']
@@ -557,81 +609,83 @@ def apply_inverse_raw(raw, inverse_operator, lambda2, dSPM=True,
if time_func is not None:
data = time_func(data)
- trans = inv['reginv'][:, None] * reduce(np.dot,
- [inv['eigen_fields']['data'],
- inv['whitener'],
- inv['proj'],
- data])
+ K, noise_norm, lh_vertno, rh_vertno = _assemble_kernel(inv, label, dSPM)
+ sol = np.dot(K, data)
+ sol = _combine_ori(sol, inv, pick_normal)
- eigen_leads = inv['eigen_leads']['data']
- source_cov = inv['source_cov']['data'][:, None]
- noise_norm = inv['noisenorm'][:, None]
+ if noise_norm is not None:
+ sol *= noise_norm
- if label is not None:
- if label['hemi'] == 'lh':
- vertno_sel = np.intersect1d(lh_vertno, label['vertices'])
- src_sel = np.searchsorted(lh_vertno, vertno_sel)
- lh_vertno = vertno_sel
- rh_vertno = np.array([])
- elif label['hemi'] == 'rh':
- vertno_sel = np.intersect1d(rh_vertno, label['vertices'])
- src_sel = np.searchsorted(rh_vertno, vertno_sel) + len(lh_vertno)
- lh_vertno = np.array([])
- rh_vertno = vertno_sel
+ tmin = float(times[0]) / raw.info['sfreq']
+ tstep = 1.0 / raw.info['sfreq']
+ stc = _make_stc(sol, tmin, tstep, lh_vertno, rh_vertno)
+ print '[done]'
- noise_norm = noise_norm[src_sel]
+ return stc
- if inv['source_ori'] == FIFF.FIFFV_MNE_FREE_ORI:
- src_sel = 3 * src_sel
- src_sel = np.c_[src_sel, src_sel + 1, src_sel + 2]
- src_sel = src_sel.ravel()
- eigen_leads = eigen_leads[src_sel]
- source_cov = source_cov[src_sel]
+def apply_inverse_epochs(epochs, inverse_operator, lambda2, dSPM=True,
+ label=None, nave=1, pick_normal=False):
+ """Apply inverse operator to Epochs
+
+ Computes a L2-norm inverse solution on each epochs and returns
+ single trial source estimates.
+
+ Parameters
+ ----------
+ epochs: Epochs object
+ Single trial epochs
+ inverse_operator: dict
+ Inverse operator read with mne.read_inverse_operator
+ lambda2: float
+ The regularization parameter
+ dSPM: bool
+ do dSPM ?
+ label: Label
+ Restricts the source estimates to a given label
+ nave: int
+ Number of averages used to regularize the solution.
+ Set to 1 on single Epoch by default.
+ pick_normal: bool
+ If True, rather than pooling the orientations by taking the norm,
+ only the radial component is kept. This is only implemented
+ when working with loose orientations.
+ Returns
+ -------
+ stc: list of SourceEstimate
+ The source estimates for all epochs
+ """
#
- # Transformation into current distributions by weighting the eigenleads
- # with the weights computed above
+ # Set up the inverse according to the parameters
#
- if inv['eigen_leads_weighted']:
- #
- # R^0.5 has been already factored in
- #
- print '(eigenleads already weighted)...',
- sol = np.dot(eigen_leads, trans)
- else:
- #
- # R^0.5 has to be factored in
- #
- print '(eigenleads need to be weighted)...',
- sol = np.sqrt(source_cov) * np.dot(eigen_leads, trans)
+ inv = prepare_inverse_operator(inverse_operator, nave, lambda2, dSPM)
+ #
+ # Pick the correct channels from the data
+ #
+ sel = pick_channels(epochs.ch_names, include=inv['noise_cov']['names'])
+ print 'Picked %d channels from the data' % len(sel)
- if inv['source_ori'] == FIFF.FIFFV_MNE_FREE_ORI:
- if pick_normal:
- print 'Picking only the normal components...',
- is_loose = 0 < inverse_operator['orient_prior']['data'][0] < 1
- if not is_loose:
- raise ValueError('The pick_normal parameter is only valid '
- 'when working with loose orientations.')
- sol = sol[2::3] # take one every 3 sources ie. only the normal
- else:
- print 'Combining the current components...',
- sol = combine_xyz(sol)
+ print 'Computing inverse...',
+ K, noise_norm, lh_vertno, rh_vertno = _assemble_kernel(inv, label, dSPM)
- if dSPM:
- print '(dSPM)...',
- sol *= noise_norm
+ stcs = list()
+ tstep = 1.0 / epochs.info['sfreq']
+ tmin = epochs.times[0]
+
+ for k, e in enumerate(epochs):
+ print "Processing epoch : %d" % (k + 1)
+ sol = np.dot(K, e[sel]) # apply imaging kernel
+ sol = _combine_ori(sol, inv, pick_normal)
+
+ if noise_norm is not None:
+ sol *= noise_norm
+
+ stcs.append(_make_stc(sol, tmin, tstep, lh_vertno, rh_vertno))
- stc = SourceEstimate(None)
- stc.data = sol
- stc.tmin = float(times[0]) / raw.info['sfreq']
- stc.tstep = 1.0 / raw.info['sfreq']
- stc.lh_vertno = lh_vertno
- stc.rh_vertno = rh_vertno
- stc._init_times()
print '[done]'
- return stc
+ return stcs
def _xyz2lf(Lf_xyz, normals):
@@ -889,7 +943,6 @@ def minimum_norm(evoked, forward, whitener, method='dspm',
print 'Combining the current components...',
sol = combine_xyz(sol)
-
src = forward['src']
stc = SourceEstimate(None)
stc.data = sol
diff --git a/mne/minimum_norm/tests/test_inverse.py b/mne/minimum_norm/tests/test_inverse.py
index 7ad3ef2..d40f10f 100644
--- a/mne/minimum_norm/tests/test_inverse.py
+++ b/mne/minimum_norm/tests/test_inverse.py
@@ -4,8 +4,12 @@ import numpy as np
# from numpy.testing import assert_array_almost_equal, assert_equal
from ...datasets import sample
+from ...label import read_label
+from ...event import read_events
+from ...epochs import Epochs
from ... import fiff, Covariance, read_forward_solution
-from ..inverse import minimum_norm, apply_inverse, read_inverse_operator
+from ..inverse import minimum_norm, apply_inverse, read_inverse_operator, \
+ apply_inverse_raw, apply_inverse_epochs
examples_folder = op.join(op.dirname(__file__), '..', '..', '..', 'examples')
@@ -18,25 +22,62 @@ fname_cov = op.join(data_path, 'MEG', 'sample',
'sample_audvis-cov.fif')
fname_fwd = op.join(data_path, 'MEG', 'sample',
'sample_audvis-meg-eeg-oct-6-fwd.fif')
+fname_raw = op.join(data_path, 'MEG', 'sample',
+ 'sample_audvis_filt-0-40_raw.fif')
+fname_event = op.join(data_path, 'MEG', 'sample',
+ 'sample_audvis_filt-0-40_raw-eve.fif')
+label = 'Aud-lh'
+fname_label = op.join(data_path, 'MEG', 'sample', 'labels', '%s.label' % label)
+inverse_operator = read_inverse_operator(fname_inv)
+label = read_label(fname_label)
+raw = fiff.Raw(fname_raw)
+snr = 3.0
+lambda2 = 1.0 / snr ** 2
+dSPM = True
-def test_apply_mne_inverse_operator():
- """Test MNE inverse computation with precomputed inverse operator
+
+def test_apply_mne_inverse():
+ """Test MNE with precomputed inverse operator on Evoked
"""
setno = 0
- snr = 3.0
- lambda2 = 1.0 / snr ** 2
- dSPM = True
-
- evoked = fiff.Evoked(fname_data, setno=setno, baseline=(None, 0))
- inverse_operator = read_inverse_operator(fname_inv)
-
+ evoked = fiff.Evoked(fname_data, setno=0, baseline=(None, 0))
stc = apply_inverse(evoked, inverse_operator, lambda2, dSPM)
assert np.all(stc.data > 0)
assert np.all(stc.data < 35)
+def test_apply_mne_inverse_raw():
+ """Test MNE with precomputed inverse operator on Raw
+ """
+ stc = apply_inverse_raw(raw, inverse_operator, lambda2, dSPM=True,
+ label=label, start=0, stop=10, nave=1,
+ pick_normal=False)
+ assert np.all(stc.data > 0)
+
+
+def test_apply_mne_inverse_epochs():
+ """Test MNE with precomputed inverse operator on Epochs
+ """
+ event_id, tmin, tmax = 1, -0.2, 0.5
+
+ picks = fiff.pick_types(raw.info, meg=True, eeg=False, stim=True,
+ ecg=True, eog=True, include=['STI 014'])
+ reject = dict(grad=4000e-13, mag=4e-12, eog=150e-6)
+ flat = dict(grad=1e-15, mag=1e-15)
+
+ events = read_events(fname_event)[:3]
+ epochs = Epochs(raw, events, event_id, tmin, tmax, picks=picks,
+ baseline=(None, 0), reject=reject, flat=flat)
+ stcs = apply_inverse_epochs(epochs, inverse_operator, lambda2, dSPM,
+ label=label)
+
+ assert len(stcs) == 1
+ assert np.all(stcs[0].data > 0)
+ assert np.all(stcs[0].data < 42)
+
+
def test_compute_minimum_norm():
"""Test MNE inverse computation starting from forward operator
"""
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/python-mne.git
More information about the debian-med-commit
mailing list