Skip to content

Instantly share code, notes, and snippets.

@VMois
Last active February 7, 2022 18:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save VMois/690d0f7bae47fd00a80f82cab4d5a74b to your computer and use it in GitHub Desktop.
Save VMois/690d0f7bae47fd00a80f82cab4d5a74b to your computer and use it in GitHub Desktop.
Simple background thread in Flask app with graceful shutdown
# Link to article: https://vmois.dev/python-flask-background-thread/
import os
import logging
import signal
from flask import Flask, request, jsonify
from background_thread import BackgroundThreadFactory, TASKS_QUEUE
logging.basicConfig(level=logging.INFO, force=True)
def create_app():
app = Flask(__name__)
@app.route('/task', methods=['POST'])
def submit_task():
task = request.json
logging.info(f'Received task: {task}')
TASKS_QUEUE.put(task)
return jsonify({'success': 'OK'})
notification_thread = BackgroundThreadFactory.create('notification')
# this condition is needed to prevent creating duplicated thread in Flask debug mode
if not (app.debug or os.environ.get('FLASK_ENV') == 'development') or os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
notification_thread.start()
original_handler = signal.getsignal(signal.SIGINT)
def sigint_handler(signum, frame):
notification_thread.stop()
# wait until thread is finished
if notification_thread.is_alive():
notification_thread.join()
original_handler(signum, frame)
try:
signal.signal(signal.SIGINT, sigint_handler)
except ValueError as e:
logging.error(f'{e}. Continuing execution...')
return app
# Link to article: https://vmois.dev/python-flask-background-thread/
import logging
import queue
import threading
import time
from queue import Queue
from abc import abstractmethod, ABC
from typing import Dict
TASKS_QUEUE = Queue()
class BackgroundThread(threading.Thread, ABC):
def __init__(self):
super().__init__()
self._stop_event = threading.Event()
def stop(self) -> None:
self._stop_event.set()
def _stopped(self) -> bool:
return self._stop_event.is_set()
@abstractmethod
def startup(self) -> None:
"""
Method that is called before the thread starts.
Initialize all necessary resources here.
:return: None
"""
raise NotImplementedError()
@abstractmethod
def shutdown(self) -> None:
"""
Method that is called shortly after stop() method was called.
Use it to clean up all resources before thread stops.
:return: None
"""
raise NotImplementedError()
@abstractmethod
def handle(self) -> None:
"""
Method that should contain business logic of the thread.
Will be executed in the loop until stop() method is called.
Must not block for a long time.
:return: None
"""
raise NotImplementedError()
def run(self) -> None:
"""
This method will be executed in a separate thread
when start() method is called.
:return: None
"""
self.startup()
while not self._stopped():
self.handle()
self.shutdown()
class NotificationThread(BackgroundThread):
def startup(self) -> None:
logging.info('NotificationThread started')
def shutdown(self) -> None:
logging.info('NotificationThread stopped')
def handle(self) -> None:
try:
task = TASKS_QUEUE.get(block=False)
# send_notification(task)
logging.info(f'Notification for {task} was sent.')
except queue.Empty:
time.sleep(1)
class BackgroundThreadFactory:
@staticmethod
def create(thread_type: str) -> BackgroundThread:
if thread_type == 'notification':
return NotificationThread()
# if thread_type == 'some_other_type':
# return SomeOtherThread()
raise NotImplementedError('Specified thread type is not implemented.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment