Skip to content

Instantly share code, notes, and snippets.

@kamiazya
Last active February 26, 2020 08:54
Show Gist options
  • Save kamiazya/f7a0c483722d18e1469eda45e4be5fde to your computer and use it in GitHub Desktop.
Save kamiazya/f7a0c483722d18e1469eda45e4be5fde to your computer and use it in GitHub Desktop.
WebSocketとRxPyの連携
import json
import threading
from websocket import create_connection
from rx.subjects import Subject
from rx.concurrency import ThreadPoolScheduler
def observable():
# ThreadPool
pool_scheduler = ThreadPoolScheduler(1)
stream = Subject()
# スレッドプロセス
def ws(stream):
ws = create_connection('ws://{domain}/messages'.format(domain = 'domain'))
while True:
stream.on_next(json.loads(ws.recv()))
th = threading.Thread(target=ws, name='ws', args=(stream,))
th.start()
return stream.subscribe_on(pool_scheduler)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment