Skip to content

Instantly share code, notes, and snippets.

@empirasign
Last active November 3, 2023 18:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save empirasign/3a019fdb91fa253f466b7f879b0b61e4 to your computer and use it in GitHub Desktop.
Save empirasign/3a019fdb91fa253f466b7f879b0b61e4 to your computer and use it in GitHub Desktop.
periodically poll Empirasign API for BWIC data, then save to disk in a format INTEXCalc can automatically process
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
bwics_to_intex.py
https://gist.github.com/empirasign/3a019fdb91fa253f466b7f879b0b61e4
Poll Empirasign API for BWIC information in selected sector(s)
Export data to folder INTEXCalc watches for BWIC information in BWIC/JSON format
rinse/repeat
PSEUDO CODE
start of loop
if after 6pm, exit
get bwics for all selected sectors
for bwic in bwics
transform bwic to INTEXCalc format, save to market_data/bwic/YYYYMMDD/...
if before 6pm, continue
Sample Calls:
./bwics_to_intex.py cmbs
./bwics_to_intex.py agcmo naresi
./bwics_to_intex.py ALL # do all sectors in BWIC_SECTORS
./bwics_to_intex.py agcmo --one-shot # poll agcmo sector once, then exit
THIS SCRIPT SHOULD BE PLATFORM INDEPENDENT
"""
import argparse
import datetime
import hashlib
import json
import logging
import logging.handlers
from pathlib import Path
import sys
import tempfile
import time
import requests
API_SCHEME = "https"
API_HOST = "www.empirasign.com"
PROXY_SERVER = '' #e.g. proxy.mycompany.net:8080
# if you get a 407 Proxy Authentication Required error, you need to set
# PROXY_SERVER to something like username:password@proxy.mycompany.net:8080
## START OF USER CONFIG
# API CREDENTIALS
APP_ID = 'MY_ID'
PASS = 'MY_PASS'
INTEX_BWIC_DIR = Path('' or tempfile.gettempdir())
# often looks like this:
# C:\Users\WINDOWS_USERNAME\AppData\Roaming\intex\settings\market_data\bwic
# If your org has a shared location for Intex market_data, the path will be different
# stop polling for new market data 21:30 UTC, or 5:30 EST
# this parameter needs to be adjusted when daylight savings time switches
STOP_HOUR = 21
STOP_MINUTE = 30
## END OF USER CONFIG
BWIC_SECTORS = ('abs', 'agency', 'agarm', 'agcmo', 'clo', 'cmbs', 'conabs', 'euro', 'naresi',
'nonag', 'spec')
rfh = logging.handlers.RotatingFileHandler(filename=tempfile.gettempdir() + "/bwics_to_intex.log",
maxBytes=5000000)
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)-8s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[rfh, logging.StreamHandler(sys.stdout)])
logger = logging.getLogger()
## Empirasign API interaction functions
def _proxies_dict(proxy_server):
"""
return proxy dictionary as needed by requests library
if this helper function is not comprehensive enough for your use case, consult
http://docs.python-requests.org/en/latest/user/advanced/#proxies
"""
if proxy_server:
return {'https': 'http://' + PROXY_SERVER}
return {}
def _make_req_sig(args):
"""
generate request signature
as a convenience, datetime.date objects autoconvert to YYYYMMDD strings
"""
args = [arg.strftime("%Y%m%d") if isinstance(arg, datetime.date) else arg for arg in args]
sig_keys = [APP_ID] + args + [PASS]
return hashlib.sha1("".join(sig_keys).encode('utf-8')).hexdigest()
## copied from https://gist.github.com/empirasign/cbd204ae7f45bc4eadae
def get_bwics(sector, d0, d1=None, fmt='json', bsym_sector=False):
"""
get summary level data for BWICs in a given sector. This
query does not return actual market data so it does not count
against the daily quota
d0: start of date range (max 30 day lookback)
d1: end of date range
when d0 = d1, this returns same results as get_bwics_old()
sector descriptions can be found here: www.empirasign.com/api-mbs/
fmt can be json or csv
"""
api_url = '{}://{}/api/bwics/'.format(API_SCHEME, API_HOST)
# compute the request signature
# the trick here is the dt for the request signature and dt for the
# query parameter are the one and the same
if d0 and d1:
d0 = d0.strftime("%Y%m%d")
d1 = d1.strftime("%Y%m%d")
req_sig = _make_req_sig([sector, d0, d1])
url_params = {
'sector': sector,
'app_id': APP_ID,
'd0': d0,
'd1': d1,
'bsym_sec_type': bsym_sector,
'req_sig': req_sig
}
else:
# search on single date and pass dt as arg name
d0 = d0.strftime("%Y%m%d")
req_sig = _make_req_sig([sector, d0])
url_params = {
'sector': sector,
'app_id': APP_ID,
'dt': d0,
'bsym_sec_type': bsym_sector,
'req_sig': req_sig
}
if fmt.lower() != 'json':
url_params['csv'] = True
resp = requests.get(api_url, url_params, proxies=_proxies_dict(PROXY_SERVER))
logger.debug("api url: %s", resp.url)
if fmt.lower() == 'json':
return resp.json()
return resp.text
def _make_intex_data(bwic):
"""
take API bwic object and reshape into form that INTEXCalc likes
"""
idata = {"Size": 0, "Securities": []}
for bond in bwic["bonds"]:
idata["Size"] += round(bond["of"] * 1e6)
idata["Securities"].append({
"Orig Face": round(bond["of"] * 1e6),
"Tranche ID": bond["isin"]
})
idata["Notes"] = bwic.get("description") or ""
if bwic.get("seller"):
idata["Notes"] += "Seller: {}".format(bwic["seller"])
idata["Settle Date"] = bwic["settle_dt"]
idata["Date"] = bwic["trade_dt_utc"][:-1] + "-00:00"
fpath = bwic["trade_dt"][:10].replace("-", "") + "/" + bwic["list_id"] + ".bwic"
return fpath, idata
def utc_minutes():
"helper function for loop control, returns minutes after midnight UTC"
cur_time = datetime.datetime.utcnow()
return cur_time.hour * 60 + cur_time.minute
def main():
"""
the main event
"""
parser = argparse.ArgumentParser()
parser.add_argument('sectors', nargs='+')
parser.add_argument("--one-shot",
action='store_true',
dest="one_shot",
help="run once, no looping")
args = parser.parse_args()
cur_dt = datetime.date.today()
fwd_dt = cur_dt + datetime.timedelta(days=7)
sectors = args.sectors
if "ALL" in args.sectors:
sectors = BWIC_SECTORS
if not sectors:
logger.error(
"Halting Execution: You must select one or more of the following valid sectors: %s",
BWIC_SECTORS)
sys.exit(1)
invalid_sectors = set(sectors) - set(BWIC_SECTORS)
if invalid_sectors:
logger.error("Halting Execution: Invalid sector(s): %s", invalid_sectors)
sys.exit(1)
try:
run_number = 1
cur_time = utc_minutes()
while args.one_shot or cur_time < (STOP_HOUR * 60 + STOP_MINUTE):
logger.info("starting run: %s", run_number)
for sector in sectors:
resp = get_bwics(sector, cur_dt, fwd_dt)
if resp["meta"]["results"] != "OK":
logger.error("api call error: %s", resp)
raise RuntimeError
logger.info("sector: %s num_results: %s, API meta data: %s", sector,
len(resp["data"]), resp["meta"])
for bwic in resp["data"]:
fpath, idata = _make_intex_data(bwic)
cur_dir = INTEX_BWIC_DIR / fpath.split("/")[0]
if not cur_dir.is_dir():
cur_dir.mkdir()
json_path = INTEX_BWIC_DIR / fpath
if not json_path.is_file():
# save to disk
json_bytes = json.dumps(idata, ensure_ascii=False, indent=2).encode("utf-8")
with open(json_path, "wb") as fp:
fp.write(json_bytes)
logger.info("saved bwic: %s, bonds: %s, bytes: %s path: %s",
bwic["list_id"], len(bwic["bonds"]), len(json_bytes), json_path)
else:
logger.info("not overwriting existing bwic json file: %s", json_path)
if args.one_shot:
logger.info("ran sector(s) %s once, now exiting main loop", sectors)
break
logger.info("waiting 60 seconds before next iteration")
time.sleep(60) # DO NOT POLL SERVER MORE THAN ONCE PER MINUTE
cur_time = utc_minutes()
if not args.one_shot:
logger.info("it's after designated stop time {:02d}:{:02d} UTC, now exiting".format(
STOP_HOUR, STOP_MINUTE))
except KeyboardInterrupt:
logger.warning("user hit CTRL-C, now exiting")
except Exception: # pylint: disable=broad-except
logger.exception("fatal error", exc_info=True)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment