Skip to content

Instantly share code, notes, and snippets.

@ikonst
Last active May 14, 2019 19:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ikonst/056554fc108727053bea9df62759b39c to your computer and use it in GitHub Desktop.
Save ikonst/056554fc108727053bea9df62759b39c to your computer and use it in GitHub Desktop.
from contextlib import contextmanager
from functools import wraps
from socket import socket as socket_type, MSG_PEEK
from typing import Set
import flask
import gevent
def _abort_on_eof_watchdog(socket: socket_type, greenlets: Set[gevent.Greenlet]) -> bool:
data = socket.recv(1, MSG_PEEK)
if len(data) > 0:
print('unexpected incoming data on socket after request was received')
return False
else:
print('detected socket shutdown')
for gr in greenlets:
print(f'killing greenlet {gr}')
gr.kill()
return True
@contextmanager
def spawn_abort_on_eof_watchdog():
watchdog = None
try:
add_greenlet_to_request(gevent.getcurrent())
watchdog = gevent.spawn(_abort_on_eof_watchdog,
socket=flask.request.environ['gunicorn.socket'],
greenlets=flask.g.greenlets)
yield watchdog
finally:
if watchdog:
watchdog.kill()
def add_greenlet_to_request(greenlet: gevent.greenlet) -> None:
"""
Adds a greenlet to terminate when the request finishes.
"""
flask.g.setdefault('greenlets', set()).add(greenlet)
def abort_on_eof(f):
"""
Sets a Flask view function to abort when the client disconnects.
"""
@wraps(f)
def decorator(*args, **kwargs):
with spawn_abort_on_eof_watchdog():
return f(*args, **kwargs)
return decorator
class AbortOnEofMiddleware(object):
"""
A middleware doing the same as @abort_on_eof but applying on an entire app.
"""
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
with spawn_abort_on_eof_greenlet():
return self.app(environ, start_response)
"""
Run with:
$ gunicorn -k gevent --reload test:app
$ curl http://localhost:8000/test
^C
$ curl http://localhost:8000/test-worker
^C
"""
from gevent import monkey
monkey.patch_all()
import gevent
from flask import Flask, copy_current_request_context
from abort_on_eof import abort_on_eof, add_greenlet_to_request
app = Flask(__name__)
completed = False
@app.route('/test')
@abort_on_eof
def test():
global completed
completed = False
print('start sleep')
gevent.sleep(2)
print('end sleep')
completed = True
return 'test completed\n'
@app.route('/state')
def get_state():
global completed
return f'{completed}\n'
@app.route('/test-worker')
@abort_on_eof
def test_worker():
global completed
completed = False
@copy_current_request_context
def do_some_work():
# do some work here, it can access flask.request like you
# would otherwise in the view function.
print('start sleep')
gevent.sleep(2)
print('end sleep')
gr = gevent.spawn(do_some_work)
add_greenlet_to_request(gr)
print('start join')
gr.join()
print('end join')
completed = True
return 'test-worker completed\n'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment