Skip to content

Instantly share code, notes, and snippets.

@versusvoid
Last active November 24, 2021 05:18
Show Gist options
  • Save versusvoid/8c55b90e44d02dd3fa32888078b6a107 to your computer and use it in GitHub Desktop.
Save versusvoid/8c55b90e44d02dd3fa32888078b6a107 to your computer and use it in GitHub Desktop.
Abort WSGI request processing on client disconnect (gunicorn, gevent + sqlalchemy, psycopg2)
import sys
import gevent
import gevent.local
import gunicorn.app.wsgiapp
import psycopg2
import psycopg2.errors
import psycopg2.extensions
import sqlalchemy.util
from gevent._hub_primitives import wait_read, wait_write
from gevent.libuv.watcher import libuv
from gunicorn.workers.ggevent import GeventWorker
class AbortOnDisconnect(gevent.GreenletExit):
pass
def patch_psycopg2(event=None):
psycopg2.extensions.set_wait_callback(_gevent_wait_callback)
_exception_holder = gevent.local.local()
def _gevent_wait_callback(conn, timeout=None):
while True:
try:
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_READ:
wait_read(conn.fileno(), timeout=timeout)
elif state == psycopg2.extensions.POLL_WRITE:
wait_write(conn.fileno(), timeout=timeout)
else:
raise psycopg2.OperationalError(f'Bad result from poll: {state}')
except AbortOnDisconnect as e:
# If cancel throws exception here we let it bubble up
# instead of AbortOnDisconnect.
# psycopg2 and sqlalchemy can then close and dispose resources
# and wrap it as some connection error which likely will still abort
# normal request processihg
conn.cancel()
# If we allow AbortOnDisconnect bubble up, psycopg2 will close perfectly
# healthy connection, so we pass it around through greenlet context
_exception_holder.aborted = e
class AbortingCursor(psycopg2.extensions.cursor):
def execute(self, *args, **kwargs):
try:
result = super().execute(*args, **kwargs)
e = getattr(_exception_holder, 'aborted', None)
if e is not None:
# cancel() was ineffective
# bubbling AbortOnDisconnect up
raise e
return result
except psycopg2.errors.QueryCanceled:
e = getattr(_exception_holder, 'aborted', None)
if e is not None:
# Normal case, connection still healthy,
# bubbling AbortOnDisconnect up
raise e
else:
raise
class AbortingConnection(psycopg2.extensions.connection):
def cursor(self, *args, **kwargs):
kwargs.setdefault('cursor_factory', AbortingCursor)
return super().cursor(*args, **kwargs)
class AbortingGeventWorker(GeventWorker):
def handle(self, listener, socket, addr):
handle_greenlet = gevent.getcurrent()
disconnect_event = gevent.get_hub().loop.io(socket.fileno(), libuv.UV_DISCONNECT)
disconnect_event.start(self._handle_disconnect, addr, handle_greenlet, disconnect_event)
try:
super().handle(listener, socket, addr)
finally:
disconnect_event.close()
def _handle_disconnect(self, addr, handle_greenlet, disconnect_event):
self.log.debug('Connection to %s aborted', addr)
gevent.kill(handle_greenlet, exception=AbortOnDisconnect)
disconnect_event.stop()
def handle_request(self, listener_name, req, sock, addr):
try:
# Calling method on parent of GeventWorker
super(GeventWorker).handle_request(listener_name, req, sock, addr)
except AbortOnDisconnect:
# GeventWorker.handle_request simply ignores GreenletExit
# and proceeds to read from socket.
# If UV_DISCONNECT was signalled, it will then generate EBADF 'Bad file number' and spam it to logs
# On StopIteration it will simply cease any work with socket
raise StopIteration()
except gevent.GreenletExit:
pass
except SystemExit:
pass
_original_is_exit_exception = sqlalchemy.util.is_exit_exception
def _is_exit_exception(e):
# sqlalchemy just as psycopg2 hurries to close connection on any unknown exception
# But since we already certain connection is healthy (otherwise
# AbortingCursor.execute would raise another exception, not AbortOnDisconnect)
# we are telling sqlalchemy that connection is ok and it should
# just bubble AbortOnDisconnect further
if type(e) == AbortOnDisconnect:
return False
return _original_is_exit_exception(e)
def run_aborting_gunicorn():
# libev doesn't support DISCONNECT socket events
gevent.config.loop = "libuv"
# patching sqlalchemy to handle our custom exception
sqlalchemy.util.is_exit_exception = _is_exit_exception
# configuring gunicorn to use our worker
sys.argv.extend(['--worker-class', 'utils.gevent.AbortingGeventWorker'])
gunicorn.app.wsgiapp.run()
# ...
[tool.poetry.scripts]
aborting-gunicorn = 'aborting:run_aborting_gunicorn'
[tool.poetry.plugins."gevent.plugins.monkey.did_patch_builtins"]
patch_psycopg2_for_gevent = 'aborting:patch_psycopg2'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment