Created
February 25, 2015 06:30
-
-
Save stormxuwz/765b30bb3c64607981cb to your computer and use it in GitHub Desktop.
Activity recognition by VAR model (HAR dataset)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import statsmodels.tsa as st | |
import numpy as np | |
import numpy.linalg as nl | |
import statsmodels.tsa as tsa | |
def solver_leastSquare(A,y): | |
return nl.lstsq(A,y); | |
def read_data(filename): | |
x=[] | |
with open(filename,"r") as input_file: | |
lines=input_file.readlines() | |
for line in lines: | |
line_splitted= line.split() | |
x.append([float(y) for y in line_splitted]) | |
x=np.array(x) | |
# print x.shape | |
return x | |
def construct_from_one_sequence(seq,lag): | |
''' | |
seq is a matrix with 3 rows as x,y,z. 128 columns | |
''' | |
xt_seq=seq[0,lag:] | |
yt_seq=seq[1,lag:] | |
zt_seq=seq[2,lag:] | |
A=np.zeros((128-lag,3*lag)); | |
for i in range(lag,128): | |
for j in range(lag): | |
# print i,j | |
A[i-lag,3*j]=seq[0,i-j-1]; | |
A[i-lag,3*j+1]=seq[1,i-j-1]; | |
A[i-lag,3*j+2]=seq[2,i-j-1]; | |
# A[i-lag,4*j+3]=1; | |
return A,xt_seq,yt_seq,zt_seq | |
def solver_coefficients(seq_list,lag): | |
num_seqs=len(seq_list) | |
xt_seq_all=np.zeros(num_seqs*(128-lag)) | |
yt_seq_all=np.zeros(num_seqs*(128-lag)) | |
zt_seq_all=np.zeros(num_seqs*(128-lag)) | |
A=np.zeros((num_seqs*(128-lag),3*lag+1)) | |
A[:,-1]=np.ones(num_seqs*(128-lag)) | |
for seq_index,seq in enumerate(seq_list): | |
A_seq,xt_seq,yt_seq,zt_seq=construct_from_one_sequence(seq,lag); | |
A[seq_index*(128-lag):(seq_index+1)*(128-lag),:-1]=A_seq | |
xt_seq_all[seq_index*(128-lag):(seq_index+1)*(128-lag)]=xt_seq | |
yt_seq_all[seq_index*(128-lag):(seq_index+1)*(128-lag)]=yt_seq | |
zt_seq_all[seq_index*(128-lag):(seq_index+1)*(128-lag)]=zt_seq | |
coeff_Y=np.array([xt_seq_all,yt_seq_all,zt_seq_all]) | |
coeff_Y=np.transpose(coeff_Y) | |
# print A; | |
# print coeff_Y; | |
coeff,res,rank,s=solver_leastSquare(A,coeff_Y); | |
# print A.shape,coeff_Y.shape,coeff.shape; | |
# print np.dot(A,coeff).shape | |
return coeff # return a | |
def predict(seq_list,lag,coeff_walking,coeff_walking_up,coeff_walking_down,coeff_sitting): | |
def cal_error(type_coeff): | |
bigA=np.zeros(((128-lag,3*lag+1))) | |
bigA[:,:-1]=A | |
bigA[:,-1]=np.ones(128-lag) | |
pred=np.dot(bigA,type_coeff) | |
# print pred.shape | |
# print pred[0,:],xt_seq[0],yt_seq[0],zt_seq[0] | |
x_error=sum((pred[:,0]-xt_seq)**2) | |
y_error=sum((pred[:,1]-yt_seq)**2) | |
z_error=sum((pred[:,2]-zt_seq)**2) | |
return z_error+y_error+x_error | |
seqs_type=[] | |
for seq in seq_list: | |
A,xt_seq,yt_seq,zt_seq=construct_from_one_sequence(seq,lag); | |
err_walking=cal_error(coeff_walking) | |
err_walking_up=cal_error(coeff_walking_up) | |
err_walking_down=cal_error(coeff_walking_down) | |
err_sitting=cal_error(coeff_sitting) | |
# print([err_walking,err_walking_up,err_walking_down,err_sitting]) | |
seqs_type.append(np.argmin([err_walking,err_walking_up,err_walking_down,err_sitting])) | |
print seqs_type | |
return seqs_type | |
def assembleData(x_data,y_data,z_data,index): | |
mid=int(len(index)/2) | |
training_list=[] | |
testing_list=[] | |
for i in index[:mid]: | |
xyz_seq=np.array([x_data[i,:],y_data[i,:],z_data[i,:]]); | |
training_list.append(xyz_seq); | |
for i in index[mid:]: | |
xyz_seq=np.array([x_data[i,:],y_data[i,:],z_data[i,:]]); | |
testing_list.append(xyz_seq); | |
return training_list,testing_list | |
def main(): | |
lag=5 | |
x_data=read_data("./UCI HAR Dataset/test/Inertial Signals/total_acc_x_test.txt"); | |
y_data=read_data("./UCI HAR Dataset/test/Inertial Signals/total_acc_y_test.txt"); | |
z_data=read_data("./UCI HAR Dataset/test/Inertial Signals/total_acc_z_test.txt"); | |
label=read_data("./UCI HAR Dataset/test/y_test.txt"); | |
# print label | |
walking_index=np.where(label==1)[0] | |
walking_up_index=np.where(label==2)[0] | |
walking_down_index=np.where(label==3)[0] | |
sitting_index=np.where(label==4)[0] | |
# print walking_index | |
np.random.seed(1) | |
np.random.shuffle(list(walking_index)) | |
np.random.shuffle(list(walking_up_index)) | |
np.random.shuffle(list(walking_down_index)) | |
np.random.shuffle(list(sitting_index)) | |
training_walking_seqlist,testing_walking_seqlist=assembleData(x_data,y_data,z_data,walking_index) | |
training_walking_up_seqlist,testing_walking_up_seqlist=assembleData(x_data,y_data,z_data,walking_up_index) | |
training_walking_down_seqlist,testing_walking_down_seqlist=assembleData(x_data,y_data,z_data,walking_down_index) | |
training_sitting_seqlist,testing_sitting_seqlist=assembleData(x_data,y_data,z_data,sitting_index) | |
coeff_walking=solver_coefficients(training_walking_seqlist,lag) | |
coeff_walking_up=solver_coefficients(training_walking_up_seqlist,lag) | |
coeff_walking_down=solver_coefficients(training_walking_down_seqlist,lag) | |
coeff_sitting=solver_coefficients(training_sitting_seqlist,lag) | |
walking_predicted=predict(testing_walking_seqlist,lag,coeff_walking,coeff_walking_up,coeff_walking_down,coeff_sitting) | |
walking_up_predicted=predict(testing_walking_up_seqlist,lag,coeff_walking,coeff_walking_up,coeff_walking_down,coeff_sitting) | |
walking_down_predicted=predict(testing_walking_down_seqlist,lag,coeff_walking,coeff_walking_up,coeff_walking_down,coeff_sitting) | |
sitting_predicted=predict(testing_sitting_seqlist,lag,coeff_walking,coeff_walking_up,coeff_walking_down,coeff_sitting) | |
confusionMatrix=np.zeros((4,4)) | |
for i in walking_predicted: | |
confusionMatrix[0,i]+=1 | |
for i in walking_up_predicted: | |
confusionMatrix[1,i]+=1 | |
for i in walking_down_predicted: | |
confusionMatrix[2,i]+=1 | |
for i in sitting_predicted: | |
confusionMatrix[3,1]+=1 | |
print confusionMatrix | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment