Skip to content

Instantly share code, notes, and snippets.

@MtkN1
Last active August 25, 2023 01:38
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MtkN1/5a70da3f82bbf653da15c70d30b51b6b to your computer and use it in GitHub Desktop.
Save MtkN1/5a70da3f82bbf653da15c70d30b51b6b to your computer and use it in GitHub Desktop.
Oanda WebSocket Sample
import asyncio
import json
import aiohttp
from rich.pretty import pprint
async def main():
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
"wss://widget.oanda.jp/ws/app/oanda_websocket?protocol=7", heartbeat=10.0
) as ws:
await ws.send_str(
r'{"event":"pusher:subscribe","data":{"auth":"","channel":"live-rate-fx"}}'
)
async for msg in ws:
data = msg.json()
if data["event"] == "live-rate":
data = json.loads(data["data"])
try:
usdjpy = next(
filter(lambda x: x["key"] == "USD_JPY", data["rates"])
)
pprint(usdjpy, expand_all=True)
except StopIteration:
pass
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
@MtkN1
Copy link
Author

MtkN1 commented Jul 7, 2022

Requirements

pip install -U aiohttp rich

@MtkN1
Copy link
Author

MtkN1 commented Jul 7, 2022

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment