Skip to content

Instantly share code, notes, and snippets.

@SolidStill
Last active July 3, 2024 12:45
Show Gist options
  • Save SolidStill/030becc0024c22d29f62e5a0022521aa to your computer and use it in GitHub Desktop.
Save SolidStill/030becc0024c22d29f62e5a0022521aa to your computer and use it in GitHub Desktop.
EODHD_ETL_workflow_py Development
import os
from dotenv import load_dotenv
load_dotenv() # Load environment variables from .env file
EODHD_API_KEY = os.getenv("EODHD_API_KEY")
SQL_USER = os.getenv("SQL_USER")
SQL_PASS = os.getenv("SQL_PASS")
SQL_HOST = os.getenv("SQL_HOST")
# BELOW temp config code block for use during initial development in notebook
# from google.colab import userdata
# my_password = userdata.get('SQLPass')
# my_user = userdata.get('SQLUser')
# my_host = userdata.get('host')
import requests
import pandas as pd
# from config import EODHD_API_KEY
def get_eod_data(symbol, api_token):
"""Fetches EOD (End of Day) data for a given symbol from the EODHD API."""
base_url = "https://eodhd.com/api/eod"
url = f"{base_url}/{symbol}?api_token={api_token}&fmt=json"
response = requests.get(url)
response.raise_for_status() # Raise an exception if the request failed
data = response.json()
# Create DataFrame and filter for the specified number of days
df = pd.DataFrame(data)
df = df.drop("volume", axis=1) # Drop the 'volume' column because the data is not relevant
df["date"] = pd.to_datetime(df["date"])
df.sort_values("date", ascending=False, inplace=True)
return df
import psycopg2
"""
Helper functions in this .py file:
create_connection(),
create_database_tables(symbol),
load_data_into_database(df, symbol)
"""
def create_connection():
"""Creates a connection to the PostgreSQL database. Helps me use graceful 'with' statements when handling DB connections"""
try:
my_user = SQL_USER
my_password = SQL_PASS
my_host = SQL_HOST
conn = psycopg2.connect(
database="pagila",
user=my_user,
host=my_host,
password=my_password,
port=5432
)
return conn
except psycopg2.Error as e:
print(f"Error connecting to database: {e}")
raise # Re-raise the error for potential error handling in 'higher-level' calling functions
def create_database_tables(symbol):
"""Creates the required tables in the PostgreSQL database if they don't exist"""
maturity_class, country = bond_symbols_dict[symbol]
table_name = f"de10_cdw_{country}_{maturity_class}_gbond"
# using 'with' statements to help handle connection/cursor objects gracefully
try:
with create_connection() as conn:
with conn.cursor() as cur:
# creates table under "student" schema
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS student.{table_name} (
date DATE PRIMARY KEY,
open NUMERIC,
high NUMERIC,
low NUMERIC,
close NUMERIC,
adjusted_close NUMERIC
)
"""
cur.execute(create_table_sql)
conn.commit()
except psycopg2.Error as e:
print(f"Error creating table '{table_name}': {e}")
raise # Re-raise the error for potential error handling in 'higher-level' calling functions
def load_data_into_database(df, symbol):
"""Loads data into the specified 'symbol' table in the database."""
maturity_class, country = bond_symbols_dict[symbol]
try:
with create_connection() as conn:
create_database_tables(symbol) # Ensure tables exist before loading
with conn.cursor() as cur:
for _, row in df.iterrows():
cur.execute(f"""
INSERT INTO student.de10_cdw_{country}_{maturity_class}_gbond (date, open, high, low, close, adjusted_close)
VALUES (%s, %s, %s, %s, %s, %s)
""", (row["date"],
row["open"],
row["high"],
row["low"],
row["close"],
row["adjusted_close"])
)
# '%s' place holders in INSERT help with:
# 1)Prevent SQL Injection,
# 2)Type Safety: psycopg2 will automatically convert the Python data types into their corresponding PostgreSQL types
conn.commit()
except psycopg2.Error as error:
print(f"Database error: {error}")
raise # Re-raise the error for potential handling in 'higher-level' calling functions
# from config import EODHD_API_KEY, SQL_USER, SQL_PASS, SQL_HOST
if __name__ == "__main__":
update_data(bond_symbols_dict)
import pandas
import psycopg2
#from config import api_token
# from extract_transform import get_eod_data, clean_and_transform_data
# from load import create_connection, load_data_into_database
# Dictionary with UK/US government bond symbols as keys and with list of maturity classes and countries as values
# *** I had subtantial issues as a result of the values in this dict being UPPER
# CASE initially ***
bond_symbols_dict = {
'UK1Y.GBOND': ['1y', 'uk'],
'UK2Y.GBOND': ['2y', 'uk'],
'UK3Y.GBOND': ['3y', 'uk'],
'UK5Y.GBOND': ['5y', 'uk'],
'UK10Y.GBOND': ['10y', 'uk'],
'UK30Y.GBOND': ['30y', 'uk'],
'US1Y.GBOND': ['1y', 'us'],
'US2Y.GBOND': ['2y', 'us'],
'US3Y.GBOND': ['3y', 'us'],
'US5Y.GBOND': ['5y', 'us'],
'US10Y.GBOND': ['10y', 'us'],
'US30Y.GBOND': ['30y', 'us'],
'DE1Y.GBOND': ['1y', 'de'], # German 1-year bond
'DE2Y.GBOND': ['2y', 'de'],
'DE5Y.GBOND': ['5y', 'de'],
'DE10Y.GBOND': ['10y', 'de'],
'DE30Y.GBOND': ['30y', 'de'],
}
def fetch_latest_ingested_date(symbol):
"""This function exists to help me determine the delta that requires inserting to fulfill the update. It returns the latest data in DB table, or 'None' if table doesn't exist or is empty."""
## I have an ongoing concern that this fn may return 'None' in the event of a connection error or similar - this could mess with the logic of the fn that calls this one and cause an attempt to INSERT already existing data, leading to "duplicate primary key" errors from DB
maturity_class, country = bond_symbols_dict[symbol]
table_name = f"de10_cdw_{country}_{maturity_class}_gbond"
print(f"Checking for table: {table_name}") # Debug print
try:
with create_connection() as conn:
with conn.cursor() as cur:
# Check if the table exists
cur.execute(
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'student' AND table_name = %s)",
(table_name,) #trailing comma ensures arg read as a tuple and not a string
)
table_exists = cur.fetchone()[0]
print(f"Table exists: {table_exists}") # Debug print
if table_exists:
# Check if the table is empty
cur.execute(f"SELECT COUNT(*) FROM student.{table_name};")
row_count = cur.fetchone()[0]
print(f"Row count: {row_count}") # Debug print
if row_count > 0:
cur.execute(f"SELECT MAX(date) FROM student.{table_name};")
latest_date = cur.fetchone()[0]
print(f"Latest date found: {latest_date}") # Debug print
return latest_date
else:
print("Table is empty.") # Debug print
return None
else:
print("Table does not exist.") # Debug print
return None
except psycopg2.Error as e:
print(f"Database error while fetching latest date: {e}")
raise
def update_data(symbol_dict):
"""Fetches, transforms, and loads the latest data for all specified symbols; if there is no existing data in the corresponding table (or the table does not exist), ALL existing data will be fetched and loaded"""
# this loop iterates through each bond type, fetches all API data for that bond and filters to keep all rows that are newer than in the current DB table - if there are in fact any new rows, they are loaded into the DB
for symbol, (maturity_class, country) in symbol_dict.items():
# using a 'while' loop below to allow 3 tries at each symbol update, to build-in falut-tolerance in the event that there is an intermittent connection error or similar
retries = 0
while retries < 3:
try:
latest_date_in_db = fetch_latest_ingested_date(symbol)
# Current method fetches all available API data before filtering, only around 30KB per symbol for entire historical feed
all_data_df = get_eod_data(symbol, EODHD_API_KEY)
if latest_date_in_db: #true if there exists a "latest date" in table
# Filter out data already in the database
latest_date_in_db = pd.Timestamp(latest_date_in_db)
new_data_df = all_data_df[all_data_df['date'] > latest_date_in_db]
else: #in the event the above if statement is false, ALL data from API loaded.
new_data_df = all_data_df
# NB must be careful about a fleeting error during fetch_latest_ingested_date(): if latest_date_in_db is returned as 'None', this will cause a load of all API data during update_data() (ie duplicate data) - have tried re-raising the exception in order to force the process to terminate
if not new_data_df.empty:
load_data_into_database(new_data_df, symbol)
print(f"Successfully updated data for {symbol}")
else:
print(f"No new data found for {symbol}")
break # Exit the loop after successful update
except Exception as e:
print(f"Error updating data for {symbol}: {e}")
retries += 1
if retries >= 3:
raise # Re-raise the error for potential error handling in 'higher-level' calling functions
print("***Update completed for all symbols***")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment