Skip to content

Instantly share code, notes, and snippets.

@dewaldabrie
Last active June 13, 2021 09:57
Show Gist options
  • Save dewaldabrie/f8e7599d48731329903b7b428fa1f656 to your computer and use it in GitHub Desktop.
Save dewaldabrie/f8e7599d48731329903b7b428fa1f656 to your computer and use it in GitHub Desktop.
"""
Find list of ASX ETF symbols at asxetfs.com
"""
import os
import time
import logging
import pickle
import csv
import pandas as pd
import yfinance as yf
def get_symbols():
"""
Generate exchange symbols from a local CSV file.
"""
symbol_csv_path = '20200601-etfs.csv'
with open(symbol_csv_path, 'r') as fh:
reader = csv.reader(fh)
# skip both header lines
next(reader)
next(reader)
for row in reader:
symbol = row[0] + '.AX'
yield symbol
def symbol_data(symbol):
"""
Fetch all available OHLC data from Yahoo! Finance.
"""
try:
print("Getting data for {symbol} ...".format(symbol=symbol))
sym = yf.Ticker(symbol)
hist = sym.history(period="max")
if hist.empty:
return
return symbol, hist
except Exception as e:
logging.exception(str(e))
def agg_data_deserialise():
"""
Deserialise data that we've collected before.
"""
agg_data = {}
if os.path.isfile('data.pkl'):
with open("data.pkl", 'rb') as fh:
agg_data = pickle.load(fh)
context = {'agg_data': agg_data}
return context
def agg_data_update(inpt, context=None):
"""
Update our existing data with new data.
"""
agg_data = context['agg_data']
sym, data = inpt
if sym in agg_data:
agg_data[sym] = pd.concat([agg_data[sym], data]).drop_duplicates().reset_index(drop=True)
else:
agg_data[sym] = data
def agg_data_serialise(context=None):
"""
Serialise updated data to disk again.
"""
with open("data.pkl", 'wb') as fh:
pickle.dump(context['agg_data'], fh)
def main_sync():
"""Naive runner for data collector."""
# deserialise
context = agg_data_deserialise()
# update
for sym in get_symbols():
results = symbol_data(sym)
if results:
agg_data_update(results, context)
# serialise
agg_data_serialise(context)
if __name__ == '__main__':
start_time = time.time()
main_sync()
duration = time.time() - start_time
print("Duration: {} seconds.".format(duration))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment