Created
November 16, 2013 07:58
-
-
Save chareice/7497321 to your computer and use it in GitHub Desktop.
test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
__all__ = ["DB", ] | |
import contextlib | |
import psycopg2 | |
from psycopg2 import extensions | |
from gevent.socket import wait_read, wait_write | |
def gevent_wait_callback(conn, timeout=None): | |
"""A wait callback useful to allow gevent to work with Psycopg.""" | |
while True: | |
state = conn.poll() | |
if state == extensions.POLL_OK: | |
break | |
elif state == extensions.POLL_READ: | |
wait_read(conn.fileno(), timeout=timeout) | |
elif state == extensions.POLL_WRITE: | |
wait_write(conn.fileno(), timeout=timeout) | |
else: | |
raise psycopg2.OperationalError( | |
"Bad result from poll: %r" % state) | |
extensions.set_wait_callback(gevent_wait_callback) | |
class DB(object): | |
def __init__(self, host=None, port=None, database=None, user=None, password=None): | |
self.host = host | |
self.port = port | |
self.database = database | |
self.user = user | |
self.password = password | |
def create_connection(self): | |
return psycopg2.connect(host=self.host, port=self.port, database=self.database, | |
user=self.user, password=self.password) | |
@contextlib.contextmanager | |
def cursor(self): | |
# print 'xxxx' | |
conn = None | |
try: | |
conn = self.create_connection() | |
print 'conn=', conn | |
yield conn.cursor() | |
except: | |
if conn: | |
if conn.closed: | |
conn = None | |
else: | |
self._rollback(conn) | |
raise | |
else: | |
if conn: | |
if conn.closed: | |
raise psycopg2.OperationalError("Cannot commit because connection was closed: %r" % (conn, )) | |
else: | |
conn.commit() | |
conn.close() | |
def _rollback(self, conn): | |
try: | |
conn.rollback() | |
except: | |
gevent.get_hub().handle_error(conn, *sys.exc_info()) | |
return conn | |
def fetchone(self, *args): | |
print 'args=', args | |
with self.cursor() as cursor: | |
cursor.execute(*args) | |
return cursor.fetchone() | |
def fetchall(self, *args): | |
with self.cursor() as cursor: | |
cursor.execute(*args) | |
return cursor.fetchall() | |
def execute(self, *args): | |
with self.cursor() as cursor: | |
cursor.execute(*args) | |
return cursor.rowcount | |
# TODO: 要 | |
host = "localhost" | |
port = 5432 | |
datbase = "postgres" | |
user = "postgres" | |
password = "" | |
db = DB(host=host, port=port, database=datbase, user=user, password=password) | |
import gevent | |
def test(): | |
import time | |
for n in range(1000): | |
# name = 't%s' % str(int(time.time())) | |
# name = "111" | |
# print db.execute("insert into test(name) values(%s);", [name, ]) | |
print db.fetchone("SELECT count(*) FROM test;") | |
l = [] | |
for i in range(250): | |
l.append(gevent.spawn(test)) | |
import time | |
t1 = time.time() | |
gevent.joinall(l) | |
t2 = time.time() | |
print t2-t1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment