Skip to content

Instantly share code, notes, and snippets.

@meejah
Created June 20, 2019 23:21
Show Gist options
  • Save meejah/57e5120a75fa5e8b036ff4045c3e0e53 to your computer and use it in GitHub Desktop.
Save meejah/57e5120a75fa5e8b036ff4045c3e0e53 to your computer and use it in GitHub Desktop.
import asyncio
class Protocol(object):
def connection_made(self, transport):
print("connection_made: {}".format(transport))
transport.abort()
def connection_lost(self, why):
print("connection_lost: {}".format(why))
def make_proto():
print("creating protocol")
return Protocol()
async def main(loop):
f = loop.create_connection(make_proto, host="localhost", port="8080")
try:
transport, proto = await f
print("transport: {}".format(transport))
print("proto: {}".format(proto))
except Exception as e:
print("error: {}".format(e))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
@meejah
Copy link
Author

meejah commented Jun 20, 2019

Output when there's a thing listening on 8080:

creating protocol
connection_made: <_SelectorSocketTransport fd=7 read=idle write=<idle, bufsize=0>>
connection_lost: None
transport: <_SelectorSocketTransport closed fd=7>
proto: <__main__.Protocol object at 0x7fae386874a8>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment