Skip to content

Instantly share code, notes, and snippets.

@vgavro
Created January 14, 2018 13:00
Show Gist options
  • Save vgavro/afb69d3676b120f018f57b4d49ab433f to your computer and use it in GitHub Desktop.
Save vgavro/afb69d3676b120f018f57b4d49ab433f to your computer and use it in GitHub Desktop.
Gevent check concurrency
def check_concurrency(sleep='time.sleep', callback=None):
if isinstance(sleep, str):
module = __import__(''.join(sleep.split('.')[:-1]))
sleep = getattr(module, sleep.split('.')[-1])
callback = callback or (lambda x: print('concurrency={}'.format(x)))
check_concurrency._flag = False
def _set_concurrency():
sleep(0.01)
check_concurrency._flag = True
def _check_concurrency():
sleep(0.02)
callback(check_concurrency._flag)
import gevent
gevent.joinall([
gevent.spawn(_check_concurrency),
gevent.spawn(_set_concurrency),
])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment