Last active
October 17, 2017 17:52
-
-
Save niucool/9eccb46ebb467e099d3fb45a0b8d03d0 to your computer and use it in GitHub Desktop.
zipline-live
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
#https://github.com/RomelTorres/alpha_vantage | |
from alpha_vantage.timeseries import TimeSeries | |
ALPHA_VANTAGE_KEY = 'XXXXXXXXXXXX' | |
################################## | |
# Alpha vantage version of data.history and data.current | |
def av_data_history(assets, fields, bar_count, frequency): | |
ts = TimeSeries(key=ALPHA_VANTAGE_KEY, output_format='pandas') | |
if bar_count <= 100: | |
output_size = 'compact' | |
else: | |
output_size = 'full' | |
if frequency == '1d': | |
intraday = False | |
else: | |
intraday = True | |
df = pd.DataFrame() | |
for asset in assets: | |
if intraday: | |
his_data, meta_data = ts.get_intraday( | |
symbol=asset.symbol, outputsize=output_size) | |
else: | |
#maybe get_daily_adjusted is better | |
his_data, meta_data = ts.get_daily( | |
symbol=asset.symbol, outputsize=output_size) | |
his_data = his_data[-bar_count:] | |
his_data.index = pd.to_datetime(his_data.index) | |
his_data.columns = pd.MultiIndex.from_product( | |
[[asset, ], his_data.columns]) | |
df = pd.concat([df, his_data], axis=1) | |
df = df.swaplevel(0, 1, axis=1) | |
return df[fields] | |
#TODO: only one asset and field is allowed, for a full version | |
# we need to follow the defintion of data.current: | |
# https://www.quantopian.com/help#api-data-current | |
def av_data_current(asset, field): | |
ts = TimeSeries(key=ALPHA_VANTAGE_KEY, output_format='pandas') | |
output_size = 'compact' | |
df = pd.DataFrame() | |
his_data, meta_data = ts.get_intraday( | |
symbol=asset.symbol, interval='1min', outputsize=output_size) | |
his_data.index = pd.to_datetime(his_data.index) | |
return his_data[field].values[-1] | |
def data_history(data, assets, fields, bar_count, frequency): | |
#Make it suitable for both backtest and live trading | |
if get_environment('arena') == 'IB': | |
return av_data_history(assets, fields, bar_count, frequency) | |
else: | |
return data.history(assets, fields, bar_count, frequency) | |
def data_current(data, asset, field): | |
#Make it suitable for both backtest and live trading | |
if get_environment('arena') == 'IB': | |
cur_price = av_data_current(asset, field) | |
else: | |
cur_price = data.current(asset, field) | |
if math.isnan(cur_price) and field == 'close': | |
cur_price = data.current(asset, 'price') | |
return cur_price | |
################################## |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Module for building a complete dataset from local directory with csv files. | |
""" | |
import os | |
import sys | |
import logbook | |
from numpy import empty | |
from pandas import DataFrame, read_csv, Index, Timedelta, NaT, concat | |
import requests | |
from pandas_datareader.data import DataReader | |
from pandas_datareader._utils import RemoteDataError | |
from zipline.utils.calendars import register_calendar_alias | |
from zipline.utils.cli import maybe_show_progress | |
logger = logbook.Logger(__name__) | |
def csvdir_equities(tframes=['daily'], start=None, end=None): | |
""" | |
Generate an ingest function for custom data bundle | |
Parameters | |
---------- | |
tframe: list or tuple, optional | |
The data time frames ('minute', 'daily') | |
start : datetime, optional | |
The start date to query for. By default this pulls the full history | |
for the calendar. | |
end : datetime, optional | |
The end date to query for. By default this pulls the full history | |
for the calendar. | |
Returns | |
------- | |
ingest : callable | |
The bundle ingest function for the given set of symbols. | |
Examples | |
-------- | |
This code should be added to ~/.zipline/extension.py | |
.. code-block:: python | |
from zipline.data.bundles import csvdir_equities, register | |
register('custom-csvdir-bundle', | |
csvdir_equities(sys.environ['CSVDIR'], | |
['daily', 'minute'])) | |
Notes | |
----- | |
Environment variable CSVDIR must contain path to the directory with the | |
following structure: | |
daily/<symbol>.cvs files | |
minute/<symbol>.csv files | |
for each symbol. | |
""" | |
return CSVDIRBundle(tframes, start, end).ingest | |
#@retry(tries=3, delay=1, backoff=2) | |
def RetryingDataReader(*args, **kwargs): | |
return DataReader(*args, **kwargs) | |
def download_splits_and_dividends(symbols, metadata, session): | |
adjustments = [] | |
for sid, symbol in enumerate(symbols): | |
try: | |
logger.debug("Downloading splits/dividends for %s" % symbol) | |
df = RetryingDataReader(symbol, | |
'yahoo-actions', | |
metadata.ix[sid].start_date, | |
metadata.ix[sid].end_date, | |
session=session).sort_index() | |
except RemoteDataError: | |
logger.warning("No data returned from Yahoo for %s" % symbol) | |
df = DataFrame(columns=['value', 'action']) | |
df['sid'] = sid | |
adjustments.append(df) | |
adj_df = concat(adjustments) | |
adj_df.index.name = 'date' | |
adj_df.reset_index(inplace=True) | |
splits_df = adj_df[adj_df['action'] == 'SPLIT'] | |
splits_df = splits_df.rename( | |
columns={'value': 'ratio', 'date': 'effective_date'}, | |
) | |
splits_df.drop('action', axis=1, inplace=True) | |
splits_df.reset_index(inplace=True, drop=True) | |
dividends_df = adj_df[adj_df['action'] == 'DIVIDEND'] | |
dividends_df = dividends_df.rename( | |
columns={'value': 'amount', 'date': 'ex_date'}, | |
) | |
dividends_df.drop('action', axis=1, inplace=True) | |
# we do not have this data in the yahoo dataset | |
dividends_df['record_date'] = NaT | |
dividends_df['declared_date'] = NaT | |
dividends_df['pay_date'] = NaT | |
dividends_df.reset_index(inplace=True, drop=True) | |
return splits_df, dividends_df | |
class CSVDIRBundle: | |
""" | |
Wrapper class to enable write access to self.splits and self.dividends | |
from _pricing_iter method. | |
""" | |
def __init__(self, tframes, start, end): | |
self.tframes = tframes | |
self.start = start | |
self.end = end | |
self.show_progress = None | |
self.symbols = None | |
self.metadata = None | |
self.csvdir = None | |
self.splits = None | |
self.dividends = None | |
def ingest(self, environ, asset_db_writer, minute_bar_writer, | |
daily_bar_writer, adjustment_writer, calendar, start_session, | |
end_session, cache, show_progress, output_dir): | |
csvdir = os.environ.get('CSVDIR') | |
if not csvdir: | |
logger.error("CSVDIR environment variable is not set") | |
sys.exit(1) | |
if not os.path.isdir(csvdir): | |
logger.error("%s is not a directory" % csvdir) | |
sys.exit(1) | |
for tframe in self.tframes: | |
ddir = os.path.join(csvdir, tframe) | |
if not os.path.isdir(ddir): | |
logger.error("%s is not a directory" % ddir) | |
for tframe in self.tframes: | |
ddir = os.path.join(csvdir, tframe) | |
self.symbols = sorted(item.split('.csv')[0] | |
for item in os.listdir(ddir) | |
if item.endswith('.csv')) | |
if not self.symbols: | |
logger.error("no <symbol>.csv files found in %s" % ddir) | |
sys.exit(1) | |
self.csvdir = ddir | |
dtype = [('start_date', 'datetime64[ns]'), | |
('end_date', 'datetime64[ns]'), | |
('auto_close_date', 'datetime64[ns]'), | |
('symbol', 'object')] | |
self.metadata = DataFrame(empty(len(self.symbols), dtype=dtype)) | |
self.show_progress = show_progress | |
if tframe == 'minute': | |
writer = minute_bar_writer | |
else: | |
writer = daily_bar_writer | |
writer.write(self._pricing_iter(), show_progress=show_progress) | |
# Hardcode the exchange to "CSVDIR" for all assets and (elsewhere) | |
# register "CSVDIR" to resolve to the NYSE calendar, because these | |
# are all equities and thus can use the NYSE calendar. | |
self.metadata['exchange'] = "CSVDIR" | |
asset_db_writer.write(equities=self.metadata) | |
# niucool: download splits and dividends from yahoo | |
# code from: https://gist.github.com/tibkiss/d917ff389d479f32fff303f53893c3c5 | |
# adjustment_writer.write(splits=self.splits, dividends=self.dividends) | |
with requests.Session() as session: | |
splits, dividends = download_splits_and_dividends( | |
self.symbols, | |
self.metadata, | |
session) | |
adjustment_writer.write(splits=splits, dividends=dividends) | |
def _pricing_iter(self): | |
with maybe_show_progress(self.symbols, self.show_progress, | |
label='Loading custom pricing data: ') as it: | |
for sid, symbol in enumerate(it): | |
logger.debug('%s: sid %s' % (symbol, sid)) | |
dfr = read_csv(os.path.join(self.csvdir, '%s.csv' % symbol), | |
parse_dates=[0], infer_datetime_format=True, | |
index_col=0).sort_index() | |
# the start date is the date of the first trade and | |
# the end date is the date of the last trade | |
start_date = dfr.index[0] | |
end_date = dfr.index[-1] | |
# The auto_close date is the day after the last trade. | |
ac_date = end_date + Timedelta(days=1) | |
self.metadata.iloc[sid] = start_date, end_date, ac_date, symbol | |
if 'split' in dfr.columns: | |
if self.splits is None: | |
self.splits = DataFrame() | |
tmp = dfr[dfr['split'] != 1.0]['split'] | |
split = DataFrame(data=tmp.index.tolist(), | |
columns=['effective_date']) | |
split['ratio'] = tmp.tolist() | |
split['sid'] = sid | |
index = Index(range(self.splits.shape[0], | |
self.splits.shape[0] + split.shape[0])) | |
split.set_index(index, inplace=True) | |
self.splits = self.splits.append(split) | |
if 'dividend' in dfr.columns: | |
if self.dividends is None: | |
self.dividends = DataFrame() | |
# ex_date amount sid record_date declared_date pay_date | |
tmp = dfr[dfr['dividend'] != 0.0]['dividend'] | |
div = DataFrame(data=tmp.index.tolist(), | |
columns=['ex_date']) | |
div['record_date'] = NaT | |
div['declared_date'] = NaT | |
div['pay_date'] = NaT | |
div['amount'] = tmp.tolist() | |
div['sid'] = sid | |
ind = Index(range(self.dividends.shape[0], | |
self.dividends.shape[0] + div.shape[0])) | |
div.set_index(ind, inplace=True) | |
if self.dividends is None: | |
self.dividends = DataFrame() | |
self.dividends = self.dividends.append(div) | |
yield sid, dfr | |
register_calendar_alias("CSVDIR", "NYSE") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment