Created
October 28, 2014 06:19
-
-
Save seckcoder/ba0a4da718dd6fe578ad to your computer and use it in GitHub Desktop.
cloud computing mysql bulk load. Fuck you cloud computing!!!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
__author__ = 'seckcoder' | |
import sys | |
import json | |
from mysql.connector.pooling import MySQLConnectionPool | |
import traceback | |
from mysql.connector.conversion import MySQLConverter | |
def loadConfig(): | |
with open("./mysql_mapper_config_cached.json", "r") as fin: | |
return json.load(fin) | |
def packIter(handle, num=10, fin=sys.stdin): | |
items = [] | |
for line in fin: | |
item = handle(line) | |
if item: | |
items.append(item) | |
if len(items) >= num: | |
yield items | |
items = [] | |
if len(items) > 0: | |
yield items | |
def print_e(*args): | |
print ''.join(traceback.format_exception(*sys.exc_info())) | |
def processLine(line): | |
line = line.strip() | |
if line: | |
uid, created_at, tid, score, cs_text = line.split('\t', 4) | |
try: | |
uid = int(uid) | |
created_at = int(created_at) | |
cs_text = cs_text.decode("unicode-escape").encode("utf-8") | |
rst = ":".join((tid, score, cs_text)) | |
return (uid, created_at, rst) | |
except Exception as e: | |
print_e(e) | |
return () | |
def itemIter(handle, fin=sys.stdin): | |
for line in fin: | |
item = handle(line) | |
if item: | |
yield item | |
def getUidsAndItems(items): | |
uids_items = {} | |
for item in items: | |
try: | |
uid = int(item[0]) | |
if uids_items.has_key(uid): | |
uids_items[uid].append(item) | |
else: | |
uids_items[uid] = [item] | |
except Exception as e: | |
print_e(e) | |
return uids_items | |
def insertItems(cluster, items): | |
conn_items = cluster.nextConns(getUidsAndItems(items)) | |
for conn, items in conn_items: | |
try: | |
cursor = conn.cursor() | |
quote = conn.converter.quote | |
escape = conn.converter.escape | |
to_mysql = conn.converter.to_mysql | |
# for item in items: | |
# print quote(escape(to_mysql(item[-1]))) | |
sql = "insert into Tweet values (%s, %s, %s)" | |
cursor.execute("set NAMES utf8mb4") # support for utf8mb4 | |
cursor.executemany(sql, items) | |
conn.commit() | |
cursor.close() | |
except Exception as e: | |
print_e(e) | |
finally: | |
conn.close() | |
config = loadConfig() | |
class PoolCluster(object): | |
base_config = { | |
"pool_size" : config["mysql_pool_concurrency"], | |
"database" : config["database"], | |
"user" : config["user"], | |
"password" : config["password"], | |
"port" : config["port"], | |
"charset" : "utf8" | |
} | |
def __init__(self, configs): | |
self.pools = [] | |
for config in configs: | |
cf = self.base_config.copy() | |
cf.update(config) | |
self.pools.append(MySQLConnectionPool(**cf)) | |
def nextConn(self, key): | |
idx = key % len(self.pools) | |
return self.pools[idx].get_connection() | |
def nextConns(self, keys_items): | |
conn_dict = {} | |
for key,items in keys_items.iteritems(): | |
idx = key % len(self.pools) | |
if conn_dict.has_key(idx): | |
conn_dict[idx][1].extend(items) | |
else: | |
try: | |
conn_dict[idx] = [self.pools[idx].get_connection(), items] | |
except Exception as e: | |
print_e(e) | |
return conn_dict.values() | |
def main(): | |
cluster = PoolCluster(({"host": serv} for serv in config["servers"])) | |
try: | |
for items in packIter(processLine, config["pack_num"]): | |
insertItems(cluster, items) | |
except Exception as e: | |
print_e(e) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment