Created
January 20, 2022 07:37
-
-
Save cheekybastard/ad9d470dd8643bb1fc2274d768c53100 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
pip install httpx orjson websockets loguru chime | |
to run: | |
python -u "<folder_path>/wss_logger.py" | |
""" | |
import sys | |
import asyncio | |
import httpx | |
import orjson | |
import websockets | |
import chime | |
from loguru import logger | |
def make_filter(name: str): | |
"""loguru filter""" | |
def filter_logs(record): | |
return record["extra"].get("name") == name | |
return filter_logs | |
logger.add( | |
"wss_logger.log", | |
level="INFO", | |
# rotation="00:00", | |
rotation="10 MB", | |
retention="3 days", | |
format="{time} {level} {message}", | |
filter=make_filter("wss"), | |
colorize=True, | |
catch=True, | |
enqueue=True, | |
buffering=512, | |
diagnose=False, | |
) | |
logger.add(sys.stdout, colorize=True, format="<green>{time}</green> <blue>{level}</blue>") | |
LOG = logger.bind(name="wss") | |
LOG.info('wss_logger starting...') | |
chime.theme('zelda') | |
# uncomment TRADING_VARS, FROM_SYMBOLS, CLOSES for price data firehose. | |
# TRADING_VARS = httpx.get('https://gains-farm-v2-mainnet.herokuapp.com/trading-variables').json() | |
# FROM_SYMBOLS = [x['from'] for x in TRADING_VARS['pairs']] | |
async def gtrade_wss(): | |
while True: | |
# https://websockets.readthedocs.io/en/stable/reference/client.html | |
async with websockets.connect( | |
'wss://gains-farm-v2-mainnet.herokuapp.com', | |
extra_headers={ | |
'Pragma': 'no-cache', | |
'Connection': 'keep-alive, Upgrade', | |
'Cache-Control': 'no-cache', | |
'User-Agent': 'FBI: CSS Fashion Crimes Taskforce.' | |
}, | |
ping_interval=20, | |
ping_timeout=20) as websocket: | |
while True: | |
try: | |
raw_msg = await asyncio.wait_for(websocket.recv(), timeout=3) | |
msg = orjson.loads(raw_msg) | |
if msg.get('closes'): | |
continue | |
# CLOSES eg: {'BTC': 41865.8, 'ETH': 3214.34, ...} | |
# CLOSES = dict(zip(FROM_SYMBOLS, msg['closes'])) | |
# LOG.info(CLOSES) | |
if msg.get('name'): | |
if msg['name'] == 'currentBlock': | |
LOG.info(f'polygon: currentBlock - {msg["value"]}') | |
except asyncio.TimeoutError: | |
LOG.error("websocket - recv timed out after 3 seconds") | |
chime.error() | |
continue | |
except websockets.ConnectionClosed: | |
LOG.error(f"websockets.ConnectionClosed: connection closed, reconnecting") | |
chime.error() | |
break | |
except Exception as e: | |
LOG.exception(f"gtrade_wss: {e}") | |
chime.error() | |
continue | |
async def main(): | |
try: | |
await asyncio.gather(gtrade_wss(), return_exceptions=True) | |
except Exception as e: | |
LOG.exception(f"main: {e}") | |
asyncio.run(main(), debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment