Skip to content

Instantly share code, notes, and snippets.

@chareice
Created November 16, 2013 07:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chareice/7497321 to your computer and use it in GitHub Desktop.
Save chareice/7497321 to your computer and use it in GitHub Desktop.
test
# -*- coding: utf-8 -*-
__all__ = ["DB", ]
import contextlib
import psycopg2
from psycopg2 import extensions
from gevent.socket import wait_read, wait_write
def gevent_wait_callback(conn, timeout=None):
"""A wait callback useful to allow gevent to work with Psycopg."""
while True:
state = conn.poll()
if state == extensions.POLL_OK:
break
elif state == extensions.POLL_READ:
wait_read(conn.fileno(), timeout=timeout)
elif state == extensions.POLL_WRITE:
wait_write(conn.fileno(), timeout=timeout)
else:
raise psycopg2.OperationalError(
"Bad result from poll: %r" % state)
extensions.set_wait_callback(gevent_wait_callback)
class DB(object):
def __init__(self, host=None, port=None, database=None, user=None, password=None):
self.host = host
self.port = port
self.database = database
self.user = user
self.password = password
def create_connection(self):
return psycopg2.connect(host=self.host, port=self.port, database=self.database,
user=self.user, password=self.password)
@contextlib.contextmanager
def cursor(self):
# print 'xxxx'
conn = None
try:
conn = self.create_connection()
print 'conn=', conn
yield conn.cursor()
except:
if conn:
if conn.closed:
conn = None
else:
self._rollback(conn)
raise
else:
if conn:
if conn.closed:
raise psycopg2.OperationalError("Cannot commit because connection was closed: %r" % (conn, ))
else:
conn.commit()
conn.close()
def _rollback(self, conn):
try:
conn.rollback()
except:
gevent.get_hub().handle_error(conn, *sys.exc_info())
return conn
def fetchone(self, *args):
print 'args=', args
with self.cursor() as cursor:
cursor.execute(*args)
return cursor.fetchone()
def fetchall(self, *args):
with self.cursor() as cursor:
cursor.execute(*args)
return cursor.fetchall()
def execute(self, *args):
with self.cursor() as cursor:
cursor.execute(*args)
return cursor.rowcount
# TODO: 要
host = "localhost"
port = 5432
datbase = "postgres"
user = "postgres"
password = ""
db = DB(host=host, port=port, database=datbase, user=user, password=password)
import gevent
def test():
import time
for n in range(1000):
# name = 't%s' % str(int(time.time()))
# name = "111"
# print db.execute("insert into test(name) values(%s);", [name, ])
print db.fetchone("SELECT count(*) FROM test;")
l = []
for i in range(250):
l.append(gevent.spawn(test))
import time
t1 = time.time()
gevent.joinall(l)
t2 = time.time()
print t2-t1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment