Skip to content

Instantly share code, notes, and snippets.

@mr-yoo
Created January 27, 2021 15:34
Show Gist options
  • Save mr-yoo/eeb4e703ec5145f8bb1845ce2692eba8 to your computer and use it in GitHub Desktop.
Save mr-yoo/eeb4e703ec5145f8bb1845ce2692eba8 to your computer and use it in GitHub Desktop.
websocket - 후보1
import websockets
import asyncio
import threading
import queue
import json
class WebSocketManager(threading.Thread):
def __init__(self, type, symbols, ticktype=None, qsize=1000):
self.__aloop = asyncio.get_event_loop()
self.__q = queue.Queue(qsize)
self.type = type
self.symbols = symbols
self.ticktype = ticktype
if self.ticktype == None:
self.ticktype = ["1H"]
super().__init__()
async def __connect_socket(self):
uri = "wss://pubwss.bithumb.com/pub/ws"
async with websockets.connect(uri, ping_interval=None) as websocket:
greeting = await websocket.recv()
print(greeting)
data = "{" + f"'type':'{self.type}', 'symbols': {self.symbols}, 'tickTypes': {self.ticktype}" + "}"
await websocket.send(data)
while True:
recv_data = await websocket.recv()
self.__q.put(json.loads(recv_data))
def run(self):
self.__aloop.run_until_complete(self.__connect_socket())
def get(self):
return self.__q.get()
def terminate():
self.__aloop.close()
def connect_socket(type, symbols, ticktype=None, qsize=1000):
wm = WebSocketManager(type, symbols, ticktype, qsize)
wm.start()
return wm
if __name__ == "__main__":
wm = connect_socket("ticker", ["BTC_KRW"], ["30M"])
while True:
data = wm.get()
print(data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment