Skip to content

Instantly share code, notes, and snippets.

@vxgmichel
Created February 9, 2017 16:52
Show Gist options
  • Save vxgmichel/474d9ab0d13cccfdb352767d83aa0910 to your computer and use it in GitHub Desktop.
Save vxgmichel/474d9ab0d13cccfdb352767d83aa0910 to your computer and use it in GitHub Desktop.
Testing UDP broadcast and asyncio
import asyncio
class EchoProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print("Echoing {0!r} from/to {1[0]}:{1[1]}".format(message, addr))
self.transport.sendto(data, addr)
class ClientProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print("Received {0!r} from {1[0]}:{1[1]}".format(message, addr))
def send(self, message):
data = message.encode()
addr = self.transport._address
print("Sending {0!r} to {1[0]}:{1[1]}".format(message, addr))
self.transport.sendto(data)
async def main(addr=('194.47.253.255', 8000)):
message = "Hello World!"
loop = asyncio.get_event_loop()
_, _ = await loop.create_datagram_endpoint(
EchoProtocol, local_addr=addr)
_, client = await loop.create_datagram_endpoint(
ClientProtocol, remote_addr=addr, allow_broadcast=True)
while True:
client.send('Hello!')
await asyncio.sleep(3)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment