Skip to content

Instantly share code, notes, and snippets.

@stormxuwz
Created February 25, 2015 06:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stormxuwz/765b30bb3c64607981cb to your computer and use it in GitHub Desktop.
Save stormxuwz/765b30bb3c64607981cb to your computer and use it in GitHub Desktop.
Activity recognition by VAR model (HAR dataset)
import statsmodels.tsa as st
import numpy as np
import numpy.linalg as nl
import statsmodels.tsa as tsa
def solver_leastSquare(A,y):
return nl.lstsq(A,y);
def read_data(filename):
x=[]
with open(filename,"r") as input_file:
lines=input_file.readlines()
for line in lines:
line_splitted= line.split()
x.append([float(y) for y in line_splitted])
x=np.array(x)
# print x.shape
return x
def construct_from_one_sequence(seq,lag):
'''
seq is a matrix with 3 rows as x,y,z. 128 columns
'''
xt_seq=seq[0,lag:]
yt_seq=seq[1,lag:]
zt_seq=seq[2,lag:]
A=np.zeros((128-lag,3*lag));
for i in range(lag,128):
for j in range(lag):
# print i,j
A[i-lag,3*j]=seq[0,i-j-1];
A[i-lag,3*j+1]=seq[1,i-j-1];
A[i-lag,3*j+2]=seq[2,i-j-1];
# A[i-lag,4*j+3]=1;
return A,xt_seq,yt_seq,zt_seq
def solver_coefficients(seq_list,lag):
num_seqs=len(seq_list)
xt_seq_all=np.zeros(num_seqs*(128-lag))
yt_seq_all=np.zeros(num_seqs*(128-lag))
zt_seq_all=np.zeros(num_seqs*(128-lag))
A=np.zeros((num_seqs*(128-lag),3*lag+1))
A[:,-1]=np.ones(num_seqs*(128-lag))
for seq_index,seq in enumerate(seq_list):
A_seq,xt_seq,yt_seq,zt_seq=construct_from_one_sequence(seq,lag);
A[seq_index*(128-lag):(seq_index+1)*(128-lag),:-1]=A_seq
xt_seq_all[seq_index*(128-lag):(seq_index+1)*(128-lag)]=xt_seq
yt_seq_all[seq_index*(128-lag):(seq_index+1)*(128-lag)]=yt_seq
zt_seq_all[seq_index*(128-lag):(seq_index+1)*(128-lag)]=zt_seq
coeff_Y=np.array([xt_seq_all,yt_seq_all,zt_seq_all])
coeff_Y=np.transpose(coeff_Y)
# print A;
# print coeff_Y;
coeff,res,rank,s=solver_leastSquare(A,coeff_Y);
# print A.shape,coeff_Y.shape,coeff.shape;
# print np.dot(A,coeff).shape
return coeff # return a
def predict(seq_list,lag,coeff_walking,coeff_walking_up,coeff_walking_down,coeff_sitting):
def cal_error(type_coeff):
bigA=np.zeros(((128-lag,3*lag+1)))
bigA[:,:-1]=A
bigA[:,-1]=np.ones(128-lag)
pred=np.dot(bigA,type_coeff)
# print pred.shape
# print pred[0,:],xt_seq[0],yt_seq[0],zt_seq[0]
x_error=sum((pred[:,0]-xt_seq)**2)
y_error=sum((pred[:,1]-yt_seq)**2)
z_error=sum((pred[:,2]-zt_seq)**2)
return z_error+y_error+x_error
seqs_type=[]
for seq in seq_list:
A,xt_seq,yt_seq,zt_seq=construct_from_one_sequence(seq,lag);
err_walking=cal_error(coeff_walking)
err_walking_up=cal_error(coeff_walking_up)
err_walking_down=cal_error(coeff_walking_down)
err_sitting=cal_error(coeff_sitting)
# print([err_walking,err_walking_up,err_walking_down,err_sitting])
seqs_type.append(np.argmin([err_walking,err_walking_up,err_walking_down,err_sitting]))
print seqs_type
return seqs_type
def assembleData(x_data,y_data,z_data,index):
mid=int(len(index)/2)
training_list=[]
testing_list=[]
for i in index[:mid]:
xyz_seq=np.array([x_data[i,:],y_data[i,:],z_data[i,:]]);
training_list.append(xyz_seq);
for i in index[mid:]:
xyz_seq=np.array([x_data[i,:],y_data[i,:],z_data[i,:]]);
testing_list.append(xyz_seq);
return training_list,testing_list
def main():
lag=5
x_data=read_data("./UCI HAR Dataset/test/Inertial Signals/total_acc_x_test.txt");
y_data=read_data("./UCI HAR Dataset/test/Inertial Signals/total_acc_y_test.txt");
z_data=read_data("./UCI HAR Dataset/test/Inertial Signals/total_acc_z_test.txt");
label=read_data("./UCI HAR Dataset/test/y_test.txt");
# print label
walking_index=np.where(label==1)[0]
walking_up_index=np.where(label==2)[0]
walking_down_index=np.where(label==3)[0]
sitting_index=np.where(label==4)[0]
# print walking_index
np.random.seed(1)
np.random.shuffle(list(walking_index))
np.random.shuffle(list(walking_up_index))
np.random.shuffle(list(walking_down_index))
np.random.shuffle(list(sitting_index))
training_walking_seqlist,testing_walking_seqlist=assembleData(x_data,y_data,z_data,walking_index)
training_walking_up_seqlist,testing_walking_up_seqlist=assembleData(x_data,y_data,z_data,walking_up_index)
training_walking_down_seqlist,testing_walking_down_seqlist=assembleData(x_data,y_data,z_data,walking_down_index)
training_sitting_seqlist,testing_sitting_seqlist=assembleData(x_data,y_data,z_data,sitting_index)
coeff_walking=solver_coefficients(training_walking_seqlist,lag)
coeff_walking_up=solver_coefficients(training_walking_up_seqlist,lag)
coeff_walking_down=solver_coefficients(training_walking_down_seqlist,lag)
coeff_sitting=solver_coefficients(training_sitting_seqlist,lag)
walking_predicted=predict(testing_walking_seqlist,lag,coeff_walking,coeff_walking_up,coeff_walking_down,coeff_sitting)
walking_up_predicted=predict(testing_walking_up_seqlist,lag,coeff_walking,coeff_walking_up,coeff_walking_down,coeff_sitting)
walking_down_predicted=predict(testing_walking_down_seqlist,lag,coeff_walking,coeff_walking_up,coeff_walking_down,coeff_sitting)
sitting_predicted=predict(testing_sitting_seqlist,lag,coeff_walking,coeff_walking_up,coeff_walking_down,coeff_sitting)
confusionMatrix=np.zeros((4,4))
for i in walking_predicted:
confusionMatrix[0,i]+=1
for i in walking_up_predicted:
confusionMatrix[1,i]+=1
for i in walking_down_predicted:
confusionMatrix[2,i]+=1
for i in sitting_predicted:
confusionMatrix[3,1]+=1
print confusionMatrix
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment