Skip to content

Instantly share code, notes, and snippets.

@vytas7
Forked from kgriffs/background_queue_falcon.py
Created March 21, 2021 20:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vytas7/bdc8af92f54bec7829ad53253ef44a32 to your computer and use it in GitHub Desktop.
Save vytas7/bdc8af92f54bec7829ad53253ef44a32 to your computer and use it in GitHub Desktop.
Pattern for saving / doing background work from a Falcon App using a queue from the standard Python Library
# Credit: @vytas7 (Vytautas Liuolia)
import queue
import signal
import threading
import time
import uuid
import falcon
class Worker:
_QUIT = object()
def __init__(self):
self._thread = threading.Thread(target=self.run)
self._queue = queue.Queue()
def start(self):
self._thread.start()
def run(self):
while True:
try:
dataset = self._queue.get(timeout=1.0)
if dataset is self._QUIT:
print('QUIT received; exiting the worker.')
break
print(f'Saving to the DB: {dataset}...')
time.sleep(5) # <== Time consuming operation
print(f'Saved: {dataset}')
except queue.Empty:
continue
def submit(self, dataset):
self._queue.put(dataset)
def stop(self):
self._queue.put(self._QUIT)
self._thread.join()
class Dataset:
def __init__(self):
self._id = uuid.uuid4()
def marshal(self):
return {
'id': str(self._id),
'message': 'Hello, World!',
}
def __repr__(self):
return f'Dataset<{self._id}>'
class Store:
def __init__(self, worker):
self._worker = worker
def on_post(self, req, resp):
dataset = Dataset()
resp.media = dataset.marshal()
self._worker.submit(dataset)
class App(falcon.API):
def __init__(self):
super().__init__()
self._worker = Worker()
self.add_route('/store', Store(self._worker))
self._worker.start()
def stop(self, signum, frame):
print('SIGQUIT received, shutting down!')
self._worker.stop()
app = App()
signal.signal(signal.SIGQUIT, app.stop)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment