Skip to content

Instantly share code, notes, and snippets.

@wangzaixiang
Created November 21, 2012 08:05
Show Gist options
  • Save wangzaixiang/4123733 to your computer and use it in GitHub Desktop.
Save wangzaixiang/4123733 to your computer and use it in GitHub Desktop.
ForkJoin/MapReduce Demo Code
# -*- coding: utf-8
'''
从数据源同步获取Channel在线的数据,保存到mongodb中
这个脚本一般以crontab方式运行,每30秒运行一次
@author: wangzaixiang@gmail.com
'''
from app_logger import Logger, set_logging_path
from db import yy_mysql
import threading
import time
import pymongo
from forkjoin import ForkJoin
from map_reduce import map_reduce
from timeit import itertools
#todo change to Configuration
db_session_list = [
{"host":"1.2.3.4:3306", "db":"appDaemon",
"user":"user", "password":"password" },
{"host":"2.3.4.5:3306", "db":"appDaemon",
"user":"user", "password":"password" }
]
def _query(name, sql, host, database, user, password):
def totuple(row):
ts = int(time.mktime( row["sync_date"].timetuple()))
ts = ts - (ts % 60)
users = int(row["users"])
sessions = int(row["sessions"])
return (ts, users, sessions)
for i in range(0,3): # retry 3 times
try:
conn = yy_mysql.Connection(host, database, user, password, connect_timeout=10)
rows = conn.query(sql)
tuples = map(totuple, rows)
Logger().debug("%s read data ok: %s", name, tuples)
return tuples
break
except:
Logger().exception("query database failed")
def __query_datasource():
sql = """
select sync_date, sum(totalUser) as users, count(totalUser) as sessions
from tbl_session_his
where sync_date >= (
select date_add( (select max(sync_date) from tbl_session_his), interval -5 minute)
)
group by sync_date
order by sync_date
"""
forkjoin = ForkJoin()
for db in db_session_list:
forkjoin.fork(target=_query, kwargs=dict(
name=db["host"], sql=sql,
host=db["host"], database=db["db"], user=db["user"], password=db["password"]
)
)
results = forkjoin.join(30)
Logger().info("query databases, result: %d", len(results) )
return results;
def __save(mongodb, records):
records = sorted(records, cmp = lambda r1,r2: r1[0] - r2[0])
for (ts, users, sessions) in records:
Logger().info("save channel: %d, %d, %d", ts, users, sessions)
try:
mongodb.online2["sum_channel_user"].update(
{'_id':ts, "val": {"$lt": users } },
{"$set": {"val": users, "sessions": sessions} },
upsert=True, safe=True)
except pymongo.errors.DuplicateKeyError: # 已经存在,且无需更新
pass
except:
Logger().exception("save channel: %d, %d, %d failed", ts, users, sessions)
"""
删除最近1分钟的数据, 频道数据比较准时,YY一般需要等待超过1分钟
"""
def __removeLatest(records):
now = time.time()
return [ rec for rec in records if now - rec[0] >= 60 ]
def channelonline_sync():
Logger().info("ChannelOnline Data Sync Begin...")
mongodb = pymongo.Connection("127.0.0.1", 27017)
results = __query_datasource()
"""[ (time, sessionCount, totalUsers) .. ]
"""
merged = map_reduce(itertools.chain(*results),
index = lambda row:(row[0],row),
mapper = lambda key, row:[(key,row)],
reducer = lambda key, rows: (key, max([row[1] for row in rows]), max([row[2] for row in rows]))
)
result = __removeLatest(merged.values())
__save(mongodb, result)
Logger().info("ChannelOnline Data Completed\n")
if __name__ == "__main__":
set_logging_path("logs/channelonline_sync.log")
channelonline_sync()
# -*- coding: utf-8
'''
提供模拟的fork join编程模式
@author: wangzaixiang@gmail.com
'''
import threading
import time
import itertools
class ForkJoin:
def __init__(self):
self.condition = threading.Condition()
self.condition.acquire();
self.threads = []
self.results = []
pass
def fork(self, target, args=(), kwargs=None):
def run():
self.threads.append(threading.currentThread())
ret = target(*args, **kwargs)
if ret != None:
self.results.append(ret)
self.threads.remove(threading.currentThread())
self.condition.acquire()
self.condition.notify()
self.condition.release()
if kwargs == None: kwargs = {}
thread = threading.Thread(target = run)
thread.start()
def join(self, seconds):
begin = time.time()
end = begin + seconds
while len(self.threads) > 0:
remain = end - time.time()
if remain <= 0:
break
self.condition.wait(remain)
self.condition.release()
return self.results
if __name__ == "__main__":
# Pie = sum( (-1)^n/(2*n+1) )
def task(begin, end):
return sum([ float(pow(-1, n)) / (2*n+1) for n in range(begin, end) ])
begin = time.time()
fork_join = ForkJoin()
for i in range(0, 5):
loop = 1000000
fork_join.fork(task, args=(i*loop,i*loop+loop))
results = fork_join.join(30)
end = time.time()
print "results = %s, pie=%f time=%d" % (results, 4*sum(itertools.chain(results)), end-begin)
pass
# -*- coding: utf-8
'''
简单的map reduce框架
@author: wangzaixiang@gmail.com
'''
def map_reduce(rows, index, mapper, reducer):
"""
rows: [ row0, row1, .. ]
index: lambda row: (k1,v1)
mapper: lambda k,v: [(k1, v2), (k2, v2)]
reducer: lambda k, vals: val
"""
group = {}
# group by key
for row in rows:
(_key, _value) = index(row)
for (k,v) in mapper(_key,_value):
if k in group:
group[k].append(v)
else:
group[k] = [v]
reduced = {}
for (key,vals) in group.items():
reduced[key] = reducer(key, vals)
return reduced
if __name__ == "__main__":
array = [ {"_id":1, "value":100},
{"_id":2, "value":200},
{"_id":3, "value":300},
{"_id":1, "value":103},
{"_id":2, "value":195},
{"_id":3, "value":320},
]
results = map_reduce(
array,
index = lambda r: (r["_id"],r),
mapper = lambda k,v: [(k,v)],
reducer = lambda k,vals: max(vals, key=lambda val: val["value"])
)
print results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment