Skip to content

Instantly share code, notes, and snippets.

@itdaniher
Last active August 6, 2017 21:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save itdaniher/bd1c66db41259eac438b95045ad99598 to your computer and use it in GitHub Desktop.
Save itdaniher/bd1c66db41259eac438b95045ad99598 to your computer and use it in GitHub Desktop.
log all the ports!
import aioudp
import asyncio
async def in_or_nothing(reader, duration = 1):
if reader == None:
return None
try:
return await asyncio.wait_for(reader.read(128), duration)
except asyncio.TimeoutError:
return None
async def tcp_logger(reader, writer):
peer = writer.get_extra_info('peername')[0:2]
port = writer.get_extra_info('sockname')[1]
resp = await in_or_nothing(reader)
writer.close()
print('tcp',(port, (resp, peer)))
class Logger(object):
def __init__(self, loop = None):
if loop == None:
loop = asyncio.get_event_loop()
self.loop = loop
self.listeners_udp = [None]
self.listeners_tcp = []
async def monitor_udp(self, max_port=1024):
for port in range(1,max_port):
try:
listener = await aioudp.open_local_endpoint(port=port, loop=self.loop)
except OSError as e:
print(e)
self.listeners_udp.append(listener)
while True:
results = await asyncio.gather(*[in_or_nothing(l) for l in self.listeners_udp])
actual_results = [x for x in enumerate(results) if x[1] != None]
for result in actual_results:
if result != None:
print('udp', result)
async def monitor_tcp(self, max_port=1024):
for port in range(1,max_port):
try:
listener = await asyncio.start_server(tcp_logger, port=port, reuse_port=True)
self.listeners_tcp.append(listener)
except OSError as e:
print(e)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.ensure_future(Logger(loop).monitor_udp())
asyncio.ensure_future(Logger(loop).monitor_tcp())
loop.run_forever()
"""Provide high-level UDP endpoints for asyncio"""
__all__ = ['open_local_endpoint', 'open_remote_endpoint']
import asyncio
import warnings
class DatagramEndpointProtocol(asyncio.DatagramProtocol):
def __init__(self, endpoint):
self._endpoint = endpoint
# Protocol methods
def connection_made(self, transport):
self._endpoint._transport = transport
def connection_lost(self, exc):
if exc is not None:
msg = 'Endpoint lost the connection: {!r}'
warnings.warn(msg.format(exc))
self._endpoint.close()
# Datagram protocol methods
def datagram_received(self, data, addr):
self._endpoint.feed_datagram(data, addr)
def error_received(self, exc):
msg = 'Endpoint received an error: {!r}'
warnings.warn(msg.format(exc))
class Endpoint:
def __init__(self, queue_size=None):
if queue_size is None:
queue_size = 0
self._queue = asyncio.Queue(queue_size)
self._closed = False
self._transport = None
# Protocol callbacks
def feed_datagram(self, data, addr):
try:
self._queue.put_nowait((data, addr))
except asyncio.QueueFull:
warnings.warn('Endpoint queue is full')
def close(self):
self._closed = True
self.feed_datagram(None, None)
if self._transport:
self._transport.close()
# User methods
def write(self, data, addr):
if self._closed:
raise IOError("Enpoint is closed")
self._transport.sendto(data, addr)
async def read(self, n = 0):
if self._closed:
raise IOError("Enpoint is closed")
data, addr = await self._queue.get()
if data is None:
raise IOError("Enpoint is closed")
return data, addr
def abort(self):
if self._closed:
raise IOError("Enpoint is closed")
self._transport.abort()
# Properties
@property
def address(self):
return self._transport._sock.getsockname()
@property
def closed(self):
return self._closed
class LocalEndpoint(Endpoint):
pass
class RemoteEndpoint(Endpoint):
def write(self, data):
super().write(data, None)
async def read(self):
data, addr = await super().read()
return data
async def open_datagram_endpoint(host='0.0.0.0', port=0, *,
endpoint_factory=Endpoint,
remote=False, loop=None,
**kwargs):
if loop is None:
loop = asyncio.get_event_loop()
kwargs['remote_addr' if remote else 'local_addr'] = host, port
endpoint = endpoint_factory()
factory = lambda: DatagramEndpointProtocol(endpoint)
await loop.create_datagram_endpoint(factory, **kwargs)
return endpoint
async def open_local_endpoint(host='0.0.0.0', port=0, *,
queue_size=None, loop=None, **kwargs):
endpoint_factory = lambda: LocalEndpoint(queue_size)
return await open_datagram_endpoint(host, port, remote=False,
endpoint_factory=endpoint_factory,
loop=loop, **kwargs)
async def open_remote_endpoint(host='0.0.0.0', port=0, *,
queue_size=None, loop=None, **kwargs):
endpoint_factory = lambda: RemoteEndpoint(queue_size)
return await open_datagram_endpoint(host, port, remote=True,
endpoint_factory=endpoint_factory,
loop=loop, **kwargs)
if __name__ == '__main__':
async def main():
local = await open_local_endpoint()
remote = await open_remote_endpoint(*local.address)
remote.write(b'Hey Hey, My My')
return await local.read()
loop = asyncio.get_event_loop()
data, addr = loop.run_until_complete(main())
message = "Got {data!r} from {addr[0]} port {addr[1]}"
print(message.format(data=data.decode(), addr=addr))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment