Skip to content

Instantly share code, notes, and snippets.

@oeway
Created June 28, 2023 19:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oeway/4073b408ebf79c8cdde79e43835e64f6 to your computer and use it in GitHub Desktop.
Save oeway/4073b408ebf79c8cdde79e43835e64f6 to your computer and use it in GitHub Desktop.
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
from imjoy_rpc.hypha import connect_to_server
import time
class BackgroundServer:
def __init__(self, server_url):
self.loop = None
self.thread = None
self.server = None
self.executor = ThreadPoolExecutor(max_workers=1)
self.server_url = server_url
async def start_server(self):
self.server = await connect_to_server({"server_url": self.server_url})
def start_loop(self):
asyncio.set_event_loop(asyncio.new_event_loop())
self.loop = asyncio.get_event_loop()
self.loop.run_forever()
def run(self):
if not self.loop:
self.thread = threading.Thread(target=self.start_loop, daemon=True)
self.thread.start()
while not self.loop or not self.loop.is_running():
pass # Wait until loop is running
future = asyncio.run_coroutine_threadsafe(self.start_server(), self.loop)
future.result() # Wait for the server to start
def register_service(self, service):
assert 'id' in service, "Each service must have an 'id'"
assert self.server, "Server has not been started yet"
# wrap functions in service with executor
for k, v in service.items():
if callable(v):
async def wrapper(*args, **kwargs):
result_future = self.loop.create_future()
def run_and_set_result():
try:
result = v(*args, **kwargs)
self.loop.call_soon_threadsafe(result_future.set_result, result)
except Exception as e:
self.loop.call_soon_threadsafe(result_future.set_exception, e)
self.executor.submit(run_and_set_result)
return await result_future
service[k] = wrapper
async def register_service_async():
await self.server.register_service(service)
print("==============>Service registered: ", service)
print(f"Services registered at workspace: {self.server.config.workspace}")
print(f"Test them with the HTTP proxy: {self.server.config.public_base_url}/{self.server.config.workspace}/services/hello-world/hello?name=World")
asyncio.run_coroutine_threadsafe(register_service_async(), self.loop)
if __name__ == "__main__":
server_url = "https://ai.imjoy.io"
bg_server = BackgroundServer(server_url)
bg_server.run()
def hello(name):
print("Hello " + name)
# print the current thread id, check if it's the mainthread
print("Current thread id: ", threading.get_ident(), threading.current_thread())
time.sleep(5)
return "Hello " + name
bg_server.register_service({
"name": "Hello World",
"id": "hello-world",
"config": {
"visibility": "public",
"run_in_executor": True,
},
"hello": hello
})
while True:
print('.', end='', flush=True)
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment