|
# -*- encoding: utf-8 -*- |
|
|
|
""" |
|
Fetch & Update Historic Exchange Rates from API |
|
|
|
Fetch the currency exchange rates using the free-tier "ExchangeRatesAPI" |
|
and populate the database. Please note, the free-tier api is |
|
vulnerable and connection might be forced close before fetching all. |
|
|
|
```python |
|
ExchangeRatesAPI$ python fetch_exch_rates.py |
|
|
|
# or to use a custom api-key, directly pass from command line as: |
|
ExchangeRatesAPI$ python fetch_exch_rates.py --api-key=<key> |
|
``` |
|
|
|
@author: Debmalya Pramanik |
|
@version: v0.0.1 |
|
""" |
|
|
|
import os # operating system functionalities |
|
import json # write failed dates under log file |
|
import time # module to manipulate time values |
|
import argparse # fetch command line arguments |
|
import requests # http library for api base ops |
|
import datetime as dt # for manipulating date object |
|
import datetime_ as dt_ # udf for dt_.date_range() |
|
|
|
import pandas as pd # dataframe manipulation/analytics lib |
|
|
|
from tqdm import tqdm as TQ |
|
from typing import Iterable |
|
from uuid import uuid4 as UUID |
|
from sqlalchemy import create_engine |
|
|
|
logPrint = lambda message, type_ : print(f"{time.ctime()}: {type_.upper()}: {message}") |
|
|
|
def create_frame(responses : Iterable[dict], outfile : str = None) -> pd.DataFrame: |
|
currency_symbols = [] |
|
for response in responses: |
|
currency_symbols += list(response["rates"].keys()) |
|
|
|
currency_symbols = list(set(currency_symbols)) |
|
tblHistoricExchRates = {k : [] for k in ['EffectiveDate', 'BaseCurrencyID'] + currency_symbols} |
|
for response in responses: |
|
tblHistoricExchRates["EffectiveDate"].append(response["date"]) |
|
tblHistoricExchRates["BaseCurrencyID"].append(response["base"]) |
|
|
|
for symbol in currency_symbols: |
|
tblHistoricExchRates[symbol].append(response["rates"].get(symbol, None)) |
|
|
|
tblHistoricExchRates = pd.DataFrame(tblHistoricExchRates) # create dataframe from dict object |
|
tblHistoricExchRates["EffectiveDate"] = pd.to_datetime(tblHistoricExchRates["EffectiveDate"], format = "%Y-%m-%d").apply(lambda x : x.date()) |
|
|
|
if outfile: |
|
tblHistoricExchRates.to_pickle(outfile) # TODO: only pickle, setup for csv/excel |
|
|
|
return tblHistoricExchRates.copy() # ! enable deepcopy, but same name as in `__name__` |
|
|
|
if __name__ == "__main__": |
|
engine = create_engine("mysql+pymysql://root:Admin159@localhost:3306/ndprdsys") |
|
|
|
# ? allow to get arguments from command line, or directly from environment variables |
|
# command line exchange rates has a precedence over the environment variable |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("--api-key", help = """ |
|
Provide a valid API Key from https://exchangeratesapi.io/. |
|
Additionally, you can also define a ${PATH} variable `{EXCHANGE_RATES_API_KEY}`, however, |
|
the command line argument (if provided) has precedence over the ${PATH} variable. |
|
""") |
|
arguments = parser.parse_args() |
|
|
|
print("Welcome to Exchange Rates API Management Tool".center(os.get_terminal_size().columns)) |
|
print("=============================================".center(os.get_terminal_size().columns), end = "\n\n") |
|
|
|
logPrint("Fetching Records to Check Available Data.", type_ = "INFO") |
|
query = "SELECT MIN(EffectiveDate), MAX(EffectiveDate) FROM tblFXRates" |
|
records = engine.execute(query).fetchall() |
|
|
|
# ? get user inputs to fetch data from the api |
|
print(f" >> Dates available in DB: {records[0][0]} to {records[0][1]}") |
|
START_DATE, END_DATE = records[0][1] + dt.timedelta(days = 1), dt.datetime.now().date() - dt.timedelta(days = 1) |
|
print(f" >> Attempting auto-download for the period: {START_DATE} to {END_DATE}") |
|
|
|
# ! allow user to fetch for a specific period, or dynamically populate |
|
_fetch_data_for_user_defined_date_range = input("Do you want to Fetch for a Custom Period? (Y/n): ") or "n" |
|
|
|
if _fetch_data_for_user_defined_date_range.upper() == "Y": |
|
START_DATE = dt.datetime.strptime(input("Start Date [YYYY-MM-DD]: "), "%Y-%m-%d").date() |
|
END_DATE = dt.datetime.strptime(input("Final Date [YYYY-MM-DD]: "), "%Y-%m-%d").date() |
|
print(f" >> Overwriting download period: {START_DATE} to {END_DATE}") |
|
|
|
logPrint(f"Trying to Fetch Data for the Period {START_DATE} to {END_DATE}", type_ = "INFO") |
|
dates = list(dt_.date_range(start = START_DATE, end = END_DATE)) |
|
|
|
responses = [] # the connection is abruptly closed, re-run with modified dates on below |
|
EXCHANGE_RATES_API_KEY = arguments.api_key or os.getenv("EXCHANGE_RATES_API_KEY") # will raise error if not defined |
|
|
|
# *** Code to Fetch the Data from API *** # |
|
failed_dates = { |
|
"api-error" : [], # append dates when `response.status_code != 200` |
|
"con-error" : [] # append dates for any other errors, typically connection abort |
|
} |
|
for date in TQ(dates): |
|
URI = f"http://api.exchangeratesapi.io/v1/{date}?access_key={EXCHANGE_RATES_API_KEY}" |
|
try: |
|
response = requests.get(URI) |
|
if response.status_code == 200: |
|
responses.append(response.json()) |
|
else: |
|
failed_dates["api-error"].append(date) |
|
print(f" {time.ctime()} : ERROR: L111 - Stopping on {date} as Received Status Code = {response.status_code}") |
|
except Exception as err: |
|
failed_dates["con-error"].append(date) |
|
print(f" {time.ctime()} : ERROR: L114 - Failed on {date}. Stopping Query, Error = {err}.") |
|
|
|
_api_failed = len(failed_dates["api-error"]) |
|
_con_failed = len(failed_dates["con-error"]) |
|
|
|
_tot_failed = _api_failed + _con_failed |
|
if _tot_failed > 0: |
|
print(f" >> Please re-run on failed dates (count = {_tot_failed:,} | api-error : {_api_failed:,}, con-error : {_con_failed:,}). ") |
|
|
|
# TODO: Modularize logs to a specific folder dynamically in the user's home directory |
|
with open(f"{dt.datetime.now().date()} #{str(UUID()).upper()[:7]} Failed Dates.json", "w") as f: |
|
json.dump(failed_dates, f, indent = 2, sort_keys = False, default = str) |
|
|
|
tblHistoricExchRates = create_frame(responses) |
|
tblHistoricExchRates.to_sql("tblFXRates", engine, if_exists = "append", index = False) # add new records to db |