Skip to content

Instantly share code, notes, and snippets.

@empirasign
Last active September 11, 2023 18:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save empirasign/c8640a60011fde1a9a9b4643b9736475 to your computer and use it in GitHub Desktop.
Save empirasign/c8640a60011fde1a9a9b4643b9736475 to your computer and use it in GitHub Desktop.
example script on how to download all market data for a given date
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
eod_data_grab.py
More. More data. More. Do it!
- Max Zorin (apocryphal)
get all the current data for all the bonds on a given date
most up to date copy can be found here:
https://gist.github.com/empirasign/c8640a60011fde1a9a9b4643b9736475
"""
import argparse
import datetime
import hashlib
import logging
import sqlite3
import time
import requests
API_HOST = "https://www.empirasign.com"
# API CREDENTIALS
APP_ID = 'MY_ID'
PASS = 'MY_PASS'
PROXY_SERVER = '' #e.g. proxy.mycompany.net:8080
# if you get a 407 Proxy Authentication Required error, you need to set
# PROXY_SERVER to something like username:password@proxy.mycompany.net:8080if PROXY_SERVER:
if PROXY_SERVER:
# http://docs.python-requests.org/en/latest/user/advanced/#proxies
PROXIES_DICT = {'https': 'http://' + PROXY_SERVER}
else:
PROXIES_DICT = {}
# information about bonds
TRADE_COLS = (
('kind', 'TEXT'), # bid, offer, market, bwic, pxtalk
('bbg_ticker', 'TEXT'),
('cusip', 'TEXT'),
('isin', 'TEXT'),
('of', 'REAL'),
('cf', 'REAL'),
('price', 'TEXT'),
('price32', 'TEXT'),
('spread', 'TEXT'),
('spread_dec', 'TEXT'),
('curve', 'TEXT'),
('of_bid', 'REAL'),
('cf_bid', 'REAL'),
('price_bid', 'TEXT'),
('price32_bid', 'TEXT'),
('spread_bid', 'TEXT'),
('color', 'TEXT'),
('dealer', 'TEXT'),
('list_id', 'TEXT'),
('trade_dt', 'TEXT'), #ISO 8601 FORMAT
('offer_dt', 'TEXT'), #ISO 8601 FORMAT
('bbgid', 'TEXT'),
)
BULK_CHUNK_SIZE = 500 # maximum number of bond identifiers per bulk data request
def chunker(lst, chunk_size):
"""
break down large lists into managable chunks
"""
chunks = []
for i in range(0, len(lst), chunk_size):
chunks.append(lst[i:i + chunk_size])
return chunks
## Empirasign API interaction functions
def get_all_bonds(dt, figi_marketsector, kind=None):
"""
return a list of all bonds that appeared on BWICs or Dealer Runs for a given date / sector
figi_marketsector is exactly equivalent to marketSecDes as defined by OpenFIGI
kind: None (all data), bwics (bwics only), runs (runs only)
https://www.openfigi.com/api#openapi-schema
"""
api_url = API_HOST + "/api/all-bonds/"
# compute the request signature
sig_keys = [APP_ID, figi_marketsector, dt.strftime("%Y%m%d"), PASS]
req_sig = hashlib.sha1("".join(sig_keys).encode("utf-8")).hexdigest()
url_params = {
'app_id': APP_ID,
'figi_marketsector': figi_marketsector,
'dt': dt.strftime("%Y%m%d"),
'req_sig': req_sig
}
if kind is not None:
url_params["kind"] = kind
resp = requests.get(api_url, url_params, proxies=PROXIES_DICT)
return resp.json()["bonds"]
def get_trade_data(dt, uids):
"""
Get trade data for a list of cusips, uids can be of any length, but will
be split up in to groups of 500 (which is current max api query size
"""
if len(uids) > BULK_CHUNK_SIZE:
raise ValueError("cannot pass in more than {} identifiers".format(BULK_CHUNK_SIZE))
api_url = API_HOST + "/api/bonds/"
data = []
# the date used here is the current date and not necessarily the query date!
sig_keys = [APP_ID, ",".join(uids), datetime.date.today().strftime("%Y%m%d"), PASS]
req_sig = hashlib.sha1("".join(sig_keys).encode("utf-8")).hexdigest()
params = {
'd0': dt.strftime("%Y%m%d"),
'd1': dt.strftime("%Y%m%d"),
'app_id': APP_ID,
'req_sig': req_sig,
'bonds': ",".join(uids)
}
resp = requests.post(api_url, data=params, proxies=PROXIES_DICT)
if resp.status_code == 200 and resp.json()['meta']['results'] == 'OK':
# do the conversion b/c we jam both cusips and isins into cusip slot
logging.info("http status code: %s, num records: %s", 200, len(resp.json()['data']))
for row in resp.json()['data']:
if row.get('cusip') and len(row['cusip']) == 12:
row['isin'] = row['cusip']
row['cusip'] = None
data.append(row)
else:
err_msg = "API Error: {}".format(resp.json()['meta'])
logging.error(err_msg)
return resp.status_code, data
############### Database helper functions ####################
def make_table(tbl, cols):
"generate the SQL necessary to make an sqlite table"
valid_types = ('INTEGER', 'REAL', 'TEXT', 'BLOB')
if any((b.upper() not in valid_types for a, b in cols)):
print("you have an invalid column type")
return None
cols_str = ", ".join(
["`{}` {}".format(col_name, col_type.upper()) for col_name, col_type in cols])
return "CREATE TABLE {} ({});".format(tbl, cols_str)
def make_insertp(tbl, col_lst, ph="%s"):
"make an INSERT SQL command with placeholders"
col_str = ", ".join(["`{}`".format(col) for col in col_lst])
val_str = ", ".join([ph] * len(col_lst))
return "INSERT INTO " + tbl + "(" + col_str + ") VALUES (" + val_str + ");"
def main():
"""
the main event
"""
parser = argparse.ArgumentParser(description='get all the market data for a given date',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
example script constructed to illustrate the utility of the new /all-bonds/
API endpoint
""")
parser.add_argument('-d', dest='dt', help='trade date YYYY-MM-DD')
parser.add_argument('-o', dest='db_path', help='sqlite path, e.g. my_prices.sqlite')
args = parser.parse_args()
logging.basicConfig(filename="eod_data_grab.log",
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%SZ',
level=logging.INFO)
logging.Formatter.converter = time.gmtime
if args.dt:
trade_dt = datetime.datetime.strptime(args.dt, r"%Y-%m-%d")
else:
trade_dt = datetime.date.today()
# create the database
conn = sqlite3.connect(args.db_path)
cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS trade_data;")
table_q = make_table('trade_data', TRADE_COLS)
cursor.execute(table_q)
logging.info("made database %s and table", args.db_path)
# index columns for better performance
q = "CREATE INDEX idx_bbg_ticker ON trade_data(bbg_ticker);"
q = "CREATE INDEX idx_cusip ON trade_data(cusip);"
q = "CREATE INDEX idx_isin ON trade_data(isin);"
cursor.execute(q)
conn.commit()
logging.info("added indexed for bbg_ticker, cusip, and isin")
all_bonds = get_all_bonds(trade_dt, "Mtge")
isins = [x["isin"] for x in all_bonds if x.get("isin")]
logging.info("%s unique isins have market data for date: %s", len(isins), trade_dt)
q = make_insertp("trade_data", [col[0] for col in TRADE_COLS], "?")
for i, cur_isins in enumerate(chunker(isins, BULK_CHUNK_SIZE)):
logging.info("starting chunk: %s", i + 1)
http_status, data = get_trade_data(trade_dt, cur_isins)
if http_status == 200 and data:
ins_data = []
for row in data:
ins_data.append([row.get(col[0]) for col in TRADE_COLS])
cursor.executemany(q, ins_data)
logging.info("inserted %s market data records", len(ins_data))
conn.commit()
conn.close()
logging.info("job done, db: %s closed", args.db_path)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment