Skip to content

Instantly share code, notes, and snippets.

@pssolanki111
Last active November 15, 2023 16:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pssolanki111/07913e9c88ff8e80c8470871345c46c4 to your computer and use it in GitHub Desktop.
Save pssolanki111/07913e9c88ff8e80c8470871345c46c4 to your computer and use it in GitHub Desktop.
This is an advanced use case for streaming a high number of symbols on polygon websockets. This uses a combination of uvloop, orjson, aioredis, grouping messages before transmitting and async streaming to achieve high throughput. This can be easily modified or extended to support simpler or more complex use cases. The key idea is keeping the str…
import asyncio
import polygon
from polygon.enums import StreamCluster
import config
import uvloop
from orjson import dumps
import aioredis
from datetime import datetime, time
# Specify our event loop policy
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
REDIS_URL = f'redis://{config.redis_host}:{config.redis_port}'
CLOSE_TIME = time(17, 5, 0)
class StockStreamer:
def __init__(self):
self.api_key, self.redis_pool, self.redis = config.polygon_key, None, None
self.stream_client = polygon.AsyncStreamClient(self.api_key, StreamCluster.STOCKS, max_memory_queue=None,
ping_timeout=None, ping_interval=None)
self.messages = []
async def initialize_redis_connections(self):
self.redis_pool = aioredis.ConnectionPool.from_url(REDIS_URL)
self.redis = aioredis.Redis(connection_pool=self.redis_pool, encoding='utf-8', decode_responses=True)
async def stock_trades_handler(self, msg: str):
"""
The handler function for the 'trades' service on stocks cluster
:param msg: the message received from stream client.
"""
asyncio.create_task(self.push_to_redis(msg))
# print(msg)
async def subscribe(self):
await self.stream_client.subscribe_stock_trades(handler_function=self.stock_trades_handler)
# await self.stream_client.change_handler('status', self.stock_trades_handler)
async def run(self):
while self.time_within_bounds():
await self.stream_client.handle_messages()
print(f'current time past the close time: {CLOSE_TIME} | Terminating... See Ya Tomorrow')
async def push_to_redis(self, msg: str):
"""
Coroutine to push messages to redis queue ``stocktrades``
:param msg: The message as received by the handler
"""
self.messages.append(msg)
# print(f'{len(self.messages)}. {msg}')
if len(self.messages) >= 5000:
temp = self.messages
self.messages = []
await self.redis.rpush('stocktrades', dumps(temp)) # encode to byte string from dictionary
print(f'{len(temp)} msgs pushed to redis at {datetime.now()}')
@staticmethod
def time_within_bounds() -> bool:
current_time = datetime.now().time()
if current_time <= CLOSE_TIME:
return True
return False
async def main():
streamer = StockStreamer()
await streamer.initialize_redis_connections()
await streamer.subscribe()
await streamer.run()
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment