-
-
Save empirasign/e19abc1649286167fce59f6645f71658 to your computer and use it in GitHub Desktop.
export Empirasign market data to INTEX market_data/market_color directory in .mktd format
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
intex_mktd.py | |
leverage the /api/all-bonds/ and /api/bonds/ API endpoints to fill up the INTEX | |
market_data/market_color directory with fresh market data for selected sectors | |
https://gist.github.com/empirasign/e19abc1649286167fce59f6645f71658 | |
THIS SCRIPT SHOULD BE PLATFORM INDEPENDENT | |
""" | |
import argparse | |
from collections import defaultdict | |
import datetime | |
import hashlib | |
import json | |
import logging | |
import logging.handlers | |
import math | |
import re | |
import sys | |
import tempfile | |
import requests | |
API_SCHEME = "https" | |
API_HOST = "www.empirasign.com" | |
PROXY_SERVER = '' # e.g. proxy.mycompany.net:8080 | |
# if you get a 407 Proxy Authentication Required error, you need to set | |
# PROXY_SERVER to something like username:password@proxy.mycompany.net:8080 | |
CHUNK_SIZE = 250 # chunk API bulk queries no larger than this | |
## START OF USER CONFIG | |
# API CREDENTIALS | |
APP_ID = 'MY_ID' | |
PASS = 'MY_PASS' | |
INTEX_MKTD_DIR = tempfile.gettempdir() | |
# The Default Location looks something like below | |
# C:\Users\WINDOWS_USERNAME\AppData\Roaming\intex\settings\market_data\market_color | |
# If your org has a shared location for Intex market_data, the path will be different | |
## END OF USER CONFIG | |
# https://www.openfigi.com/api#post-v3-mapping | |
# figi_securitytype | |
sectypes = ("MBS 30yr", "Agncy CMO Other", "MBS 15yr", "MBS Other", "MBS ARM", "Prvt CMO Other", | |
"Agncy CMO IO", "CF", "CMBS", "MBS 20yr", "ABS Other", "Agncy CMO FLT", "Prvt CMO FLT", | |
"Agncy CMO Z", "Agncy CMBS", "ABS Home", "Agncy CMO INV", "Prvt CMO IO", | |
"Agncy CMO PO", "MBS 10yr", "ABS Auto", "SBA Pool", "MBS balloon", "Prvt CMO PO", | |
"Agncy ABS Other", "ABS Card", "Prvt CMO Z", "SN", "Prvt CMO INV", "HB", | |
"Agncy ABS Home", "Canadian", "MV") | |
# figi_securitytype2 | |
sectypes2 = ("Pool", "CMO", "Whole Loan", "ABS", "TBA", "CMBS", "LL", "ABS Other", "MML", "CRE", | |
"SME", "ABS/MEZZ", "ABS/HG", "TRP", "HY", "CDO2", "RMBS", "IG", "CDS", "TRP/REIT", | |
"CDS(CRP)", "MEZZ", "CDS(ABS)", "2ND LIEN", "TRP/BK", "SME/MEZZ") | |
rfh = logging.handlers.RotatingFileHandler(filename=tempfile.gettempdir() + "/intex_mktd.log", | |
maxBytes=5000000) | |
logging.basicConfig(level=logging.INFO, | |
format="%(asctime)s - %(levelname)-8s - %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S", | |
handlers=[rfh, logging.StreamHandler(sys.stdout)]) | |
logger = logging.getLogger() | |
## Empirasign API interaction functions | |
def _proxies_dict(proxy_server): | |
""" | |
return proxy dictionary as needed by requests library | |
if this helper function is not comprehensive enough for your use case, consult | |
http://docs.python-requests.org/en/latest/user/advanced/#proxies | |
""" | |
if proxy_server: | |
return {'https': 'http://' + PROXY_SERVER} | |
return {} | |
def _make_req_sig(args): | |
""" | |
generate request signature | |
as a convenience, datetime.date objects autoconvert to YYYYMMDD strings | |
""" | |
args = [arg.strftime("%Y%m%d") if isinstance(arg, datetime.date) else arg for arg in args] | |
sig_keys = [APP_ID] + args + [PASS] | |
return hashlib.sha1("".join(sig_keys).encode('utf-8')).hexdigest() | |
## copied from https://gist.github.com/empirasign/cbd204ae7f45bc4eadae | |
def get_bonds(uids, d0=None, d1=None, fmt='json', nport=False): | |
""" | |
get bwic and dealer runs data for a list of cusips/ISINs/BBG Tickers | |
the maxium length of uids processed by server is 200, items 201 and | |
higher will be ignored. | |
d0 and d1 are not required, they indicated start and end (inclusive) | |
of date range to search, must be datetime.date type | |
fmt can be json or csv | |
""" | |
api_url = '{}://{}/api/bonds/'.format(API_SCHEME, API_HOST) | |
#compute the request signature | |
req_sig = _make_req_sig([",".join(uids), datetime.date.today()]) | |
req_params = {'app_id': APP_ID, 'req_sig': req_sig, 'nport': nport, 'bonds': ",".join(uids)} | |
if d0 and d1: | |
req_params['d0'] = d0.strftime("%Y%m%d") | |
req_params['d1'] = d1.strftime("%Y%m%d") | |
if fmt.lower() != 'json': | |
req_params['csv'] = True | |
resp = requests.post(api_url, data=req_params, proxies=_proxies_dict(PROXY_SERVER)) | |
if fmt.lower() == 'json': | |
return resp.json() | |
return resp.text | |
def get_all_bonds(dt, figi_marketsector, kind=None, fmt='json'): | |
""" | |
return a list of all bonds that appeared on BWICs or Dealer Runs for a given date / sector | |
figi_marketsector is exactly equivalent to marketSecDes as defined by OpenFIGI | |
https://www.openfigi.com/api#openapi-schema | |
kind = bwics (show only list of all BWIC bonds for a given date) | |
kind = runs (show only list of all runs bonds for a given date) | |
fmt can be json or csv | |
""" | |
api_url = '{}://{}/api/all-bonds/'.format(API_SCHEME, API_HOST) | |
dt = dt.strftime("%Y%m%d") | |
# compute the request signature | |
req_sig = _make_req_sig([figi_marketsector, dt]) | |
url_params = { | |
'app_id': APP_ID, | |
'figi_marketsector': figi_marketsector, | |
'dt': dt, | |
'req_sig': req_sig | |
} | |
if kind is not None: | |
url_params["kind"] = kind | |
if fmt.lower() != 'json': | |
url_params['csv'] = True | |
resp = requests.get(api_url, url_params, proxies=_proxies_dict(PROXY_SERVER)) | |
if fmt.lower() == 'json': | |
return resp.json() | |
return resp.text | |
def chunker(lst, chunk_size): | |
""" | |
break down large lists into managable chunks | |
""" | |
chunks = [] | |
for i in range(0, len(lst), chunk_size): | |
chunks.append(lst[i:i + chunk_size]) | |
return chunks | |
def _comp_px(handle, ticks): | |
""" | |
helper function for price32_to_dec | |
""" | |
tot_px = 0 | |
if ticks[-1] == "+": | |
tot_px += .5 / 32.0 | |
ticks = ticks[:-1] | |
if len(ticks) == 3: | |
tot_px += float(ticks[-1]) / 8.0 / 32.0 | |
tot_px += float(ticks[:2]) / 32.0 | |
else: | |
tot_px += float(ticks) / 32.0 | |
tot_px += float(handle) | |
return tot_px | |
def price32_to_dec(price32): | |
""" | |
convert a well-formed price32 or price with handle suffix to a | |
decimal price | |
""" | |
if not price32: | |
return None | |
if isinstance(price32, (int, float)): | |
return float(price32) | |
m = re.search(r"( *\$ *)(.+)", price32) | |
if m: | |
price32 = m.group(2) | |
try: | |
price32 = float(price32) | |
return price32 | |
except Exception: # pylint: disable=broad-except | |
pass | |
m = re.search(r'^(\d+)-?h$', price32, re.I) | |
if m: | |
return int(m.group(1)) + 0.25 | |
m = re.search(r'^(\d+)-$', price32, re.I) | |
if m: | |
return float(m.group(1)) | |
# negative price32 - specs | |
m = re.search(r'^\-\d', price32, re.I) | |
if m: | |
parts = price32.strip().split("-")[-2:] | |
if len(parts) == 2 and "" not in parts: | |
try: | |
return _comp_px(*parts) * -1 | |
except ValueError: | |
pass | |
return None | |
price32 = price32.lower().replace("a", "") | |
parts = price32.strip().split("-") | |
if len(parts) == 2: | |
try: | |
return _comp_px(*parts) | |
except ValueError: | |
pass | |
return None | |
def _to_intex(item): | |
""" | |
take market data item from /api/bonds/ end point and covert to format | |
suitable for .mktd files for INTEXcalc | |
keys: Tranche ID, As Of, Quote, Quote Type, Color, Provider | |
""" | |
required_fields = ["Tranche ID", "As Of", "Color", "Quote", "Quote Type"] | |
idata = {"Tranche ID": item["isin"]} | |
# handle nport data | |
if item['kind'] == 'nport': | |
idata["As Of"] = item["repPdDate"] + "T08:00:00-05:00" | |
idata["Color"] = "N-PORT" | |
idata["Provider"] = "EDGAR" | |
balance_usd = item['balance'] / item['balanceExchangeRt'] | |
idata["Size"] = round(balance_usd, 3) | |
try: | |
idata["Quote"] = round(100 * item['valUSD'] / balance_usd, 3) | |
except ZeroDivisionError: | |
idata["Quote"] = None | |
idata["Quote Type"] = "Price" | |
idata["Fund-Name"] = item.get("seriesName") | |
idata["Fund-Company"] = item.get("regName") | |
return idata | |
if item["kind"] in ("bwic", "pxtalk", "offer"): | |
idata["Size"] = round(item["of"] * 1e6) | |
elif item["kind"] == "bid": | |
idata["Size"] = round(item["of_bid"] * 1e6) | |
if item["kind"] in ("bwic", "pxtalk"): | |
idata["As Of"] = item["trade_datetime_utc"][:-1] + "-00:00" | |
else: | |
if isinstance(item["trade_dt"], datetime.date): | |
item["trade_dt"] = item["trade_dt"].strftime("%Y-%m-%d") | |
idata["As Of"] = item["trade_dt"] + "T08:00:00-05:00" # assume 8AM EST | |
if item["kind"] == "bwic": | |
idata["Provider"] = item["list_id"] | |
else: | |
idata["Provider"] = item.get("dealer", "unk") | |
# handle price or spread data | |
if item["kind"] in ("bwic", "pxtalk", "offer"): | |
if item.get("price") is not None: | |
idata["Quote"] = item["price"] | |
idata["Quote Type"] = "Price" | |
elif item.get("price32") is not None: | |
idata["Quote"] = price32_to_dec(item["price32"]) | |
idata["Quote Type"] = "Price" | |
elif item.get("spread_dec") is not None: | |
idata["Quote"] = item["spread_dec"] | |
idata["curve"] = item.get("curve") | |
idata["Quote Type"] = "Spread" | |
elif item.get("spread") is not None: | |
idata["Quote"] = item["spread"] | |
idata["curve"] = item.get("curve") | |
idata["Quote Type"] = "Spread" | |
if item["kind"] == "offer": | |
idata["Color"] = "Offer" | |
elif item["kind"] == "pxtalk": | |
idata["Color"] = "PX Talk" | |
elif item["kind"] == 'bwic' and item['color']: | |
idata["Color"] = item['color'] | |
elif item["kind"] == "bid": | |
if item.get("price_bid") is not None: | |
idata["Quote"] = item["price_bid"] | |
idata["Quote Type"] = "Price" | |
elif item.get("price32_bid") is not None: | |
idata["Quote"] = price32_to_dec(item["price32_bid"]) | |
idata["Quote Type"] = "Price" | |
elif item.get("spread_bid") is not None: | |
idata["Quote"] = item["spread_bid"] | |
idata["Quote Type"] = "Spread" | |
idata["curve"] = item.get("curve") | |
idata["Color"] = "Bid" | |
else: | |
# market is not yet implemented | |
return {} | |
if all(key in idata for key in required_fields): | |
return idata | |
return {} | |
def split_mkts(item): | |
""" | |
split 2 way mkts into bid and offer quotes | |
""" | |
bid_keys = ['of_bid', 'cf_bid', 'price_bid', 'price32_bid', 'spread_bid'] | |
offer_keys = ['of', 'cf', 'price', 'price32', 'spread'] | |
if any(x for x in bid_keys if x in item) and any(x for x in offer_keys if x in item): | |
bid = {x: item[x] for x in item if x not in offer_keys} | |
offer = {x: item[x] for x in item if x not in bid_keys} | |
return offer, bid | |
return item, None | |
def main(): | |
""" | |
the main event | |
""" | |
parser = argparse.ArgumentParser(description='INTEXcalc Market Data', | |
epilog=""" | |
When listing sectors with spaces, surround each sector with quotes, e.g. | |
python intex_mktd.py --sectype 'ABS Auto' | |
python intex_mktd.py --sectype 'ABS Auto' SN 'MBS 15yr' | |
""", | |
formatter_class=argparse.RawDescriptionHelpFormatter) | |
parser.add_argument('sectors', nargs='+', help="list of securitytype or securitytype2 sectors") | |
parser.add_argument("--sectype", | |
action='store_true', | |
dest="sectype", | |
help="filter sectors on OpenFIGI securitytype") | |
parser.add_argument("--sectype2", | |
action='store_true', | |
dest="sectype2", | |
help="filter sectors on OpenFIGI securitytype2") | |
args = parser.parse_args() | |
if args.sectype: | |
filter1 = set(args.sectors) | |
filter2 = [] | |
invalid_sectors = filter1 - set(sectypes) | |
elif args.sectype2: | |
filter1 = [] | |
filter2 = set(args.sectors) | |
invalid_sectors = filter2 - set(sectypes2) | |
else: | |
raise ValueError("must specify --sectype or --sectype2") | |
if invalid_sectors: | |
logger.error("Halting Execution: Invalid sector(s): %s", invalid_sectors) | |
sys.exit(1) | |
if not filter1 and not filter2: | |
logger.error("Halting Execution: No sectors selected") | |
sys.exit(1) | |
logger.info("sector filters: sectype1: %s, sectype2: %s", filter1, filter2) | |
resp = get_all_bonds(datetime.date.today(), "Mtge") | |
if resp["meta"]["results"] != "OK": | |
logger.error("api call error: %s", resp) | |
raise RuntimeError | |
req_left = resp["meta"]['requests_left'] | |
logger.info("total result set: %s bonds, quota remaining: %s", len(resp["bonds"]), req_left) | |
logger.info("sample list: %s", resp["bonds"][:10]) | |
if filter1: | |
query_bonds = [x["isin"] for x in resp["bonds"] if x["figi_securitytype"] in filter1] | |
else: | |
query_bonds = [x["isin"] for x in resp["bonds"] if x["figi_securitytype2"] in filter2] | |
# filter out any null results | |
query_bonds = list({x for x in query_bonds if x}) | |
if len(query_bonds) > req_left: | |
logger.warning("truncating bonds to query, %s requests left, but %s bonds to query", | |
req_left, len(query_bonds)) | |
for i, isins in enumerate(chunker(query_bonds, CHUNK_SIZE)): | |
logger.info("starting API query for chunk %s of %s", i + 1, | |
math.ceil(len(query_bonds) / CHUNK_SIZE)) | |
quotes = defaultdict(list) | |
if not req_left: | |
logger.error("early exit, no API quota left") | |
sys.exit(1) | |
resp = get_bonds(isins, nport=True) | |
if resp["meta"]["results"] != "OK": | |
logger.error("api call error: %s", resp) | |
raise RuntimeError | |
req_left = resp["meta"]['requests_left'] | |
# loop over the the results | |
for quote in resp["data"]: | |
quotes[quote["isin"]].append(quote) | |
for isin in quotes: | |
export_quotes = [] | |
for quote in quotes[isin]: | |
# convert to intex mktd format | |
quote, quote2 = split_mkts(quote) | |
cur_quote = _to_intex(quote) | |
if cur_quote: | |
export_quotes.append(cur_quote) | |
if quote2: | |
cur_quote2 = _to_intex(quote2) | |
if cur_quote2: | |
export_quotes.append(cur_quote2) | |
if export_quotes: | |
data_path = INTEX_MKTD_DIR + "/" + isin + ".mktd" | |
with open(data_path, "wb") as fp: | |
fp.write( | |
json.dumps(export_quotes, ensure_ascii=False, indent=2).encode("utf-8")) | |
logger.info("saved bond data for: %s, num_quotes: %s path: %s", isin, | |
len(export_quotes), data_path) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment