Skip to content

Instantly share code, notes, and snippets.

@wolever
Last active July 10, 2018 21:03
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wolever/6b23f57b247cb9bc56572dd34b00af7b to your computer and use it in GitHub Desktop.
Save wolever/6b23f57b247cb9bc56572dd34b00af7b to your computer and use it in GitHub Desktop.
Point-form notes about using gevent in production
  • I've been using gevent in production for about 6 years, and I've only encountered a handful of issues:
    • In gevent pre-1.0 there were issues with the DNS resolver
    • I've had a couple of issues with gevent-openssl, which I've fixed by using a custom version: mjs/gevent_openssl#14 and mjs/gevent_openssl#12
    • Celery hangs when gevent is monkeypatched. I haven't figured out why, and this might have been fixed in newer versions of celery (the one I'm using is fairly old).
  • All of my experience using gevent is in fully monkeypatched mode. It's certainky possible to use outside of monkeypatch mode, but I don't know anything about that.
  • See the monkeypatches.py and gevent_.py files for my implementation of monkeypatching.
  • The gevent_.py file contains a couple things:
    • Monkeypatches
    • Tests which verify whether or not the monkeypatches succeed (there's a view in my application which runs gevent_.run_checks so it's easy to verify whether or not the patches have been applied correctly)
    • A custom spawn function which injects some thread local state into new threads to help with logging.
    • The lazy_imap helper, which applies a function concurrently while guaranteeing that only a certain number of threads and outstanding (ie, unconsumed) results are available. After writing this I realized that there is a maxsize argument to Pool.imap, which I believe does the same thing.
  • With all of the above in place, I haven't had any issues with common libraries (requests, urllib*, redis-py), and this can be verified with the checks implemented in gevent_.py.
  • I use worker_class = "gevent" in gunicorn
import os
import sys
import heapq
using_gevent = False
def try_init_gevent():
use_gevent = os.environ.get("USE_GEVENT")
if not use_gevent or use_gevent in ["0", "false"]:
return
global using_gevent
using_gevent = True
# For some reason, as of Django 1.8 it can't be imported before gevent has
# done its monkey patching. Double check that here.
assert "django" not in sys.modules, \
"Django was imported before try_init_gevent was called"
from gevent import monkey
monkey.patch_all(subprocess=True)
import gevent_openssl
gevent_openssl.monkey_patch()
from gevent_openssl.SSL import Connection
def Connection_sendall_fixed(self, buf, flags=0):
# There is a bug with gevent which causes `sendall` to crash when
# the buffer is large. Work around this by manually implementing
# `sendall`. See also: https://github.com/gevent/gevent/issues/736
while buf:
sent = self.send(buf, flags)
buf = buf[sent:]
Connection.sendall = Connection_sendall_fixed
from psycogreen.gevent import patch_psycopg
patch_psycopg()
monkeypatch_gevent_greenlet()
#import gevent
#bd_thread = gevent.spawn(start_blocking_detector)
#_blocking_detector[0] = bd_thread
_blocking_detector = [None]
def start_blocking_detector():
# Sleep for a little while to give the application a chance to do all the
# imports + initialization (which may take a little while)
import gevent
gevent.sleep(5)
from gevent_helpers import BlockingDetector
BlockingDetector(timeout=1.5, raise_exc=False, aggressive=False)()
def stop_blocking_detector():
bd = _blocking_detector[0]
if bd is not None:
bd.kill()
def check(a, b):
if a == b:
return "OK"
return "ERROR: %r != %r" %(a, b)
def check_yields(f):
import gevent
result = []
gevent.spawn(lambda: result.append("okay"))
f()
return check(result, ["okay"])
def run_checks():
yield ("USE_GEVENT", os.environ.get("USE_GEVENT") or "")
yield ("using_gevent", check(using_gevent, True))
yield ("spawn", check_spawn())
yield ("requests", check_requests())
yield ("psycopg2", check_psycopg2())
yield ("subprocess", check_psycopg2())
yield ("Timeout", check_timeout())
def check_spawn():
import gevent
result = []
gevent.joinall([
gevent.spawn(lambda: result.append("a")),
gevent.spawn(lambda: gevent.sleep(0.2) or result.append("c")),
gevent.spawn(lambda: gevent.sleep(0.1) or result.append("b")),
])
return check(result, ["a", "b", "c"])
def check_requests():
import requests
return check_yields(lambda: requests.get("http://example.com"))
def check_psycopg2():
from django.contrib.auth.models import User
return check_yields(lambda: User.objects.all().first())
def check_timeout():
from gevent import Timeout, sleep
with Timeout(0.001, False):
sleep(0.1)
return "ERROR"
return "OK"
def check_subprocess():
import subprocess
return check_yields(lambda: subprocess.call(["true"]))
def spawn_helper(log_context, post_spawn, func, args, kwargs):
post_spawn()
return func(*args, **kwargs)
def spawn(func, *args, **kwargs):
import gevent
from gevent.greenlet import Greenlet
if Greenlet.__init__.__name__.startswith("akindi"):
return gevent.spawn(func, *args, **kwargs)
return gevent.spawn(spawn_helper, get_post_spawn(), func, args, kwargs)
def get_post_spawn(func=None):
from .global_ import g
child_num = g.get("_num_child_threads", 0) + 1
log_context = g.log_context
g._num_child_threads = child_num
cur_name = g.log_context.get("thread")
child_thread = "%s%s" %(cur_name and cur_name + ":" or "", child_num)
def akindi_post_spawn(*args, **kwargs):
g.log_context.update(log_context)
g.log_context["thread"] = child_thread
if func is not None:
return func(*args, **kwargs)
return akindi_post_spawn
def pool(size=None):
from gevent.pool import Pool
return Pool(size=size)
def monkeypatch_gevent_greenlet():
from gevent.greenlet import Greenlet
def akindi_greenlet__init__(self, run=None, *args, **kwargs):
post_spawn = get_post_spawn(func=(run or self._run))
Greenlet__init__(self, post_spawn, *args, **kwargs)
Greenlet__init__ = Greenlet.__init__
Greenlet.__init__ = akindi_greenlet__init__
class lazy_imap(object):
""" Lazily apply func over iter, ensuring that there are at most ``size``
outstanding elements::
>>> def some_func(x):
... print "Running some_func with:", x
... sleep(0.1)
... return x
...
>>> items = lazy_imap(some_func, range(100), size=3)
Running some_func with: 0
Running some_func with: 1
Running some_func with: 2
>>> items.next()
0
Running some_func with: 3
>>>
"""
def __init__(self, func, iter, size=None):
from gevent.event import Event
self.size = size or 10
self.func = func
self.iter_with_idx = enumerate(iter)
self.next_ready = Event()
self.pending_results = []
self.pending_threads = 0
self.next_result_idx = 0
self.iter_exhausted = False
self._spawn_next()
def _is_done(self):
return (
self.iter_exhausted and
not self.pending_threads and
not self.pending_results
)
def _spawn_next(self):
from gevent import spawn
if self.iter_exhausted:
return
while self.pending_threads + len(self.pending_results) < self.size:
try:
idx, val = self.iter_with_idx.next()
except StopIteration:
self.iter_exhausted = True
return
self.pending_threads += 1
greenlet = spawn(self.func, val)
greenlet.__result_index = idx
greenlet.rawlink(self._on_thread_result)
def _on_thread_result(self, greenlet):
heapq.heappush(self.pending_results, (
greenlet.__result_index,
greenlet.exception,
greenlet.value,
))
self.pending_threads -= 1
self.next_ready.set()
self._spawn_next()
def __iter__(self):
return self
def next(self):
if self._is_done():
raise StopIteration
while True:
has_next = (
self.pending_results and
self.pending_results[0][0] == self.next_result_idx
)
if has_next:
break
self.next_ready.wait(timeout=1)
self.next_ready.clear()
self.next_result_idx += 1
_, exc, value = heapq.heappop(self.pending_results)
self._spawn_next()
if exc:
raise exc
return value
"""
I import this file and run `monkeypatch_everything` from my application's `__init__.py`
to guarantee that monkeypatching takes place before anything else is imported.
"""
import logging
did_init = False
def _patch_urllib3(module):
from django.utils.module_loading import import_string
try:
inject_into_urllib3 = import_string(module + ".contrib.pyopenssl.inject_into_urllib3")
inject_into_urllib3()
except Exception as e:
import gevent
def log_error():
# Give the rest of Django a chance to warm up and configure
# loggers, otherwise this message won't go anywhere.
gevent.sleep(1)
log = logging.getLogger("akindi.__init__")
log.error(
"Error injecting PyOpenSSL into %s: %s "
"(things will still work, but SSL certificates "
"with SNI will fail to validate)",
module, e,
)
gevent.spawn(log_error)
def _patch_openssl():
from gevent_openssl.SSL import _real_connection as Connection
old_set_tlsext_host_name = Connection.set_tlsext_host_name
def set_tlsext_host_name(self, name):
if isinstance(name, unicode):
name = name.encode("idna")
return old_set_tlsext_host_name(self, name)
set_tlsext_host_name.old = old_set_tlsext_host_name
Connection.set_tlsext_host_name = set_tlsext_host_name
def monkeypatch_everything():
""" Initialize monkeypatches. This should be called as early as possible, either
from manage.py or akindi.wsgi. """
global did_init
if did_init:
return
from akindi.gevent_ import try_init_gevent
try_init_gevent()
_patch_openssl()
_patch_urllib3("urllib3")
_patch_urllib3("requests.packages.urllib3")
did_init = True
@srikiraju
Copy link

any word on the celery+gevent hanging? :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment