-
-
Save rainey/2e1e95d47e6a37edd6ab79fa3a595020 to your computer and use it in GitHub Desktop.
restarting polygon socket
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import polygon | |
import pickle | |
import copy | |
import websocket | |
import time | |
import signal | |
import datetime | |
import os | |
from builtins import staticmethod | |
message_dir = "" | |
pg_client = None | |
def sigint_handler(sig, frame): | |
if pg_client: | |
print("Closing Client") | |
pg_client.close() | |
class log_recorder(object): | |
@staticmethod | |
def time_now(): | |
return int(time.time()*10000000) | |
def __init__(self, basedir="stream_log"): | |
self.basedir = basedir | |
create_time = log_recorder.time_now() | |
self.dir_out = "%s/%d" % (basedir, create_time) | |
os.mkdir(self.dir_out) | |
self.num_restarts = 0 | |
self.error_state = 0 | |
def incr_restarts(self): | |
create_time = log_recorder.time_now() | |
self.dir_out = "%s/%d" % (self.basedir, create_time) | |
os.mkdir(self.dir_out) | |
self.num_restarts += 1 | |
def record_message(self, message): | |
message_time = log_recorder.time_now() | |
with open("%s/%i.pkl" % ( self.dir_out, message_time), "wb") as f_out: | |
pickle.dump(message, f_out) | |
def record_message_fn(self): | |
def rm_fn(message): | |
print("(%i) Recording message %s" % (self.num_restarts, message)) | |
self.record_message(message) | |
return rm_fn | |
def Err_fn(self): | |
def polygon_err(ws, err): | |
print("Error %s" %( err)) | |
ws.close() | |
self.error_state = 1 | |
return polygon_err | |
def Onclose_Fn(self): | |
def polygon_onclose(ws): | |
print("Closed ") | |
self.error_state = 2 | |
return polygon_onclose | |
def start_pgclient(sub_list: [], lr : log_recorder): | |
polygon_key="secret" | |
pg_client = polygon.WebSocketClient(polygon.STOCKS_CLUSTER, | |
polygon_key, lr.record_message_fn(), | |
lr.Onclose_Fn(), | |
lr.Err_fn()) | |
pg_client.run_async() | |
pg_client.subscribe(*sub_list) | |
if __name__ == "__main__": | |
tickers_list = "FB,AMZN,AAPL,NFLX,GOOG".split(",") | |
lr = log_recorder() | |
list_Ticks = ["AM.%s" %(ticker) for ticker in tickers_list] | |
signal.signal(signal.SIGINT, sigint_handler) | |
start_pgclient(list_Ticks, lr) | |
print("Error state is %i" % (error_state)) | |
while True: | |
time.sleep(1) | |
if lr.error_state != 0: | |
lr.error_state = 0 | |
lr.incr_restarts() | |
start_pgclient(list_Ticks, lr) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment