Skip to content

Instantly share code, notes, and snippets.

@summer-liu
Created December 17, 2015 06:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save summer-liu/b376396e9f27ab4c1363 to your computer and use it in GitHub Desktop.
Save summer-liu/b376396e9f27ab4c1363 to your computer and use it in GitHub Desktop.
from pymongo import MongoClient
import math
import datetime
import calendar
import multiprocessing
import threading
NUM_OF_PROCESS = 6
NUM_OF_WORKERS = 10
START_DATE = datetime.datetime(2015, 2,8 ,0,0)
END_DATE = datetime.datetime(2015,12, 18,0,0)
START_TIMESTAMP = calendar.timegm(START_DATE.utctimetuple())
END_TIMESTAMP = calendar.timegm(END_DATE.utctimetuple())
n = 0
def assign():
global n
if n < num_of_hours:
n += 1
return n
else:
return 'done'
def process(start, end):
global points
global userAttr
global deviceAttr
global eventFlow
global mobile
db = MongoClient('10.8.8.111:27017')['yangcong-prod25']
points = db['points']
userAttr = db['userAttrCache']
deviceAttr = db['deviceAttrCache']
eventFlow = db['eventFlow']
mobile = {'android': 'imei', 'ios': 'idfa'}
global num_of_hours
global beginning
num_of_hours = int((end - start) / 3600)
print multiprocessing.current_process().name + ' total hours: %d' % num_of_hours
beginning = start
threads = []
for i in range(NUM_OF_WORKERS):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
for t in threads:
t.join()
print multiprocessing.current_process().name + 'done.'
def worker():
finish = False
while not finish:
res = assign()
if res == 'done':
finish = True
else:
start_timestamp = beginning + (res-1)*3600
end_timestamp = beginning + res*3600
start_time = datetime.datetime.fromtimestamp(start_timestamp) - datetime.timedelta(hours=8)
end_time = datetime.datetime.fromtimestamp(end_timestamp) - datetime.timedelta(hours=8)
print multiprocessing.current_process().name + ' ' + threading.current_thread().name + ' processing points data from ' + str(start_time) + ' to ' + str(end_time) + '......\n'
# eventFlow: device, user, startTime, endTime, eventFlow
pipeline = [
{
"$match": {
"createdBy":{"$gte":start_time, "$lt":end_time}
}
},
{
"$sort": {
"createdBy": 1
}
},
{
"$group": {
"_id": {"device": {"$ifNull":["$header.imei", "$header.idfa"]}, "user":"$user"},
"startTime": {"$first": "$createdBy"},
"endTime": {"$last": "$createdBy"},
"eventFlow": {"$push": "$eventKey"}
}
},
{
"$project": {
"_id": 0,
"device": "$_id.device",
"user": "$_id.user",
"startTime":1 ,
"endTime": 1,
"eventFlow": 1
}
}
]
event_flow = list(points.aggregate(pipeline))
if len(event_flow):
eventFlow.insert_many(event_flow)
# deviceAttr: device, activateDate, recentSession, platform, users
pipeline_2 = [
{
"$match": {
"from": {"$in": ["android", "ios"]},
"createdBy":{"$gte":start_time, "$lt":end_time}
}
},
{
"$group": {
"_id": {"$ifNull":["$header.imei", "$header.idfa"]},
"activateDate": {"$min": "$createdBy"},
"recentSession": {"$max": "$createdBy"},
"users": {"$addToSet": "$user"},
"platform": {"$first": "$from"}
}
}
]
device = list(points.aggregate(pipeline_2))
for d in device:
deviceId = d['_id']
dd = deviceAttr.find_one({"device": deviceId})
if dd:
activate_date = min(d['activateDate'], dd['activateDate'])
recent_session = max(d['recentSession'], dd['recentSession'])
users = list(set(dd['users'] + d['users'])) if dd['users'] else d['users']
else:
activate_date = d['activateDate']
recent_session = d['recentSession']
users = d['users']
deviceAttr.find_one_and_replace({"device": deviceId}, {
"device": deviceId,
"activateDate": activate_date,
"recentSession": recent_session,
"platform": d['platform'],
"users": users
}, upsert=True)
# userAttr: user, activateDate, recentPCSession , recentMobileSession
pipeline_pc = [
{
"$match":{
"createdBy":{"$gte":start_time, "$lt":end_time},
"from": 'pc',
"user": {"$exists": True}
}
},
{
"$group": {
"_id": "$user",
"activateDate": {"$min": "$createdBy"},
"recentPCSession": {"$max": "$createdBy"}
}
}
]
pipeline_mobile = [
{
"$match":{
"createdBy":{"$gte":start_time, "$lt":end_time},
"from": {"$in":['android','ios']},
"user": {"$exists": True}
}
},
{
"$group": {
"_id":"$user",
"activateDate": {"$min": "$createdBy"},
"recentMobileSession": {"$max": "$createdBy"}
}
}
]
users_pc = list(points.aggregate(pipeline_pc))
users_mobile = list(points.aggregate(pipeline_mobile))
for u in users_pc:
userId = u['_id']
user = userAttr.find_one({"user": userId})
if user:
activate_date = min(u['activateDate'], user['activateDate'])
recent_pc_session = max(u['recentPCSession'], user['recentPCSession']) if user['recentPCSession'] else u['recentPCSession']
else:
activate_date = u['activateDate']
recent_pc_session = u['recentPCSession']
userAttr.find_one_and_replace({'user': userId}, {
"user": userId,
"activateDate": activate_date,
"recentPCSession": recent_pc_session,
"recentMobileSession": user['recentMobileSession'] if (user and user['recentMobileSession']) else None
}, upsert=True)
for u in users_mobile:
userId = u['_id']
user = userAttr.find_one({"user": userId})
if user:
activate_date = min(u['activateDate'], user['activateDate'])
recent_mobile_session = max(u['recentMobileSession'], user['recentMobileSession']) if user['recentMobileSession'] else u['recentMobileSession']
else:
activate_date = u['activateDate']
recent_mobile_session = u['recentMobileSession']
userAttr.find_one_and_replace({'user': userId}, {
"user": userId,
"activateDate": activate_date,
"recentPCSession": user['recentPCSession'] if (user and user['recentPCSession']) else None ,
"recentMobileSession": recent_mobile_session
}, upsert=True)
if __name__ == '__main__':
print "Start cache script......"
pool = multiprocessing.Pool(processes = NUM_OF_PROCESS)
interval = (END_TIMESTAMP - START_TIMESTAMP) / NUM_OF_PROCESS
for i in range(NUM_OF_PROCESS):
pool.apply_async(process, (START_TIMESTAMP + i*interval, START_TIMESTAMP + (i+1)*interval))
pool.close()
pool.join()
print 'Cache script done.......'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment