Skip to content

Instantly share code, notes, and snippets.

@oeway
Forked from sunliqun123/janus_demo_1
Last active October 31, 2018 15:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oeway/7f58937eb9a7a86f43533b779c2a798d to your computer and use it in GitHub Desktop.
Save oeway/7f58937eb9a7a86f43533b779c2a798d to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import janus
import asyncio
import time
import threading
from bson.objectid import ObjectId
from autobahn.asyncio.wamp import ApplicationSession
from autobahn.asyncio.wamp import ApplicationRunner
import uuid
task_q = janus.Queue()
result_q = janus.Queue()
futureDict = {}
# This is y2
async def process_result():
while True:
result = result_q.async_q.get()
if result['task_id'] in futureDict:
future = futureDict[result['task_id']]
if result['success']:
future.set_result(result['result'])
else:
future.set_exception(result['exception'])
del futureDict[result['task_id']]
def heavy_calc(*args, **kwargs):
time.sleep(2)
return [1, 2, 4]
# this is t3
def process_tasks():
while True:
task = task_q.sync_q.get()
if task['name'] == 'heavy_calc':
try:
ret = heavy_calc(**task['kwargs'])
result_q.sync_q.put({'task_id': task['id'], 'success': True, 'result': ret})
except Exception as e:
result_q.sync_q.put({'task_id': task['id'], 'success': False, 'exception': e})
else:
result_q.sync_q.put({'task_id': task['id'], 'success': False, 'exception': NotImplemented })
class FrontendSession(ApplicationSession):
def onConnect(self):
self.join(self.config.realm, [u"ticket"], u'backend')
def onChallenge(self, challenge):
if challenge.method == u"ticket":
return u'sg-ai.com'
else:
raise Exception("Invalid authmethod {}".format(challenge.method))
async def onJoin(self, details):
print(self._session_id)
asyncio.ensure_future(process_result())
@wamp.register('com.example.heavy_calculation')
async def heavy_calculation(self, fname, reg):
new_id = str(uuid.uuid4())
task_q.async_q.put({'id': new_id, 'name': 'heavy_calc', 'kwargs': {'a': 1, 'b': 2}})
future = asyncio.Future()
futureDict[new_id] = future
return await future
def onLeave(self, details):
print("Client session left: {}".format(details))
self.disconnect()
def onDisconnect(self):
print("Client session disconnected.")
if __name__ == '__main__':
t = threading.Thread(target=process_tasks)
t.setDaemon(True)
t.start()
# this is y1
runner = ApplicationRunner(url='wss://dcrossbar.sg-ai.com/ws', realm='realm1')
runner.run(FrontendSession, log_level='debug')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment