Skip to content

Instantly share code, notes, and snippets.

@fantix
Created October 9, 2018 12:39
Show Gist options
  • Save fantix/a1d8b850f0623a39f635ca380bc76948 to your computer and use it in GitHub Desktop.
Save fantix/a1d8b850f0623a39f635ca380bc76948 to your computer and use it in GitHub Desktop.
uvloop#197 reproduced crash
import asyncio
import ssl
import threading
import socket
import gc
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
def server():
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('0.0.0.0', 8088))
s.listen(2000)
while True:
ss, addr = s.accept()
while ss.recv(1024):
pass
threading.Thread(target=server, daemon=True).start()
async def sub():
try:
ctx = ssl.SSLContext()
ctx.verify_mode = ssl.CERT_NONE
await asyncio.wait_for(
asyncio.open_connection('127.0.0.1', 8088, ssl=ctx), 0.1)
except asyncio.TimeoutError:
pass
async def main():
fs = []
for i in range(1000):
fs.append(asyncio.ensure_future(sub()))
await asyncio.wait(fs)
gc.collect()
def test():
asyncio.get_event_loop().run_until_complete(main())
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment