-
-
Save empirasign/09cfba470f561f3b7c1e329a4e67a251 to your computer and use it in GitHub Desktop.
Effective API use
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
bwic_poll.py fka bwic_watcher.py, named changed to avoid internal name clash | |
Construct a BWIC watcher database | |
https://gist.github.com/empirasign/09cfba470f561f3b7c1e329a4e67a251 | |
PSEUDO CODE | |
start of loop | |
if after 6pm, exit | |
if before 6pm, continue | |
get all bwics for a sector | |
are they are any new bwics or changes to existing bwics | |
if yes: | |
run queries for bonds on new bwics or bwics with changes (e.g. new talk, or color) | |
write new data to local database (in this case, sqlite) | |
wait 60 seconds, return to top of loop | |
if no: | |
wait 120 seconds, return to top of loop. | |
at any time during the execution hit Ctrl-C to exit. | |
Sample Calls: | |
./bwic_watcher.py -s cmbs -C (watch CMBS sector, create new sqlite databse) | |
./bwic_watcher.py -s agcmo naresi (query Agency CMO and Non-Ag resi, use existing DB) | |
TODO: | |
add options for limited lookback on historical data | |
""" | |
import sys | |
import hashlib #needed to compute API request signatures | |
import datetime | |
import json | |
import sqlite3 | |
import time | |
import argparse | |
import logging | |
import logging.handlers | |
import requests | |
API_HOST = "https://www.empirasign.com" | |
# API CREDENTIALS | |
APP_ID = 'MY_ID' | |
PASS = 'MY_PASS' | |
WORK_DIR = '' # where the sqlite and log files will be saved | |
DB_NAME = 'bwic_watcher.sqlite' # name of sqlite database file (all tables live here) | |
LOG_NAME = 'bwic_watcher.log' | |
# stop polling for new market data 21:30 UTC, or 5:30 EST | |
# this parameter needs to be adjusted when daylight savings time switches | |
STOP_HOUR = 21 | |
STOP_MINUTE = 30 | |
PROXY_SERVER = '' #e.g. proxy.mycompany.net:8080 | |
# if you get a 407 Proxy Authentication Required error, you need to set | |
# PROXY_SERVER to something like username:password@proxy.mycompany.net:8080 | |
if PROXY_SERVER: | |
# http://docs.python-requests.org/en/latest/user/advanced/#proxies | |
PROXIES_DICT = {'https': 'http://' + PROXY_SERVER} | |
else: | |
PROXIES_DICT = {} | |
rfh = logging.handlers.RotatingFileHandler(filename=WORK_DIR + '/' + LOG_NAME, maxBytes=5000000) | |
logging.basicConfig(level=logging.INFO, | |
format="%(asctime)s - %(levelname)-8s - %(message)s", | |
datefmt="%y-%m-%d %H:%M:%S", | |
handlers=[rfh, logging.StreamHandler(sys.stdout)]) | |
logger = logging.getLogger() | |
CUR_DT = datetime.date.today() | |
VALID_SECTORS = ('abs', 'agency', 'agarm', 'agcmo', 'clo', 'cmbs', 'conabs', 'euro', 'naresi', | |
'nonag') | |
# information about bwics | |
BWIC_COLS = ( | |
('list_id', 'TEXT'), | |
("num_bonds", 'INTEGER'), | |
("num_color", 'INTEGER'), | |
("num_talk", 'INTEGER'), | |
("uids", 'TEXT') #json encoded list of cusips and/or isins | |
) | |
# information about bonds | |
TRADE_COLS = ( | |
('kind', 'TEXT'), # bid, offer, market, bwic, pxtalk | |
('bbg_ticker', 'TEXT'), | |
('cusip', 'TEXT'), | |
('isin', 'TEXT'), | |
('of', 'REAL'), | |
('cf', 'REAL'), | |
('price', 'TEXT'), | |
('price32', 'TEXT'), | |
('spread', 'TEXT'), | |
('spread_dec', 'TEXT'), | |
('curve', 'TEXT'), | |
('of_bid', 'REAL'), | |
('cf_bid', 'REAL'), | |
('price_bid', 'TEXT'), | |
('price32_bid', 'TEXT'), | |
('spread_bid', 'TEXT'), | |
('color', 'TEXT'), | |
('dealer', 'TEXT'), | |
('list_id', 'TEXT'), | |
('trade_dt', 'TEXT'), #ISO 8601 FORMAT | |
('offer_dt', 'TEXT'), #ISO 8601 FORMAT | |
('bbgid', 'TEXT'), | |
('bsym_sec_type', 'TEXT'), | |
('settle_dt', 'TEXT'), #ISO 8601 FORMAT | |
('trade_datetime_utc', 'TEXT'), #ISO 8601 FORMAT | |
('dealer_notes', 'TEXT'), | |
) | |
def chunker(lst, chunk_size): | |
""" | |
break down large lists into managable chunks | |
""" | |
chunks = [] | |
for i in range(0, len(lst), chunk_size): | |
chunks.append(lst[i:i + chunk_size]) | |
return chunks | |
## Empirasign API interaction functions | |
def get_bwic_data(dt, sectors): | |
"Performs request on the specified singular sector and return all relevant data" | |
dt_str = dt.strftime("%Y%m%d") | |
data = {} | |
for sector in sectors: | |
api_url_bwic = API_HOST + "/api/bwic/" | |
#compute the request signature | |
sig_keys = [APP_ID, sector, dt_str, PASS] | |
req_sig = hashlib.sha1("".join(sig_keys).encode("utf-8")).hexdigest() | |
params = {'sector': sector, 'app_id': APP_ID, 'dt': dt_str, 'req_sig': req_sig} | |
resp = requests.get(api_url_bwic, params, proxies=PROXIES_DICT) | |
if resp.status_code != 200 or resp.json()['meta']['results'] != "OK": | |
logger.error("FAILURE! Query for %s, full url: %s", sector, api_url_bwic) | |
print(resp) | |
return [] | |
for row in resp.json()['data']: | |
data[row['list_id']] = { | |
'uids': row['uids'], | |
'num_bonds': row['num_bonds'], | |
'num_color': row['num_color'], | |
'num_talk': row['num_talk'], | |
} | |
return data | |
def get_trade_data(dt, uids, group_size=200): # only works for today's date | |
""" | |
Get trade data for a list of cusips, uids can be of any length, but will | |
be split up in to groups of 200 (which is current max api query size | |
""" | |
group_size = min(group_size, 200) # enforce max group size of 200 | |
api_url = API_HOST + "/api/bonds/" | |
data = [] | |
for uid_grp in chunker(uids, group_size): | |
sig_keys = [APP_ID, ",".join(uid_grp), dt.strftime("%Y%m%d"), PASS] | |
req_sig = hashlib.sha1("".join(sig_keys).encode("utf-8")).hexdigest() | |
params = {'app_id': APP_ID, 'req_sig': req_sig, 'bonds': ",".join(uid_grp)} | |
resp = requests.post(api_url, data=params, proxies=PROXIES_DICT) | |
if resp.status_code == 200 and resp.json()['meta']['results'] == 'OK': | |
# do the conversion b/c we jam both cusips and isins into cusip slot | |
for row in resp.json()['data']: | |
if row.get('cusip') and len(row['cusip']) == 12: | |
row['isin'] = row['cusip'] | |
row['cusip'] = None | |
data.append(row) | |
else: | |
logger.error("API Error: {}".format(resp.json()['meta'])) | |
return [] | |
return data | |
def requests_left(): | |
""" | |
returns number of API requests left for the current day | |
(num_requests_left, errors_lst) | |
""" | |
api_url = API_HOST + "/api/mystatus/" | |
#compute the request signature | |
sig_keys = [APP_ID, datetime.date.today().strftime("%Y%m%d"), PASS] | |
req_sig = hashlib.sha1("".join(sig_keys).encode("utf-8")).hexdigest() | |
params = {'app_id': APP_ID, 'req_sig': req_sig} | |
resp = requests.get(api_url, params, proxies=PROXIES_DICT).json() | |
return resp["requests_left"], resp['meta']['errors'] | |
# market data munging, diffing and database updating functions | |
def uids_from_bwics(bwics, items): | |
""" | |
used to make sure we only query unique cusips/isins as a bond may | |
appear on more than one bwic in a given day | |
""" | |
uids = [] | |
for list_id in items: | |
uids.extend(bwics[list_id]['uids']) | |
return list(set(uids)) | |
def get_changed_bwics(cursor, bwics): | |
"figure out which bwics are new, or have changes that need new data retrievals" | |
new_bwics = [] | |
changed_bwics = [] | |
# search for changes | |
for list_id in bwics: | |
# do the upsert | |
obj = dict(zip(list(bwics[list_id].keys()) + ['list_id'], list(bwics[list_id].values()) + [list_id])) | |
obj['uids'] = json.dumps(obj['uids']) # so we can put into sqlite | |
pks = ('list_id',) | |
res = upsert(cursor, 'bwics', pks, obj, ph="?") | |
if res == 'INSERT': | |
new_bwics.append(list_id) | |
elif res == 'UPDATE': | |
changed_bwics.append(list_id) | |
return new_bwics, changed_bwics | |
def get_changed_bonds(cursor, mdata): | |
"figure out which bwics are new, or have changes that need new data retrievals" | |
new_bonds, changed_bonds = [], [] | |
pks = { | |
'bid': ('kind', 'bbg_ticker', 'trade_dt', 'dealer'), | |
'offer': ('kind', 'bbg_ticker', 'trade_dt', 'dealer'), | |
'market': ('kind', 'bbg_ticker', 'trade_dt', 'dealer'), | |
'bwic': ('kind', 'bbg_ticker', 'list_id'), | |
'pxtalk': ('kind', 'bbg_ticker', 'list_id', 'dealer') | |
} | |
for bond in mdata: | |
res = upsert(cursor, 'trade_data', pks[bond.get("kind", "bwic")], bond, ph="?") | |
if res == 'INSERT': | |
new_bonds.append(bond['bbg_ticker']) | |
elif res == 'UPDATE': | |
changed_bonds.append(bond['bbg_ticker']) | |
return new_bonds, changed_bonds | |
def utc_minutes(): | |
"helper function for loop control, returns minutes after midnight UTC" | |
cur_time = datetime.datetime.utcnow() | |
return cur_time.hour * 60 + cur_time.minute | |
############### Database helper functions #################### | |
def make_table(tbl, cols): | |
"generate the SQL necessary to make an sqlite table" | |
valid_types = ('INTEGER', 'REAL', 'TEXT', 'BLOB') | |
if any((b.upper() not in valid_types for a, b in cols)): | |
print("you have an invalid column type") | |
return None | |
cols_str = ", ".join( | |
["`{}` {}".format(col_name, col_type.upper()) for col_name, col_type in cols]) | |
return "CREATE TABLE {} ({});".format(tbl, cols_str) | |
def make_insertp(tbl, col_lst, ph="%s"): | |
"make an INSERT SQL command with placeholders" | |
col_str = ",".join(col_lst) | |
val_str = ",".join([ | |
ph, | |
] * len(col_lst)) | |
q = "INSERT INTO " + tbl + "(" + col_str + ") VALUES (" + val_str + ");" | |
return q | |
def make_update(tbl, data, pks, ph='%s'): | |
"execute a complete UPDATE SQL command" | |
match_clause = " AND ".join(["{} = {}".format(key, ph) for key in pks]) | |
set_clause = ",".join(["{}={}".format(key, ph) for key in data]) | |
q = "UPDATE {} SET {} WHERE {};".format(tbl, set_clause, match_clause) | |
return q | |
def upsert(cursor, tbl, pks, data, ph="%s", ignore_cols=None): | |
"""if record matches data on pks, do an update, otherwise do an insert | |
data is a dict | |
ignore_cols are for stuff like timestamps that will always change, but | |
just because it changes doesn't mean the record has changed | |
""" | |
if ignore_cols: | |
cols = set(data.keys()) - set(ignore_cols) | |
else: | |
cols = data.keys() | |
match_clause = [] | |
query_keys = [] | |
for key in cols: | |
if data[key] is None: | |
match_clause.append("{} IS NULL".format(key)) | |
else: | |
match_clause.append("{} = {}".format(key, ph)) | |
query_keys.append(key) | |
if match_clause: | |
match_clause = " AND ".join(match_clause) | |
else: | |
match_clause = "1" #this ensure dupes and error raised | |
q = "SELECT COUNT(*) FROM {} WHERE {};".format(tbl, match_clause) | |
cursor.execute(q, tuple([data[key] for key in query_keys])) | |
results = cursor.fetchall()[0][0] | |
if results > 1: | |
# we have dupes | |
raise ValueError('duplicate record: {}'.format(data)) | |
elif results == 1: | |
return "NO CHANGES" | |
#check for partial match on pks | |
match_clause = " AND ".join(["{} = {}".format(key, ph) for key in pks]) | |
q = "SELECT COUNT(*) FROM {} WHERE {};".format(tbl, match_clause) | |
cursor.execute(q, tuple([data[key] for key in pks])) | |
results = cursor.fetchall()[0][0] | |
if results > 1: | |
# our pks do not constitute uniqueness | |
raise ValueError( | |
'primary keys retrieve multiple records--uniqueness violated: {}'.format(pks)) | |
elif results == 1: | |
q = make_update(tbl, data, pks, ph) | |
match_vals = [data[key] for key in pks] | |
cursor.execute(q, tuple(list(data.values()) + match_vals)) | |
return 'UPDATE' | |
else: | |
#no matches found; so do an insert | |
q = make_insertp(tbl, data.keys(), ph) | |
cursor.execute(q, tuple(data.values())) | |
return 'INSERT' | |
def main(): | |
""" | |
the main event | |
""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--sectors", dest="sectors", nargs="+", type=str) | |
parser.add_argument("-C", dest='create_db', action="store_true") | |
parser.add_argument("-o", | |
"--one-shot", | |
action='store_true', | |
dest="one_shot", | |
help="run once, no looping") | |
args = parser.parse_args() | |
create_db = args.create_db | |
sectors = args.sectors | |
if "ALL" in args.sectors: | |
sectors = VALID_SECTORS | |
if not sectors: | |
logger.error("Halting Execution: You must select one or more of the following valid sectors: {}".format(VALID_SECTORS)) | |
sys.exit() | |
invalid_sectors = set(sectors) - set(VALID_SECTORS) | |
if invalid_sectors: | |
logger.error("Halting Execution: Invalid sector(s): {}".format(invalid_sectors)) | |
sys.exit() | |
if create_db: | |
conn = sqlite3.connect(WORK_DIR + DB_NAME) | |
cursor = conn.cursor() | |
cursor.execute("DROP TABLE IF EXISTS bwics;") | |
table_q = make_table('bwics', BWIC_COLS) | |
cursor.execute(table_q) | |
cursor.execute("DROP TABLE IF EXISTS trade_data;") | |
table_q = make_table('trade_data', TRADE_COLS) | |
cursor.execute(table_q) | |
# index columns for better performance | |
q = "CREATE INDEX idx_bbg_ticker ON trade_data(bbg_ticker ASC);" | |
cursor.execute(q) | |
q = "CREATE INDEX idx_list_id ON bwics(list_id ASC);" | |
cursor.execute(q) | |
conn.commit() | |
conn.close() | |
try: | |
conn = sqlite3.connect(WORK_DIR + DB_NAME) | |
cursor = conn.cursor() | |
run_number = 1 | |
cur_time = utc_minutes() | |
while args.one_shot or cur_time < (STOP_HOUR * 60 + STOP_MINUTE): | |
quota, err = requests_left() # check number of api queries left | |
if err: | |
logger.error("API ERROR: {}".format(err)) | |
sys.exit() | |
else: | |
logger.info("starting run: {}, queries remaining: {}".format(run_number, quota)) | |
if quota < 50: | |
logger.warning("running low on queries, now exiting") | |
conn.close() | |
sys.exit() | |
bwics = get_bwic_data(CUR_DT, sectors) | |
new_bwics, changed_bwics = get_changed_bwics(cursor, bwics) | |
if new_bwics or changed_bwics: | |
conn.commit() | |
logger.info("{} new BWICs, {} changed BWICs".format(len(new_bwics), len(changed_bwics))) | |
uids = uids_from_bwics(bwics, set(new_bwics + changed_bwics)) | |
# pull the market data for the uids (cusips or isins) | |
if len(uids) > quota: | |
uids = uids[:quota] #truncate number of queries made | |
logger.info("retrieving market data on {} unique bonds".format(len(uids))) | |
mdata = get_trade_data(CUR_DT, uids) | |
new_bonds, changed_bonds = get_changed_bonds(cursor, mdata) | |
if new_bonds or changed_bonds: | |
conn.commit() | |
logger.info("{} new bonds, {} changed bonds".format(len(new_bonds), len(changed_bonds))) | |
if args.one_shot: | |
logger.info("ran sector(s) %s once, now exiting main loop", sectors) | |
break | |
logger.info("waiting 60 seconds before next iteration") | |
time.sleep(60) # DO NOT POLL SERVER MORE THAN ONCE PER MINUTE | |
else: | |
logger.info("0 new BWICs, 0 changed BWICs") | |
if args.one_shot: | |
logger.info("ran sector(s) %s once, now exiting main loop", sectors) | |
break | |
logger.info("waiting 120 seconds before next iteration") | |
time.sleep(120) # IF NOTHING NEW, WAIT TWO MINUTES BEFORE NEXT POLL | |
cur_time = utc_minutes() | |
if not args.one_shot: | |
logger.info("it's after designated stop time {:02d}:{:02d} UTC, now exiting".format(STOP_HOUR, STOP_MINUTE)) | |
except KeyboardInterrupt: | |
logger.warning("user hit CTRL-C, now exiting") | |
except Exception: # pylint: disable=broad-except | |
logger.exception("fatal error", exc_info=True) | |
finally: | |
print("closing sqlite connection") | |
conn.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment