-
-
Save empirasign/3a019fdb91fa253f466b7f879b0b61e4 to your computer and use it in GitHub Desktop.
periodically poll Empirasign API for BWIC data, then save to disk in a format INTEXCalc can automatically process
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
bwics_to_intex.py | |
https://gist.github.com/empirasign/3a019fdb91fa253f466b7f879b0b61e4 | |
Poll Empirasign API for BWIC information in selected sector(s) | |
Export data to folder INTEXCalc watches for BWIC information in BWIC/JSON format | |
rinse/repeat | |
PSEUDO CODE | |
start of loop | |
if after 6pm, exit | |
get bwics for all selected sectors | |
for bwic in bwics | |
transform bwic to INTEXCalc format, save to market_data/bwic/YYYYMMDD/... | |
if before 6pm, continue | |
Sample Calls: | |
./bwics_to_intex.py cmbs | |
./bwics_to_intex.py agcmo naresi | |
./bwics_to_intex.py ALL # do all sectors in BWIC_SECTORS | |
./bwics_to_intex.py agcmo --one-shot # poll agcmo sector once, then exit | |
THIS SCRIPT SHOULD BE PLATFORM INDEPENDENT | |
""" | |
import argparse | |
import datetime | |
import hashlib | |
import json | |
import logging | |
import logging.handlers | |
from pathlib import Path | |
import sys | |
import tempfile | |
import time | |
import requests | |
API_SCHEME = "https" | |
API_HOST = "www.empirasign.com" | |
PROXY_SERVER = '' #e.g. proxy.mycompany.net:8080 | |
# if you get a 407 Proxy Authentication Required error, you need to set | |
# PROXY_SERVER to something like username:password@proxy.mycompany.net:8080 | |
## START OF USER CONFIG | |
# API CREDENTIALS | |
APP_ID = 'MY_ID' | |
PASS = 'MY_PASS' | |
INTEX_BWIC_DIR = Path('' or tempfile.gettempdir()) | |
# often looks like this: | |
# C:\Users\WINDOWS_USERNAME\AppData\Roaming\intex\settings\market_data\bwic | |
# If your org has a shared location for Intex market_data, the path will be different | |
# stop polling for new market data 21:30 UTC, or 5:30 EST | |
# this parameter needs to be adjusted when daylight savings time switches | |
STOP_HOUR = 21 | |
STOP_MINUTE = 30 | |
## END OF USER CONFIG | |
BWIC_SECTORS = ('abs', 'agency', 'agarm', 'agcmo', 'clo', 'cmbs', 'conabs', 'euro', 'naresi', | |
'nonag', 'spec') | |
rfh = logging.handlers.RotatingFileHandler(filename=tempfile.gettempdir() + "/bwics_to_intex.log", | |
maxBytes=5000000) | |
logging.basicConfig(level=logging.INFO, | |
format="%(asctime)s - %(levelname)-8s - %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S", | |
handlers=[rfh, logging.StreamHandler(sys.stdout)]) | |
logger = logging.getLogger() | |
## Empirasign API interaction functions | |
def _proxies_dict(proxy_server): | |
""" | |
return proxy dictionary as needed by requests library | |
if this helper function is not comprehensive enough for your use case, consult | |
http://docs.python-requests.org/en/latest/user/advanced/#proxies | |
""" | |
if proxy_server: | |
return {'https': 'http://' + PROXY_SERVER} | |
return {} | |
def _make_req_sig(args): | |
""" | |
generate request signature | |
as a convenience, datetime.date objects autoconvert to YYYYMMDD strings | |
""" | |
args = [arg.strftime("%Y%m%d") if isinstance(arg, datetime.date) else arg for arg in args] | |
sig_keys = [APP_ID] + args + [PASS] | |
return hashlib.sha1("".join(sig_keys).encode('utf-8')).hexdigest() | |
## copied from https://gist.github.com/empirasign/cbd204ae7f45bc4eadae | |
def get_bwics(sector, d0, d1=None, fmt='json', bsym_sector=False): | |
""" | |
get summary level data for BWICs in a given sector. This | |
query does not return actual market data so it does not count | |
against the daily quota | |
d0: start of date range (max 30 day lookback) | |
d1: end of date range | |
when d0 = d1, this returns same results as get_bwics_old() | |
sector descriptions can be found here: www.empirasign.com/api-mbs/ | |
fmt can be json or csv | |
""" | |
api_url = '{}://{}/api/bwics/'.format(API_SCHEME, API_HOST) | |
# compute the request signature | |
# the trick here is the dt for the request signature and dt for the | |
# query parameter are the one and the same | |
if d0 and d1: | |
d0 = d0.strftime("%Y%m%d") | |
d1 = d1.strftime("%Y%m%d") | |
req_sig = _make_req_sig([sector, d0, d1]) | |
url_params = { | |
'sector': sector, | |
'app_id': APP_ID, | |
'd0': d0, | |
'd1': d1, | |
'bsym_sec_type': bsym_sector, | |
'req_sig': req_sig | |
} | |
else: | |
# search on single date and pass dt as arg name | |
d0 = d0.strftime("%Y%m%d") | |
req_sig = _make_req_sig([sector, d0]) | |
url_params = { | |
'sector': sector, | |
'app_id': APP_ID, | |
'dt': d0, | |
'bsym_sec_type': bsym_sector, | |
'req_sig': req_sig | |
} | |
if fmt.lower() != 'json': | |
url_params['csv'] = True | |
resp = requests.get(api_url, url_params, proxies=_proxies_dict(PROXY_SERVER)) | |
logger.debug("api url: %s", resp.url) | |
if fmt.lower() == 'json': | |
return resp.json() | |
return resp.text | |
def _make_intex_data(bwic): | |
""" | |
take API bwic object and reshape into form that INTEXCalc likes | |
""" | |
idata = {"Size": 0, "Securities": []} | |
for bond in bwic["bonds"]: | |
idata["Size"] += round(bond["of"] * 1e6) | |
idata["Securities"].append({ | |
"Orig Face": round(bond["of"] * 1e6), | |
"Tranche ID": bond["isin"] | |
}) | |
idata["Notes"] = bwic.get("description") or "" | |
if bwic.get("seller"): | |
idata["Notes"] += "Seller: {}".format(bwic["seller"]) | |
idata["Settle Date"] = bwic["settle_dt"] | |
idata["Date"] = bwic["trade_dt_utc"][:-1] + "-00:00" | |
fpath = bwic["trade_dt"][:10].replace("-", "") + "/" + bwic["list_id"] + ".bwic" | |
return fpath, idata | |
def utc_minutes(): | |
"helper function for loop control, returns minutes after midnight UTC" | |
cur_time = datetime.datetime.utcnow() | |
return cur_time.hour * 60 + cur_time.minute | |
def main(): | |
""" | |
the main event | |
""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument('sectors', nargs='+') | |
parser.add_argument("--one-shot", | |
action='store_true', | |
dest="one_shot", | |
help="run once, no looping") | |
args = parser.parse_args() | |
cur_dt = datetime.date.today() | |
fwd_dt = cur_dt + datetime.timedelta(days=7) | |
sectors = args.sectors | |
if "ALL" in args.sectors: | |
sectors = BWIC_SECTORS | |
if not sectors: | |
logger.error( | |
"Halting Execution: You must select one or more of the following valid sectors: %s", | |
BWIC_SECTORS) | |
sys.exit(1) | |
invalid_sectors = set(sectors) - set(BWIC_SECTORS) | |
if invalid_sectors: | |
logger.error("Halting Execution: Invalid sector(s): %s", invalid_sectors) | |
sys.exit(1) | |
try: | |
run_number = 1 | |
cur_time = utc_minutes() | |
while args.one_shot or cur_time < (STOP_HOUR * 60 + STOP_MINUTE): | |
logger.info("starting run: %s", run_number) | |
for sector in sectors: | |
resp = get_bwics(sector, cur_dt, fwd_dt) | |
if resp["meta"]["results"] != "OK": | |
logger.error("api call error: %s", resp) | |
raise RuntimeError | |
logger.info("sector: %s num_results: %s, API meta data: %s", sector, | |
len(resp["data"]), resp["meta"]) | |
for bwic in resp["data"]: | |
fpath, idata = _make_intex_data(bwic) | |
cur_dir = INTEX_BWIC_DIR / fpath.split("/")[0] | |
if not cur_dir.is_dir(): | |
cur_dir.mkdir() | |
json_path = INTEX_BWIC_DIR / fpath | |
if not json_path.is_file(): | |
# save to disk | |
json_bytes = json.dumps(idata, ensure_ascii=False, indent=2).encode("utf-8") | |
with open(json_path, "wb") as fp: | |
fp.write(json_bytes) | |
logger.info("saved bwic: %s, bonds: %s, bytes: %s path: %s", | |
bwic["list_id"], len(bwic["bonds"]), len(json_bytes), json_path) | |
else: | |
logger.info("not overwriting existing bwic json file: %s", json_path) | |
if args.one_shot: | |
logger.info("ran sector(s) %s once, now exiting main loop", sectors) | |
break | |
logger.info("waiting 60 seconds before next iteration") | |
time.sleep(60) # DO NOT POLL SERVER MORE THAN ONCE PER MINUTE | |
cur_time = utc_minutes() | |
if not args.one_shot: | |
logger.info("it's after designated stop time {:02d}:{:02d} UTC, now exiting".format( | |
STOP_HOUR, STOP_MINUTE)) | |
except KeyboardInterrupt: | |
logger.warning("user hit CTRL-C, now exiting") | |
except Exception: # pylint: disable=broad-except | |
logger.exception("fatal error", exc_info=True) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment