Last active
February 7, 2022 18:54
-
-
Save VMois/690d0f7bae47fd00a80f82cab4d5a74b to your computer and use it in GitHub Desktop.
Simple background thread in Flask app with graceful shutdown
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Link to article: https://vmois.dev/python-flask-background-thread/ | |
import os | |
import logging | |
import signal | |
from flask import Flask, request, jsonify | |
from background_thread import BackgroundThreadFactory, TASKS_QUEUE | |
logging.basicConfig(level=logging.INFO, force=True) | |
def create_app(): | |
app = Flask(__name__) | |
@app.route('/task', methods=['POST']) | |
def submit_task(): | |
task = request.json | |
logging.info(f'Received task: {task}') | |
TASKS_QUEUE.put(task) | |
return jsonify({'success': 'OK'}) | |
notification_thread = BackgroundThreadFactory.create('notification') | |
# this condition is needed to prevent creating duplicated thread in Flask debug mode | |
if not (app.debug or os.environ.get('FLASK_ENV') == 'development') or os.environ.get('WERKZEUG_RUN_MAIN') == 'true': | |
notification_thread.start() | |
original_handler = signal.getsignal(signal.SIGINT) | |
def sigint_handler(signum, frame): | |
notification_thread.stop() | |
# wait until thread is finished | |
if notification_thread.is_alive(): | |
notification_thread.join() | |
original_handler(signum, frame) | |
try: | |
signal.signal(signal.SIGINT, sigint_handler) | |
except ValueError as e: | |
logging.error(f'{e}. Continuing execution...') | |
return app |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Link to article: https://vmois.dev/python-flask-background-thread/ | |
import logging | |
import queue | |
import threading | |
import time | |
from queue import Queue | |
from abc import abstractmethod, ABC | |
from typing import Dict | |
TASKS_QUEUE = Queue() | |
class BackgroundThread(threading.Thread, ABC): | |
def __init__(self): | |
super().__init__() | |
self._stop_event = threading.Event() | |
def stop(self) -> None: | |
self._stop_event.set() | |
def _stopped(self) -> bool: | |
return self._stop_event.is_set() | |
@abstractmethod | |
def startup(self) -> None: | |
""" | |
Method that is called before the thread starts. | |
Initialize all necessary resources here. | |
:return: None | |
""" | |
raise NotImplementedError() | |
@abstractmethod | |
def shutdown(self) -> None: | |
""" | |
Method that is called shortly after stop() method was called. | |
Use it to clean up all resources before thread stops. | |
:return: None | |
""" | |
raise NotImplementedError() | |
@abstractmethod | |
def handle(self) -> None: | |
""" | |
Method that should contain business logic of the thread. | |
Will be executed in the loop until stop() method is called. | |
Must not block for a long time. | |
:return: None | |
""" | |
raise NotImplementedError() | |
def run(self) -> None: | |
""" | |
This method will be executed in a separate thread | |
when start() method is called. | |
:return: None | |
""" | |
self.startup() | |
while not self._stopped(): | |
self.handle() | |
self.shutdown() | |
class NotificationThread(BackgroundThread): | |
def startup(self) -> None: | |
logging.info('NotificationThread started') | |
def shutdown(self) -> None: | |
logging.info('NotificationThread stopped') | |
def handle(self) -> None: | |
try: | |
task = TASKS_QUEUE.get(block=False) | |
# send_notification(task) | |
logging.info(f'Notification for {task} was sent.') | |
except queue.Empty: | |
time.sleep(1) | |
class BackgroundThreadFactory: | |
@staticmethod | |
def create(thread_type: str) -> BackgroundThread: | |
if thread_type == 'notification': | |
return NotificationThread() | |
# if thread_type == 'some_other_type': | |
# return SomeOtherThread() | |
raise NotImplementedError('Specified thread type is not implemented.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment