Skip to content

Instantly share code, notes, and snippets.

@micuat
Last active June 22, 2017 00:02
Show Gist options
  • Save micuat/bb0b012a95673151715e7f60f19364a8 to your computer and use it in GitHub Desktop.
Save micuat/bb0b012a95673151715e7f60f19364a8 to your computer and use it in GitHub Desktop.
import numpy as np
from pylsl import StreamInfo, StreamOutlet, StreamInlet, resolve_stream
from pythonosc import udp_client
def compute_feature_vector(eegdata, Fs):
# https://github.com/bcimontreal/bci_workshop/blob/master/bci_workshop_tools.py
eegdata = np.array([eegdata]).T
# 1. Compute the PSD
winSampleLength = len(eegdata)
# Apply Hamming window
w = np.hamming(winSampleLength)
dataWinCentered = eegdata - np.mean(eegdata, axis=0) # Remove offset
dataWinCenteredHam = (dataWinCentered.T*w).T
NFFT = nextpow2(winSampleLength)
Y = np.fft.fft(dataWinCenteredHam, n=NFFT, axis=0)/winSampleLength
PSD = 2*np.abs(Y[0:int(NFFT/2),:])
f = Fs/2*np.linspace(0,1,NFFT/2)
# SPECTRAL FEATURES
# Average of band powers
# Delta <4
ind_delta, = np.where(f<4)
meanDelta = np.mean(PSD[ind_delta,:],axis=0)
# Theta 4-8
ind_theta, = np.where((f>=4) & (f<=8))
meanTheta = np.mean(PSD[ind_theta,:],axis=0)
# Alpha 8-12
ind_alpha, = np.where((f>=8) & (f<=12))
meanAlpha = np.mean(PSD[ind_alpha,:],axis=0)
# Beta 12-30
ind_beta, = np.where((f>=12) & (f<30))
meanBeta = np.mean(PSD[ind_beta,:],axis=0)
feature_vector = np.concatenate((meanDelta, meanTheta, meanAlpha, meanBeta),axis=0)
feature_vector = np.log10(feature_vector)
return feature_vector
def nextpow2(i):
"""
Find the next power of 2 for number i
"""
n = 1
while n < i:
n *= 2
return n
if __name__ == "__main__":
fft_srate = 4
fft_bands = 4
# set OSC client
client = udp_client.SimpleUDPClient("localhost", 12000)
# set inlet (EEG)
print("looking for an EEG stream...")
streams = resolve_stream('type', 'EEG')
inlet = StreamInlet(streams[0])
# set outlet (FFT)
info = StreamInfo(inlet.info().name() + '-FFT', 'FFT', fft_bands, fft_srate, 'float32', 'myuid34234')
outlet = StreamOutlet(info)
srate = inlet.info().nominal_srate()
print("now sending data...")
eegArray = []
while True:
sample, timestamp = inlet.pull_sample()
eeg = sample[1]
eegArray.append(eeg)
if len(eegArray) >= srate:
eegArray = eegArray[0:int(srate) - 1]
feat_vector = compute_feature_vector(eegArray, srate)
eegArray = eegArray[int(srate / fft_srate):]
print(feat_vector)
outlet.push_sample(feat_vector)
client.send_message("/eeg/feat/", feat_vector)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment