Skip to content

Instantly share code, notes, and snippets.

@javidcf
Created November 5, 2018 18:37
Show Gist options
  • Save javidcf/2d57e8dd19f6cd3972097c6349930d41 to your computer and use it in GitHub Desktop.
Save javidcf/2d57e8dd19f6cd3972097c6349930d41 to your computer and use it in GitHub Desktop.
Basic msgpack server for Unreal Engine Python
import unreal_engine as ue
import asyncio
import threading
import queue
import msgpack
try:
import msgpack_numpy as mnp
mnp.patch()
except ModuleNotFoundError: pass
class MyServer:
DEFAULT_HOST = 'localhost'
DEFAULT_PORT = 7025
def __init__(self):
self._host = MyServer.DEFAULT_HOST
self._port = MyServer.DEFAULT_PORT
# Managed by main thread
self._queue = None
self._thread_ready = None
self._thread_closed = None
# Managed by server thread
self._loop = None
self._server = None
self._server_future = None
self._client_tasks = None
def begin_play(self):
if self._thread_ready is not None or self._thread_closed is not None:
ue.log_error('Server already initiated.')
raise Exception
self._queue = queue.Queue()
self._thread_ready = threading.Event()
self._thread_closed = threading.Event()
threading.Thread(target=self._run_server).start()
def tick(self, delta_time):
try:
while True:
self._queue.get(block=False)()
except queue.Empty: pass
def end_play(self, reason):
if self._thread_ready is None or self._thread_closed is None:
ue.log_error('Server not started.')
raise Exception
self._thread_ready.wait()
asyncio.run_coroutine_threadsafe(self._stop_server(), self._loop)
ue.log(f'Waiting for server to stop.')
self._thread_closed.wait()
self._queue = None
self._thread_ready = None
self._thread_closed = None
def _process(self, obj):
ue.log(f'Received: {obj}')
return 'Received'
def _run_server(self):
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._server_future = asyncio.ensure_future(self._spawn_server())
self._client_tasks = set()
self._thread_ready.set()
try:
try:
self._loop.run_forever()
finally:
tasks = asyncio.Task.all_tasks()
for task in tasks:
if not task.done():
task.cancel()
self._loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
except Exception as e:
ue.log_error(e)
finally:
self._client_tasks = None
self._server_future = None
self._loop.close()
self._loop = None
self._thread_closed.set()
async def _spawn_server(self):
try:
ue.log(f'Trying to start listen server on {self._host}:{self._port}.')
self._server = await asyncio.start_server(self._new_client_connected, self._host, self._port)
await self._server.wait_closed()
except asyncio.CancelledError: pass
except Exception as e:
ue.log_error(e)
finally:
self._server = None
ue.log('Tcp server ended')
async def _new_client_connected(self, reader, writer):
task = asyncio.Task.current_task()
self._client_tasks.add(task)
try:
name = writer.get_extra_info('peername')
ue.log(f'New client connection from {name}.')
packer = msgpack.Packer()
unpacker = msgpack.Unpacker()
finished = False
while not finished:
data = await reader.read(1 << 20)
if not data:
ue.log(f'Client {name} disconnected.')
break
unpacker.feed(data)
for obj in unpacker:
response = await self._run_in_main_thread(self._process, obj)
writer.write(packer.pack(response))
await writer.drain()
if obj is None:
finished = True
except asyncio.CancelledError:
ue.log(f'Dropping connection with {name}.')
except Exception as e:
ue.log_error(f'Error on connection with {name}: {e}.')
finally:
try:
writer.close()
if hasattr(writer, 'wait_closed'): # Since Python 3.7
await writer.wait_closed()
except Exception: pass
finally:
self._client_tasks.remove(task)
ue.log(f'Connection with {name} terminated.')
async def _stop_server(self):
if self._server is not None:
self._server.close()
for task in self._client_tasks:
if not task.done():
task.cancel()
await self._server_future
if self._loop is not None:
self._loop.stop()
async def _run_in_main_thread(self, callback, *args, **kwargs):
future = asyncio.Future()
cb = lambda: self._main_thread_callback_wrapper(future, callback, *args, **kwargs)
self._queue.put(cb, block=False)
result = await future
return result
def _main_thread_callback_wrapper(self, future, callback, *args, **kwargs):
result = callback(*args, **kwargs)
self._loop.call_soon_threadsafe(lambda: future.set_result(result))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment