Skip to content

Instantly share code, notes, and snippets.

@seckcoder
Created October 28, 2014 06:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save seckcoder/ba0a4da718dd6fe578ad to your computer and use it in GitHub Desktop.
Save seckcoder/ba0a4da718dd6fe578ad to your computer and use it in GitHub Desktop.
cloud computing mysql bulk load. Fuck you cloud computing!!!
#!/usr/bin/env python
__author__ = 'seckcoder'
import sys
import json
from mysql.connector.pooling import MySQLConnectionPool
import traceback
from mysql.connector.conversion import MySQLConverter
def loadConfig():
with open("./mysql_mapper_config_cached.json", "r") as fin:
return json.load(fin)
def packIter(handle, num=10, fin=sys.stdin):
items = []
for line in fin:
item = handle(line)
if item:
items.append(item)
if len(items) >= num:
yield items
items = []
if len(items) > 0:
yield items
def print_e(*args):
print ''.join(traceback.format_exception(*sys.exc_info()))
def processLine(line):
line = line.strip()
if line:
uid, created_at, tid, score, cs_text = line.split('\t', 4)
try:
uid = int(uid)
created_at = int(created_at)
cs_text = cs_text.decode("unicode-escape").encode("utf-8")
rst = ":".join((tid, score, cs_text))
return (uid, created_at, rst)
except Exception as e:
print_e(e)
return ()
def itemIter(handle, fin=sys.stdin):
for line in fin:
item = handle(line)
if item:
yield item
def getUidsAndItems(items):
uids_items = {}
for item in items:
try:
uid = int(item[0])
if uids_items.has_key(uid):
uids_items[uid].append(item)
else:
uids_items[uid] = [item]
except Exception as e:
print_e(e)
return uids_items
def insertItems(cluster, items):
conn_items = cluster.nextConns(getUidsAndItems(items))
for conn, items in conn_items:
try:
cursor = conn.cursor()
quote = conn.converter.quote
escape = conn.converter.escape
to_mysql = conn.converter.to_mysql
# for item in items:
# print quote(escape(to_mysql(item[-1])))
sql = "insert into Tweet values (%s, %s, %s)"
cursor.execute("set NAMES utf8mb4") # support for utf8mb4
cursor.executemany(sql, items)
conn.commit()
cursor.close()
except Exception as e:
print_e(e)
finally:
conn.close()
config = loadConfig()
class PoolCluster(object):
base_config = {
"pool_size" : config["mysql_pool_concurrency"],
"database" : config["database"],
"user" : config["user"],
"password" : config["password"],
"port" : config["port"],
"charset" : "utf8"
}
def __init__(self, configs):
self.pools = []
for config in configs:
cf = self.base_config.copy()
cf.update(config)
self.pools.append(MySQLConnectionPool(**cf))
def nextConn(self, key):
idx = key % len(self.pools)
return self.pools[idx].get_connection()
def nextConns(self, keys_items):
conn_dict = {}
for key,items in keys_items.iteritems():
idx = key % len(self.pools)
if conn_dict.has_key(idx):
conn_dict[idx][1].extend(items)
else:
try:
conn_dict[idx] = [self.pools[idx].get_connection(), items]
except Exception as e:
print_e(e)
return conn_dict.values()
def main():
cluster = PoolCluster(({"host": serv} for serv in config["servers"]))
try:
for items in packIter(processLine, config["pack_num"]):
insertItems(cluster, items)
except Exception as e:
print_e(e)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment