[med-svn] [python-mne] 235/376: ENH : basic EOG + ECG artifacts detectors
Yaroslav Halchenko
debian at onerussian.com
Fri Nov 27 17:22:54 UTC 2015
This is an automated email from the git hooks/post-receive script.
yoh pushed a commit to annotated tag v0.1
in repository python-mne.
commit 8565be6846d8f9a0b0cfd3e03c8702035d286d6f
Author: Alexandre Gramfort <alexandre.gramfort at inria.fr>
Date: Thu May 5 16:08:05 2011 -0400
ENH : basic EOG + ECG artifacts detectors
---
examples/plot_find_ecg_artifacts.py | 47 +++++++++
examples/plot_find_eog_artifacts.py | 45 +++++++++
mne/__init__.py | 1 +
mne/artifacts/__init__.py | 2 +
mne/artifacts/ecg.py | 113 ++++++++++++++++++++++
mne/artifacts/eog.py | 66 +++++++++++++
mne/artifacts/peak_finder.py | 164 ++++++++++++++++++++++++++++++++
mne/artifacts/tests/test_peak_finder.py | 8 ++
8 files changed, 446 insertions(+)
diff --git a/examples/plot_find_ecg_artifacts.py b/examples/plot_find_ecg_artifacts.py
new file mode 100644
index 0000000..bbcc76c
--- /dev/null
+++ b/examples/plot_find_ecg_artifacts.py
@@ -0,0 +1,47 @@
+"""
+==================
+Find ECG artifacts
+==================
+
+Locate QRS component of ECG.
+
+"""
+# Authors: Alexandre Gramfort <gramfort at nmr.mgh.harvard.edu>
+#
+# License: BSD (3-clause)
+
+print __doc__
+
+import numpy as np
+import pylab as pl
+import mne
+from mne import fiff
+from mne.datasets import sample
+data_path = sample.data_path('.')
+
+###############################################################################
+# Set parameters
+raw_fname = data_path + '/MEG/sample/sample_audvis_raw.fif'
+
+# Setup for reading the raw data
+raw = fiff.Raw(raw_fname)
+
+event_id = 999
+ecg_events = mne.artifacts.find_ecg_events(raw, event_id)
+
+# Read epochs
+picks = fiff.pick_types(raw.info, meg=False, eeg=False, stim=False, eog=False,
+ include=['MEG 1531'])
+tmin, tmax = -0.1, 0.1
+epochs = mne.Epochs(raw, ecg_events, event_id, tmin, tmax, picks=picks,
+ proj=False)
+data = epochs.get_data()
+
+print "Number of detected EOG artifacts : %d" % len(data)
+
+###############################################################################
+# Plot EOG artifacts
+pl.plot(1e3 * epochs.times, np.squeeze(data).T)
+pl.xlabel('Times (ms)')
+pl.ylabel('ECG')
+pl.show()
\ No newline at end of file
diff --git a/examples/plot_find_eog_artifacts.py b/examples/plot_find_eog_artifacts.py
new file mode 100644
index 0000000..22ef4d1
--- /dev/null
+++ b/examples/plot_find_eog_artifacts.py
@@ -0,0 +1,45 @@
+"""
+==================
+Find EOG artifacts
+==================
+
+Locate peaks of EOG to spot blinks and general EOG artifacts.
+
+"""
+# Authors: Alexandre Gramfort <gramfort at nmr.mgh.harvard.edu>
+#
+# License: BSD (3-clause)
+
+print __doc__
+
+import numpy as np
+import pylab as pl
+import mne
+from mne import fiff
+from mne.datasets import sample
+data_path = sample.data_path('.')
+
+###############################################################################
+# Set parameters
+raw_fname = data_path + '/MEG/sample/sample_audvis_filt-0-40_raw.fif'
+
+# Setup for reading the raw data
+raw = fiff.Raw(raw_fname)
+
+event_id = 998
+eog_events = mne.artifacts.find_eog_events(raw, event_id)
+
+# Read epochs
+picks = fiff.pick_types(raw.info, meg=False, eeg=False, stim=False, eog=True)
+tmin, tmax = -0.2, 0.2
+epochs = mne.Epochs(raw, eog_events, event_id, tmin, tmax, picks=picks)
+data = epochs.get_data()
+
+print "Number of detected EOG artifacts : %d" % len(data)
+
+###############################################################################
+# Plot EOG artifacts
+pl.plot(1e3 * epochs.times, np.squeeze(data).T)
+pl.xlabel('Times (ms)')
+pl.ylabel('EOG (muV)')
+pl.show()
\ No newline at end of file
diff --git a/mne/__init__.py b/mne/__init__.py
index e81addc..64c4a42 100755
--- a/mne/__init__.py
+++ b/mne/__init__.py
@@ -11,3 +11,4 @@ from .epochs import Epochs
from .label import label_time_courses, read_label
from .misc import parse_config, read_reject_parameters
import fiff
+import artifacts
diff --git a/mne/artifacts/__init__.py b/mne/artifacts/__init__.py
new file mode 100644
index 0000000..19a3252
--- /dev/null
+++ b/mne/artifacts/__init__.py
@@ -0,0 +1,2 @@
+from .eog import find_eog_events
+from .ecg import find_ecg_events
diff --git a/mne/artifacts/ecg.py b/mne/artifacts/ecg.py
new file mode 100644
index 0000000..ede3990
--- /dev/null
+++ b/mne/artifacts/ecg.py
@@ -0,0 +1,113 @@
+import numpy as np
+
+from .. import fiff
+from ..filter import band_pass_filter
+
+
+def qrs_detector(sfreq, ecg, thresh_value=0.6, levels=2.5, n_thresh=3):
+ """Detect QRS component in ECG channels.
+
+ QRS is the main wave on the heart beat.
+
+ Parameters
+ ----------
+ sfreq : float
+ Sampling rate
+ ecg : array
+ ECG signal
+ thresh_value: float
+ qrs detection threshold
+ levels: float
+ number of std from mean to include for detection
+ n_thresh: int
+ max number of crossings
+
+ Returns
+ -------
+ events : array
+ Indices of ECG peaks
+ """
+ win_size = round((60.0 * sfreq) / 120.0)
+
+ filtecg = band_pass_filter(ecg, sfreq, 5, 35)
+ n_points = len(filtecg)
+
+ absecg = np.abs(filtecg)
+ init = int(sfreq)
+
+ maxpt = np.empty(3)
+ maxpt[0] = np.max(absecg[:init])
+ maxpt[1] = np.max(absecg[init:init * 2])
+ maxpt[2] = np.max(absecg[init * 2:init * 3])
+
+ init_max = np.mean(maxpt)
+
+ thresh1 = init_max * thresh_value
+
+ numcross = []
+ time = []
+ rms = []
+ i = 0
+ while i < (n_points - win_size):
+ window = absecg[i:i + win_size]
+ if window[0] > thresh1:
+ maxTime = np.argmax(window)
+ time.append(i + maxTime)
+ numcross.append(np.sum(np.diff(window > thresh1) == 1))
+ rms.append(np.sqrt(np.mean(window ** 2)))
+ i += win_size
+ else:
+ i += 1
+
+ time = np.array(time)
+ rms_mean = np.mean(rms)
+ rms_std = np.std(rms)
+ rms_thresh = rms_mean + (rms_std * levels)
+ b = np.where(rms < rms_thresh)[0]
+ a = np.array(numcross)[b]
+ clean_events = time[b[a < n_thresh]]
+
+ return clean_events
+
+
+def find_ecg_events(raw, event_id=999):
+ """Find ECG peaks
+
+ Parameters
+ ----------
+ raw : instance of Raw
+ The raw data
+ event_id : int
+ The index to assign to found events
+
+ Returns
+ -------
+ ecg_events : array
+ Events
+ """
+ info = raw.info
+
+ # Geting ECG Channel
+ ch_ECG = fiff.pick_types(info, meg=False, eeg=False, stim=False,
+ eog=False, ecg=True, emg=False)
+
+ if len(ch_ECG) == 0:
+ # closest to the heart normally, In future we can search for it.
+ ch_ECG = fiff.pick_channels(raw.ch_names, include='MEG 1531')
+ print 'Using channel index %d to identify heart beats' % ch_ECG
+ else:
+ print 'ECG channel index for this subject is: %s' % ch_ECG
+
+ assert len(ch_ECG) == 1
+ ecg, times = raw[ch_ECG, :]
+
+ # detecting QRS and generating event file
+ ecg_events = qrs_detector(info['sfreq'], ecg.ravel())
+ n_events = len(ecg_events)
+ average_pulse = 60.0 * (times[-1] - times[0]) / n_events
+ print ("Number of ECG events detected : %d (average pulse %d / min.)"
+ % (n_events, average_pulse))
+
+ ecg_events = np.c_[ecg_events + raw.first_samp, np.zeros(n_events),
+ event_id * np.ones(n_events)]
+ return ecg_events
diff --git a/mne/artifacts/eog.py b/mne/artifacts/eog.py
new file mode 100644
index 0000000..afc0808
--- /dev/null
+++ b/mne/artifacts/eog.py
@@ -0,0 +1,66 @@
+import numpy as np
+
+from .peak_finder import peak_finder
+from .. import fiff
+from ..filter import band_pass_filter
+
+
+def find_eog_events(raw, event_id=998):
+ """Locate EOG artifacts
+
+ Parameters
+ ----------
+
+
+ Returns
+ -------
+ """
+ info = raw.info
+
+ # Geting EOG Channel
+ ch_EOG = fiff.pick_types(info, meg=False, eeg=False, stim=False,
+ eog=True, ecg=False, emg=False)
+
+ if len(ch_EOG) == 0:
+ print 'No EOG channels found'
+ print 'Trying with EEG 061 and EEG 062'
+ ch_EOG = fiff.pick_channels(raw.ch_names,
+ include=['EEG 061', 'EEG 062'])
+ if len(ch_EOG) != 2:
+ raise ValueError('EEG 61 or EEG 62 channel not found !!')
+
+ print 'EOG channel index for this subject is: %s' % ch_EOG
+
+ sampRate = info['sfreq']
+
+ eog, _ = raw[ch_EOG, :]
+
+ print ('Filtering the data to remove DC offset to help distinguish '
+ 'blinks from saccades')
+
+ # filtering to remove dc offset so that we know which is blink and saccades
+ filteog = np.array([band_pass_filter(x, sampRate, 2, 45) for x in eog])
+ temp = np.sqrt(np.sum(filteog ** 2, axis=1))
+
+ indexmax = np.argmax(temp)
+
+ # easy to detect peaks with this filtering.
+ filteog = band_pass_filter(eog[indexmax], sampRate, 1, 10)
+
+ # detecting eog blinks and generating event file
+
+ print 'Now detecting blinks and generating corresponding event file'
+
+ temp = filteog - np.mean(filteog)
+ if np.abs(np.max(temp)) > np.abs(np.min(temp)):
+ eog_events, _ = peak_finder(filteog, extrema=1)
+ else:
+ eog_events, _ = peak_finder(filteog, extrema=-1)
+
+ print 'Saving event file'
+ n_events = len(eog_events)
+ print "Number of EOG events detected : %d" % n_events
+ eog_events = np.c_[eog_events + raw.first_samp, np.zeros(n_events),
+ event_id * np.ones(n_events)]
+
+ return eog_events
\ No newline at end of file
diff --git a/mne/artifacts/peak_finder.py b/mne/artifacts/peak_finder.py
new file mode 100644
index 0000000..85186c7
--- /dev/null
+++ b/mne/artifacts/peak_finder.py
@@ -0,0 +1,164 @@
+import numpy as np
+from math import ceil
+
+
+def peak_finder(x0, thresh=None, extrema=1):
+ """Noise tolerant fast peak finding algorithm
+
+ Parameters
+ ----------
+ x0: 1d array
+ A real vector from the maxima will be found (required)
+ thresh: float
+ The amount above surrounding data for a peak to be
+ identified (default = (max(x0)-min(x0))/4). Larger values mean
+ the algorithm is more selective in finding peaks.
+ extrema: {-1, 1}
+ 1 if maxima are desired, -1 if minima are desired
+ (default = maxima, 1)
+
+ Returns
+ -------
+ peak_loc: array
+ The indicies of the identified peaks in x0
+ peak_mag: array
+ The magnitude of the identified peaks
+
+ Note
+ ----
+ If repeated values are found the first is identified as the peak.
+ Conversion from initial Matlab code from:
+ Nathanael C. Yoder (ncyoder at purdue.edu)
+
+ Exemple
+ -------
+ t = 0:.0001:10;
+ x = 12*sin(10*2*pi*t)-3*sin(.1*2*pi*t)+randn(1,numel(t));
+ x(1250:1255) = max(x);
+ peak_finder(x)
+ """
+
+ x0 = np.asanyarray(x0)
+
+ if x0.ndim >= 2:
+ raise ValueError('The input data must be a 1D vector')
+
+ s = x0.size
+
+ if thresh is None:
+ thresh = (np.max(x0) - np.min(x0)) / 4
+
+ assert extrema in [-1, 1]
+
+ if extrema == -1:
+ x0 = extrema * x0 # Make it so we are finding maxima regardless
+
+ dx0 = np.diff(x0) # Find derivative
+ # This is so we find the first of repeated values
+ dx0[dx0 == 0] = -np.finfo(float).eps
+ # Find where the derivative changes sign
+ ind = np.where(dx0[:-1:] * dx0[1::] < 0)[0] + 1
+
+ # Include endpoints in potential peaks and valleys
+ x = np.concatenate((x0[:1], x0[ind], x0[-1:]))
+ ind = np.concatenate(([0], ind, [s - 1]))
+
+ # x only has the peaks, valleys, and endpoints
+ length = x.size
+ min_mag = np.min(x)
+
+ if length > 2: # Function with peaks and valleys
+
+ # Set initial parameters for loop
+ temp_mag = min_mag
+ found_peak = False
+ left_min = min_mag
+
+ # Deal with first point a little differently since tacked it on
+ # Calculate the sign of the derivative since we taked the first point
+ # on it does not neccessarily alternate like the rest.
+ signDx = np.sign(np.diff(x[:3]))
+ if signDx[0] <= 0: # The first point is larger or equal to the second
+ ii = -1
+ if signDx[0] == signDx[1]: # Want alternating signs
+ x = np.concatenate((x[:1], x[2:]))
+ ind = np.concatenate((ind[:1], ind[2:]))
+ length -= 1
+
+ else: # First point is smaller than the second
+ ii = 0
+ if signDx[0] == signDx[1]: # Want alternating signs
+ x = x[1:]
+ ind = ind[1:]
+ length -= 1
+
+ # Preallocate max number of maxima
+ maxPeaks = ceil(length / 2.0)
+ peak_loc = np.zeros(maxPeaks, dtype=np.int)
+ peak_mag = np.zeros(maxPeaks)
+ c_ind = 0
+ # Loop through extrema which should be peaks and then valleys
+ while ii < (length - 1):
+ ii += 1 # This is a peak
+ # Reset peak finding if we had a peak and the next peak is bigger
+ # than the last or the left min was small enough to reset.
+ if found_peak and ((x[ii] > peak_mag[-1])
+ or (left_min < peak_mag[-1] - thresh)):
+ temp_mag = min_mag
+ found_peak = False
+
+ # Make sure we don't iterate past the length of our vector
+ if ii == length - 1:
+ break # We assign the last point differently out of the loop
+
+ # Found new peak that was lager than temp mag and threshold larger
+ # than the minimum to its left.
+ if (x[ii] > temp_mag) and (x[ii] > left_min + thresh):
+ temp_loc = ii
+ temp_mag = x[ii]
+
+ ii += 1 # Move onto the valley
+ # Come down at least thresh from peak
+ if not found_peak and (temp_mag > (thresh + x[ii])):
+ found_peak = True # We have found a peak
+ left_min = x[ii]
+ peak_loc[c_ind] = temp_loc # Add peak to index
+ peak_mag[c_ind] = temp_mag
+ c_ind += 1
+ elif x[ii] < left_min: # New left minima
+ left_min = x[ii]
+
+ # Check end point
+ if (x[-1] > temp_mag) and (x[-1] > (left_min + thresh)):
+ peak_loc[c_ind] = length - 1
+ peak_mag[c_ind] = x[-1]
+ c_ind += 1
+ elif not found_peak and temp_mag > min_mag:
+ # Check if we still need to add the last point
+ peak_loc[c_ind] = temp_loc
+ peak_mag[c_ind] = temp_mag
+ c_ind += 1
+
+ # Create output
+ peak_inds = ind[peak_loc[:c_ind]]
+ peak_mags = peak_mag[:c_ind]
+ else: # This is a monotone function where an endpoint is the only peak
+ x_ind = np.argmax(x)
+ peak_mags = x[x_ind]
+ if peak_mags > (min_mag + thresh):
+ peak_inds = ind[x_ind]
+ else:
+ peak_mags = []
+ peak_inds = []
+
+ # Change sign of data if was finding minima
+ if extrema < 0:
+ peak_mags *= -1.0
+ x0 = -x0
+
+ # Plot if no output desired
+ if len(peak_inds) == 0:
+ print 'No significant peaks found'
+
+ return peak_inds, peak_mags
+
diff --git a/mne/artifacts/tests/test_peak_finder.py b/mne/artifacts/tests/test_peak_finder.py
new file mode 100644
index 0000000..522f7fc
--- /dev/null
+++ b/mne/artifacts/tests/test_peak_finder.py
@@ -0,0 +1,8 @@
+import numpy as np
+from ..peak_finder import peak_finder
+
+def test_peak_finder():
+ """Test the peak detection method"""
+ x = [0, 2, 5, 0, 6, -1]
+ peak_inds, peak_mags = peak_finder(x)
+ print peak_inds
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/python-mne.git
More information about the debian-med-commit
mailing list