Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Stocks Dataset using AlphaVantage API
# Stocks Data Assembly and Preprocessing
# ------------------------------------------------------------------------------
import os
import csv
import time
from pathlib import Path
from functools import reduce
import pandas as pd
from alpha_vantage.timeseries import TimeSeries
from argparse_prompt import PromptParser
# Configuration Options
# ------------------------------------------------------------------------------
# Initialize the command-line options parser
parser = PromptParser()
# Because we’re using non-idempotent trailing raw data, partially downloading
# can potentially result in undefined behavior of the dataset. Skipping the
# download will only re-preprocess the dataset from existing raw data.
parser.add_argument('--skip_download', default=False, prompt=False, action='store_true',
help='Whether to skip downloading raw data and only use existing cached data'
)
# A clean run of the script will re-download and re-process everything from
# scratch and ignore any locally cached files.
parser.add_argument('--ignore_cache', default=False, prompt=False, action='store_true',
help='Whether to ignore cached data and download raw data from scratch'
)
# The Alpha Vantage API supports up to 5 requests per minute (and 500 requests
# per day). By default we pause between requests to avoid reaching the limit.
# See: https://www.alphavantage.co/premium/
parser.add_argument('--no_pause', default=False, prompt=False, action='store_true',
help='Whether to pause between API requests to avoid being rate limited'
)
# The number of historical months to use in the dataset, counting from the most
# recent month. The AlphaVantage API supports up to two years of trailing
# historical data.
parser.add_argument('--months', type=int, default=1, choices=range(1,24),
metavar='[1-24]', help='Number of historical months to use'
)
# The time interval between two consecutive data points in the time series.
parser.add_argument('--interval', type=str, default='15min',
choices=['1min', '5min', '15min', '30min', '60min'],
help='The tick interval between two consecutive data points'
)
# Whether to include data points from the pre-market and after-hours market.
parser.add_argument('--extended_hours', default=False, prompt=False, action='store_true',
help='Whether to include data on trading before or after the market is open'
)
# Parse known configuration options
options, unparsed = parser.parse_known_args()
# Settings
# ------------------------------------------------------------------------------
# Full path to where the data should be saved: the script’s directory.
data_path = Path(__file__).parent.resolve()
# Get the Alpha Vantage API key from an environment variable
alpha_vantage_api_key = os.environ.get('ALPHA_VANTAGE_API_KEY')
# Initialize a TimeSeries, passing the Alpha Vantage API key
ts = TimeSeries(key=alpha_vantage_api_key, output_format='csv')
# The stocks to download
stocks = [
'MSFT', 'F', 'VOO', 'IBM', 'BA',
'AMZN', 'AAPL', 'M', 'NFLX', 'INTC',
'DIS', 'STNE', 'PTON', 'JNJ', 'UAA',
'CMG', 'BABA', 'ADSK', 'CPB', 'CVX'
# Not included:
# 'AVGO', 'BRK.B'
# These stocks are too volatile or extreme and can skew results too much:
# 'GME', 'AMC', 'ETSY', 'MRNA', 'TSLA', 'AMD', 'SHOP',
# For some reason or another, these stocks do not have enough intraday tick
# entries. Consequently, merging with these will drastically reduce the
# amount of rows available in the final dataset.
# 'WW', 'WIX', 'QDEL', 'MDB',
]
# The time period slices to download
time_slices = [
'year1month1', 'year1month2', 'year1month3', 'year1month4',
'year1month5', 'year1month6', 'year1month7', 'year1month8',
'year1month9', 'year1month10', 'year1month11', 'year1month12',
'year2month1', 'year2month2', 'year2month3', 'year2month4',
'year2month5', 'year2month6', 'year2month7', 'year2month8',
'year2month9', 'year2month10', 'year2month11', 'year2month12'
][:options.months]
# Helpers
# ------------------------------------------------------------------------------
def timestamp() -> str:
"""Returns the current timestamp in the format of '14:25:32'"""
return time.strftime('%H:%M:%S')
def log(text) -> None:
"""Prints the given text to stdout with a timestamp"""
print(f'{timestamp()} {text}')
# Data Download
# ------------------------------------------------------------------------------
def download_data() -> None:
"""Based on the config options and settings, download the raw dataset."""
# Only download the data if requested.
if options.skip_download:
log('Skipping download. Starting preprocessing.')
return
# Iterate over each of the stocks
for stock in stocks:
# Iterate over each of the time slices
for time_slice in time_slices:
# Download a time slice of the stock (returns a boolean indicating
# whether the data was freshly downloaded or was it already cached)
if download_slice(stock, time_slice):
# If we want to pause to avoid being rate limited, sleep for a while
if not options.no_pause:
log('Pausing to avoid rate limit...')
time.sleep(12.5)
# Report that the download was completed successfully.
log('Download completed. Starting preprocessing.')
log('---------------------------------------------')
def download_slice(stock: str, time_slice: str) -> None:
"""
Download a `time_slice` of the given `stock` to a local CSV file.
Args:
stock: The ticker symbol of the stock to download.
time_slice: The time slice to download.
Returns:
A boolean noting whether the data was newly-downloaded or already cached.
"""
# Full file name that we want to save the data to
file_name = f'{data_path}/raw/{stock}-{time_slice}-{options.interval}.csv'
# If the file already exists, and this is not a clean run of the script,
# skip downloading this time slice of the stock.
if os.path.isfile(file_name) and not options.ignore_cache:
log(f'Skipping download of slice {time_slice} of {stock} stock.')
return False
# Log what time slice of which stock we’re downloading
log(f'Downloading slice {time_slice} of {stock} stock...')
# Send a request to Alpha Vantage to get the data as a CSV
data, metadata = ts.get_intraday_extended(
symbol=stock, interval=options.interval, slice=time_slice
)
# Log the name of the CSV file the data will be saved into
log(f'Saving slice {time_slice} of {stock} stock to {file_name}')
# Open the CSV file and write the data into it row by row
with open(file_name, mode='w') as csv_file:
csv_writer = csv.writer(csv_file)
for row in data: csv_writer.writerow(row)
# Indicates we’ve downloaded a fresh copy of the data
return True
# Dataset Preprocessing
# ------------------------------------------------------------------------------
def normalize_stock_slice(stock: str, time_slice: str) -> pd.DataFrame:
"""Normalizes a stock time slice:
- Reads the stock time slice data from a csv to a pandas data frame.
- Removes any pre- or post-market trading ticks, if necessary.
- Adds a typical price column, an average of the high, low, and close columns.
- Converts the time slice timestamp to epoch time in seconds.
- Renames columns to include the stock ticker name (e.g. 'open' -> 'X open').
Args:
stock: The stock ticker symbol
time_slice: The time slice to normalize
Returns:
A pandas dataframe of the normalized stock ticks
"""
df = pd.read_csv(f'{data_path}/raw/{stock}-{time_slice}-{options.interval}.csv')
# Convert the 'time' column type to a datetime
df['time'] = pd.to_datetime(df['time'])
# If required, remove pre-market and after-hours trading ticks (anything
# before 09:31:00 or after 16:00:00).
# See: https://stackoverflow.com/q/42213578/
if not options.extended_hours:
df = df[df.time.dt.strftime('%H:%M:%S').between('09:31:00','16:00:00')]
# Add a column for the typical price of the stock
# See: https://en.wikipedia.org/wiki/Typical_price
df['middle'] = (df['close'] + df['high'] + df['low']) / 3.0
# Convert the `time` column to epoch time in seconds
# See: https://stackoverflow.com/a/63948244/119959
df['time'] = df['time'].apply(lambda x: int(x.timestamp()))
# Rename columns to include the stock ticker
df.rename(inplace=True, columns= {
'open' : f'{stock} open',
'high' : f'{stock} high',
'low' : f'{stock} low',
'close' : f'{stock} close',
'volume': f'{stock} volume',
'middle': f'{stock} middle'
})
# Return the normalized data frame
return df
def normalize_stock(stock: str) -> pd.DataFrame:
"""Normalizes a stock dataset:
- Normalizes all the stock’s time slices
- Add together all of the stock time slices
- Reverses the order of the data frame to start from the earliest tick
- Adds further enriched columns of stock statistics:
- The simple moving average of the stock close price using several windows
- The standard deviation of the stock close price using several windows
Args:
stock: The stock ticker name
Returns:
A pandas dataframe with the normalized stock data.
"""
log(f'Normalizing {stock} time slices...')
# Normalize all the stock slices
slice_dfs = [normalize_stock_slice(stock, slice) for slice in time_slices]
# And add them together to one dataframe
df = reduce(lambda df1, df2: df1.append(df2), slice_dfs)
# Reverse the dataframe (earlier ticks should be listed first). Note that
# for some reason this give s a `SettingWithCopyWarning` when there’s only
# one time slice per stock.
df = df.iloc[::-1]
# Add several indicators of different window sizes
for window in [3, 10, 20, 50, 100]:
# Add a simple moving average indicator
df.loc[:,f'{stock} SMA {window}'] = sma(df[f'{stock} close'], window)
# Add a standard deviation indicator
df.loc[:,f'{stock} STD {window}'] = std(df[f'{stock} close'], window)
# Return the normalized data frame
return df
def sma(column: pd.Series, window: int) -> pd.Series:
"""Returns the simple moving average of the `column` using the `window`."""
return column.rolling(window=window, min_periods=1).mean()
def std(column: pd.Series, window: int) -> pd.Series:
"""Returns the standard deviation of the `column` using the `window`."""
# @TODO Since we currently heavily rely on percentage change in the
# experiment, zeros are forbidden as values, to avoid a division by
# zero. So, if the standard deviation is zero (which it will be only
# in the first row), we change its value to 0.1.
return (column.rolling(window=window, min_periods=1)
.std(ddof=0)
.apply(lambda x: x or 0.1))
# Main Entry Point
# ------------------------------------------------------------------------------
if __name__ == '__main__':
# Download the data
download_data()
# Normalize each of the stocks
stock_dfs = [normalize_stock(stock) for stock in stocks]
# Log the number of entries we get per stock
for i, d in enumerate(stock_dfs): log(f'{stocks[i]}: {len(d.index)} rows')
# Then merge them together based on the time column
df = reduce(lambda df1, df2: pd.merge(df1, df2, on='time'), stock_dfs)
# Normalize the time so that the first data point will be at point 10,000
# and the rest offset by their difference in seconds. We start at such a
# high number (rather than zero or one) so that the log difference in the
# first few frames will not be that larger than the difference that follows.
df.loc[:, 'time'] -= df.loc[0, 'time'] - 10000
# Log some information about the dataset
print('-' * 80)
df.info()
print('-' * 80)
print(df)
print('-' * 80)
# Save the dataframe to a dataset CSV file, without the index
df.to_csv(f'{data_path}/dataset-{options.interval}.csv',index=False)
@Radagaisus
Copy link
Author

Radagaisus commented Aug 18, 2021

  • To generate the dataset run python data/stocks/generate.py --help.

  • The AlphaVantage API key must be set as the ALPHA_VANTAGE_API_KEY environment variable.

  • The list of stocks that are used to generate the dataset is hardcoded in the script. Note that for some stocks a lot of data points are missing. Consequently, since we’re using an inner join when merging the data, a lot of rows can potentially be dropped if a stock with sparse data is added to the list.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment