Created
July 17, 2017 08:11
-
-
Save leafsummer/2ace73396076420314d365df53cb21d1 to your computer and use it in GitHub Desktop.
a writer thead to database with queue for handling the many requests
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class AsyncCursor(object): | |
def __init__(self, event, sql, params, timeout): | |
self._event = event # Used to signal when results are ready. | |
self.sql = sql | |
self.params = params | |
self.timeout = timeout | |
self._cursor = None | |
self._rows = None | |
self._ready = False | |
def set_result(self, cursor): | |
# This method is called once the worker thread has executed the | |
# query (self.sql). | |
self._cursor = cursor | |
self._rows = cursor.fetchall() | |
self._event.set() # Wake up the thread that's waiting on the event. | |
return self | |
def _wait(self, timeout=None): | |
# This method is used by the caller to block until results are ready, | |
# or, optionally, raise an exception if the query takes longer than | |
# is acceptable. | |
timeout = timeout if timeout is not None else self.timeout | |
# Call the event's `wait()` method. | |
if not self._event.wait(timeout=timeout) and timeout: | |
raise ResultTimeout('results not ready, timed out.') | |
self._ready = True | |
def __iter__(self): | |
# If the caller attempts to iterate over the Cursor, first ensure | |
# that the results are ready before exposing a row iterator. | |
if not self._ready: | |
self._wait() | |
return iter(self._rows) | |
@property | |
def lastrowid(self): | |
# If the caller requests the ID of the most recently inserted row, | |
# we need to first make sure the query was actually executed. | |
if not self._ready: | |
self._wait() | |
return self._cursor.lastrowid | |
@property | |
def rowcount(self): | |
# Like lastrowid(), make sure the query was executed before returning | |
# the number of affected rows. | |
if not self._ready: | |
self._wait() | |
return self._cursor.rowcount | |
class Writer(object): | |
def __init__(self, database, queue): | |
self.database = database | |
self.queue = queue | |
def run(self): | |
conn = self.database.get_conn() | |
while self.loop(conn): | |
pass | |
def loop(self, conn): | |
obj = self.queue.get() | |
if isinstance(obj, AsyncCursor): | |
self.execute(obj) | |
return True | |
elif obj is SHUTDOWN: | |
return False | |
def execute(self, async_cursor): | |
# Call the base-class implementation of execute_sql to avoid entering | |
# an endless chain of recursion. | |
db_cursor = SqliteExtDatabase.execute_sql( | |
self, | |
async_cursor.sql, | |
async_cursor.params) | |
return async_cursor.set_result(db_cursor) | |
class SqliteQueueDatabase(SqliteExtDatabase): | |
def execute_sql(self, sql, params=None, require_commit=True, timeout=None): | |
if require_commit: # Treat this as a write query. | |
# Create an AsyncCursor object to encapsulate the execution | |
# of our write query, add it to the writer thread's queue, and | |
# return the wrapper to the caller. | |
async_cursor = AsyncCursor( | |
event=threading.Event(), | |
sql=sql, | |
params=params, | |
timeout=timeout) | |
self.write_queue.put(async_cursor) | |
return async_cursor | |
else: # Regular read query. | |
return super(SqliteQueueDatabase, self).execute_sql( | |
sql, params, require_commit) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment