Skip to content

Instantly share code, notes, and snippets.

@tru
Created March 5, 2014 16:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tru/9370622 to your computer and use it in GitHub Desktop.
Save tru/9370622 to your computer and use it in GitHub Desktop.
Discover servers & players via GDM
__author__ = 'tru'
import asyncio
import netifaces
import socket
import aiohttp
GDM_PORT_SERVER = 32414
GDM_PORT_PLAYER = 32413
def get_broadcast_addrs():
for interface in netifaces.interfaces():
try:
addrdict = netifaces.ifaddresses(interface)
except:
continue
if netifaces.AF_INET in addrdict and len(addrdict[netifaces.AF_INET]) > 0:
if "broadcast" in addrdict[netifaces.AF_INET][0]:
yield addrdict[netifaces.AF_INET][0]["broadcast"]
class GDMClient(asyncio.DatagramProtocol):
def __init__(self, port = GDM_PORT_SERVER):
self.port = port
def connection_made(self, transport):
self.transport = transport
self.transport._sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.send_browse()
def datagram_received(self, data, addr):
serverdict = {k: v.strip() for (k, v) in [l.split(':') for l in [l for l in str(data).split("\\r\\n")[1:]] if ":" in l]}
def connection_lost(self, exc):
print("Lost connection: %s" % exc)
def error_received(self, exc):
raise exc
def send_browse(self):
print("Sending browse package")
for addr in get_broadcast_addrs():
self.transport.sendto("M-SEARCH * HTTP/1.1\r\n".encode(), (addr, GDM_PORT_SERVER))
def get_gdm_task(loop: asyncio.unix_events._UnixSelectorEventLoop):
task = loop.create_datagram_endpoint(GDMClient, local_addr=('0.0.0.0', 0), family=socket.AF_INET)
loop.run_until_complete(task)
return task
if __name__ == '__main__':
loop = asyncio.get_event_loop()
get_gdm_task(loop)
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment