Skip to content

Instantly share code, notes, and snippets.

@empirasign
Last active September 11, 2023 19:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save empirasign/09cfba470f561f3b7c1e329a4e67a251 to your computer and use it in GitHub Desktop.
Save empirasign/09cfba470f561f3b7c1e329a4e67a251 to your computer and use it in GitHub Desktop.
Effective API use
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
bwic_poll.py fka bwic_watcher.py, named changed to avoid internal name clash
Construct a BWIC watcher database
https://gist.github.com/empirasign/09cfba470f561f3b7c1e329a4e67a251
PSEUDO CODE
start of loop
if after 6pm, exit
if before 6pm, continue
get all bwics for a sector
are they are any new bwics or changes to existing bwics
if yes:
run queries for bonds on new bwics or bwics with changes (e.g. new talk, or color)
write new data to local database (in this case, sqlite)
wait 60 seconds, return to top of loop
if no:
wait 120 seconds, return to top of loop.
at any time during the execution hit Ctrl-C to exit.
Sample Calls:
./bwic_watcher.py -s cmbs -C (watch CMBS sector, create new sqlite databse)
./bwic_watcher.py -s agcmo naresi (query Agency CMO and Non-Ag resi, use existing DB)
TODO:
add options for limited lookback on historical data
"""
import sys
import hashlib #needed to compute API request signatures
import datetime
import json
import sqlite3
import time
import argparse
import logging
import logging.handlers
import requests
API_HOST = "https://www.empirasign.com"
# API CREDENTIALS
APP_ID = 'MY_ID'
PASS = 'MY_PASS'
WORK_DIR = '' # where the sqlite and log files will be saved
DB_NAME = 'bwic_watcher.sqlite' # name of sqlite database file (all tables live here)
LOG_NAME = 'bwic_watcher.log'
# stop polling for new market data 21:30 UTC, or 5:30 EST
# this parameter needs to be adjusted when daylight savings time switches
STOP_HOUR = 21
STOP_MINUTE = 30
PROXY_SERVER = '' #e.g. proxy.mycompany.net:8080
# if you get a 407 Proxy Authentication Required error, you need to set
# PROXY_SERVER to something like username:password@proxy.mycompany.net:8080
if PROXY_SERVER:
# http://docs.python-requests.org/en/latest/user/advanced/#proxies
PROXIES_DICT = {'https': 'http://' + PROXY_SERVER}
else:
PROXIES_DICT = {}
rfh = logging.handlers.RotatingFileHandler(filename=WORK_DIR + '/' + LOG_NAME, maxBytes=5000000)
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)-8s - %(message)s",
datefmt="%y-%m-%d %H:%M:%S",
handlers=[rfh, logging.StreamHandler(sys.stdout)])
logger = logging.getLogger()
CUR_DT = datetime.date.today()
VALID_SECTORS = ('abs', 'agency', 'agarm', 'agcmo', 'clo', 'cmbs', 'conabs', 'euro', 'naresi',
'nonag')
# information about bwics
BWIC_COLS = (
('list_id', 'TEXT'),
("num_bonds", 'INTEGER'),
("num_color", 'INTEGER'),
("num_talk", 'INTEGER'),
("uids", 'TEXT') #json encoded list of cusips and/or isins
)
# information about bonds
TRADE_COLS = (
('kind', 'TEXT'), # bid, offer, market, bwic, pxtalk
('bbg_ticker', 'TEXT'),
('cusip', 'TEXT'),
('isin', 'TEXT'),
('of', 'REAL'),
('cf', 'REAL'),
('price', 'TEXT'),
('price32', 'TEXT'),
('spread', 'TEXT'),
('spread_dec', 'TEXT'),
('curve', 'TEXT'),
('of_bid', 'REAL'),
('cf_bid', 'REAL'),
('price_bid', 'TEXT'),
('price32_bid', 'TEXT'),
('spread_bid', 'TEXT'),
('color', 'TEXT'),
('dealer', 'TEXT'),
('list_id', 'TEXT'),
('trade_dt', 'TEXT'), #ISO 8601 FORMAT
('offer_dt', 'TEXT'), #ISO 8601 FORMAT
('bbgid', 'TEXT'),
('bsym_sec_type', 'TEXT'),
('settle_dt', 'TEXT'), #ISO 8601 FORMAT
('trade_datetime_utc', 'TEXT'), #ISO 8601 FORMAT
('dealer_notes', 'TEXT'),
)
def chunker(lst, chunk_size):
"""
break down large lists into managable chunks
"""
chunks = []
for i in range(0, len(lst), chunk_size):
chunks.append(lst[i:i + chunk_size])
return chunks
## Empirasign API interaction functions
def get_bwic_data(dt, sectors):
"Performs request on the specified singular sector and return all relevant data"
dt_str = dt.strftime("%Y%m%d")
data = {}
for sector in sectors:
api_url_bwic = API_HOST + "/api/bwic/"
#compute the request signature
sig_keys = [APP_ID, sector, dt_str, PASS]
req_sig = hashlib.sha1("".join(sig_keys).encode("utf-8")).hexdigest()
params = {'sector': sector, 'app_id': APP_ID, 'dt': dt_str, 'req_sig': req_sig}
resp = requests.get(api_url_bwic, params, proxies=PROXIES_DICT)
if resp.status_code != 200 or resp.json()['meta']['results'] != "OK":
logger.error("FAILURE! Query for %s, full url: %s", sector, api_url_bwic)
print(resp)
return []
for row in resp.json()['data']:
data[row['list_id']] = {
'uids': row['uids'],
'num_bonds': row['num_bonds'],
'num_color': row['num_color'],
'num_talk': row['num_talk'],
}
return data
def get_trade_data(dt, uids, group_size=200): # only works for today's date
"""
Get trade data for a list of cusips, uids can be of any length, but will
be split up in to groups of 200 (which is current max api query size
"""
group_size = min(group_size, 200) # enforce max group size of 200
api_url = API_HOST + "/api/bonds/"
data = []
for uid_grp in chunker(uids, group_size):
sig_keys = [APP_ID, ",".join(uid_grp), dt.strftime("%Y%m%d"), PASS]
req_sig = hashlib.sha1("".join(sig_keys).encode("utf-8")).hexdigest()
params = {'app_id': APP_ID, 'req_sig': req_sig, 'bonds': ",".join(uid_grp)}
resp = requests.post(api_url, data=params, proxies=PROXIES_DICT)
if resp.status_code == 200 and resp.json()['meta']['results'] == 'OK':
# do the conversion b/c we jam both cusips and isins into cusip slot
for row in resp.json()['data']:
if row.get('cusip') and len(row['cusip']) == 12:
row['isin'] = row['cusip']
row['cusip'] = None
data.append(row)
else:
logger.error("API Error: {}".format(resp.json()['meta']))
return []
return data
def requests_left():
"""
returns number of API requests left for the current day
(num_requests_left, errors_lst)
"""
api_url = API_HOST + "/api/mystatus/"
#compute the request signature
sig_keys = [APP_ID, datetime.date.today().strftime("%Y%m%d"), PASS]
req_sig = hashlib.sha1("".join(sig_keys).encode("utf-8")).hexdigest()
params = {'app_id': APP_ID, 'req_sig': req_sig}
resp = requests.get(api_url, params, proxies=PROXIES_DICT).json()
return resp["requests_left"], resp['meta']['errors']
# market data munging, diffing and database updating functions
def uids_from_bwics(bwics, items):
"""
used to make sure we only query unique cusips/isins as a bond may
appear on more than one bwic in a given day
"""
uids = []
for list_id in items:
uids.extend(bwics[list_id]['uids'])
return list(set(uids))
def get_changed_bwics(cursor, bwics):
"figure out which bwics are new, or have changes that need new data retrievals"
new_bwics = []
changed_bwics = []
# search for changes
for list_id in bwics:
# do the upsert
obj = dict(zip(list(bwics[list_id].keys()) + ['list_id'], list(bwics[list_id].values()) + [list_id]))
obj['uids'] = json.dumps(obj['uids']) # so we can put into sqlite
pks = ('list_id',)
res = upsert(cursor, 'bwics', pks, obj, ph="?")
if res == 'INSERT':
new_bwics.append(list_id)
elif res == 'UPDATE':
changed_bwics.append(list_id)
return new_bwics, changed_bwics
def get_changed_bonds(cursor, mdata):
"figure out which bwics are new, or have changes that need new data retrievals"
new_bonds, changed_bonds = [], []
pks = {
'bid': ('kind', 'bbg_ticker', 'trade_dt', 'dealer'),
'offer': ('kind', 'bbg_ticker', 'trade_dt', 'dealer'),
'market': ('kind', 'bbg_ticker', 'trade_dt', 'dealer'),
'bwic': ('kind', 'bbg_ticker', 'list_id'),
'pxtalk': ('kind', 'bbg_ticker', 'list_id', 'dealer')
}
for bond in mdata:
res = upsert(cursor, 'trade_data', pks[bond.get("kind", "bwic")], bond, ph="?")
if res == 'INSERT':
new_bonds.append(bond['bbg_ticker'])
elif res == 'UPDATE':
changed_bonds.append(bond['bbg_ticker'])
return new_bonds, changed_bonds
def utc_minutes():
"helper function for loop control, returns minutes after midnight UTC"
cur_time = datetime.datetime.utcnow()
return cur_time.hour * 60 + cur_time.minute
############### Database helper functions ####################
def make_table(tbl, cols):
"generate the SQL necessary to make an sqlite table"
valid_types = ('INTEGER', 'REAL', 'TEXT', 'BLOB')
if any((b.upper() not in valid_types for a, b in cols)):
print("you have an invalid column type")
return None
cols_str = ", ".join(
["`{}` {}".format(col_name, col_type.upper()) for col_name, col_type in cols])
return "CREATE TABLE {} ({});".format(tbl, cols_str)
def make_insertp(tbl, col_lst, ph="%s"):
"make an INSERT SQL command with placeholders"
col_str = ",".join(col_lst)
val_str = ",".join([
ph,
] * len(col_lst))
q = "INSERT INTO " + tbl + "(" + col_str + ") VALUES (" + val_str + ");"
return q
def make_update(tbl, data, pks, ph='%s'):
"execute a complete UPDATE SQL command"
match_clause = " AND ".join(["{} = {}".format(key, ph) for key in pks])
set_clause = ",".join(["{}={}".format(key, ph) for key in data])
q = "UPDATE {} SET {} WHERE {};".format(tbl, set_clause, match_clause)
return q
def upsert(cursor, tbl, pks, data, ph="%s", ignore_cols=None):
"""if record matches data on pks, do an update, otherwise do an insert
data is a dict
ignore_cols are for stuff like timestamps that will always change, but
just because it changes doesn't mean the record has changed
"""
if ignore_cols:
cols = set(data.keys()) - set(ignore_cols)
else:
cols = data.keys()
match_clause = []
query_keys = []
for key in cols:
if data[key] is None:
match_clause.append("{} IS NULL".format(key))
else:
match_clause.append("{} = {}".format(key, ph))
query_keys.append(key)
if match_clause:
match_clause = " AND ".join(match_clause)
else:
match_clause = "1" #this ensure dupes and error raised
q = "SELECT COUNT(*) FROM {} WHERE {};".format(tbl, match_clause)
cursor.execute(q, tuple([data[key] for key in query_keys]))
results = cursor.fetchall()[0][0]
if results > 1:
# we have dupes
raise ValueError('duplicate record: {}'.format(data))
elif results == 1:
return "NO CHANGES"
#check for partial match on pks
match_clause = " AND ".join(["{} = {}".format(key, ph) for key in pks])
q = "SELECT COUNT(*) FROM {} WHERE {};".format(tbl, match_clause)
cursor.execute(q, tuple([data[key] for key in pks]))
results = cursor.fetchall()[0][0]
if results > 1:
# our pks do not constitute uniqueness
raise ValueError(
'primary keys retrieve multiple records--uniqueness violated: {}'.format(pks))
elif results == 1:
q = make_update(tbl, data, pks, ph)
match_vals = [data[key] for key in pks]
cursor.execute(q, tuple(list(data.values()) + match_vals))
return 'UPDATE'
else:
#no matches found; so do an insert
q = make_insertp(tbl, data.keys(), ph)
cursor.execute(q, tuple(data.values()))
return 'INSERT'
def main():
"""
the main event
"""
parser = argparse.ArgumentParser()
parser.add_argument("--sectors", dest="sectors", nargs="+", type=str)
parser.add_argument("-C", dest='create_db', action="store_true")
parser.add_argument("-o",
"--one-shot",
action='store_true',
dest="one_shot",
help="run once, no looping")
args = parser.parse_args()
create_db = args.create_db
sectors = args.sectors
if "ALL" in args.sectors:
sectors = VALID_SECTORS
if not sectors:
logger.error("Halting Execution: You must select one or more of the following valid sectors: {}".format(VALID_SECTORS))
sys.exit()
invalid_sectors = set(sectors) - set(VALID_SECTORS)
if invalid_sectors:
logger.error("Halting Execution: Invalid sector(s): {}".format(invalid_sectors))
sys.exit()
if create_db:
conn = sqlite3.connect(WORK_DIR + DB_NAME)
cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS bwics;")
table_q = make_table('bwics', BWIC_COLS)
cursor.execute(table_q)
cursor.execute("DROP TABLE IF EXISTS trade_data;")
table_q = make_table('trade_data', TRADE_COLS)
cursor.execute(table_q)
# index columns for better performance
q = "CREATE INDEX idx_bbg_ticker ON trade_data(bbg_ticker ASC);"
cursor.execute(q)
q = "CREATE INDEX idx_list_id ON bwics(list_id ASC);"
cursor.execute(q)
conn.commit()
conn.close()
try:
conn = sqlite3.connect(WORK_DIR + DB_NAME)
cursor = conn.cursor()
run_number = 1
cur_time = utc_minutes()
while args.one_shot or cur_time < (STOP_HOUR * 60 + STOP_MINUTE):
quota, err = requests_left() # check number of api queries left
if err:
logger.error("API ERROR: {}".format(err))
sys.exit()
else:
logger.info("starting run: {}, queries remaining: {}".format(run_number, quota))
if quota < 50:
logger.warning("running low on queries, now exiting")
conn.close()
sys.exit()
bwics = get_bwic_data(CUR_DT, sectors)
new_bwics, changed_bwics = get_changed_bwics(cursor, bwics)
if new_bwics or changed_bwics:
conn.commit()
logger.info("{} new BWICs, {} changed BWICs".format(len(new_bwics), len(changed_bwics)))
uids = uids_from_bwics(bwics, set(new_bwics + changed_bwics))
# pull the market data for the uids (cusips or isins)
if len(uids) > quota:
uids = uids[:quota] #truncate number of queries made
logger.info("retrieving market data on {} unique bonds".format(len(uids)))
mdata = get_trade_data(CUR_DT, uids)
new_bonds, changed_bonds = get_changed_bonds(cursor, mdata)
if new_bonds or changed_bonds:
conn.commit()
logger.info("{} new bonds, {} changed bonds".format(len(new_bonds), len(changed_bonds)))
if args.one_shot:
logger.info("ran sector(s) %s once, now exiting main loop", sectors)
break
logger.info("waiting 60 seconds before next iteration")
time.sleep(60) # DO NOT POLL SERVER MORE THAN ONCE PER MINUTE
else:
logger.info("0 new BWICs, 0 changed BWICs")
if args.one_shot:
logger.info("ran sector(s) %s once, now exiting main loop", sectors)
break
logger.info("waiting 120 seconds before next iteration")
time.sleep(120) # IF NOTHING NEW, WAIT TWO MINUTES BEFORE NEXT POLL
cur_time = utc_minutes()
if not args.one_shot:
logger.info("it's after designated stop time {:02d}:{:02d} UTC, now exiting".format(STOP_HOUR, STOP_MINUTE))
except KeyboardInterrupt:
logger.warning("user hit CTRL-C, now exiting")
except Exception: # pylint: disable=broad-except
logger.exception("fatal error", exc_info=True)
finally:
print("closing sqlite connection")
conn.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment