Created
September 20, 2017 05:05
-
-
Save tibkiss/b3cd8264a444763bf1cf5b36a418fade to your computer and use it in GitHub Desktop.
Zipline IQFeed CSV Ingestor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import glob | |
import numpy as np | |
import pandas as pd | |
import requests | |
from pandas_datareader.data import DataReader | |
from pandas_datareader._utils import RemoteDataError | |
from zipline.utils.calendars import register_calendar_alias | |
from zipline.data.resample import minute_frame_to_session_frame | |
from zipline.utils.cli import maybe_show_progress | |
def _cachpath(symbol, type_): | |
return '-'.join((symbol.replace(os.path.sep, '_'), type_)) | |
def load_csv(csv_dir, symbol, start, end): | |
end = pd.to_datetime('today', utc=True) if not end else end | |
years = np.unique(pd.date_range(start, end, freq='D').to_period('A').to_timestamp('A').year) | |
df = pd.DataFrame() | |
for year in years: | |
try: | |
filename = '%s/HC-%s-1M-%s-iqfeed.csv.gz' % (csv_dir, symbol, year) | |
new_df = pd.read_csv(filename, index_col='DateTime', parse_dates=True) | |
df = df.append(new_df) | |
except IOError as e: | |
pass | |
# Convert from US/Eastern to UTC Naive | |
df.index = df.index.tz_localize('US/Eastern').tz_convert(None) | |
return df.sort_index() | |
def get_all_symbols(directory): | |
files = glob.glob('%s/HC*iqfeed.csv.gz' % directory) | |
symbols = {os.path.basename(e).replace('HC-', '', 1).split('-1M-')[0] for e in files} | |
return symbols | |
def iqfeed_equities(csv_dir, symbols, start=None, end=None): | |
"""Create a data bundle ingest function from a set of symbols loaded from | |
iqfeed csv. | |
Parameters | |
---------- | |
symbols : iterable[str] | |
The ticker symbols to load data for. | |
start : datetime, optional | |
The start date to query for. By default this pulls the full history | |
for the calendar. | |
end : datetime, optional | |
The end date to query for. By default this pulls the full history | |
for the calendar. | |
Returns | |
------- | |
ingest : callable | |
The bundle ingest function for the given set of symbols. | |
Notes | |
----- | |
The sids for each symbol will be the index into the symbols sequence. | |
""" | |
# strict this in memory so that we can reiterate over it | |
symbols = tuple(symbols) | |
def ingest(environ, | |
asset_db_writer, | |
minute_bar_writer, | |
daily_bar_writer, | |
adjustment_writer, | |
calendar, | |
start_session, | |
end_session, | |
cache, | |
show_progress, | |
output_dir, | |
start=start, | |
end=end): | |
if start is None: | |
start = start_session | |
if end is None: | |
end = None | |
metadata = pd.DataFrame(np.empty(len(symbols), dtype=[ | |
('start_date', 'datetime64[ns]'), | |
('end_date', 'datetime64[ns]'), | |
('auto_close_date', 'datetime64[ns]'), | |
('symbol', 'object'), | |
])) | |
def _pricing_iter(daily=False): | |
sid = 0 | |
with maybe_show_progress( | |
symbols, | |
show_progress, | |
label='Ingesting IQFEED pricing data: ') as it, \ | |
requests.Session() as session: | |
for symbol in it: | |
path = _cachpath(symbol, 'ohlcv') | |
try: | |
df = cache[path] | |
except KeyError: | |
print symbol | |
df = cache[path] = load_csv(csv_dir, symbol, start, end) | |
# the start date is the date of the first trade and | |
# the end date is the date of the last trade | |
start_date = df.index[0] | |
end_date = df.index[-1] | |
# The auto_close date is the day after the last trade. | |
ac_date = end_date + pd.Timedelta(days=1) | |
metadata.iloc[sid] = start_date, end_date, ac_date, symbol | |
df.rename( | |
columns={ | |
'DateTime': 'date', | |
'Open': 'open', | |
'High': 'high', | |
'Low': 'low', | |
'Close': 'close', | |
'Volume': 'volume', | |
}, | |
inplace=True, | |
) | |
df['volume'] = df['volume'] / 1000 | |
if daily: | |
daily_bars = minute_frame_to_session_frame(df, calendar) | |
calendar_sessions = calendar.sessions_in_range(start_date, end_date) | |
missing_dates = calendar_sessions.difference(daily_bars.index) | |
for missing_date in missing_dates: | |
print('Forward filling data for %s: %s' % (symbol, str(missing_date))) | |
daily_bars.loc[missing_date] = np.NaN | |
# Need to sort as missing_date fillings are put at the end of the dataframe | |
daily_bars.sort_index(inplace=True) | |
yield (sid, daily_bars.fillna(method='pad')) | |
else: | |
yield sid, df | |
sid += 1 | |
# from IPython import embed; embed() | |
minute_bar_writer.write(_pricing_iter(False), show_progress=show_progress) | |
daily_bar_writer.write(_pricing_iter(True), show_progress=show_progress) | |
symbol_map = pd.Series(metadata.symbol.index, metadata.symbol) | |
metadata['exchange'] = "IQFEED" | |
asset_db_writer.write(equities=metadata) | |
adjustments = [] | |
with maybe_show_progress( | |
symbols, | |
show_progress, | |
label='Downloading Yahoo adjustment data: ') as it, \ | |
requests.Session() as session: | |
for symbol in it: | |
path = _cachpath(symbol, 'adjustment') | |
sid = symbol_map[symbol] | |
try: | |
df = cache[path] | |
except KeyError: | |
try: | |
df = cache[path] = DataReader( | |
symbol, | |
'yahoo-actions', | |
metadata.ix[sid].start_date, | |
metadata.ix[sid].end_date, | |
session=session, | |
).sort_index() | |
except RemoteDataError: | |
print("No data returned from Yahoo for %s" % symbol) | |
df = pd.DataFrame(columns=['value', 'action']) | |
df['sid'] = sid | |
adjustments.append(df) | |
adj_df = pd.concat(adjustments) | |
adj_df.index.name = 'date' | |
adj_df.reset_index(inplace=True) | |
splits = adj_df[adj_df.action == 'SPLIT'] | |
splits = splits.rename( | |
columns={'value': 'ratio', 'date': 'effective_date'}, | |
) | |
splits.drop('action', axis=1, inplace=True) | |
dividends = adj_df[adj_df.action == 'DIVIDEND'] | |
dividends = dividends.rename( | |
columns={'value': 'amount', 'date': 'ex_date'}, | |
) | |
dividends.drop('action', axis=1, inplace=True) | |
# we do not have this data in the yahoo dataset | |
dividends['record_date'] = pd.NaT | |
dividends['declared_date'] = pd.NaT | |
dividends['pay_date'] = pd.NaT | |
adjustment_writer.write(splits=splits, dividends=dividends) | |
return ingest | |
register_calendar_alias("IQFEED", "NYSE") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment