Skip to content

Instantly share code, notes, and snippets.

@abhaystoic
Last active May 13, 2016 09:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abhaystoic/b63a28c86c07fb9ade566cd37e405430 to your computer and use it in GitHub Desktop.
Save abhaystoic/b63a28c86c07fb9ade566cd37e405430 to your computer and use it in GitHub Desktop.
A prototype to track human movement
__author__ = 'Abhay Gupta'
__version__ = 0.1
import pyhs2
import time
import MySQLdb
import datetime
import sys
import pytz
def DATE_TIME():
return datetime.datetime.now(pytz.timezone('Asia/Calcutta'))
def FORMATTED_TIME():
return datetime.datetime.strptime(str(DATE_TIME()).split('.')[0], '%Y-%m-%d %H:%M:%S')
#Spinner
def DrawSpinner(counter):
if counter % 4 == 0:
sys.stdout.write("/")
elif counter % 4 == 1:
sys.stdout.write("-")
elif counter % 4 == 2:
sys.stdout.write("\\")
elif counter % 4 == 3:
sys.stdout.write("|")
sys.stdout.flush()
sys.stdout.write('\b')
#Generator
def neighourhood(iterable):
iterator = iter(iterable)
prev = None
item = iterator.next() # throws StopIteration if empty.
for next in iterator:
yield (prev,item,next)
prev = item
item = next
yield (prev,item,None)
#hive cursor
def get_cursor():
conn = pyhs2.connect(host='',
port=10000,
authMechanism="PLAIN",
user='hadoop',
password='',
database='test')
return conn.cursor()
def get_mysql_cursor():
conn = MySQLdb.connect(user='db', passwd='',
host='',
db='test')
return conn.cursor()
def get_records():
cur = get_cursor()
cur.execute("select * from user_location_history")
#Fetch table results
return cur.fetchall()
def get_user_movement():
#Initializing
location_dict = {}
#Fetching all the records
records = get_records()
counter = 0
for record in records:
counter = counter + 1
DrawSpinner(counter)
if location_dict.has_key(record[1]):
location_dict[record[1]].append(int(record[3]))
else:
location_dict[record[1]] = [int(record[3])]
return location_dict
#For performance improvements use list instead of dictionary as we don't need count of the users here
def prepare_movement_data():
#Initializing
city_movement_data_dict = {}
user_location_dict = get_user_movement()
counter = 0
for user_id, user_movement_path in user_location_dict.iteritems():
counter = counter + 1
DrawSpinner(counter)
if len(set(user_movement_path)) > 1:
if city_movement_data_dict.has_key(tuple(user_movement_path)):
city_movement_data_dict[tuple(user_movement_path)] = city_movement_data_dict[tuple(user_movement_path)] + 1
else:
city_movement_data_dict[tuple(user_movement_path)] = 1
return city_movement_data_dict
def store_mining_results(unique_movement_map_tuple):
sql_query = None
insert_flag = False
update_flag = False
cur = get_mysql_cursor()
for prev, current, next in neighourhood(unique_movement_map_tuple):
#Execute query
cur.execute('select * from user_flow_location')
#Handling the empty table case
fetched_data = cur.fetchall()
if len(fetched_data) == 0 and prev is not None and current is not None and prev != current and current != next:
sql_query = 'insert into user_flow_location(location_from, location_to, count)\
values('+ str(prev) + ',' + str(current) + ', 1 )'
cur.execute(sql_query)
else:
insert_flag = False
update_flag = False
for record in fetched_data:
if record[2] == prev and record[3] == current:
update_flag = True
sql_query = 'update user_flow_location set count = ' + str(record[3] + 1) +\
' where id = ' + str(record[0])
cur.execute(sql_query)
elif prev is not None and current is not None and prev != current and current != next:
insert_flag = True
if update_flag == False and insert_flag == True:
#Insert only if the entry doesn't exists
#A quick fix. It can be improved later.
cur2 = get_mysql_cursor()
cur2.execute('select * from user_flow_location where location_from = ' +\
str(prev) + ' and location_to = ' + str(current))
if len(cur2.fetchall()) == 0:
sql_query = 'insert into user_flow_location\
(location_from, location_to, count)\
values('+ str(prev) + ',' + str(current)+', 1)'
cur.execute(sql_query)
cur2.close()
cur.close()
if __name__ == '__main__':
start_time = time.time()
#spinner = spinning_cursor()
print 'Preparing data...'
city_movement_data_dict = prepare_movement_data()
sys.stdout.flush()
sys.stdout.write('\b')
print '\nData prepared for mining.'
print 'Processing data and storing results...'
counter = 0
for unique_movement_map, unique_user_count in city_movement_data_dict.iteritems():
#print str(unique_movement_map), ' : ', str(unique_user_count), ' user(s) relocated through this path'
store_mining_results(unique_movement_map)
counter = counter + 1
DrawSpinner(counter)
sys.stdout.flush()
sys.stdout.write('\b')
end_time = time.time()
print '\nData mining complete!'
print '\nTotal execution time = ', str(end_time - start_time), ' seconds\n'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment