Skip to content

Instantly share code, notes, and snippets.

@rlamy
Last active September 10, 2019 11:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rlamy/53712623ea16c9c920c574dd2266dbef to your computer and use it in GitHub Desktop.
Save rlamy/53712623ea16c9c920c574dd2266dbef to your computer and use it in GitHub Desktop.
import sys
import os
import threading
import time
import importlib
import contextlib
import faulthandler
import _thread
def trace_calls(frame, event, arg):
if event != 'call':
return
co = frame.f_code
func_name = co.co_name
if func_name == 'write':
# Ignore write() calls from print statements
return
func_line_no = frame.f_lineno
func_filename = co.co_filename
caller = frame.f_back
caller_line_no = caller.f_lineno
caller_filename = caller.f_code.co_filename
if caller_filename == '<frozen importlib._bootstrap>' and False:
print('%s: Call to %s on line %s of %s from line %s of %s' % \
(_thread.get_ident(), func_name, func_line_no, func_filename,
caller_line_no, caller_filename))
if func_name == 'has_deadlock':
tid = _thread.get_ident()
print(f"{tid} called has_deadlock() on {frame.f_locals['self']}")
_blocking_on = frame.f_globals['_blocking_on']
ml = _blocking_on.get(tid)
print(_thread.get_ident(), frame.f_globals['_blocking_on'])
if ml:
print(f'Lock owner: {ml.owner}')
print(vars(ml))
elif func_name == 'acquire':
tid = _thread.get_ident()
print(f"{tid} called acquire() on {frame.f_locals['self']}")
elif func_name == 'release':
tid = _thread.get_ident()
print(f"{tid} called release() on {frame.f_locals['self']}")
@contextlib.contextmanager
def start_threads(threads, unlock=None):
threads = list(threads)
started = []
try:
for t in threads:
t.start()
started.append(t)
yield
finally:
try:
if unlock:
unlock()
endtime = starttime = time.monotonic()
for timeout in range(1, 11):
endtime += 1
for t in started:
t.join(max(endtime - time.monotonic(), 0.01))
started = [t for t in started if t.is_alive()]
if not started:
break
finally:
started = [t for t in started if t.is_alive()]
if started:
faulthandler.dump_traceback(sys.stdout)
raise AssertionError('Unable to join %d threads' % len(started))
def delay_has_deadlock(frame, event, arg):
if event == 'call' and frame.f_code.co_name == 'has_deadlock':
time.sleep(0.2)
def test_concurrency():
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'data'))
try:
exc = None
def run():
print('start thread\n')
sys.settrace(delay_has_deadlock)
event.wait()
try:
importlib.import_module('package')
except BaseException as e:
nonlocal exc
exc = e
print('end thread\n')
for i in range(1000):
event = threading.Event()
threads = [threading.Thread(target=run) for x in range(2)]
try:
with start_threads(threads, event.set):
time.sleep(0)
finally:
sys.modules.pop('package', None)
sys.modules.pop('package.submodule', None)
if exc is not None:
raise exc
print('*****************')
print('')
finally:
del sys.path[0]
test_concurrency()
import sys
from time import sleep
import threading
from importlib._bootstrap import _ModuleLockManager, _blocking_on
def run1():
print('T1: 0')
with _ModuleLockManager('xyz'):
sleep(0.2)
print('T1: 1')
with _ModuleLockManager('xyz'):
print('T1: 2')
pass
def delay_has_deadlock(frame, event, arg):
if event == 'call' and frame.f_code.co_name == 'has_deadlock':
sleep(0.2)
def run2():
sys.settrace(delay_has_deadlock)
print('T2: 0')
sleep(0.1)
print('T2: 1')
with _ModuleLockManager('xyz'):
print('T2: 2')
def test():
thread1 = threading.Thread(target=run1)
thread2 = threading.Thread(target=run2)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
test()
print(_blocking_on)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment