Created
October 16, 2017 19:26
-
-
Save tibkiss/d917ff389d479f32fff303f53893c3c5 to your computer and use it in GitHub Desktop.
csv ingestor which forward fills missing daily data & loads split/dividends from yahoo finance
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Module for building a complete dataset from local directory with csv files. | |
""" | |
import os | |
import glob | |
import logbook | |
import requests | |
#from retry import retry | |
from numpy import empty, NaN | |
from pandas import DataFrame, read_csv, Index, Timedelta, NaT, concat | |
from pandas_datareader.data import DataReader | |
from pandas_datareader._utils import RemoteDataError | |
from zipline.utils.calendars import register_calendar_alias | |
from zipline.data.resample import minute_frame_to_session_frame | |
from zipline.utils.cli import maybe_show_progress | |
logger = logbook.Logger(__name__) | |
def get_all_symbols(directory): | |
files = glob.glob('%s/HC*iqfeed.csv.gz' % directory) | |
symbols = {os.path.basename(e).replace('HC-', '', 1).split('-1M-')[0] for e in files} | |
return symbols | |
def load_csvs(csv_dir, symbol): | |
all_dfs = [] | |
all_files = glob.glob('%s/HC-%s-*iqfeed.csv.gz' % (csv_dir, symbol)) | |
for file_ in all_files: | |
df = read_csv(file_, parse_dates=[0], infer_datetime_format=True, | |
index_col=0) | |
all_dfs.append(df) | |
merged_df = concat(all_dfs) | |
# Convert from US/Eastern to UTC Naive | |
merged_df.index = merged_df.index.tz_localize('US/Eastern').tz_convert(None) | |
merged_df.sort_index(inplace=True) | |
# Convert cumsum volumes to per bar ones: | |
# IQFeed CSV file accumulates the volume over the day, restarting every day | |
# We need to calculate the difference between the previous and the current bar. | |
# As the df contains multiple days grouping is needed to not to carry values | |
# from one day to another. The fillna ensures that there will be no nan values | |
# at the beginning of each day. | |
merged_df['Date'] = merged_df.index.date | |
merged_df['Volume'] = merged_df.groupby('Date')['Volume'].diff().fillna(merged_df['Volume']) | |
merged_df.drop(['Date'], inplace=True, axis=1) | |
merged_df.rename( | |
columns={ | |
'DateTime': 'date', | |
'Open': 'open', | |
'High': 'high', | |
'Low': 'low', | |
'Close': 'close', | |
'Volume': 'volume', | |
}, | |
inplace=True, | |
) | |
return merged_df | |
#@retry(tries=3, delay=1, backoff=2) | |
def RetryingDataReader(*args, **kwargs): | |
return DataReader(*args, **kwargs) | |
def csvdir_equities(tframes=None, csvdir=None, daily_from_minute=True): | |
""" | |
Generate an ingest function for custom data bundle | |
This function can be used in ~/.zipline/extension.py | |
to register bundle with custom parameters, e.g. with | |
a custom trading calendar. | |
Parameters | |
---------- | |
tframe: tuple, optional | |
The data time frames, supported timeframes: 'daily' and 'minute' | |
csvdir : string, optional, default: CSVDIR environment variable | |
The path to the directory of this structure: | |
<directory>/<timeframe1>/<symbol1>.csv | |
<directory>/<timeframe1>/<symbol2>.csv | |
<directory>/<timeframe1>/<symbol3>.csv | |
<directory>/<timeframe2>/<symbol1>.csv | |
<directory>/<timeframe2>/<symbol2>.csv | |
<directory>/<timeframe2>/<symbol3>.csv | |
Returns | |
------- | |
ingest : callable | |
The bundle ingest function | |
Examples | |
-------- | |
This code should be added to ~/.zipline/extension.py | |
.. code-block:: python | |
from zipline.data.bundles import csvdir_equities, register | |
register('custom-csvdir-bundle', | |
csvdir_equities(["daily", "minute"], | |
'/full/path/to/the/csvdir/directory')) | |
""" | |
return CSVDIRBundle(tframes, csvdir, daily_from_minute).ingest | |
class CSVDIRBundle: | |
""" | |
Wrapper class to call csvdir_bundle with provided | |
list of time frames and a path to the csvdir directory | |
""" | |
def __init__(self, tframes=None, csvdir=None, daily_from_minute=True): | |
self.tframes = tframes | |
self.csvdir = csvdir | |
self.daily_from_minute = daily_from_minute | |
def ingest(self, | |
environ, | |
asset_db_writer, | |
minute_bar_writer, | |
daily_bar_writer, | |
adjustment_writer, | |
calendar, | |
start_session, | |
end_session, | |
cache, | |
show_progress, | |
output_dir): | |
csvdir_bundle(environ, | |
asset_db_writer, | |
minute_bar_writer, | |
daily_bar_writer, | |
adjustment_writer, | |
calendar, | |
start_session, | |
end_session, | |
cache, | |
show_progress, | |
output_dir, | |
self.tframes, | |
self.csvdir, | |
self.daily_from_minute) | |
def csvdir_bundle(environ, | |
asset_db_writer, | |
minute_bar_writer, | |
daily_bar_writer, | |
adjustment_writer, | |
calendar, | |
start_session, | |
end_session, | |
cache, | |
show_progress, | |
output_dir, | |
tframes=None, | |
csvdir=None, | |
daily_from_minute=True): | |
""" | |
Build a zipline data bundle from the directory with csv files. | |
""" | |
assert not (daily_from_minute and 'daily' in tframes) | |
assert not (daily_from_minute and 'minute' not in tframes) | |
if not csvdir: | |
csvdir = environ.get('CSVDIR') | |
if not csvdir: | |
raise ValueError("CSVDIR environment variable is not set") | |
if not os.path.isdir(csvdir): | |
raise ValueError("%s is not a directory" % csvdir) | |
if not tframes: | |
tframes = set(["daily", "minute"]).intersection(os.listdir(csvdir)) | |
if not tframes: | |
raise ValueError("'daily' and 'minute' directories " | |
"not found in '%s'" % csvdir) | |
divs_splits = {'dividends': DataFrame(), | |
'splits': DataFrame()} | |
for tframe in tframes: | |
ddir = os.path.join(csvdir, tframe) | |
symbols = sorted(get_all_symbols(ddir)) | |
if not symbols: | |
raise ValueError("no ingestable CSV files found in %s" % ddir) | |
dtype = [('start_date', 'datetime64[ns]'), | |
('end_date', 'datetime64[ns]'), | |
('auto_close_date', 'datetime64[ns]'), | |
('symbol', 'object')] | |
metadata = DataFrame(empty(len(symbols), dtype=dtype)) | |
if tframe == 'minute': | |
writer = minute_bar_writer | |
else: | |
writer = daily_bar_writer | |
writer.write(_pricing_iter(ddir, symbols, metadata, | |
divs_splits, show_progress, calendar, | |
daily_from_minute=False), | |
show_progress=show_progress) | |
if daily_from_minute: | |
logger.info("Deriving daily bars from minute data") | |
daily_bar_writer.write(_pricing_iter(ddir, symbols, metadata, | |
divs_splits, show_progress, calendar, | |
daily_from_minute=True), | |
show_progress=show_progress) | |
# Hardcode the exchange to "CSVDIR" for all assets and (elsewhere) | |
# register "CSVDIR" to resolve to the NYSE calendar, because these | |
# are all equities and thus can use the NYSE calendar. | |
metadata['exchange'] = "CSVDIR" | |
asset_db_writer.write(equities=metadata) | |
with requests.Session() as session: | |
splits, dividends = download_splits_and_dividends(symbols, metadata, session) | |
adjustment_writer.write(splits=splits, dividends=dividends) | |
def download_splits_and_dividends(symbols, metadata, session): | |
adjustments = [] | |
for sid, symbol in enumerate(symbols): | |
try: | |
logger.debug("Downloading splits/dividends for %s" % symbol) | |
df = RetryingDataReader(symbol, | |
'yahoo-actions', | |
metadata.ix[sid].start_date, | |
metadata.ix[sid].end_date, | |
session=session).sort_index() | |
except RemoteDataError: | |
logger.warning("No data returned from Yahoo for %s" % symbol) | |
df = DataFrame(columns=['value', 'action']) | |
df['sid'] = sid | |
adjustments.append(df) | |
adj_df = concat(adjustments) | |
adj_df.index.name = 'date' | |
adj_df.reset_index(inplace=True) | |
splits_df = adj_df[adj_df['action'] == 'SPLIT'] | |
splits_df = splits_df.rename( | |
columns={'value': 'ratio', 'date': 'effective_date'}, | |
) | |
splits_df.drop('action', axis=1, inplace=True) | |
splits_df.reset_index(inplace=True, drop=True) | |
dividends_df = adj_df[adj_df['action'] == 'DIVIDEND'] | |
dividends_df = dividends_df.rename( | |
columns={'value': 'amount', 'date': 'ex_date'}, | |
) | |
dividends_df.drop('action', axis=1, inplace=True) | |
# we do not have this data in the yahoo dataset | |
dividends_df['record_date'] = NaT | |
dividends_df['declared_date'] = NaT | |
dividends_df['pay_date'] = NaT | |
dividends_df.reset_index(inplace=True, drop=True) | |
return splits_df, dividends_df | |
def _pricing_iter(csvdir, symbols, metadata, divs_splits, show_progress, calendar, daily_from_minute=False): | |
with maybe_show_progress(symbols, show_progress, | |
label='Loading custom pricing data: ') as it: | |
for sid, symbol in enumerate(it): | |
logger.debug('%s: sid %s' % (symbol, sid)) | |
dfr = load_csvs(csvdir, symbol) | |
start_date = dfr.index[0] | |
end_date = dfr.index[-1] | |
# The auto_close date is the day after the last trade. | |
ac_date = end_date + Timedelta(days=1) | |
metadata.iloc[sid] = start_date, end_date, ac_date, symbol | |
if 'split' in dfr.columns: | |
tmp = 1. / dfr[dfr['split'] != 1.0]['split'] | |
split = DataFrame(data=tmp.index.tolist(), | |
columns=['effective_date']) | |
split['ratio'] = tmp.tolist() | |
split['sid'] = sid | |
splits = divs_splits['splits'] | |
index = Index(range(splits.shape[0], | |
splits.shape[0] + split.shape[0])) | |
split.set_index(index, inplace=True) | |
divs_splits['splits'] = splits.append(split) | |
if 'dividend' in dfr.columns: | |
# ex_date amount sid record_date declared_date pay_date | |
tmp = dfr[dfr['dividend'] != 0.0]['dividend'] | |
div = DataFrame(data=tmp.index.tolist(), | |
columns=['ex_date']) | |
div['record_date'] = NaT | |
div['declared_date'] = NaT | |
div['pay_date'] = NaT | |
div['amount'] = tmp.tolist() | |
div['sid'] = sid | |
divs = divs_splits['dividends'] | |
ind = Index(range(divs.shape[0], | |
divs.shape[0] + div.shape[0])) | |
div.set_index(ind, inplace=True) | |
divs_splits['dividends'] = divs.append(div) | |
if daily_from_minute: | |
dfr_daily = minute_frame_to_session_frame(dfr, calendar) | |
calendar_sessions = calendar.sessions_in_range(start_date, end_date) | |
missing_dates = calendar_sessions.difference(dfr_daily.index) | |
for missing_date in missing_dates: | |
logger.warning('Forward filling data for %s: %s' % (symbol, str(missing_date))) | |
dfr_daily.loc[missing_date] = NaN | |
# Need to sort as missing_date fillings are put at the end of the dataframe | |
dfr_daily.sort_index(inplace=True) | |
yield sid, dfr_daily.fillna(method='pad') | |
else: | |
yield sid, dfr | |
register_calendar_alias("CSVDIR", "NYSE") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment