Skip to content

Instantly share code, notes, and snippets.

@maftieu
Forked from panchr/app.py
Last active August 29, 2015 14:26
Show Gist options
  • Save maftieu/24466af5667017a581d5 to your computer and use it in GitHub Desktop.
Save maftieu/24466af5667017a581d5 to your computer and use it in GitHub Desktop.
Queued (I/O intensive) operations in Python
from queue_db import QueuedWriter
from pymongo import Connection
from flask import Flask
from json import dumps
def main():
'''This is just an example using Flask and MongoDB/pymongo.
Of course, you can use any web server and any database as long as you set up the methods appropriately'''
conn = pymongo.Connection()
writer = QueuedWriter(conn)
app = Flask(__name__)
@app.route("/write/", methods = ["POST"])
def write_data():
writer.put(request.form.to_dict())
return dumps({"status": "queued", "success": True})
writer.run() # should use a signal handler (Python's signal module) to stop the writer before exiting the main thread
app.run()
if __name__ == "__main__":
main()
from Queue import Queue, Empty # Python 2.7
# the module was renamed to just "queue" in Python 3.x
from threading import Thread
import time
class BackgroundProcessor(object):
'''Background processor for queued operations'''
def __init__(self):
self.queue = Queue()
self.thread = Thread(target = self.process_queue)
self.put = self.queue.put # for outside uses, instead of referencing BackgroundProcessor.queue.put
self.run = self.thread.start
self.stop = self.thread.join
self.current = None
def process_one(self):
'''Process the current item - returns whether or not the process was successful
Should be implemented in child classes'''
return False
def process_queue(self):
'''Process the queued operations'''
if self.current:
success = self.process_one()
if not success: # attempt to redo the write later on if it was not successful
self.queue.put(self.current, False)
self.current = None
self.queue.task_done() # mark the task as done anyway so the queue can be stopped later
return self.process_queue() # try to process another item if possible
else:
try:
self.current = self.queue.get(False)
except Empty:
self.current = None
return False
from background import BackgroundProcessor
class QueuedWriter(BackgroundProcessor):
'''A queued database writer'''
def __init__(self, conn):
super(QueuedWriter, self).__init__()
self.conn = conn
def process_one(self):
'''Write the current object to the database'''
return self.conn.query(...) # some insertion query using self.current
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment