Skip to content

Instantly share code, notes, and snippets.

@solaris33
Last active June 26, 2022 07:13
Show Gist options
  • Save solaris33/59b24fb65f115ff896041364d01c64c1 to your computer and use it in GitHub Desktop.
Save solaris33/59b24fb65f115ff896041364d01c64c1 to your computer and use it in GitHub Desktop.
# 업비트 웹소켓 현재가(ticker) 수신 예제
# Reference : https://wikidocs.net/117440
import websockets
import asyncio
import json
import datetime
verbose = True
async def upbit_ws_client():
uri = "wss://api.upbit.com/websocket/v1"
async with websockets.connect(uri, ping_interval=60) as websocket:
subscribe_fmt = [
{"ticket":"test"},
{
"type": "ticker",
"codes":["KRW-BTC"],
"isOnlyRealtime": True
},
{"format":"SIMPLE"}
]
subscribe_data = json.dumps(subscribe_fmt)
await websocket.send(subscribe_data)
while True:
data = await websocket.recv()
data = json.loads(data)
if verbose == True:
print('------------------------------------------------------')
print('타입 :', data['ty'])
print('마켓 코드 :', data['cd'])
print('시가 :', data['op'])
print('고가 :', data['hp'])
print('저가 :', data['lp'])
print('현재가 :', data['tp'])
print('전일 종가 :', data['pcp'])
print('전일 대비 :', data['c'])
print('부호 없는 전일 대비 값 :', data['cp'])
print('전일 대비 값 :', data['scp'])
print('부호 없는 전일 대비 등락율 :', data['cr'])
print('전일 대비 등락율 :', data['scr'])
print('가장 최근 거래량 :', data['tv'])
print('누적 거래량(UTC 0시 기준) :', data['atv'])
print('24시간 누적 거래량 :', data['atv24h'])
print('누적 거래대금(UTC 0시 기준) :', data['atp'])
print('24시간 누적 거래대금 :', data['atp24h'])
print('최근 거래 일자(UTC) :', data['tdt'])
print('최근 거래 시각(UTC) :', data['ttm'])
print('체결 타임스탬프 (milliseconds) :', data['ttms'])
print('매수/매도 구분 :', data['ab'])
print('누적 매도량 :', data['aav'])
print('누적 매수량 :', data['abv'])
print('52주 최고가 :', data['h52wp'])
print('52주 최고가 달성일 :', data['h52wdt'])
print('52주 최저가 :', data['l52wp'])
print('52주 최저가 달성일 :', data['l52wdt'])
print('거래상태 :', data['ms'])
print('거래 정지 여부 :', data['its'])
print('상장폐지일 :', data['dd'])
print('유의 종목 여부 :', data['mw'])
print('타임스탬프 (milliseconds) :', data['tms'])
print('스트림 타입 :', data['st'])
else:
print(data['cd'], datetime.datetime.fromtimestamp(data['tms']/1000), data['tp'])
async def main():
await upbit_ws_client()
if __name__ == "__main__":
# 메인 loop 실행
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment