Skip to content

Instantly share code, notes, and snippets.

@niucool
Last active October 17, 2017 17:52
Show Gist options
  • Save niucool/9eccb46ebb467e099d3fb45a0b8d03d0 to your computer and use it in GitHub Desktop.
Save niucool/9eccb46ebb467e099d3fb45a0b8d03d0 to your computer and use it in GitHub Desktop.
zipline-live
import pandas as pd
#https://github.com/RomelTorres/alpha_vantage
from alpha_vantage.timeseries import TimeSeries
ALPHA_VANTAGE_KEY = 'XXXXXXXXXXXX'
##################################
# Alpha vantage version of data.history and data.current
def av_data_history(assets, fields, bar_count, frequency):
ts = TimeSeries(key=ALPHA_VANTAGE_KEY, output_format='pandas')
if bar_count <= 100:
output_size = 'compact'
else:
output_size = 'full'
if frequency == '1d':
intraday = False
else:
intraday = True
df = pd.DataFrame()
for asset in assets:
if intraday:
his_data, meta_data = ts.get_intraday(
symbol=asset.symbol, outputsize=output_size)
else:
#maybe get_daily_adjusted is better
his_data, meta_data = ts.get_daily(
symbol=asset.symbol, outputsize=output_size)
his_data = his_data[-bar_count:]
his_data.index = pd.to_datetime(his_data.index)
his_data.columns = pd.MultiIndex.from_product(
[[asset, ], his_data.columns])
df = pd.concat([df, his_data], axis=1)
df = df.swaplevel(0, 1, axis=1)
return df[fields]
#TODO: only one asset and field is allowed, for a full version
# we need to follow the defintion of data.current:
# https://www.quantopian.com/help#api-data-current
def av_data_current(asset, field):
ts = TimeSeries(key=ALPHA_VANTAGE_KEY, output_format='pandas')
output_size = 'compact'
df = pd.DataFrame()
his_data, meta_data = ts.get_intraday(
symbol=asset.symbol, interval='1min', outputsize=output_size)
his_data.index = pd.to_datetime(his_data.index)
return his_data[field].values[-1]
def data_history(data, assets, fields, bar_count, frequency):
#Make it suitable for both backtest and live trading
if get_environment('arena') == 'IB':
return av_data_history(assets, fields, bar_count, frequency)
else:
return data.history(assets, fields, bar_count, frequency)
def data_current(data, asset, field):
#Make it suitable for both backtest and live trading
if get_environment('arena') == 'IB':
cur_price = av_data_current(asset, field)
else:
cur_price = data.current(asset, field)
if math.isnan(cur_price) and field == 'close':
cur_price = data.current(asset, 'price')
return cur_price
##################################
"""
Module for building a complete dataset from local directory with csv files.
"""
import os
import sys
import logbook
from numpy import empty
from pandas import DataFrame, read_csv, Index, Timedelta, NaT, concat
import requests
from pandas_datareader.data import DataReader
from pandas_datareader._utils import RemoteDataError
from zipline.utils.calendars import register_calendar_alias
from zipline.utils.cli import maybe_show_progress
logger = logbook.Logger(__name__)
def csvdir_equities(tframes=['daily'], start=None, end=None):
"""
Generate an ingest function for custom data bundle
Parameters
----------
tframe: list or tuple, optional
The data time frames ('minute', 'daily')
start : datetime, optional
The start date to query for. By default this pulls the full history
for the calendar.
end : datetime, optional
The end date to query for. By default this pulls the full history
for the calendar.
Returns
-------
ingest : callable
The bundle ingest function for the given set of symbols.
Examples
--------
This code should be added to ~/.zipline/extension.py
.. code-block:: python
from zipline.data.bundles import csvdir_equities, register
register('custom-csvdir-bundle',
csvdir_equities(sys.environ['CSVDIR'],
['daily', 'minute']))
Notes
-----
Environment variable CSVDIR must contain path to the directory with the
following structure:
daily/<symbol>.cvs files
minute/<symbol>.csv files
for each symbol.
"""
return CSVDIRBundle(tframes, start, end).ingest
#@retry(tries=3, delay=1, backoff=2)
def RetryingDataReader(*args, **kwargs):
return DataReader(*args, **kwargs)
def download_splits_and_dividends(symbols, metadata, session):
adjustments = []
for sid, symbol in enumerate(symbols):
try:
logger.debug("Downloading splits/dividends for %s" % symbol)
df = RetryingDataReader(symbol,
'yahoo-actions',
metadata.ix[sid].start_date,
metadata.ix[sid].end_date,
session=session).sort_index()
except RemoteDataError:
logger.warning("No data returned from Yahoo for %s" % symbol)
df = DataFrame(columns=['value', 'action'])
df['sid'] = sid
adjustments.append(df)
adj_df = concat(adjustments)
adj_df.index.name = 'date'
adj_df.reset_index(inplace=True)
splits_df = adj_df[adj_df['action'] == 'SPLIT']
splits_df = splits_df.rename(
columns={'value': 'ratio', 'date': 'effective_date'},
)
splits_df.drop('action', axis=1, inplace=True)
splits_df.reset_index(inplace=True, drop=True)
dividends_df = adj_df[adj_df['action'] == 'DIVIDEND']
dividends_df = dividends_df.rename(
columns={'value': 'amount', 'date': 'ex_date'},
)
dividends_df.drop('action', axis=1, inplace=True)
# we do not have this data in the yahoo dataset
dividends_df['record_date'] = NaT
dividends_df['declared_date'] = NaT
dividends_df['pay_date'] = NaT
dividends_df.reset_index(inplace=True, drop=True)
return splits_df, dividends_df
class CSVDIRBundle:
"""
Wrapper class to enable write access to self.splits and self.dividends
from _pricing_iter method.
"""
def __init__(self, tframes, start, end):
self.tframes = tframes
self.start = start
self.end = end
self.show_progress = None
self.symbols = None
self.metadata = None
self.csvdir = None
self.splits = None
self.dividends = None
def ingest(self, environ, asset_db_writer, minute_bar_writer,
daily_bar_writer, adjustment_writer, calendar, start_session,
end_session, cache, show_progress, output_dir):
csvdir = os.environ.get('CSVDIR')
if not csvdir:
logger.error("CSVDIR environment variable is not set")
sys.exit(1)
if not os.path.isdir(csvdir):
logger.error("%s is not a directory" % csvdir)
sys.exit(1)
for tframe in self.tframes:
ddir = os.path.join(csvdir, tframe)
if not os.path.isdir(ddir):
logger.error("%s is not a directory" % ddir)
for tframe in self.tframes:
ddir = os.path.join(csvdir, tframe)
self.symbols = sorted(item.split('.csv')[0]
for item in os.listdir(ddir)
if item.endswith('.csv'))
if not self.symbols:
logger.error("no <symbol>.csv files found in %s" % ddir)
sys.exit(1)
self.csvdir = ddir
dtype = [('start_date', 'datetime64[ns]'),
('end_date', 'datetime64[ns]'),
('auto_close_date', 'datetime64[ns]'),
('symbol', 'object')]
self.metadata = DataFrame(empty(len(self.symbols), dtype=dtype))
self.show_progress = show_progress
if tframe == 'minute':
writer = minute_bar_writer
else:
writer = daily_bar_writer
writer.write(self._pricing_iter(), show_progress=show_progress)
# Hardcode the exchange to "CSVDIR" for all assets and (elsewhere)
# register "CSVDIR" to resolve to the NYSE calendar, because these
# are all equities and thus can use the NYSE calendar.
self.metadata['exchange'] = "CSVDIR"
asset_db_writer.write(equities=self.metadata)
# niucool: download splits and dividends from yahoo
# code from: https://gist.github.com/tibkiss/d917ff389d479f32fff303f53893c3c5
# adjustment_writer.write(splits=self.splits, dividends=self.dividends)
with requests.Session() as session:
splits, dividends = download_splits_and_dividends(
self.symbols,
self.metadata,
session)
adjustment_writer.write(splits=splits, dividends=dividends)
def _pricing_iter(self):
with maybe_show_progress(self.symbols, self.show_progress,
label='Loading custom pricing data: ') as it:
for sid, symbol in enumerate(it):
logger.debug('%s: sid %s' % (symbol, sid))
dfr = read_csv(os.path.join(self.csvdir, '%s.csv' % symbol),
parse_dates=[0], infer_datetime_format=True,
index_col=0).sort_index()
# the start date is the date of the first trade and
# the end date is the date of the last trade
start_date = dfr.index[0]
end_date = dfr.index[-1]
# The auto_close date is the day after the last trade.
ac_date = end_date + Timedelta(days=1)
self.metadata.iloc[sid] = start_date, end_date, ac_date, symbol
if 'split' in dfr.columns:
if self.splits is None:
self.splits = DataFrame()
tmp = dfr[dfr['split'] != 1.0]['split']
split = DataFrame(data=tmp.index.tolist(),
columns=['effective_date'])
split['ratio'] = tmp.tolist()
split['sid'] = sid
index = Index(range(self.splits.shape[0],
self.splits.shape[0] + split.shape[0]))
split.set_index(index, inplace=True)
self.splits = self.splits.append(split)
if 'dividend' in dfr.columns:
if self.dividends is None:
self.dividends = DataFrame()
# ex_date amount sid record_date declared_date pay_date
tmp = dfr[dfr['dividend'] != 0.0]['dividend']
div = DataFrame(data=tmp.index.tolist(),
columns=['ex_date'])
div['record_date'] = NaT
div['declared_date'] = NaT
div['pay_date'] = NaT
div['amount'] = tmp.tolist()
div['sid'] = sid
ind = Index(range(self.dividends.shape[0],
self.dividends.shape[0] + div.shape[0]))
div.set_index(ind, inplace=True)
if self.dividends is None:
self.dividends = DataFrame()
self.dividends = self.dividends.append(div)
yield sid, dfr
register_calendar_alias("CSVDIR", "NYSE")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment