-
-
Save empirasign/c8640a60011fde1a9a9b4643b9736475 to your computer and use it in GitHub Desktop.
example script on how to download all market data for a given date
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
eod_data_grab.py (End-of-Day Data Grab) | |
Pull down all market observations for a specific date. Choose to narrow down your | |
search space by sector information or bond identifiers. | |
Requirements: | |
- api_oo_v1 | |
- sqlite3 | |
Example Usage: | |
./eod_data_grab.py empirasign_20240717.sqlite -d2024-07-17 | |
Resources: | |
- This script: https://gist.github.com/empirasign/c8640a60011fde1a9a9b4643b9736475 | |
- api_oo_v1.py: https://gist.github.com/empirasign/e1ffad84620fb0eddbe6a4297aff70ee | |
- sample Python module that wraps Empirasign API endpoints with easy-to-use methods | |
- More sample code: https://www.empirasign.com/developer-resources | |
""" | |
import argparse | |
import logging | |
import datetime | |
import time | |
import sqlite3 | |
from api_oo_v1 import EmpirasignAPI | |
PROXY_SERVER = None # e.g. "proxy.mycompany.net:8080" | |
# may need "username:password@proxy.mycompany.net:8080" if you see 407 Proxy Authentication Required | |
# all possible columns across /bonds/ market observations | |
TRADE_COLS = ( | |
('kind', 'TEXT'), # 'bid', 'offer', 'market', 'bwic', 'pxtalk' | |
('cusip', 'TEXT'), | |
('isin', 'TEXT'), | |
('figi', 'TEXT'), | |
('bbg_ticker', 'TEXT'), | |
('list_id', 'TEXT'), | |
('of', 'REAL'), | |
('cf', 'REAL'), | |
('of_bid', 'REAL'), | |
('of_offer', 'REAL'), | |
('cf_bid', 'REAL'), | |
('cf_offer', 'REAL'), | |
('color', 'TEXT'), | |
('price_raw', 'TEXT'), | |
('price', 'TEXT'), | |
('price_bid', 'TEXT'), | |
('price_offer', 'TEXT'), | |
('price32_bid', 'TEXT'), | |
('price32_offer', 'TEXT'), | |
('spread_raw', 'TEXT'), | |
('spread', 'TEXT'), | |
('spread_bid', 'TEXT'), | |
('spread_offer', 'TEXT'), | |
('curve', 'TEXT'), | |
('speed', 'REAL'), | |
('speed_type', 'TEXT'), | |
('cf_scenario', 'TEXT'), | |
('dealer', 'TEXT'), | |
('trade_dt', 'TEXT'), #ISO 8601 FORMAT | |
('trade_datetime_utc', 'TEXT'), #ISO 8601 FORMAT | |
('settle_dt', 'TEXT'), #ISO 8601 FORMAT | |
('dealer_notes', 'TEXT') | |
) | |
TRADE_FIELDS = [col[0] for col in TRADE_COLS] | |
# ------- Customize _is_match() logic to filter down to the bonds you care about ------ | |
def _is_match(bond): | |
""" | |
customize your own filter rules to capture the securities you are interested in | |
using specific identifiers or sector data from OpenFIGI | |
figi_securitytype2: https://www.openfigi.com/api/enumValues/v3/securityType2 | |
figi_securitytype: https://www.openfigi.com/api/enumValues/v3/securityType | |
This particular example is filtering for CLOs using security types | |
""" | |
sectype2, sectype = bond['figi_securitytype2'], bond['figi_securitytype'] | |
return sectype2 in ('LL', 'MML') or (sectype2 == 'ABS Other' and | |
sectype in ('CF', 'MV', 'HB', 'SN')) | |
# ------- Generic SQLite helpers ------ | |
def _make_insert_query(tbl, columns): | |
"make an INSERT SQL command with placeholders" | |
cols_str = ", ".join([f"`{col}`" for col in columns]) | |
vals_str = ", ".join(["?"] * len(columns)) | |
return f"INSERT INTO {tbl} ({cols_str}) VALUES ({vals_str});" | |
def _create_table(sqlite_conn, index_columns): | |
"create SQLite table with provided indexes" | |
cursor = sqlite_conn.cursor() | |
cursor.execute("DROP TABLE IF EXISTS trade_data;") | |
cols_str = ", ".join([f"`{col_name}` {col_type}" for col_name, col_type in TRADE_COLS]) | |
cursor.execute(f"CREATE TABLE trade_data ({cols_str});") | |
for column in index_columns: # index columns for better performance | |
cursor.execute(f"CREATE INDEX idx_{column} ON trade_data({column});") | |
sqlite_conn.commit() | |
cursor.close() | |
def _store_records(sqlite_conn, rows): | |
"insert records into SQLite table in reasonable chunks" | |
cursor = sqlite_conn.cursor() | |
insert_query = _make_insert_query("trade_data", TRADE_FIELDS) | |
insert_rows = [[row.get(field) for field in TRADE_FIELDS] for row in rows] | |
cursor.executemany(insert_query, insert_rows) | |
sqlite_conn.commit() | |
cursor.close() | |
# ------- Main Algo ------ | |
def main(): | |
""" | |
the main event | |
""" | |
parser = argparse.ArgumentParser(description='pull all market data for a given date') | |
parser.add_argument('db_path', help="SQLite path, e.g. my_prices.sqlite") | |
parser.add_argument('-d', dest='dt', help='trade date as YYYY-MM-DD') | |
args = parser.parse_args() | |
logging.basicConfig(filename="eod_data_grab.log", | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%SZ', | |
level=logging.INFO) | |
logging.Formatter.converter = time.gmtime | |
if args.dt: | |
trade_dt = datetime.datetime.strptime(args.dt, r"%Y-%m-%d") | |
else: | |
trade_dt = datetime.date.today() | |
# create SQLite database and table | |
conn = sqlite3.connect(args.db_path) | |
_create_table(conn, ['bbg_ticker', 'cusip', 'isin']) | |
logging.info("made table at SQLite database file: %s", args.db_path) | |
# create API object with my credentials | |
api = EmpirasignAPI('API_KEY', 'API_SECRET', PROXY_SERVER) | |
# Step 1: get all bonds that appeared on BWICs or Dealer Runs on specified trade date | |
all_active_bonds = api.get_active_bonds(trade_dt, "Mtge")['bonds'] | |
logging.info("%s ISINs have market data for date: %s", len(active_isins), trade_dt) | |
# Step 2: filter down active ISINs to those that pass our requirements | |
active_isins = [bond['isin'] for bond in all_active_bonds if _is_match(bond)] | |
logging.info("%s ISINs match our requirements: %s", len(active_isins), trade_dt) | |
# Step 3: pull down all market observations for filtered ISINs | |
resp = api.get_market_data(active_isins, date_args=trade_dt) | |
logging.info("%s errors, %s warnings, %s results", | |
len(resp['meta']['errors']), len(resp['meta']['warnings']), len(resp['data'])) | |
# store market observations in SQLite table | |
_store_records(conn, resp['data']) | |
logging.info("inserted %s market data records", len(resp['data'])) | |
conn.close() | |
logging.info("job done, db: %s closed", args.db_path) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment