Skip to content

Instantly share code, notes, and snippets.

@empirasign
Last active December 26, 2023 16:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save empirasign/e19abc1649286167fce59f6645f71658 to your computer and use it in GitHub Desktop.
Save empirasign/e19abc1649286167fce59f6645f71658 to your computer and use it in GitHub Desktop.
export Empirasign market data to INTEX market_data/market_color directory in .mktd format
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
intex_mktd.py
leverage the /api/all-bonds/ and /api/bonds/ API endpoints to fill up the INTEX
market_data/market_color directory with fresh market data for selected sectors
https://gist.github.com/empirasign/e19abc1649286167fce59f6645f71658
THIS SCRIPT SHOULD BE PLATFORM INDEPENDENT
"""
import argparse
from collections import defaultdict
import datetime
import hashlib
import json
import logging
import logging.handlers
import math
import re
import sys
import tempfile
import requests
API_SCHEME = "https"
API_HOST = "www.empirasign.com"
PROXY_SERVER = '' # e.g. proxy.mycompany.net:8080
# if you get a 407 Proxy Authentication Required error, you need to set
# PROXY_SERVER to something like username:password@proxy.mycompany.net:8080
CHUNK_SIZE = 250 # chunk API bulk queries no larger than this
## START OF USER CONFIG
# API CREDENTIALS
APP_ID = 'MY_ID'
PASS = 'MY_PASS'
INTEX_MKTD_DIR = tempfile.gettempdir()
# The Default Location looks something like below
# C:\Users\WINDOWS_USERNAME\AppData\Roaming\intex\settings\market_data\market_color
# If your org has a shared location for Intex market_data, the path will be different
## END OF USER CONFIG
# https://www.openfigi.com/api#post-v3-mapping
# figi_securitytype
sectypes = ("MBS 30yr", "Agncy CMO Other", "MBS 15yr", "MBS Other", "MBS ARM", "Prvt CMO Other",
"Agncy CMO IO", "CF", "CMBS", "MBS 20yr", "ABS Other", "Agncy CMO FLT", "Prvt CMO FLT",
"Agncy CMO Z", "Agncy CMBS", "ABS Home", "Agncy CMO INV", "Prvt CMO IO",
"Agncy CMO PO", "MBS 10yr", "ABS Auto", "SBA Pool", "MBS balloon", "Prvt CMO PO",
"Agncy ABS Other", "ABS Card", "Prvt CMO Z", "SN", "Prvt CMO INV", "HB",
"Agncy ABS Home", "Canadian", "MV")
# figi_securitytype2
sectypes2 = ("Pool", "CMO", "Whole Loan", "ABS", "TBA", "CMBS", "LL", "ABS Other", "MML", "CRE",
"SME", "ABS/MEZZ", "ABS/HG", "TRP", "HY", "CDO2", "RMBS", "IG", "CDS", "TRP/REIT",
"CDS(CRP)", "MEZZ", "CDS(ABS)", "2ND LIEN", "TRP/BK", "SME/MEZZ")
rfh = logging.handlers.RotatingFileHandler(filename=tempfile.gettempdir() + "/intex_mktd.log",
maxBytes=5000000)
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)-8s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[rfh, logging.StreamHandler(sys.stdout)])
logger = logging.getLogger()
## Empirasign API interaction functions
def _proxies_dict(proxy_server):
"""
return proxy dictionary as needed by requests library
if this helper function is not comprehensive enough for your use case, consult
http://docs.python-requests.org/en/latest/user/advanced/#proxies
"""
if proxy_server:
return {'https': 'http://' + PROXY_SERVER}
return {}
def _make_req_sig(args):
"""
generate request signature
as a convenience, datetime.date objects autoconvert to YYYYMMDD strings
"""
args = [arg.strftime("%Y%m%d") if isinstance(arg, datetime.date) else arg for arg in args]
sig_keys = [APP_ID] + args + [PASS]
return hashlib.sha1("".join(sig_keys).encode('utf-8')).hexdigest()
## copied from https://gist.github.com/empirasign/cbd204ae7f45bc4eadae
def get_bonds(uids, d0=None, d1=None, fmt='json', nport=False):
"""
get bwic and dealer runs data for a list of cusips/ISINs/BBG Tickers
the maxium length of uids processed by server is 200, items 201 and
higher will be ignored.
d0 and d1 are not required, they indicated start and end (inclusive)
of date range to search, must be datetime.date type
fmt can be json or csv
"""
api_url = '{}://{}/api/bonds/'.format(API_SCHEME, API_HOST)
#compute the request signature
req_sig = _make_req_sig([",".join(uids), datetime.date.today()])
req_params = {'app_id': APP_ID, 'req_sig': req_sig, 'nport': nport, 'bonds': ",".join(uids)}
if d0 and d1:
req_params['d0'] = d0.strftime("%Y%m%d")
req_params['d1'] = d1.strftime("%Y%m%d")
if fmt.lower() != 'json':
req_params['csv'] = True
resp = requests.post(api_url, data=req_params, proxies=_proxies_dict(PROXY_SERVER))
if fmt.lower() == 'json':
return resp.json()
return resp.text
def get_all_bonds(dt, figi_marketsector, kind=None, fmt='json'):
"""
return a list of all bonds that appeared on BWICs or Dealer Runs for a given date / sector
figi_marketsector is exactly equivalent to marketSecDes as defined by OpenFIGI
https://www.openfigi.com/api#openapi-schema
kind = bwics (show only list of all BWIC bonds for a given date)
kind = runs (show only list of all runs bonds for a given date)
fmt can be json or csv
"""
api_url = '{}://{}/api/all-bonds/'.format(API_SCHEME, API_HOST)
dt = dt.strftime("%Y%m%d")
# compute the request signature
req_sig = _make_req_sig([figi_marketsector, dt])
url_params = {
'app_id': APP_ID,
'figi_marketsector': figi_marketsector,
'dt': dt,
'req_sig': req_sig
}
if kind is not None:
url_params["kind"] = kind
if fmt.lower() != 'json':
url_params['csv'] = True
resp = requests.get(api_url, url_params, proxies=_proxies_dict(PROXY_SERVER))
if fmt.lower() == 'json':
return resp.json()
return resp.text
def chunker(lst, chunk_size):
"""
break down large lists into managable chunks
"""
chunks = []
for i in range(0, len(lst), chunk_size):
chunks.append(lst[i:i + chunk_size])
return chunks
def _comp_px(handle, ticks):
"""
helper function for price32_to_dec
"""
tot_px = 0
if ticks[-1] == "+":
tot_px += .5 / 32.0
ticks = ticks[:-1]
if len(ticks) == 3:
tot_px += float(ticks[-1]) / 8.0 / 32.0
tot_px += float(ticks[:2]) / 32.0
else:
tot_px += float(ticks) / 32.0
tot_px += float(handle)
return tot_px
def price32_to_dec(price32):
"""
convert a well-formed price32 or price with handle suffix to a
decimal price
"""
if not price32:
return None
if isinstance(price32, (int, float)):
return float(price32)
m = re.search(r"( *\$ *)(.+)", price32)
if m:
price32 = m.group(2)
try:
price32 = float(price32)
return price32
except Exception: # pylint: disable=broad-except
pass
m = re.search(r'^(\d+)-?h$', price32, re.I)
if m:
return int(m.group(1)) + 0.25
m = re.search(r'^(\d+)-$', price32, re.I)
if m:
return float(m.group(1))
# negative price32 - specs
m = re.search(r'^\-\d', price32, re.I)
if m:
parts = price32.strip().split("-")[-2:]
if len(parts) == 2 and "" not in parts:
try:
return _comp_px(*parts) * -1
except ValueError:
pass
return None
price32 = price32.lower().replace("a", "")
parts = price32.strip().split("-")
if len(parts) == 2:
try:
return _comp_px(*parts)
except ValueError:
pass
return None
def _to_intex(item):
"""
take market data item from /api/bonds/ end point and covert to format
suitable for .mktd files for INTEXcalc
keys: Tranche ID, As Of, Quote, Quote Type, Color, Provider
"""
required_fields = ["Tranche ID", "As Of", "Color", "Quote", "Quote Type"]
idata = {"Tranche ID": item["isin"]}
# handle nport data
if item['kind'] == 'nport':
idata["As Of"] = item["repPdDate"] + "T08:00:00-05:00"
idata["Color"] = "N-PORT"
idata["Provider"] = "EDGAR"
balance_usd = item['balance'] / item['balanceExchangeRt']
idata["Size"] = round(balance_usd, 3)
try:
idata["Quote"] = round(100 * item['valUSD'] / balance_usd, 3)
except ZeroDivisionError:
idata["Quote"] = None
idata["Quote Type"] = "Price"
idata["Fund-Name"] = item.get("seriesName")
idata["Fund-Company"] = item.get("regName")
return idata
if item["kind"] in ("bwic", "pxtalk", "offer"):
idata["Size"] = round(item["of"] * 1e6)
elif item["kind"] == "bid":
idata["Size"] = round(item["of_bid"] * 1e6)
if item["kind"] in ("bwic", "pxtalk"):
idata["As Of"] = item["trade_datetime_utc"][:-1] + "-00:00"
else:
if isinstance(item["trade_dt"], datetime.date):
item["trade_dt"] = item["trade_dt"].strftime("%Y-%m-%d")
idata["As Of"] = item["trade_dt"] + "T08:00:00-05:00" # assume 8AM EST
if item["kind"] == "bwic":
idata["Provider"] = item["list_id"]
else:
idata["Provider"] = item.get("dealer", "unk")
# handle price or spread data
if item["kind"] in ("bwic", "pxtalk", "offer"):
if item.get("price") is not None:
idata["Quote"] = item["price"]
idata["Quote Type"] = "Price"
elif item.get("price32") is not None:
idata["Quote"] = price32_to_dec(item["price32"])
idata["Quote Type"] = "Price"
elif item.get("spread_dec") is not None:
idata["Quote"] = item["spread_dec"]
idata["curve"] = item.get("curve")
idata["Quote Type"] = "Spread"
elif item.get("spread") is not None:
idata["Quote"] = item["spread"]
idata["curve"] = item.get("curve")
idata["Quote Type"] = "Spread"
if item["kind"] == "offer":
idata["Color"] = "Offer"
elif item["kind"] == "pxtalk":
idata["Color"] = "PX Talk"
elif item["kind"] == 'bwic' and item['color']:
idata["Color"] = item['color']
elif item["kind"] == "bid":
if item.get("price_bid") is not None:
idata["Quote"] = item["price_bid"]
idata["Quote Type"] = "Price"
elif item.get("price32_bid") is not None:
idata["Quote"] = price32_to_dec(item["price32_bid"])
idata["Quote Type"] = "Price"
elif item.get("spread_bid") is not None:
idata["Quote"] = item["spread_bid"]
idata["Quote Type"] = "Spread"
idata["curve"] = item.get("curve")
idata["Color"] = "Bid"
else:
# market is not yet implemented
return {}
if all(key in idata for key in required_fields):
return idata
return {}
def split_mkts(item):
"""
split 2 way mkts into bid and offer quotes
"""
bid_keys = ['of_bid', 'cf_bid', 'price_bid', 'price32_bid', 'spread_bid']
offer_keys = ['of', 'cf', 'price', 'price32', 'spread']
if any(x for x in bid_keys if x in item) and any(x for x in offer_keys if x in item):
bid = {x: item[x] for x in item if x not in offer_keys}
offer = {x: item[x] for x in item if x not in bid_keys}
return offer, bid
return item, None
def main():
"""
the main event
"""
parser = argparse.ArgumentParser(description='INTEXcalc Market Data',
epilog="""
When listing sectors with spaces, surround each sector with quotes, e.g.
python intex_mktd.py --sectype 'ABS Auto'
python intex_mktd.py --sectype 'ABS Auto' SN 'MBS 15yr'
""",
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('sectors', nargs='+', help="list of securitytype or securitytype2 sectors")
parser.add_argument("--sectype",
action='store_true',
dest="sectype",
help="filter sectors on OpenFIGI securitytype")
parser.add_argument("--sectype2",
action='store_true',
dest="sectype2",
help="filter sectors on OpenFIGI securitytype2")
args = parser.parse_args()
if args.sectype:
filter1 = set(args.sectors)
filter2 = []
invalid_sectors = filter1 - set(sectypes)
elif args.sectype2:
filter1 = []
filter2 = set(args.sectors)
invalid_sectors = filter2 - set(sectypes2)
else:
raise ValueError("must specify --sectype or --sectype2")
if invalid_sectors:
logger.error("Halting Execution: Invalid sector(s): %s", invalid_sectors)
sys.exit(1)
if not filter1 and not filter2:
logger.error("Halting Execution: No sectors selected")
sys.exit(1)
logger.info("sector filters: sectype1: %s, sectype2: %s", filter1, filter2)
resp = get_all_bonds(datetime.date.today(), "Mtge")
if resp["meta"]["results"] != "OK":
logger.error("api call error: %s", resp)
raise RuntimeError
req_left = resp["meta"]['requests_left']
logger.info("total result set: %s bonds, quota remaining: %s", len(resp["bonds"]), req_left)
logger.info("sample list: %s", resp["bonds"][:10])
if filter1:
query_bonds = [x["isin"] for x in resp["bonds"] if x["figi_securitytype"] in filter1]
else:
query_bonds = [x["isin"] for x in resp["bonds"] if x["figi_securitytype2"] in filter2]
# filter out any null results
query_bonds = list({x for x in query_bonds if x})
if len(query_bonds) > req_left:
logger.warning("truncating bonds to query, %s requests left, but %s bonds to query",
req_left, len(query_bonds))
for i, isins in enumerate(chunker(query_bonds, CHUNK_SIZE)):
logger.info("starting API query for chunk %s of %s", i + 1,
math.ceil(len(query_bonds) / CHUNK_SIZE))
quotes = defaultdict(list)
if not req_left:
logger.error("early exit, no API quota left")
sys.exit(1)
resp = get_bonds(isins, nport=True)
if resp["meta"]["results"] != "OK":
logger.error("api call error: %s", resp)
raise RuntimeError
req_left = resp["meta"]['requests_left']
# loop over the the results
for quote in resp["data"]:
quotes[quote["isin"]].append(quote)
for isin in quotes:
export_quotes = []
for quote in quotes[isin]:
# convert to intex mktd format
quote, quote2 = split_mkts(quote)
cur_quote = _to_intex(quote)
if cur_quote:
export_quotes.append(cur_quote)
if quote2:
cur_quote2 = _to_intex(quote2)
if cur_quote2:
export_quotes.append(cur_quote2)
if export_quotes:
data_path = INTEX_MKTD_DIR + "/" + isin + ".mktd"
with open(data_path, "wb") as fp:
fp.write(
json.dumps(export_quotes, ensure_ascii=False, indent=2).encode("utf-8"))
logger.info("saved bond data for: %s, num_quotes: %s path: %s", isin,
len(export_quotes), data_path)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment