Skip to content

Instantly share code, notes, and snippets.

@cheekybastard
Created January 20, 2022 07:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cheekybastard/ad9d470dd8643bb1fc2274d768c53100 to your computer and use it in GitHub Desktop.
Save cheekybastard/ad9d470dd8643bb1fc2274d768c53100 to your computer and use it in GitHub Desktop.
"""
pip install httpx orjson websockets loguru chime
to run:
python -u "<folder_path>/wss_logger.py"
"""
import sys
import asyncio
import httpx
import orjson
import websockets
import chime
from loguru import logger
def make_filter(name: str):
"""loguru filter"""
def filter_logs(record):
return record["extra"].get("name") == name
return filter_logs
logger.add(
"wss_logger.log",
level="INFO",
# rotation="00:00",
rotation="10 MB",
retention="3 days",
format="{time} {level} {message}",
filter=make_filter("wss"),
colorize=True,
catch=True,
enqueue=True,
buffering=512,
diagnose=False,
)
logger.add(sys.stdout, colorize=True, format="<green>{time}</green> <blue>{level}</blue>")
LOG = logger.bind(name="wss")
LOG.info('wss_logger starting...')
chime.theme('zelda')
# uncomment TRADING_VARS, FROM_SYMBOLS, CLOSES for price data firehose.
# TRADING_VARS = httpx.get('https://gains-farm-v2-mainnet.herokuapp.com/trading-variables').json()
# FROM_SYMBOLS = [x['from'] for x in TRADING_VARS['pairs']]
async def gtrade_wss():
while True:
# https://websockets.readthedocs.io/en/stable/reference/client.html
async with websockets.connect(
'wss://gains-farm-v2-mainnet.herokuapp.com',
extra_headers={
'Pragma': 'no-cache',
'Connection': 'keep-alive, Upgrade',
'Cache-Control': 'no-cache',
'User-Agent': 'FBI: CSS Fashion Crimes Taskforce.'
},
ping_interval=20,
ping_timeout=20) as websocket:
while True:
try:
raw_msg = await asyncio.wait_for(websocket.recv(), timeout=3)
msg = orjson.loads(raw_msg)
if msg.get('closes'):
continue
# CLOSES eg: {'BTC': 41865.8, 'ETH': 3214.34, ...}
# CLOSES = dict(zip(FROM_SYMBOLS, msg['closes']))
# LOG.info(CLOSES)
if msg.get('name'):
if msg['name'] == 'currentBlock':
LOG.info(f'polygon: currentBlock - {msg["value"]}')
except asyncio.TimeoutError:
LOG.error("websocket - recv timed out after 3 seconds")
chime.error()
continue
except websockets.ConnectionClosed:
LOG.error(f"websockets.ConnectionClosed: connection closed, reconnecting")
chime.error()
break
except Exception as e:
LOG.exception(f"gtrade_wss: {e}")
chime.error()
continue
async def main():
try:
await asyncio.gather(gtrade_wss(), return_exceptions=True)
except Exception as e:
LOG.exception(f"main: {e}")
asyncio.run(main(), debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment