Skip to content

Instantly share code, notes, and snippets.

@mosquito
Created September 25, 2015 13:19
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mosquito/aa20b94fec8b406d9a51 to your computer and use it in GitHub Desktop.
Save mosquito/aa20b94fec8b406d9a51 to your computer and use it in GitHub Desktop.
python mysql connector performance
# encoding: utf-8
import uuid
import sys
from tornado.gen import coroutine, sleep
from tornado.concurrent import futures
from tornado.ioloop import IOLoop, PeriodicCallback
from cymysql import connect
from functools import wraps, partial
io_loop = IOLoop.current()
def db_exec(conn, q, x=0):
cursor = conn.cursor()
cursor.execute(q)
ret = cursor.fetchall()
cursor.close()
conn.commit()
if x:
sys.stdout.write('\r%04d ' % x)
return ret
CONN_PARAMS = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': '',
'db': 'test'
}
class ConnWraper(object):
def __init__(self, pool, busy):
self.__pool = pool
self.__busy = busy
self.__conn = pool.pop()
self.__busy.add(self.__conn)
def __enter__(self):
return self.__conn
def __exit__(self, exc_type, exc_val, exc_tb):
self.__busy.remove(self.__conn)
self.__pool.add(self.__conn)
class Pool(object):
def __init__(self, size, **kwargs):
self._size = size
self.__thread_pool = futures.ThreadPoolExecutor(self._size)
self.__pool = None
self.__busy = set()
self.__kwargs = kwargs
@coroutine
def run(self):
tp = self.__thread_pool
conns = set((yield [tp.submit(connect, **self.__kwargs) for _ in range(self._size)]))
self.__pool = conns
def call(self, func, *a, **kw):
return self.__thread_pool.submit(func, *a, **kw)
@coroutine
def get_conn(self):
while not self.__pool:
yield sleep(0.001)
return ConnWraper(self.__pool, self.__busy)
POOL = None
@coroutine
def prepare():
global POOL
POOL = Pool(30, **CONN_PARAMS)
yield POOL.run()
@coroutine
def test_db_sleep():
def run(conn):
cursor = conn.cursor()
try:
cursor.execute("select sleep(1)")
return cursor.fetchall()
finally:
cursor.close()
with (yield POOL.get_conn()) as conn:
result = yield POOL.call(run, conn)
return result
@coroutine
def test_table():
q = (
'CREATE TABLE IF NOT EXISTS `test`.`test_table` ( '
'`id` int NOT NULL STORAGE MEMORY AUTO_INCREMENT, '
'`key` varchar(20) NOT NULL STORAGE MEMORY, '
'`value` varchar(20) NOT NULL STORAGE MEMORY, '
'`int` bigint NOT NULL STORAGE MEMORY, '
'`text` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL STORAGE MEMORY, '
'`ts` datetime STORAGE MEMORY DEFAULT CURRENT_TIMESTAMP, '
'PRIMARY KEY (`id`) '
') ENGINE=`InnoDB` COMMENT="";'
)
with (yield POOL.get_conn()) as conn:
result = yield POOL.call(db_exec, conn, q)
with open('data.sql') as f:
for x, q in enumerate(f):
with (yield POOL.get_conn()) as conn:
result = yield POOL.call(db_exec, conn, q, x)
@coroutine
def test_table_select():
with (yield POOL.get_conn()) as conn:
result = yield POOL.call(db_exec, conn, 'select * from test_table limit 1000000')
print(len(result))
@coroutine
def test_table_select_one_by_one():
def one_by_one(conn, q):
cur = conn.cursor()
cur.execute(q)
c = 0
data = cur.fetchone()
while data:
data = cur.fetchone()
c += 1
return c
with (yield POOL.get_conn()) as conn:
result = yield POOL.call(one_by_one, conn, 'select * from test_table limit 1000000')
print(result)
@coroutine
def test():
results = yield [test_db_sleep() for _ in range(1800)]
if __name__ == '__main__':
flusher = PeriodicCallback(sys.stdout.flush, 1000)
io_loop.add_callback(flusher.start)
t = io_loop.time()
print(io_loop.run_sync(prepare))
print("Preparing time: {}".format(io_loop.time() - t))
t = io_loop.time()
print("Starting test: Table creation")
print(io_loop.run_sync(test_table))
print("Execution time: {}".format(io_loop.time() - t))
t = io_loop.time()
print("Starting test: Table select one by one")
print(io_loop.run_sync(test_table_select_one_by_one))
print("Execution time: {}".format(io_loop.time() - t))
t = io_loop.time()
print("Starting test: Table select")
print(io_loop.run_sync(test_table_select))
print("Execution time: {}".format(io_loop.time() - t))
t = io_loop.time()
print("Starting test: Sleep concurrency")
print(io_loop.run_sync(test))
print("Execution time: {}".format(io_loop.time() - t))
@coroutine
def cleanup():
with (yield POOL.get_conn()) as conn:
yield POOL.call(db_exec, conn, "DROP TABLE test_table")
print(io_loop.run_sync(cleanup))
Preparing time: 0.05134987831115723
Starting test: Table creation
9989 None
Execution time: 54.28241682052612
Starting test: Table select one by one
1000000
None
Execution time: 37.34457802772522
Starting test: Table select
1000000
None
Execution time: 43.45820212364197
Starting test: Sleep concurrency
None
Execution time: 69.3703100681305
# encoding: utf-8
import uuid
import sys
import tormysql
from tornado.gen import coroutine, sleep
from tornado.concurrent import futures
from tornado.ioloop import IOLoop, PeriodicCallback
from functools import wraps
io_loop = IOLoop.current()
CONN_PARAMS = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': '',
'db': 'test'
}
POOL = None
@coroutine
def prepare():
global POOL
POOL = tormysql.ConnectionPool(
max_connections=30,
idle_seconds=7200,
charset="utf8",
**CONN_PARAMS
)
@coroutine
def test_db_sleep():
conn = yield POOL.Connection()
cur = conn.cursor()
yield cur.execute("SELECT SLEEP(%s)", (1,))
io_loop.add_callback(cur.close)
io_loop.add_callback(conn.close)
return cur.fetchall()
@coroutine
def test_table():
q = (
'CREATE TABLE IF NOT EXISTS `test`.`test_table` ( '
'`id` int NOT NULL STORAGE MEMORY AUTO_INCREMENT, '
'`key` varchar(20) NOT NULL STORAGE MEMORY, '
'`value` varchar(20) NOT NULL STORAGE MEMORY, '
'`int` bigint NOT NULL STORAGE MEMORY, '
'`text` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL STORAGE MEMORY, '
'`ts` datetime STORAGE MEMORY DEFAULT CURRENT_TIMESTAMP, '
'PRIMARY KEY (`id`) '
') ENGINE=`InnoDB` COMMENT="";'
)
conn = yield POOL.Connection()
cursor = conn.cursor()
yield cursor.execute(q)
cursor.close()
conn.close()
with open('data.sql') as f:
for x, q in enumerate(f):
conn = yield POOL.Connection()
cur = conn.cursor()
yield cur.execute(q)
io_loop.add_callback(cur.close)
io_loop.add_callback(conn.close)
sys.stdout.write('\r%04d ' % x)
@coroutine
def test_table_select():
conn = yield POOL.Connection()
cur = conn.cursor()
yield cur.execute('select * from test_table limit 1000000')
data = cur.fetchall()
print(len(data))
io_loop.add_callback(cur.close)
io_loop.add_callback(conn.close)
@coroutine
def test_table_select_one_by_one():
conn = yield POOL.Connection()
cur = conn.cursor()
yield cur.execute('select * from test_table limit 1000000')
c = 0
data = cur.fetchone()
while data:
data = cur.fetchone()
c += 1
print(c)
io_loop.add_callback(cur.close)
io_loop.add_callback(conn.close)
@coroutine
def test():
results = yield [test_db_sleep() for _ in range(1800)]
if __name__ == '__main__':
flusher = PeriodicCallback(sys.stdout.flush, 1000)
io_loop.add_callback(flusher.start)
t = io_loop.time()
print(io_loop.run_sync(prepare))
print("Preparing time: {}".format(io_loop.time() - t))
t = io_loop.time()
print("Starting test: Table creation")
print(io_loop.run_sync(test_table))
print("Execution time: {}".format(io_loop.time() - t))
t = io_loop.time()
print("Starting test: Table select one by one")
print(io_loop.run_sync(test_table_select_one_by_one))
print("Execution time: {}".format(io_loop.time() - t))
t = io_loop.time()
print("Starting test: Table select")
print(io_loop.run_sync(test_table_select))
print("Execution time: {}".format(io_loop.time() - t))
t = io_loop.time()
print("Starting test: Sleep concurrency")
print(io_loop.run_sync(test))
print("Execution time: {}".format(io_loop.time() - t))
@coroutine
def cleanup():
conn = yield POOL.Connection()
cur = conn.cursor()
yield cur.execute("DROP TABLE test_table")
yield cur.close()
conn.close()
print(io_loop.run_sync(cleanup))
Preparing time: 0.00026798248291015625
Starting test: Table creation
9989 None
Execution time: 36.6736900806427
Starting test: Table select one by one
1000000
None
Execution time: 26.509829998016357
Starting test: Table select
1000000
None
Execution time: 25.42108702659607
Starting test: Sleep concurrency
None
Execution time: 60.63144087791443
# encoding: utf-8
import uuid
import sys
from tornado.gen import coroutine, sleep
from tornado.concurrent import futures
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado_mysql import pools
from functools import wraps
io_loop = IOLoop.current()
CONN_PARAMS = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': '',
'db': 'test'
}
POOL = None
@coroutine
def prepare():
global POOL
POOL = pools.Pool(
CONN_PARAMS,
max_open_connections=30,
max_idle_connections=1,
max_recycle_sec=3
)
@coroutine
def test_db_sleep():
cur = yield POOL.execute("SELECT SLEEP(%s)", (1,))
return cur.fetchall()
@coroutine
def test_table():
q = (
'CREATE TABLE IF NOT EXISTS `test`.`test_table` ( '
'`id` int NOT NULL STORAGE MEMORY AUTO_INCREMENT, '
'`key` varchar(20) NOT NULL STORAGE MEMORY, '
'`value` varchar(20) NOT NULL STORAGE MEMORY, '
'`int` bigint NOT NULL STORAGE MEMORY, '
'`text` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL STORAGE MEMORY, '
'`ts` datetime STORAGE MEMORY DEFAULT CURRENT_TIMESTAMP, '
'PRIMARY KEY (`id`) '
') ENGINE=`InnoDB` COMMENT="";'
)
cur = yield POOL.execute(q)
cur.close()
with open('data.sql') as f:
for x, q in enumerate(f):
cur = yield POOL.execute(q)
io_loop.add_callback(cur.close)
sys.stdout.write('\r%04d ' % x)
@coroutine
def test_table_select():
cur = yield POOL.execute('select * from test_table limit 1000000')
data = cur.fetchall()
print(len(data))
cur.close()
@coroutine
def test_table_select_one_by_one():
cur = yield POOL.execute('select * from test_table limit 1000000')
c = 0
data = cur.fetchone()
while data:
data = cur.fetchone()
c += 1
print(c)
cur.close()
@coroutine
def test():
results = yield [test_db_sleep() for _ in range(1800)]
if __name__ == '__main__':
flusher = PeriodicCallback(sys.stdout.flush, 1000)
io_loop.add_callback(flusher.start)
t = io_loop.time()
print(io_loop.run_sync(prepare))
print("Preparing time: {}".format(io_loop.time() - t))
t = io_loop.time()
print("Starting test: Table creation")
print(io_loop.run_sync(test_table))
print("Execution time: {}".format(io_loop.time() - t))
t = io_loop.time()
print("Starting test: Table select one by one")
print(io_loop.run_sync(test_table_select_one_by_one))
print("Execution time: {}".format(io_loop.time() - t))
t = io_loop.time()
print("Starting test: Table select")
print(io_loop.run_sync(test_table_select))
print("Execution time: {}".format(io_loop.time() - t))
t = io_loop.time()
print("Starting test: Sleep concurrency")
print(io_loop.run_sync(test))
print("Execution time: {}".format(io_loop.time() - t))
@coroutine
def cleanup():
cur = yield POOL.execute("DROP TABLE test_table")
cur.close()
print(io_loop.run_sync(cleanup))
Preparing time: 0.00041604042053222656
Starting test: Table creation
9989 None
Execution time: 48.617724895477295
Starting test: Table select one by one
1000000
None
Execution time: 154.39642095565796
Starting test: Table select
1000000
None
Execution time: 193.75107789039612
Starting test: Sleep concurrency
None
Execution time: 61.97779083251953
None
@mosquito
Copy link
Author

data.sql is available there https://yadi.sk/d/cbxxmyELjKiYZ

@ocswor
Copy link

ocswor commented Dec 20, 2018

hi , your data.sql not found

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment