[med-svn] [python-mne] 118/376: new class Evoked to store an evoked dataset
Yaroslav Halchenko
debian at onerussian.com
Fri Nov 27 17:22:19 UTC 2015
This is an automated email from the git hooks/post-receive script.
yoh pushed a commit to annotated tag v0.1
in repository python-mne.
commit affa3df9c0110cb3b1ed34f247dc0c202d615361
Author: Alexandre Gramfort <alexandre.gramfort at inria.fr>
Date: Tue Mar 8 17:47:32 2011 -0500
new class Evoked to store an evoked dataset
---
examples/plot_read_evoked.py | 15 +-
mne/fiff/__init__.py | 2 +-
mne/fiff/evoked.py | 718 ++++++++++++++++++++++--------------------
mne/fiff/pick.py | 10 +-
mne/fiff/tests/test_evoked.py | 25 +-
mne/inverse.py | 16 +-
6 files changed, 415 insertions(+), 371 deletions(-)
diff --git a/examples/plot_read_evoked.py b/examples/plot_read_evoked.py
index 3a58b33..7227732 100644
--- a/examples/plot_read_evoked.py
+++ b/examples/plot_read_evoked.py
@@ -17,29 +17,30 @@ data_path = sample.data_path('.')
fname = data_path + '/MEG/sample/sample_audvis-ave.fif'
# Reading
-data = fiff.read_evoked(fname, setno=0, baseline=(None, 0))
+evoked = fiff.Evoked(fname, setno=0, baseline=(None, 0))
+times = 1e3*evoked.times # times in ms
+data = evoked.data
+
+evoked.save('test-ave.fif')
###############################################################################
# Show result
import pylab as pl
pl.clf()
pl.subplot(3, 1, 1)
-pl.plot(1000*data['evoked']['times'],
- 1e13*data['evoked']['epochs'][0:306:3,:].T)
+pl.plot(times, 1e13*data[0:306:3,:].T)
pl.ylim([-200, 200])
pl.title('Planar Gradiometers 1')
pl.xlabel('time (ms)')
pl.ylabel('MEG data (fT/cm)')
pl.subplot(3, 1, 2)
-pl.plot(1000*data['evoked']['times'],
- 1e13*data['evoked']['epochs'][1:306:3,:].T)
+pl.plot(times, 1e13*data[1:306:3,:].T)
pl.ylim([-200, 200])
pl.title('Planar Gradiometers 2')
pl.xlabel('time (ms)')
pl.ylabel('MEG data (fT/cm)')
pl.subplot(3, 1, 3)
-pl.plot(1000*data['evoked']['times'],
- 1e15*data['evoked']['epochs'][2:306:3,:].T)
+pl.plot(times, 1e15*data[2:306:3,:].T)
pl.ylim([-600, 600])
pl.title('Magnetometers')
pl.xlabel('time (ms)')
diff --git a/mne/fiff/__init__.py b/mne/fiff/__init__.py
index 08096c5..79c481e 100644
--- a/mne/fiff/__init__.py
+++ b/mne/fiff/__init__.py
@@ -7,7 +7,7 @@ __version__ = '0.1.git'
from .constants import FIFF
from .open import fiff_open
-from .evoked import read_evoked, write_evoked
+from .evoked import Evoked, read_evoked, write_evoked
from .raw import Raw, read_raw_segment, read_raw_segment_times, \
start_writing_raw, write_raw_buffer, finish_writing_raw
from .pick import pick_types
diff --git a/mne/fiff/evoked.py b/mne/fiff/evoked.py
index dcfc879..abb6f2b 100644
--- a/mne/fiff/evoked.py
+++ b/mne/fiff/evoked.py
@@ -11,6 +11,385 @@ from .tag import read_tag
from .tree import dir_tree_find
from .meas_info import read_meas_info
+from .tree import copy_tree
+from .write import start_file, start_block, end_file, end_block, write_id, \
+ write_float, write_int, write_coord_trans, write_ch_info, \
+ write_dig_point, write_name_list, write_string, \
+ write_float_matrix
+from .proj import write_proj
+from .ctf import write_ctf_comp
+
+
+class Evoked(object):
+ """Evoked data
+
+ Attributes
+ ----------
+ fname :
+
+ nave : int
+ Number of averaged epochs
+
+ aspect_kind :
+ aspect_kind
+
+ first : int
+ First time sample
+
+ last : int
+ Last time sample
+
+ comment : string
+ Comment on dataset. Can be the condition.
+
+ times : array
+ Array of time instants in seconds
+
+ data : 2D array of shape [n_channels x n_times]
+ Evoked response.
+ """
+ def __init__(self, fname, setno=0, baseline=None):
+ """
+ Parameters
+ ----------
+ fname : string
+ Name of evoked/average FIF file
+
+ setno : int
+ Dataset ID number
+ """
+ self.fname = fname
+ self.fname = setno
+
+ if setno < 0:
+ raise ValueError, 'Data set selector must be positive'
+
+ print 'Reading %s ...' % fname
+ fid, tree, _ = fiff_open(fname)
+
+ # Read the measurement info
+ info, meas = read_meas_info(fid, tree)
+ info['filename'] = fname
+
+ # Locate the data of interest
+ processed = dir_tree_find(meas, FIFF.FIFFB_PROCESSED_DATA)
+ if len(processed) == 0:
+ fid.close()
+ raise ValueError, 'Could not find processed data'
+
+ evoked_node = dir_tree_find(meas, FIFF.FIFFB_EVOKED)
+ if len(evoked_node) == 0:
+ fid.close(fid)
+ raise ValueError, 'Could not find evoked data'
+
+ # Identify the aspects
+ naspect = 0
+ is_smsh = None
+ sets = []
+ for k in range(len(evoked_node)):
+ aspects_k = dir_tree_find(evoked_node[k], FIFF.FIFFB_ASPECT)
+ set_k = dict(aspects=aspects_k,
+ naspect=len(aspects_k))
+ sets.append(set_k)
+
+ if sets[k]['naspect'] > 0:
+ if is_smsh is None:
+ is_smsh = np.zeros((1, sets[k]['naspect']))
+ else:
+ is_smsh = np.r_[is_smsh, np.zeros((1, sets[k]['naspect']))]
+ naspect += sets[k]['naspect']
+
+ saspects = dir_tree_find(evoked_node[k], FIFF.FIFFB_SMSH_ASPECT)
+ nsaspects = len(saspects)
+ if nsaspects > 0:
+ sets[k]['naspect'] += nsaspects
+ sets[k]['naspect'] = [sets[k]['naspect'], saspects] # XXX : potential bug
+ is_smsh = np.r_[is_smsh, np.ones(1, sets[k]['naspect'])]
+ naspect += nsaspects
+
+ print '\t%d evoked data sets containing a total of %d data aspects' \
+ ' in %s' % (len(evoked_node), naspect, fname)
+
+ if setno >= naspect or setno < 0:
+ fid.close()
+ raise ValueError, 'Data set selector out of range'
+
+ # Next locate the evoked data set
+ #
+ p = 0
+ goon = True
+ for k in range(len(evoked_node)):
+ for a in range(sets[k]['naspect']):
+ if p == setno:
+ my_evoked = evoked_node[k]
+ my_aspect = sets[k]['aspects'][a]
+ goon = False
+ break
+ p += 1
+ if not goon:
+ break
+ else:
+ # The desired data should have been found but better to check
+ fid.close(fid)
+ raise ValueError, 'Desired data set not found'
+
+ # Now find the data in the evoked block
+ nchan = 0
+ sfreq = -1
+ q = 0
+ chs = []
+ comment = None
+ for k in range(my_evoked.nent):
+ kind = my_evoked.directory[k].kind
+ pos = my_evoked.directory[k].pos
+ if kind == FIFF.FIFF_COMMENT:
+ tag = read_tag(fid, pos)
+ comment = tag.data
+ elif kind == FIFF.FIFF_FIRST_SAMPLE:
+ tag = read_tag(fid, pos)
+ first = tag.data
+ elif kind == FIFF.FIFF_LAST_SAMPLE:
+ tag = read_tag(fid, pos)
+ last = tag.data
+ elif kind == FIFF.FIFF_NCHAN:
+ tag = read_tag(fid, pos)
+ nchan = tag.data
+ elif kind == FIFF.FIFF_SFREQ:
+ tag = read_tag(fid, pos)
+ sfreq = tag.data
+ elif kind ==FIFF.FIFF_CH_INFO:
+ tag = read_tag(fid, pos)
+ chs.append(tag.data)
+ q += 1
+
+ if comment is None:
+ comment = 'No comment'
+
+ # Local channel information?
+ if nchan > 0:
+ if chs is None:
+ fid.close()
+ raise ValueError, 'Local channel information was not found ' \
+ 'when it was expected.'
+
+ if len(chs) != nchan:
+ fid.close()
+ raise ValueError, 'Number of channels and number of channel ' \
+ 'definitions are different'
+
+ info.chs = chs
+ info.nchan = nchan
+ print '\tFound channel information in evoked data. nchan = %d' % nchan
+ if sfreq > 0:
+ info['sfreq'] = sfreq
+
+ nsamp = last - first + 1
+ print '\tFound the data of interest:'
+ print '\t\tt = %10.2f ... %10.2f ms (%s)' % (
+ 1000*first/info['sfreq'], 1000*last/info['sfreq'], comment)
+ if info['comps'] is not None:
+ print '\t\t%d CTF compensation matrices available' % len(info['comps'])
+
+ # Read the data in the aspect block
+ nave = 1
+ epoch = []
+ for k in range(my_aspect.nent):
+ kind = my_aspect.directory[k].kind
+ pos = my_aspect.directory[k].pos
+ if kind == FIFF.FIFF_COMMENT:
+ tag = read_tag(fid, pos)
+ comment = tag.data
+ elif kind == FIFF.FIFF_ASPECT_KIND:
+ tag = read_tag(fid, pos)
+ aspect_kind = tag.data
+ elif kind == FIFF.FIFF_NAVE:
+ tag = read_tag(fid, pos)
+ nave = tag.data
+ elif kind == FIFF.FIFF_EPOCH:
+ tag = read_tag(fid, pos)
+ epoch.append(tag)
+
+ print '\t\tnave = %d aspect type = %d' % (nave, aspect_kind)
+
+ nepoch = len(epoch)
+ if nepoch != 1 and nepoch != info.nchan:
+ fid.close()
+ raise ValueError, 'Number of epoch tags is unreasonable '\
+ '(nepoch = %d nchan = %d)' % (nepoch, info.nchan)
+
+ if nepoch == 1:
+ # Only one epoch
+ all_data = epoch[0].data
+ # May need a transpose if the number of channels is one
+ if all_data.shape[1] == 1 and info.nchan == 1:
+ all_data = all_data.T
+
+ else:
+ # Put the old style epochs together
+ all_data = epoch[0].data.T
+ for k in range(1, nepoch):
+ all_data = np.r_[all_data, epoch[k].data.T]
+
+ if all_data.shape[1] != nsamp:
+ fid.close()
+ raise ValueError, 'Incorrect number of samples (%d instead of %d)' % (
+ all_data.shape[1], nsamp)
+
+ # Calibrate
+ cals = np.array([info['chs'][k].cal for k in range(info['nchan'])])
+ all_data = np.dot(np.diag(cals.ravel()), all_data)
+
+ times = np.arange(first, last+1, dtype=np.float) / info['sfreq']
+
+ # Run baseline correction
+ if baseline is not None:
+ print "Applying baseline correction ..."
+ bmin = baseline[0]
+ bmax = baseline[1]
+ if bmin is None:
+ imin = 0
+ else:
+ imin = int(np.where(times >= bmin)[0][0])
+ if bmax is None:
+ imax = len(times)
+ else:
+ imax = int(np.where(times <= bmax)[0][-1]) + 1
+ all_data -= np.mean(all_data[:, imin:imax], axis=1)[:,None]
+ else:
+ print "No baseline correction applied..."
+
+ # Put it all together
+ self.info = info
+ self.nave = nave
+ self.aspect_kind = aspect_kind
+ self.first = first
+ self.last = last
+ self.comment = comment
+ self.times = times
+ self.data = all_data
+
+ fid.close()
+
+ def save(self, fname):
+ """Save dataset to file.
+
+ Parameters
+ ----------
+ fname : string
+ Name of the file where to save the data.
+ """
+
+ # Create the file and save the essentials
+ fid = start_file(fname)
+ start_block(fid, FIFF.FIFFB_MEAS)
+ write_id(fid, FIFF.FIFF_BLOCK_ID)
+ if self.info['meas_id'] is not None:
+ write_id(fid, FIFF.FIFF_PARENT_BLOCK_ID, self.info['meas_id'])
+
+ # Measurement info
+ start_block(fid, FIFF.FIFFB_MEAS_INFO)
+
+ # Blocks from the original
+ blocks = [FIFF.FIFFB_SUBJECT, FIFF.FIFFB_HPI_MEAS, FIFF.FIFFB_HPI_RESULT,
+ FIFF.FIFFB_ISOTRAK, FIFF.FIFFB_PROCESSING_HISTORY]
+
+ have_hpi_result = False
+ have_isotrak = False
+ if len(blocks) > 0 and 'filename' in self.info and \
+ self.info['filename'] is not None:
+ fid2, tree, _ = fiff_open(self.info['filename'])
+ for block in blocks:
+ nodes = dir_tree_find(tree, block)
+ copy_tree(fid2, tree['id'], nodes, fid)
+ if block == FIFF.FIFFB_HPI_RESULT and len(nodes) > 0:
+ have_hpi_result = True
+ if block == FIFF.FIFFB_ISOTRAK and len(nodes) > 0:
+ have_isotrak = True
+ fid2.close()
+
+ # General
+ write_float(fid, FIFF.FIFF_SFREQ, self.info['sfreq'])
+ write_float(fid, FIFF.FIFF_HIGHPASS, self.info['highpass'])
+ write_float(fid, FIFF.FIFF_LOWPASS, self.info['lowpass'])
+ write_int(fid, FIFF.FIFF_NCHAN, self.info['nchan'])
+ if self.info['meas_date'] is not None:
+ write_int(fid, FIFF.FIFF_MEAS_DATE, self.info['meas_date'])
+
+ # Coordinate transformations if the HPI result block was not there
+ if not have_hpi_result:
+ if self.info['dev_head_t'] is not None:
+ write_coord_trans(fid, self.info['dev_head_t'])
+
+ if self.info['ctf_head_t'] is not None:
+ write_coord_trans(fid, self.info['ctf_head_t'])
+
+ # Channel information
+ for k in range(self.info['nchan']):
+ # Scan numbers may have been messed up
+ self.info['chs'][k]['scanno'] = k
+ write_ch_info(fid, self.info['chs'][k])
+
+ # Polhemus data
+ if self.info['dig'] is not None and not have_isotrak:
+ start_block(fid, FIFF.FIFFB_ISOTRAK)
+ for d in self.info['dig']:
+ write_dig_point(fid, d)
+
+ end_block(fid, FIFF.FIFFB_ISOTRAK)
+
+ # Projectors
+ write_proj(fid, self.info['projs'])
+
+ # CTF compensation info
+ write_ctf_comp(fid, self.info['comps'])
+
+ # Bad channels
+ if len(self.info['bads']) > 0:
+ start_block(fid, FIFF.FIFFB_MNE_BAD_CHANNELS)
+ write_name_list(fid, FIFF.FIFF_MNE_CH_NAME_LIST, self.info['bads'])
+ end_block(fid, FIFF.FIFFB_MNE_BAD_CHANNELS)
+
+ end_block(fid, FIFF.FIFFB_MEAS_INFO)
+
+ # One or more evoked data sets
+ start_block(fid, FIFF.FIFFB_PROCESSED_DATA)
+ data_evoked = self.data
+ if not isinstance(data_evoked, list):
+ data_evoked = [data_evoked]
+
+ for evoked in data_evoked:
+ start_block(fid, FIFF.FIFFB_EVOKED)
+
+ # Comment is optional
+ if len(self.comment) > 0:
+ write_string(fid, FIFF.FIFF_COMMENT, self.comment)
+
+ # First and last sample
+ write_int(fid, FIFF.FIFF_FIRST_SAMPLE, self.first)
+ write_int(fid, FIFF.FIFF_LAST_SAMPLE, self.last)
+
+ # The epoch itself
+ start_block(fid, FIFF.FIFFB_ASPECT)
+
+ write_int(fid, FIFF.FIFF_ASPECT_KIND, self.aspect_kind)
+ write_int(fid, FIFF.FIFF_NAVE, self.nave)
+
+ decal = np.zeros((self.info['nchan'], self.info['nchan']))
+ for k in range(self.info['nchan']): # XXX : can be improved
+ decal[k, k] = 1.0 / self.info['chs'][k]['cal']
+
+ write_float_matrix(fid, FIFF.FIFF_EPOCH,
+ np.dot(decal, evoked))
+ end_block(fid, FIFF.FIFFB_ASPECT)
+ end_block(fid, FIFF.FIFFB_EVOKED)
+
+ end_block(fid, FIFF.FIFFB_PROCESSED_DATA)
+
+ end_block(fid, FIFF.FIFFB_MEAS)
+
+ end_file(fid)
+
+
def read_evoked(fname, setno=0, baseline=None):
"""Read an evoked dataset
@@ -37,230 +416,11 @@ def read_evoked(fname, setno=0, baseline=None):
-------
data: dict
The evoked dataset
-
"""
- if setno < 0:
- raise ValueError, 'Data set selector must be positive'
-
- print 'Reading %s ...' % fname
- fid, tree, _ = fiff_open(fname)
-
- # Read the measurement info
- info, meas = read_meas_info(fid, tree)
- info['filename'] = fname
-
- # Locate the data of interest
- processed = dir_tree_find(meas, FIFF.FIFFB_PROCESSED_DATA)
- if len(processed) == 0:
- fid.close()
- raise ValueError, 'Could not find processed data'
-
- evoked = dir_tree_find(meas, FIFF.FIFFB_EVOKED)
- if len(evoked) == 0:
- fid.close(fid)
- raise ValueError, 'Could not find evoked data'
-
- # Identify the aspects
- naspect = 0
- is_smsh = None
- sets = []
- for k in range(len(evoked)):
- aspects_k = dir_tree_find(evoked[k], FIFF.FIFFB_ASPECT)
- set_k = dict(aspects=aspects_k,
- naspect=len(aspects_k))
- sets.append(set_k)
-
- if sets[k]['naspect'] > 0:
- if is_smsh is None:
- is_smsh = np.zeros((1, sets[k]['naspect']))
- else:
- is_smsh = np.r_[is_smsh, np.zeros((1, sets[k]['naspect']))]
- naspect += sets[k]['naspect']
-
- saspects = dir_tree_find(evoked[k], FIFF.FIFFB_SMSH_ASPECT)
- nsaspects = len(saspects)
- if nsaspects > 0:
- sets[k]['naspect'] += nsaspects
- sets[k]['naspect'] = [sets[k]['naspect'], saspects] # XXX : potential bug
- is_smsh = np.r_[is_smsh, np.ones(1, sets[k]['naspect'])]
- naspect += nsaspects
-
- print '\t%d evoked data sets containing a total of %d data aspects' \
- ' in %s' % (len(evoked), naspect, fname)
-
- if setno >= naspect or setno < 0:
- fid.close()
- raise ValueError, 'Data set selector out of range'
-
- # Next locate the evoked data set
- #
- p = 0
- goon = True
- for k in range(len(evoked)):
- for a in range(sets[k]['naspect']):
- if p == setno:
- my_evoked = evoked[k]
- my_aspect = sets[k]['aspects'][a]
- goon = False
- break
- p += 1
- if not goon:
- break
- else:
- # The desired data should have been found but better to check
- fid.close(fid)
- raise ValueError, 'Desired data set not found'
-
- # Now find the data in the evoked block
- nchan = 0
- sfreq = -1
- q = 0
- chs = []
- comment = None
- for k in range(my_evoked.nent):
- kind = my_evoked.directory[k].kind
- pos = my_evoked.directory[k].pos
- if kind == FIFF.FIFF_COMMENT:
- tag = read_tag(fid, pos)
- comment = tag.data
- elif kind == FIFF.FIFF_FIRST_SAMPLE:
- tag = read_tag(fid, pos)
- first = tag.data
- elif kind == FIFF.FIFF_LAST_SAMPLE:
- tag = read_tag(fid, pos)
- last = tag.data
- elif kind == FIFF.FIFF_NCHAN:
- tag = read_tag(fid, pos)
- nchan = tag.data
- elif kind == FIFF.FIFF_SFREQ:
- tag = read_tag(fid, pos)
- sfreq = tag.data
- elif kind ==FIFF.FIFF_CH_INFO:
- tag = read_tag(fid, pos)
- chs.append(tag.data)
- q += 1
-
- if comment is None:
- comment = 'No comment'
-
- # Local channel information?
- if nchan > 0:
- if chs is None:
- fid.close()
- raise ValueError, 'Local channel information was not found ' \
- 'when it was expected.'
-
- if len(chs) != nchan:
- fid.close()
- raise ValueError, 'Number of channels and number of channel ' \
- 'definitions are different'
-
- info.chs = chs
- info.nchan = nchan
- print '\tFound channel information in evoked data. nchan = %d' % nchan
- if sfreq > 0:
- info['sfreq'] = sfreq
-
- nsamp = last - first + 1
- print '\tFound the data of interest:'
- print '\t\tt = %10.2f ... %10.2f ms (%s)' % (
- 1000*first/info['sfreq'], 1000*last/info['sfreq'], comment)
- if info['comps'] is not None:
- print '\t\t%d CTF compensation matrices available' % len(info['comps'])
-
- # Read the data in the aspect block
- nave = 1
- epoch = []
- for k in range(my_aspect.nent):
- kind = my_aspect.directory[k].kind
- pos = my_aspect.directory[k].pos
- if kind == FIFF.FIFF_COMMENT:
- tag = read_tag(fid, pos)
- comment = tag.data
- elif kind == FIFF.FIFF_ASPECT_KIND:
- tag = read_tag(fid, pos)
- aspect_kind = tag.data
- elif kind == FIFF.FIFF_NAVE:
- tag = read_tag(fid, pos)
- nave = tag.data
- elif kind == FIFF.FIFF_EPOCH:
- tag = read_tag(fid, pos)
- epoch.append(tag)
-
- print '\t\tnave = %d aspect type = %d' % (nave, aspect_kind)
-
- nepoch = len(epoch)
- if nepoch != 1 and nepoch != info.nchan:
- fid.close()
- raise ValueError, 'Number of epoch tags is unreasonable '\
- '(nepoch = %d nchan = %d)' % (nepoch, info.nchan)
-
- if nepoch == 1:
- # Only one epoch
- all_data = epoch[0].data
- # May need a transpose if the number of channels is one
- if all_data.shape[1] == 1 and info.nchan == 1:
- all_data = all_data.T
-
- else:
- # Put the old style epochs together
- all_data = epoch[0].data.T
- for k in range(1, nepoch):
- all_data = np.r_[all_data, epoch[k].data.T]
-
- if all_data.shape[1] != nsamp:
- fid.close()
- raise ValueError, 'Incorrect number of samples (%d instead of %d)' % (
- all_data.shape[1], nsamp)
-
- # Calibrate
- cals = np.array([info['chs'][k].cal for k in range(info['nchan'])])
- all_data = np.dot(np.diag(cals.ravel()), all_data)
-
- times = np.arange(first, last+1, dtype=np.float) / info['sfreq']
-
- # Run baseline correction
- if baseline is not None:
- print "Applying baseline correction ..."
- bmin = baseline[0]
- bmax = baseline[1]
- if bmin is None:
- imin = 0
- else:
- imin = int(np.where(times >= bmin)[0][0])
- if bmax is None:
- imax = len(times)
- else:
- imax = int(np.where(times <= bmax)[0][-1]) + 1
- all_data -= np.mean(all_data[:, imin:imax], axis=1)[:,None]
- else:
- print "No baseline correction applied..."
-
- # Put it all together
- data = dict(info=info, evoked=dict(aspect_kind=aspect_kind,
- is_smsh=is_smsh[setno],
- nave=nave, first=first,
- last=last, comment=comment,
- times=times,
- epochs=all_data))
-
- fid.close()
-
- return data
+ return Evoked(fname, setno)
-###############################################################################
-# Writing
-from .tree import copy_tree
-from .write import start_file, start_block, end_file, end_block, write_id, \
- write_float, write_int, write_coord_trans, write_ch_info, \
- write_dig_point, write_name_list, write_string, \
- write_float_matrix
-from .proj import write_proj
-from .ctf import write_ctf_comp
-
-
-def write_evoked(name, data):
+def write_evoked(name, evoked):
"""Write an evoked dataset to a file
Parameters
@@ -268,117 +428,7 @@ def write_evoked(name, data):
name: string
The file name.
- data: dict
- The evoked dataset obtained with read_evoked
+ evoked: object of type Evoked
+ The evoked dataset to save
"""
-
- # Create the file and save the essentials
- fid = start_file(name)
- start_block(fid, FIFF.FIFFB_MEAS)
- write_id(fid, FIFF.FIFF_BLOCK_ID)
- if data['info']['meas_id'] is not None:
- write_id(fid, FIFF.FIFF_PARENT_BLOCK_ID, data['info']['meas_id'])
-
- # Measurement info
- start_block(fid, FIFF.FIFFB_MEAS_INFO)
-
- # Blocks from the original
- blocks = [FIFF.FIFFB_SUBJECT, FIFF.FIFFB_HPI_MEAS, FIFF.FIFFB_HPI_RESULT,
- FIFF.FIFFB_ISOTRAK, FIFF.FIFFB_PROCESSING_HISTORY]
-
- have_hpi_result = False
- have_isotrak = False
- if len(blocks) > 0 and 'filename' in data['info'] and \
- data['info']['filename'] is not None:
- fid2, tree, _ = fiff_open(data['info']['filename'])
- for block in blocks:
- nodes = dir_tree_find(tree, block)
- copy_tree(fid2, tree['id'], nodes, fid)
- if block == FIFF.FIFFB_HPI_RESULT and len(nodes) > 0:
- have_hpi_result = True
- if block == FIFF.FIFFB_ISOTRAK and len(nodes) > 0:
- have_isotrak = True
- fid2.close()
-
-
- # General
- write_float(fid, FIFF.FIFF_SFREQ, data['info']['sfreq'])
- write_float(fid, FIFF.FIFF_HIGHPASS, data['info']['highpass'])
- write_float(fid, FIFF.FIFF_LOWPASS, data['info']['lowpass'])
- write_int(fid, FIFF.FIFF_NCHAN, data['info']['nchan'])
- if data['info']['meas_date'] is not None:
- write_int(fid, FIFF.FIFF_MEAS_DATE, data['info']['meas_date'])
-
- # Coordinate transformations if the HPI result block was not there
- if not have_hpi_result:
- if data['info']['dev_head_t'] is not None:
- write_coord_trans(fid, data['info']['dev_head_t'])
-
- if data['info']['ctf_head_t'] is not None:
- write_coord_trans(fid, data['info']['ctf_head_t'])
-
- # Channel information
- for k in range(data['info']['nchan']):
- # Scan numbers may have been messed up
- data['info']['chs'][k]['scanno'] = k
- write_ch_info(fid, data['info']['chs'][k])
-
- # Polhemus data
- if data['info']['dig'] is not None and not have_isotrak:
- start_block(fid, FIFF.FIFFB_ISOTRAK)
- for d in data['info']['dig']:
- write_dig_point(fid, d)
-
- end_block(fid, FIFF.FIFFB_ISOTRAK)
-
- # Projectors
- write_proj(fid, data['info']['projs'])
-
- # CTF compensation info
- write_ctf_comp(fid, data['info']['comps'])
-
- # Bad channels
- if len(data['info']['bads']) > 0:
- start_block(fid, FIFF.FIFFB_MNE_BAD_CHANNELS)
- write_name_list(fid, FIFF.FIFF_MNE_CH_NAME_LIST, data['info']['bads'])
- end_block(fid, FIFF.FIFFB_MNE_BAD_CHANNELS)
-
- end_block(fid, FIFF.FIFFB_MEAS_INFO)
-
- # One or more evoked data sets
- start_block(fid, FIFF.FIFFB_PROCESSED_DATA)
- data_evoked = data['evoked']
- if not isinstance(data_evoked, list):
- data_evoked = [data_evoked]
-
- for evoked in data_evoked:
- start_block(fid, FIFF.FIFFB_EVOKED)
-
- # Comment is optional
- if len(evoked['comment']) > 0:
- write_string(fid, FIFF.FIFF_COMMENT, evoked['comment'])
-
- # First and last sample
- write_int(fid, FIFF.FIFF_FIRST_SAMPLE, evoked['first'])
- write_int(fid, FIFF.FIFF_LAST_SAMPLE, evoked['last'])
-
- # The epoch itself
- start_block(fid, FIFF.FIFFB_ASPECT)
-
- write_int(fid, FIFF.FIFF_ASPECT_KIND, evoked['aspect_kind'])
- write_int(fid, FIFF.FIFF_NAVE, evoked['nave'])
-
- decal = np.zeros((data['info']['nchan'], data['info']['nchan']))
- for k in range(data['info']['nchan']): # XXX : can be improved
- decal[k, k] = 1.0 / data['info']['chs'][k]['cal']
-
- write_float_matrix(fid, FIFF.FIFF_EPOCH,
- np.dot(decal, evoked['epochs']))
- end_block(fid, FIFF.FIFFB_ASPECT)
- end_block(fid, FIFF.FIFFB_EVOKED)
-
- end_block(fid, FIFF.FIFFB_PROCESSED_DATA)
-
- end_block(fid, FIFF.FIFFB_MEAS)
-
- end_file(fid)
+ evoked.save(name)
diff --git a/mne/fiff/pick.py b/mne/fiff/pick.py
index 9bf3e72..894762a 100644
--- a/mne/fiff/pick.py
+++ b/mne/fiff/pick.py
@@ -130,8 +130,8 @@ def pick_channels_evoked(orig, include=[], exclude=[]):
Parameters
----------
- orig : dict
- One evoked data
+ orig : Evoked object
+ One evoked dataset
include : list of string, (optional)
List of channels to include. (if None, include all available)
@@ -149,7 +149,7 @@ def pick_channels_evoked(orig, include=[], exclude=[]):
if len(include) == 0 and len(exclude) == 0:
return orig
- sel = pick_channels(orig['info']['ch_names'], include=include,
+ sel = pick_channels(orig.info['ch_names'], include=include,
exclude=exclude)
if len(sel) == 0:
@@ -159,11 +159,11 @@ def pick_channels_evoked(orig, include=[], exclude=[]):
#
# Modify the measurement info
#
- res['info'] = pick_info(res['info'], sel)
+ res.info = pick_info(res.info, sel)
#
# Create the reduced data set
#
- res['evoked']['epochs'] = res['evoked']['epochs'][sel,:]
+ res.data = res.data[sel,:]
return res
diff --git a/mne/fiff/tests/test_evoked.py b/mne/fiff/tests/test_evoked.py
index c69b55a..ac0f833 100644
--- a/mne/fiff/tests/test_evoked.py
+++ b/mne/fiff/tests/test_evoked.py
@@ -1,4 +1,3 @@
-import os
import os.path as op
from numpy.testing import assert_array_almost_equal, assert_equal
@@ -10,20 +9,14 @@ fname = op.join(op.dirname(__file__), 'data', 'test-ave.fif')
def test_io_evoked():
"""Test IO for noise covariance matrices
"""
- data = read_evoked(fname)
+ ave = read_evoked(fname)
- write_evoked('evoked.fif', data)
- data2 = read_evoked('evoked.fif')
+ write_evoked('evoked.fif', ave)
+ ave2 = read_evoked('evoked.fif')
- print assert_array_almost_equal(data['evoked']['epochs'],
- data2['evoked']['epochs'])
- print assert_array_almost_equal(data['evoked']['times'],
- data2['evoked']['times'])
- print assert_equal(data['evoked']['nave'],
- data2['evoked']['nave'])
- print assert_equal(data['evoked']['aspect_kind'],
- data2['evoked']['aspect_kind'])
- print assert_equal(data['evoked']['last'],
- data2['evoked']['last'])
- print assert_equal(data['evoked']['first'],
- data2['evoked']['first'])
+ print assert_array_almost_equal(ave.data, ave2.data)
+ print assert_array_almost_equal(ave.times, ave2.times)
+ print assert_equal(ave.nave, ave2.nave)
+ print assert_equal(ave.aspect_kind, ave2.aspect_kind)
+ print assert_equal(ave.last, ave2.last)
+ print assert_equal(ave.first, ave2.first)
diff --git a/mne/inverse.py b/mne/inverse.py
index 5038ebd..b35260f 100644
--- a/mne/inverse.py
+++ b/mne/inverse.py
@@ -12,7 +12,7 @@ from .fiff.tag import find_tag
from .fiff.matrix import _read_named_matrix, _transpose_named_matrix
from .fiff.proj import read_proj, make_projector
from .fiff.tree import dir_tree_find
-from .fiff.evoked import read_evoked
+from .fiff.evoked import Evoked
from .fiff.pick import pick_channels_evoked
from .cov import read_cov
@@ -441,7 +441,7 @@ def compute_inverse(fname_data, setno, fname_inv, lambda2, dSPM=True,
#
# Read the data first
#
- data = read_evoked(fname_data, setno, baseline=baseline)
+ evoked = Evoked(fname_data, setno, baseline=baseline)
#
# Then the inverse operator
#
@@ -450,14 +450,14 @@ def compute_inverse(fname_data, setno, fname_inv, lambda2, dSPM=True,
# Set up the inverse according to the parameters
#
if nave is None:
- nave = data['evoked']['nave']
+ nave = evoked.nave
inv = prepare_inverse_operator(inv, nave, lambda2, dSPM)
#
# Pick the correct channels from the data
#
- data = pick_channels_evoked(data, inv['noise_cov']['names'])
- print 'Picked %d channels from the data' % data['info']['nchan']
+ evoked = pick_channels_evoked(evoked, inv['noise_cov']['names'])
+ print 'Picked %d channels from the data' % evoked.info['nchan']
print 'Computing inverse...',
#
# Simple matrix multiplication followed by combination of the
@@ -469,7 +469,7 @@ def compute_inverse(fname_data, setno, fname_inv, lambda2, dSPM=True,
trans = inv['reginv'][:,None] * reduce(np.dot, [inv['eigen_fields']['data'],
inv['whitener'],
inv['proj'],
- data['evoked']['epochs']])
+ evoked.data])
#
# Transformation into current distributions by weighting the eigenleads
@@ -504,8 +504,8 @@ def compute_inverse(fname_data, setno, fname_inv, lambda2, dSPM=True,
res = dict()
res['inv'] = inv
res['sol'] = sol
- res['tmin'] = float(data['evoked']['first']) / data['info']['sfreq']
- res['tstep'] = 1.0 / data['info']['sfreq']
+ res['tmin'] = float(evoked.first) / evoked.info['sfreq']
+ res['tstep'] = 1.0 / evoked.info['sfreq']
print '[done]'
return res
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/python-mne.git
More information about the debian-med-commit
mailing list