Skip to content

Instantly share code, notes, and snippets.

@hvardhanx
Last active June 28, 2019 16:30
Show Gist options
  • Save hvardhanx/365309cce2a3a992af4cea4e4c893f08 to your computer and use it in GitHub Desktop.
Save hvardhanx/365309cce2a3a992af4cea4e4c893f08 to your computer and use it in GitHub Desktop.
import os
import fcntl
import select
def udp_server(host='127.0.0.1', port=1234):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setblocking(False)
logger.info(f"Listening on udp {host}:{port}")
s.bind((host, port))
# fd = s.fileno()
# fl = fcntl.fcntl(fd, fcntl.F_GETFL)
# fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
while True:
ready_to_read, ready_to_write, in_error = select.select([s], [], [])
if s in ready_to_read:
(data, addr) = s.recvfrom(2048)
data = data.decode('utf-8')
_data = data
while _data:
try:
(_data, addr) = s.recvfrom(1024)
data += _data.decode('utf-8')
except socket.error as e:
break
if 'data' in data:
data = json.loads(data)
yield data['data']
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment