Skip to content

Instantly share code, notes, and snippets.

@tibkiss
Created September 20, 2017 05:05
Show Gist options
  • Save tibkiss/b3cd8264a444763bf1cf5b36a418fade to your computer and use it in GitHub Desktop.
Save tibkiss/b3cd8264a444763bf1cf5b36a418fade to your computer and use it in GitHub Desktop.
Zipline IQFeed CSV Ingestor
import os
import glob
import numpy as np
import pandas as pd
import requests
from pandas_datareader.data import DataReader
from pandas_datareader._utils import RemoteDataError
from zipline.utils.calendars import register_calendar_alias
from zipline.data.resample import minute_frame_to_session_frame
from zipline.utils.cli import maybe_show_progress
def _cachpath(symbol, type_):
return '-'.join((symbol.replace(os.path.sep, '_'), type_))
def load_csv(csv_dir, symbol, start, end):
end = pd.to_datetime('today', utc=True) if not end else end
years = np.unique(pd.date_range(start, end, freq='D').to_period('A').to_timestamp('A').year)
df = pd.DataFrame()
for year in years:
try:
filename = '%s/HC-%s-1M-%s-iqfeed.csv.gz' % (csv_dir, symbol, year)
new_df = pd.read_csv(filename, index_col='DateTime', parse_dates=True)
df = df.append(new_df)
except IOError as e:
pass
# Convert from US/Eastern to UTC Naive
df.index = df.index.tz_localize('US/Eastern').tz_convert(None)
return df.sort_index()
def get_all_symbols(directory):
files = glob.glob('%s/HC*iqfeed.csv.gz' % directory)
symbols = {os.path.basename(e).replace('HC-', '', 1).split('-1M-')[0] for e in files}
return symbols
def iqfeed_equities(csv_dir, symbols, start=None, end=None):
"""Create a data bundle ingest function from a set of symbols loaded from
iqfeed csv.
Parameters
----------
symbols : iterable[str]
The ticker symbols to load data for.
start : datetime, optional
The start date to query for. By default this pulls the full history
for the calendar.
end : datetime, optional
The end date to query for. By default this pulls the full history
for the calendar.
Returns
-------
ingest : callable
The bundle ingest function for the given set of symbols.
Notes
-----
The sids for each symbol will be the index into the symbols sequence.
"""
# strict this in memory so that we can reiterate over it
symbols = tuple(symbols)
def ingest(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir,
start=start,
end=end):
if start is None:
start = start_session
if end is None:
end = None
metadata = pd.DataFrame(np.empty(len(symbols), dtype=[
('start_date', 'datetime64[ns]'),
('end_date', 'datetime64[ns]'),
('auto_close_date', 'datetime64[ns]'),
('symbol', 'object'),
]))
def _pricing_iter(daily=False):
sid = 0
with maybe_show_progress(
symbols,
show_progress,
label='Ingesting IQFEED pricing data: ') as it, \
requests.Session() as session:
for symbol in it:
path = _cachpath(symbol, 'ohlcv')
try:
df = cache[path]
except KeyError:
print symbol
df = cache[path] = load_csv(csv_dir, symbol, start, end)
# the start date is the date of the first trade and
# the end date is the date of the last trade
start_date = df.index[0]
end_date = df.index[-1]
# The auto_close date is the day after the last trade.
ac_date = end_date + pd.Timedelta(days=1)
metadata.iloc[sid] = start_date, end_date, ac_date, symbol
df.rename(
columns={
'DateTime': 'date',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
inplace=True,
)
df['volume'] = df['volume'] / 1000
if daily:
daily_bars = minute_frame_to_session_frame(df, calendar)
calendar_sessions = calendar.sessions_in_range(start_date, end_date)
missing_dates = calendar_sessions.difference(daily_bars.index)
for missing_date in missing_dates:
print('Forward filling data for %s: %s' % (symbol, str(missing_date)))
daily_bars.loc[missing_date] = np.NaN
# Need to sort as missing_date fillings are put at the end of the dataframe
daily_bars.sort_index(inplace=True)
yield (sid, daily_bars.fillna(method='pad'))
else:
yield sid, df
sid += 1
# from IPython import embed; embed()
minute_bar_writer.write(_pricing_iter(False), show_progress=show_progress)
daily_bar_writer.write(_pricing_iter(True), show_progress=show_progress)
symbol_map = pd.Series(metadata.symbol.index, metadata.symbol)
metadata['exchange'] = "IQFEED"
asset_db_writer.write(equities=metadata)
adjustments = []
with maybe_show_progress(
symbols,
show_progress,
label='Downloading Yahoo adjustment data: ') as it, \
requests.Session() as session:
for symbol in it:
path = _cachpath(symbol, 'adjustment')
sid = symbol_map[symbol]
try:
df = cache[path]
except KeyError:
try:
df = cache[path] = DataReader(
symbol,
'yahoo-actions',
metadata.ix[sid].start_date,
metadata.ix[sid].end_date,
session=session,
).sort_index()
except RemoteDataError:
print("No data returned from Yahoo for %s" % symbol)
df = pd.DataFrame(columns=['value', 'action'])
df['sid'] = sid
adjustments.append(df)
adj_df = pd.concat(adjustments)
adj_df.index.name = 'date'
adj_df.reset_index(inplace=True)
splits = adj_df[adj_df.action == 'SPLIT']
splits = splits.rename(
columns={'value': 'ratio', 'date': 'effective_date'},
)
splits.drop('action', axis=1, inplace=True)
dividends = adj_df[adj_df.action == 'DIVIDEND']
dividends = dividends.rename(
columns={'value': 'amount', 'date': 'ex_date'},
)
dividends.drop('action', axis=1, inplace=True)
# we do not have this data in the yahoo dataset
dividends['record_date'] = pd.NaT
dividends['declared_date'] = pd.NaT
dividends['pay_date'] = pd.NaT
adjustment_writer.write(splits=splits, dividends=dividends)
return ingest
register_calendar_alias("IQFEED", "NYSE")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment