Skip to content

Instantly share code, notes, and snippets.

@duncangh
Forked from elibroftw/investing.py
Created April 1, 2023 05:00
Show Gist options
  • Save duncangh/93d068510c557704f8c8f3d47053f8e1 to your computer and use it in GitHub Desktop.
Save duncangh/93d068510c557704f8c8f3d47053f8e1 to your computer and use it in GitHub Desktop.
A Python script and tool for speeding up stock market research.
"""
Investing Quick Analytics
Author: Elijah Lopez
Version: 1.49
Created: April 3rd 2020
Updated: March 16th 2021
https://gist.github.com/elibroftw/2c374e9f58229d7cea1c14c6c4194d27
Resources:
Black-Scholes variables:
https://aaronschlegel.me/black-scholes-formula-python.html#Dividend-Paying-Black-Scholes-Formula
Black-Scholes formulas:
https://quantpie.co.uk/bsm_formula/bs_summary.php
Volatility (Standard Deviation) of a stock:
https://tinytrader.io/how-to-calculate-historical-price-volatility-with-python/
Concurrent Futures:
https://docs.python.org/3/library/concurrent.futures.html
"""
from contextlib import suppress
import csv
import concurrent.futures
from datetime import datetime, timedelta
import math
from statistics import NormalDist, median, StatisticsError
# noinspection PyUnresolvedReferences
from pprint import pprint
from typing import Iterator
# 3rd party libraries
from bs4 import BeautifulSoup
from fuzzywuzzy import process
import random
import requests
import json
import yfinance as yf
from enum import IntEnum
import numpy as np
from pytz import timezone
from functools import lru_cache, wraps, cmp_to_key
import time
import re
import feedparser
def time_cache(max_age, maxsize=128, typed=False):
"""Least-recently-used cache decorator with time-based cache invalidation.
Args:
max_age: Time to live for cached results (in seconds).
maxsize: Maximum cache size (see `functoolslru_cache`).
typed: Cache on distinct input types (see `functools.lru_cache`).
"""
def _decorator(fn):
@lru_cache(maxsize=maxsize, typed=typed)
def _new(*args, __time_salt, **kwargs):
return fn(*args, **kwargs)
@wraps(fn)
def _wrapped(*args, **kwargs):
return _new(*args, **kwargs, __time_salt=int(time.time() / max_age))
return _wrapped
return _decorator
def timing(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
_start = time.time()
result = fn(*args, **kwargs)
print(f'@timing {fn.__name__} ELAPSED TIME:', time.time() - _start)
return result
return wrapper
NASDAQ_TICKERS_URL = 'https://api.nasdaq.com/api/screener/stocks?exchange=nasdaq&download=true'
OTC_TICKERS_URK = 'https://www.otcmarkets.com/research/stock-screener/api?securityType=Common%20Stock&market=20,21,22,10,6,5,2,1&sortField=symbol&pageSize=100000'
# NYSE_TICKERS_URL = 'https://api.nasdaq.com/api/screener/stocks?exchange=nyse&download=true'
NYSE_TICKERS_URL = 'https://www.nyse.com/api/quotes/filter'
NYSE_URL = 'https://www.nyse.com'
AMEX_TICKERS_URL = 'https://api.nasdaq.com/api/screener/stocks?exchange=amex&download=true'
TSX_TICKERS_URL = 'https://www.tsx.com/json/company-directory/search/tsx/^*'
PREMARKET_FUTURES_URL = 'https://ca.investing.com/indices/indices-futures'
DOW_URL = 'https://en.wikipedia.org/wiki/Dow_Jones_Industrial_Average'
SP500_URL = 'http://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
RUT_2K_URL ='https://api.vanguard.com/rs/ire/01/ind/fund/VTWO/portfolio-holding/stock.json'
TIP_RANKS_API = 'https://www.tipranks.com/api/stocks/'
CIK_LIST_URL = 'https://www.sec.gov/include/ticker.txt'
SORTED_INFO_CACHE = {} # for when its past 4 PM
GENERIC_HEADERS = {
'accept': 'text/html,application/xhtml+xml,application/json',
'user-agent': 'Mozilla/5.0'
}
# NOTE: something for later https://www.alphavantage.co/
# noinspection PyShadowingNames
def make_request(url, method='GET', headers=None, json=None, data=None):
if headers is None:
headers = GENERIC_HEADERS
if method == 'GET':
return requests.get(url, headers=headers)
elif method == 'POST':
return requests.post(url, json=json, headers=headers, data=None)
raise ValueError(f'Invalid method {method}')
@time_cache(24 * 3600, maxsize=1)
def get_dow_tickers() -> dict:
resp = make_request(DOW_URL)
soup = BeautifulSoup(resp.text, 'html.parser')
# noinspection PyUnresolvedReferences
table = soup.find('table', {'id': 'constituents'}).find('tbody')
rows = table.find_all('tr')
tickers = dict()
for row in rows:
with suppress(IndexError):
ticker = row.find_all('td')[1].text.split(':')[-1].strip()
name = row.find('th').text.strip()
tickers[ticker] = {'symbol': ticker, 'name': name}
return tickers
@time_cache(24 * 3600, maxsize=1)
def get_sp500_tickers() -> dict:
resp = make_request(SP500_URL)
soup = BeautifulSoup(resp.text, 'html.parser')
table = soup.find('table', {'id': 'constituents'})
tickers = {}
# noinspection PyUnresolvedReferences
for row in table.findAll('tr')[1:]:
tds = row.findAll('td')
ticker = tds[0].text.strip()
if '.' not in ticker:
name = tds[1].text.strip()
tickers[ticker] = {'symbol': ticker, 'name': name}
return tickers
@time_cache(24 * 3600, maxsize=1)
def get_russel_2k_tickers() -> dict:
'''
Instead of calculating the russel 2k every time,
'''
data = make_request(RUT_2K_URL, headers={'Referer': RUT_2K_URL}).json()
tickers = {}
for stock in data['fund']['entity']:
ticker = stock['ticker']
# filter tickers
# if asset_class == 'Equity' and ticker != '-' and not bool(re.search(r'\d', ticker)):
tickers[ticker] = {
'symbol': ticker,
'name': stock['longName']
}
return tickers
def clean_ticker(ticker):
# remove everything except for letters and periods
regex = re.compile(r'[^a-zA-Z.]')
return regex.sub('', ticker).strip().upper()
def clean_name(name: str):
return name.replace('Common Stock', '').strip()
def clean_stock_info(info):
info['name'] = clean_name(info['name'])
return info
@time_cache(24 * 3600, maxsize=1)
def get_bats_tickers() -> dict:
r = make_request(NASDAQ_TICKERS_URL).json()
tickers = {}
for stock in r['data']['rows']:
symbol = stock['symbol'].strip()
tickers[symbol] = {**clean_stock_info(stock), 'exchange': 'NASDAQ'}
return tickers
@time_cache(24 * 3600, maxsize=1)
def get_nasdaq_tickers() -> dict:
r = make_request(NASDAQ_TICKERS_URL).json()
tickers = {}
for stock in r['data']['rows']:
symbol = stock['symbol'].strip()
tickers[symbol] = {**clean_stock_info(stock), 'exchange': 'NASDAQ'}
return tickers
@time_cache(24 * 3600, maxsize=1)
def get_nyse_tickers() -> dict:
payload = {"instrumentType": "EQUITY", "pageNumber": 1, "sortColumn": "NORMALIZED_TICKER", "sortOrder": "ASC",
"maxResultsPerPage": 10000, "filterToken": ""}
r = make_request(NYSE_TICKERS_URL, method='POST', json=payload).json()
with open('test2.json', 'w') as f:
json.dump(r, f, indent=4)
tickers = {}
for stock in r:
symbol = stock['symbol'] = stock['symbolTicker'].strip()
stock['name'] = stock['instrumentName']
tickers[symbol] = {**clean_stock_info(stock), 'exchange': 'NYSE'}
return tickers
@time_cache(24 * 3600, maxsize=1)
def get_amex_tickers() -> dict:
r = make_request(AMEX_TICKERS_URL).json()
tickers = {}
for stock in r['data']['rows']:
symbol = stock['symbol'].strip()
tickers[symbol] = {**clean_stock_info(stock), 'exchange': 'AMEX'}
return tickers
@time_cache(24 * 3600, maxsize=1)
def get_tsx_tickers() -> dict:
r = make_request(TSX_TICKERS_URL).json()
tickers = {}
for stock in r['results']:
ticker = stock['symbol'].strip() + '.TO'
name = stock['name'].replace('Common Stock', '').strip()
tickers[ticker] = {'symbol': ticker, 'name': name, 'exchange': 'TSX'}
return tickers
@time_cache(24 * 3600, maxsize=1)
def get_nyse_arca_tickers() -> dict:
post_data = {'instrumentType': 'EXCHANGE_TRADED_FUND', 'pageNumber': 1, 'sortColumn': 'NORMALIZED_TICKER',
'sortOrder': 'ASC', 'maxResultsPerPage': 5000, 'filterToken': ''}
r = requests.post('https://www.nyse.com/api/quotes/filter',
json=post_data).json()
tickers = {}
for stock in r:
symbol = stock['symbolTicker'].strip()
tickers[symbol] = {'symbol': symbol, 'name': stock['instrumentName'], 'exchange': 'NYSEARCA'}
return tickers
@time_cache(24 * 3600, maxsize=1)
def get_otc_tickers() -> dict:
r = make_request(OTC_TICKERS_URK).text.strip('"').replace('\\"', '"')
r = json.loads(r)['stocks']
tickers = {}
for stock in r:
symbol = stock['symbol'].strip()
info = {'symbol': stock['symbol'], 'name': stock['securityName'], 'exchange': 'OTC'}
tickers[symbol] = info
return tickers
# can cache this since info rarely changes
@time_cache(24 * 3600, maxsize=100)
def get_tickers(category) -> dict:
"""
OPTIONS: ALL, US, NYSE, NASDAQ, SP500, DOW, TSX,
DEFENCE, MREITS, CARS, TANKERS, UTILS
"""
category = category.upper()
tickers = dict()
# Indexes
if category in {'S&P500', 'S&P 500', 'SP500'}:
tickers.update(get_sp500_tickers())
if category in {'DOW', 'DJIA'}:
tickers.update(get_dow_tickers())
# Exchanges
if category in {'NASDAQ', 'NDAQ', 'US', 'ALL'}:
tickers.update(get_nasdaq_tickers())
if category in {'NYSE', 'US', 'ALL'}:
tickers.update(get_nyse_tickers())
if category in {'AMEX', 'US', 'ALL'}:
tickers.update(get_amex_tickers())
if category in {'ARCA', 'NYSEARCA', 'US', 'ALL'}:
tickers.update(get_nyse_arca_tickers())
if category in {'TSX', 'TMX', 'CA', 'ALL'}:
tickers.update(get_tsx_tickers())
if category in {'OTC', 'OTCMKTS', 'ALL'}:
tickers.update(get_otc_tickers())
# Industries
elif category == 'DEFENCE':
defence_tickers = {'LMT', 'BA', 'NOC', 'GD', 'RTX', 'LDOS'}
tickers = get_nyse_tickers()
return {k: v for k, v in tickers.items() if k in defence_tickers}
elif category in {'MORTGAGE REITS', 'MREITS'}:
mreits = {'NLY', 'STWD', 'AGNC', 'TWO', 'PMT', 'MITT', 'NYMT', 'MFA',
'IVR', 'NRZ', 'TRTX', 'RWT', 'DX', 'XAN', 'WMC'}
tickers = get_tickers('ALL')
return {k: v for k, v in tickers.items() if k in mreits}
elif category in {'OIL', 'OIL & GAS', 'O&G'}:
oil_and_gas = {'DNR', 'PVAC', 'ROYT', 'SWN', 'CPE', 'CEQP', 'PAA', 'PUMP', 'PBF'}
tickers = get_tickers('ALL')
return {k: v for k, v in tickers.items() if k in oil_and_gas}
elif category in {'AUTO', 'AUTOMOBILE', 'CARS', 'CAR'}:
autos = {'TSLA', 'GM', 'F', 'NIO', 'RACE', 'FCAU', 'HMC', 'TTM', 'TM', 'XPEV', 'LI', 'CCIV'}
tickers = get_tickers('ALL')
return {k: v for k, v in tickers.items() if k in autos}
elif category == 'TANKERS':
oil_tankers = {'EURN', 'TNK', 'TK', 'TNP', 'DSX', 'NAT',
'STNG', 'SFL', 'DHT', 'CPLP', 'DSSI', 'FRO', 'INSW', 'NNA', 'SBNA'}
tickers = get_tickers('ALL')
return {k: v for k, v in tickers.items() if k in oil_tankers}
elif category in {'UTILS', 'UTILITIES'}:
utilities = {'PCG', 'ELLO', 'AT', 'ELP', 'ES', 'EDN', 'IDA', 'HNP', 'GPJA', 'NEP', 'SO', 'CEPU', 'AES', 'ETR',
'KEP', 'OGE', 'EIX', 'NEE', 'TVC', 'TAC', 'EE', 'CIG', 'PNW', 'EMP', 'EBR.B', 'CPL', 'DTE', 'POR',
'EAI', 'NRG', 'CWEN', 'KEN', 'AGR', 'BEP', 'ORA', 'EAE', 'PPX', 'AZRE', 'ENIC', 'FE', 'CVA', 'BKH',
'ELJ', 'EZT', 'HE', 'VST', 'ELU', 'ELC', 'TVE', 'AQN', 'PAM', 'AEP', 'ENIA', 'EAB', 'PPL', 'CNP',
'D', 'PNM', 'EBR', 'FTS'}
tickers = get_tickers('ALL')
return {k: v for k, v in tickers.items() if k in utilities}
return tickers
@time_cache(24 * 3600, maxsize=1)
def get_cik_mapping():
r = make_request(CIK_LIST_URL)
cik_mapping = {}
for line in r.text.splitlines():
line = line.strip()
ticker, cik = line.split()
ticker = ticker.upper()
cik_mapping[ticker] = cik
return cik_mapping
@lru_cache(maxsize=10000)
def get_cik(ticker):
return get_cik_mapping()[ticker]
def get_company_name(ticker: str):
ticker = clean_ticker(ticker)
with suppress(KeyError):
return get_tickers('ALL')[ticker]['name']
if ticker.count('.TO'):
try:
return get_tsx_tickers()[ticker]['name']
except KeyError:
ticker = ticker.replace('.TO', '')
r = requests.get(f'https://www.tsx.com/json/company-directory/search/tsx/{ticker}')
results = {}
for s in r.json()['results']:
s['name'] = s['name'].upper()
if s['symbol'] == ticker: return s['name']
results[s['symbol']] = s
best_match = process.extractOne(ticker, list(results.keys()))[0]
return results[best_match]['name']
raise ValueError(f'could not get company name for {ticker}')
@time_cache(10000)
def get_financials(ticker: str, aggregate=False, commit_db=True):
"""
Parses 10K file and returns Total Assets, Net Incomes, and Return on Assets (ROA) [for now]
for US Companies that file with the SEC.
Performance: ~5 seconds cold start, ~1.5 seconds thereafter
Args:
ticker: US ticker to get data for
aggregate: Whether to parse all 10K files. [future]
commit_db: Whether to handle the sqlite commit [future]
Returns:
{'name': 'Apple Inc.',
'net_incomes': {2018: 59531000000, 2019: 55256000000, 2020: 57411000000},
'return_on_assets': {2020: 16.959611953349324},
'roa': 16.959611953349324,
'symbol': 'AAPL',
'total_assets': {2019: 338516000000, 2020: 323888000000}}
"""
# TODO: use a SQLITE database to cache data
# and the latest 10K url
ticker = clean_ticker(ticker)
if ticker not in get_tickers('ALL'):
print(ticker)
ticker = find_stock(ticker)[0][0]
company_name = get_company_name(ticker)
cik = get_cik(ticker).rjust(10, '0')
submission = make_request(f'https://data.sec.gov/submissions/CIK{cik}.json').json()
form_index = submission['filings']['recent']['form'].index('10-K')
accession = submission['filings']['recent']['accessionNumber'][form_index].replace('-', '')
file_name = submission['filings']['recent']['primaryDocument'][form_index]
file_name, ext = file_name.rsplit('.')
url = f'https://www.sec.gov/Archives/edgar/data/{cik}/{accession}/{file_name}_{ext}.xml'
r = make_request(url).text
soup = BeautifulSoup(r, 'lxml')
def get_context_date(context_ref, is_balance_sheet=False, only_year=False):
if is_balance_sheet:
_date = context_ref.rsplit('_I', 1)[1]
else:
_date = context_ref.rsplit('-', 1)[1]
return int(_date[:4]) if only_year else _date
total_assets = {get_context_date(tag['contextref'], True, True): int(tag.text) for tag in soup.find_all('us-gaap:assets')}
net_income_loss = soup.find_all('us-gaap:netincomeloss')
# if tags not found use other alias
if not net_income_loss:
net_income_loss = soup.find_all('us-gaap:netincomelossavailabletocommonstockholdersbasic')
net_incomes = {}
for tag in net_income_loss:
year = get_context_date(tag['contextref'], only_year=True)
if year not in net_incomes:
net_incomes[year] = int(tag.text)
roas = {}
for year, value in total_assets.items():
with suppress(KeyError):
next_year = year + 1
roa = (net_incomes[next_year] / value) * 100 # %
roas[next_year] = roa
financials = {
'name': company_name,
'symbol': ticker,
'total_assets': total_assets,
'net_incomes': net_incomes,
'return_on_assets': roas,
'roa': sorted(roas.items())[0][1]
}
return financials
def get_ticker_info(query: str, round_values=True):
"""
Uses WSJ instead of yfinance to get stock info summary
Sample Return:
{'annualized_dividend': 6.52,
'api_url': 'https://www.wsj.com/market-data/quotes/IBM?id={"ticker":"IBM","countryCode":"US","path":"IBM"}&type=quotes_chart',
'change': -0.15,
'change_percent': -0.12,
'close_price': 120.71,
'dividend_yield': 5.40,
'eps_ttm': 6.24,
'pe': 19.34,
'extended_hours': True,
'last_dividend': 1.63,
'latest_change': -0.01,
'latest_change_percent': -0.01,
'name': 'International Business Machines Corp.',
'previous_close_price': 120.86,
'price': 120.7,
'source': 'https://www.marketwatch.com/investing/stock/IBM?countrycode=US',
'symbol': 'IBM',
'timestamp': datetime.datetime(2021, 2, 23, 19, 59, 49, 906000, tzinfo=<StaticTzInfo 'GMT'>),
'volume': 4531464}
"""
ticker = clean_ticker(query)
try:
is_etf = ticker in get_nyse_arca_tickers() or 'ETF' in get_company_name(ticker).split()
except ValueError:
is_etf = False
country_code = 'CA' if '.TO' in ticker else 'US'
ticker = ticker.replace('.TO', '') # remove exchange
api_query = {
'ticker': ticker,
'countryCode': country_code,
'path': ticker
}
api_query = json.dumps(api_query, separators=(',', ':'))
source = f'https://www.marketwatch.com/investing/stock/{ticker}?countrycode={country_code}'
api_url = f'https://www.wsj.com/market-data/quotes/{ticker}?id={api_query}&type=quotes_chart'
if is_etf:
ckey = 'cecc4267a0'
entitlement_token = 'cecc4267a0194af89ca343805a3e57af'
source = f'https://www.marketwatch.com/investing/fund/{ticker}?countrycode={country_code}'
api_url = f'https://api.wsj.net/api/dylan/quotes/v2/comp/quoteByDialect?dialect=official&needed=Financials|CompositeTrading|CompositeBeforeHoursTrading|CompositeAfterHoursTrading&MaxInstrumentMatches=1&accept=application/json&EntitlementToken={entitlement_token}&ckey={ckey}&dialects=Charting&id=ExchangeTradedFund-US-{ticker}'
r = make_request(api_url)
if not r.ok:
try:
ticker = find_stock(query)[0][0]
if ticker != query:
return get_ticker_info(ticker)
except IndexError:
raise ValueError(f'Invalid ticker "{query}"')
data = r.json() if is_etf else r.json()['data']
try:
quote_data = data['InstrumentResponses'][0]['Matches'][0] if is_etf else data['quoteData']
except IndexError:
raise ValueError(f'Invalid ticker "{query}"')
financials = quote_data['Financials']
name = quote_data['Instrument']['CommonName']
try:
previous_close = financials['Previous']['Price']['Value']
except TypeError:
raise ValueError(f'Invalid ticker "{query}"')
latest_price = closing_price = quote_data['CompositeTrading']['Last']['Price']['Value']
try:
latest_price = quote_data['CompositeBeforeHoursTrading']['Price']['Value']
except TypeError:
try:
latest_price = quote_data['CompositeAfterHoursTrading']['Price']['Value']
except TypeError:
closing_price = previous_close
volume = quote_data['CompositeTrading']['Volume']
if is_etf:
if quote_data['CompositeBeforeHoursTrading']:
market_state = 'Pre-Market'
elif quote_data['CompositeAfterHoursTrading']:
market_state = 'After-Market' if quote_data['CompositeAfterHoursTrading']['IsRealtime'] else 'Closed'
else:
market_state = 'Open'
else:
market_state = data['quote']['marketState'].get('CurrentState', 'Open')
extended_hours = market_state in {'After-Market', 'Closed', 'Pre-Market'}
if market_state in {'After-Market', 'Closed'} and quote_data['CompositeAfterHoursTrading']:
timestamp = quote_data['CompositeAfterHoursTrading']['Time']
elif market_state == 'Pre-Market' and quote_data['CompositeBeforeHoursTrading']:
timestamp = quote_data['CompositeBeforeHoursTrading']['Time']
else:
timestamp = quote_data['CompositeTrading']['Last']['Time']
try:
timestamp = int(timestamp.split('(', 1)[1].split('+', 1)[0]) / 1e3
timestamp = datetime.utcfromtimestamp(timestamp).astimezone(timezone('US/Eastern'))
except IndexError:
# time format is: 2021-02-25T18:52:44.677
timestamp = datetime.strptime(timestamp.rsplit('.', 1)[0], '%Y-%m-%dT%H:%M:%S')
change = closing_price - previous_close
change_percent = change / previous_close * 100
latest_change = latest_price - closing_price
latest_change_percent = latest_change / closing_price * 100
try:
market_cap = financials['MarketCapitalization']['Value']
except TypeError:
try:
market_cap = financials['SharesOutstanding'] * latest_price
except TypeError:
market_cap = 0
try:
eps_ttm = financials['LastEarningsPerShare']['Value']
except TypeError:
eps_ttm = 0
try:
last_dividend = financials['LastDividendPerShare']['Value']
except TypeError:
last_dividend = None
dividend_yield = financials['Yield']
annualized_dividend = financials['AnnualizedDividend']
if annualized_dividend is None:
dividend_yield = 0
last_dividend = 0
annualized_dividend = 0
pe = financials['PriceToEarningsRatio']
if pe is None:
try:
pe = closing_price / eps_ttm
except ZeroDivisionError:
pe = 0 # 0 = N/A
if round_values:
previous_close = round(previous_close, 2)
latest_price = round(latest_price, 2)
closing_price = round(closing_price, 2)
change = round(change, 2)
change_percent = round(change_percent, 2)
latest_change = round(latest_change, 2)
latest_change_percent = round(latest_change_percent, 2)
dividend_yield = round(dividend_yield, 2)
last_dividend = round(last_dividend, 2)
eps_ttm = round(eps_ttm, 2)
market_cap = round(market_cap)
return_info = {
'name': name,
'symbol': ticker + ('.TO' if country_code == 'CA' else ''),
'volume': volume,
'eps_ttm': eps_ttm,
'pe': pe,
'dividend_yield': dividend_yield,
'last_dividend': last_dividend,
'annualized_dividend': annualized_dividend,
'price': latest_price,
'market_cap': market_cap,
'close_price': closing_price,
'previous_close_price': previous_close,
'change': change,
'change_percent': change_percent,
'latest_change': latest_change,
'latest_change_percent': latest_change_percent,
'extended_hours': extended_hours,
'timestamp': timestamp,
'source': source,
'api_url': api_url
}
return return_info
# noinspection PyUnboundLocalVariable
@time_cache(30) # cache for 30 seconds
def get_ticker_info_old(ticker: str, round_values=True, use_nasdaq=False) -> dict:
"""
Raises ValueError
Sometimes the dividend yield is incorrect
"""
ticker = clean_ticker(ticker)
if use_nasdaq:
url = f'https://api.nasdaq.com/api/quote/{ticker}/summary?assetclass=stocks'
r = make_request(url).json()
if r['status']['rCode'] < 400:
summary = {k: v['value'] for k, v in r['data']['summaryData'].items()}
url = f'https://api.nasdaq.com/api/quote/{ticker}/info?assetclass=stocks'
info = make_request(url).json()['data']
# name = get_tickers('ALL')[ticker]['name']
name = clean_name(info['companyName'])
volume = int(summary['ShareVolume'].replace(',', ''))
previous_close = float(summary['PreviousClose'].replace('$', ''))
eps_ttm = float(summary['EarningsPerShare'].replace('$', '').replace('N/A', '0'))
# annualized dividend
last_dividend = float(summary['AnnualizedDividend'].replace('$', '').replace('N/A', '0'))
dividend_yield = float(summary['Yield'].replace('%', '').replace('N/A', '0'))
# industry = summary['Industry']
else:
use_nasdaq = False
yf_ticker = yf.Ticker(ticker)
if not use_nasdaq:
try:
info = yf_ticker.info
name = info['longName']
volume = info['volume']
previous_close = info['regularMarketPreviousClose']
eps_ttm = info.get('trailingEps')
last_dividend = info.get('lastDividendValue')
dividend_yield = info['trailingAnnualDividendYield']
if last_dividend is None:
dividend_yield = 0
last_dividend = 0
except (KeyError, ValueError):
raise ValueError(f'Invalid ticker "{ticker}"')
data_latest = yf_ticker.history(period='5d', interval='1m', prepost=True)
timestamp = data_latest.last_valid_index()
latest_price = float(data_latest.tail(1)['Close'].iloc[0])
# if market is open: most recent close
# else: close before most recent close
# get most recent price
timestamp_ending = str(timestamp)[-6:]
extended_hours = not (16 > timestamp.hour > 9 or (timestamp.hour == 9 and timestamp.min <= 30))
if timestamp.hour >= 16: # timestamp is from post market
today = datetime(timestamp.year, timestamp.month, timestamp.day, 15, 59)
closing_timestamp = today.strftime(f'%Y-%m-%d %H:%M:%S{timestamp_ending}')
closing_price = data_latest.loc[closing_timestamp]['Open']
else:
# open-market / pre-market since timestamp is before 4:00 pm
# if pre-market, this close is after the previous close
latest_close = datetime(timestamp.year, timestamp.month,
timestamp.day, 15, 59) - timedelta(days=1)
while True:
try:
prev_day_timestamp = latest_close.strftime(f'%Y-%m-%d %H:%M:%S{timestamp_ending}')
closing_price = data_latest.loc[prev_day_timestamp]['Open']
break
except KeyError:
latest_close -= timedelta(days=1)
change = closing_price - previous_close
change_percent = change / previous_close * 100
latest_change = latest_price - closing_price
latest_change_percent = latest_change / closing_price * 100
if round_values:
previous_close = round(previous_close, 2)
latest_price = round(latest_price, 2)
closing_price = round(closing_price, 2)
change = round(change, 2)
change_percent = round(change_percent, 2)
latest_change = round(latest_change, 2)
latest_change_percent = round(latest_change_percent, 2)
try: dividend_yield = round(dividend_yield, 4)
except TypeError: dividend_yield = 0
last_dividend = round(last_dividend, 2)
with suppress(TypeError): eps_ttm = round(eps_ttm, 2)
return_info = {
'name': name,
'symbol': ticker,
'volume': volume,
'eps_ttm': eps_ttm,
'dividend_yield': dividend_yield,
'last_dividend': last_dividend,
'price': latest_price,
'close_price': closing_price,
'previous_close_price': previous_close,
'change': change,
'change_percent': change_percent,
'latest_change': latest_change,
'latest_change_percent': latest_change_percent,
'extended_hours': extended_hours,
'timestamp': timestamp
}
return return_info
def get_ticker_infos(tickers, round_values=True, errors_as_str=False) -> tuple:
"""
returns: list[dict], list
uses a threadPoolExecutor instead of asyncio
"""
ticker_infos = []
tickers_not_found = []
with concurrent.futures.ThreadPoolExecutor(max_workers=35) as executor:
future_infos = [executor.submit(get_ticker_info, ticker, round_values=round_values) for ticker in tickers]
for future in concurrent.futures.as_completed(future_infos):
try:
ticker_infos.append(future.result())
except ValueError as e:
tickers_not_found.append(str(e) if errors_as_str else e)
return ticker_infos, tickers_not_found
def get_data(tickers: Iterator, start_date=None, end_date=None, period='3mo', group_by='ticker', interval='1d',
show_progress=True):
# http://www.datasciencemadesimple.com/union-and-union-all-in-pandas-dataframe-in-python-2/
# new format
# _key = ' '.join(tickers) + f' {start_date} {end_date} {period} {group_by}'
_data = yf.download(tickers, start_date, end_date, period=period, group_by=group_by, threads=3,
progress=show_progress, interval=interval)
return _data
def parse_info(_data, ticker, start_date, end_date, start_price_key='Open'):
"""
start_price_key: can be 'Open' or 'Close'
TODO: change parse_info keys to snake_case
"""
start_price = _data[ticker][start_price_key][start_date]
if math.isnan(_data[ticker]['Open'][end_date]):
end_date = _data[ticker]['Open'].last_valid_index()
end_price = _data[ticker]['Close'][end_date]
change = end_price - start_price
percent_change = change / start_price
try:
start_volume = round(_data[ticker]['Volume'][start_date])
except ValueError:
start_volume = 0
end_volume = round(_data[ticker]['Volume'][end_date])
avg_volume = (start_volume + end_volume) / 2
return {'Start': start_price, 'End': end_price, 'Change': change, 'Percent Change': percent_change,
'Open Volume': start_volume, 'Close Volume': end_volume, 'Avg Volume': avg_volume}
def get_parsed_data(_data=None, tickers: list = None, market='ALL', sort_key='Percent Change',
of='day', start_date: datetime = None, end_date: datetime = None):
"""
returns the parsed trading data sorted by percent change
:param _data: if you are doing a lot of parsing but None is recommended unless you are dealing with >= 1 month
:param tickers: specify if you have your own custom tickers list, otherwise market is used to get the list
:param market: the market if data is None
:param of: one of {'day', 'mtd', 'ytd', '1m', '1yr'}
:param sort_key: one of {'Start', 'End', 'Change', 'Percent Change', 'Open Volume', 'Close Volume', 'Avg Volume'}
if None, a dict with the tickers as keys is returned instead of a list
:param start_date: if of == 'custom' specify this values
:param end_date: if of == 'custom' specify this value
"""
of = of.lower()
_today = datetime.today()
todays_date = _today.date()
if tickers is None:
tickers = list(get_tickers(market))
if _today.hour >= 16 and of == 'day':
# TODO: cache pre-market as well
# key format will be
with suppress(KeyError):
return SORTED_INFO_CACHE[of][str(todays_date)][','.join(tickers)]
if of == 'custom':
assert start_date and end_date
if _data is None:
_data = get_data(tickers, start_date=start_date, end_date=end_date)
start_date, end_date = _data.first_valid_index(), _data.last_valid_index()
parsed_info = {}
for ticker in tickers:
info = parse_info(_data, ticker, start_date, end_date)
if not math.isnan(info['Start']):
parsed_info[ticker] = info
elif of in {'day', '1d'}:
# TODO: use get_ticker_info instead
# ALWAYS USE LATEST DATA
_data = get_data(tickers, period='5d', interval='1m')
market_day = _data.last_valid_index().date() == todays_date
if not market_day or (_today.hour * 60 + _today.minute >= 645): # >= 10:45 AM
# movers of the latest market day [TODAY]
recent_day = _data.last_valid_index()
recent_start_day = recent_day.replace(hour=9, minute=30, second=0)
parsed_info = {}
for ticker in tickers:
try:
info = parse_info(_data, ticker, recent_start_day, recent_day)
if not math.isnan(info['Start']):
parsed_info[ticker] = info
except ValueError:
# TODO: fix
print('ERROR: Could not get info for', ticker)
else: # movers of the second last market day
yest = _data.tail(2).first_valid_index() # assuming interval = 1d
parsed_info = {}
for ticker in tickers:
info = parse_info(_data, ticker, yest, yest)
if not math.isnan(info['Start']):
parsed_info[ticker] = info
# TODO: custom day amount
elif of in {'mtd', 'month_to_date', 'monthtodate'}:
start_date = todays_date.replace(day=1)
if _data is None:
_data = get_data(tickers, start_date=start_date, end_date=_today)
while start_date not in _data.index and start_date < todays_date:
start_date += timedelta(days=1)
if start_date >= todays_date:
raise RuntimeError(
'No market days this month')
parsed_info = {}
for ticker in tickers:
info = parse_info(_data, ticker, start_date, todays_date)
if not math.isnan(info['Start']):
parsed_info[ticker] = info
elif of in {'month', '1m', 'm'}:
start_date = todays_date - timedelta(days=30)
if _data is None:
_data = get_data(
tickers, start_date=start_date, end_date=_today)
while start_date not in _data.index:
start_date += timedelta(days=1)
parsed_info = {}
for ticker in tickers:
info = parse_info(_data, ticker, start_date, todays_date)
if not math.isnan(info['Start']):
parsed_info[ticker] = info
# TODO: x months
elif of in {'ytd', 'year_to_date', 'yeartodate'}:
if _data is None:
_temp = _today.replace(day=1, month=1)
_data = get_data(tickers, start_date=_temp, end_date=_today)
start_date = _data.first_valid_index() # first market day of the year
else:
start_date = _today.replace(day=1, month=1).date() # Jan 1st
# find first market day of the year
while start_date not in _data.index:
start_date += timedelta(days=1)
end_date = _data.last_valid_index()
parsed_info = {}
for ticker in tickers:
info = parse_info(_data, ticker, start_date, end_date)
if not math.isnan(info['Start']):
parsed_info[ticker] = info
elif of in {'year', '1yr', 'yr', 'y'}:
if _data is None:
_data = get_data(tickers, start_date=_today -
timedelta(days=365), end_date=_today)
start_date = _data.first_valid_index() # first market day of the year
else:
start_date = _today.date() - timedelta(days=365)
_data = get_data(tickers, start_date=_today.replace(
day=1, month=1), end_date=_today)
end_date = _data.last_valid_index()
parsed_info = {}
for ticker in tickers:
info = parse_info(_data, ticker, start_date, end_date)
if not math.isnan(info['Start']):
parsed_info[ticker] = info
# TODO: x years
else:
parsed_info = {} # invalid of
if sort_key is None:
return parsed_info
sorted_info = sorted(parsed_info.items(),
key=lambda item: item[1][sort_key])
if _today.hour >= 16 and of == 'day':
if of not in SORTED_INFO_CACHE:
SORTED_INFO_CACHE[of] = {}
if str(todays_date) not in SORTED_INFO_CACHE[of]:
SORTED_INFO_CACHE[of][str(todays_date)] = {}
SORTED_INFO_CACHE[of][str(todays_date)][','.join(tickers)] = sorted_info
return sorted_info
def winners(sorted_info=None, tickers: list = None, market='ALL', of='day', start_date=None, end_date=None, show=5):
# sorted_info is the return of get_parsed_data with non-None sort_key
if sorted_info is None:
sorted_info = get_parsed_data(
tickers=tickers, market=market, of=of, start_date=start_date, end_date=end_date)
return list(reversed(sorted_info[-show:]))
def losers(sorted_info=None, tickers: list = None, market='ALL', of='day', start_date=None, end_date=None, show=5):
# sorted_info is the return of get_parsed_data with non-None sort_key
if sorted_info is None:
sorted_info = get_parsed_data(
tickers=tickers, market=market, of=of, start_date=start_date, end_date=end_date)
return sorted_info[:show]
# noinspection PyTypeChecker
def winners_and_losers(_data=None, tickers=None, market='ALL', of='day', start_date=None, end_date=None, show=5,
console_output=True, csv_output=''):
sorted_info = get_parsed_data(_data, tickers, market, of=of, start_date=start_date, end_date=end_date)
if console_output:
bulls = ''
bears = ''
length = min(show, len(sorted_info))
for i in range(length):
better_stock = sorted_info[-i - 1]
worse_stock = sorted_info[i]
open_close1 = f'{round(better_stock[1]["Start"], 2)}, {round(better_stock[1]["End"], 2)}'
open_close2 = f'{round(worse_stock[1]["Start"], 2)}, {round(worse_stock[1]["End"], 2)}'
bulls += f'\n{better_stock[0]} [{open_close1}]: {round(better_stock[1]["Percent Change"] * 100, 2)}%'
bears += f'\n{worse_stock[0]} [{open_close2}]: {round(worse_stock[1]["Percent Change"] * 100, 2)}%'
header1 = f'TOP {length} WINNERS ({of})'
header2 = f'TOP {length} LOSERS ({of})'
line = '-' * len(header1)
print(f'{line}\n{header1}\n{line}{bulls}')
line = '-' * len(header2)
print(f'{line}\n{header2}\n{line}{bears}')
if csv_output:
with open(csv_output, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['TICKER'] + list(sorted_info[0][1].keys()))
for ticker in sorted_info:
writer.writerow([ticker[0]] + list(ticker[1].values()))
return sorted_info
def top_movers(_data=None, tickers=None, market='ALL', of='day', start_date=None, end_date=None, show=5,
console_output=True, csv_output=''):
return winners_and_losers(_data=_data, tickers=tickers, market=market, of=of, start_date=start_date,
end_date=end_date, show=show, console_output=console_output, csv_output=csv_output)
@time_cache(3600) # cache for 1 hour
def get_target_price(ticker, round_values=True):
"""
ticker: yahoo finance ticker
returns: {'avg': float, 'low': float, 'high': float, 'price': float,
'eps_ttm': float 'source': 'url', 'api_url': 'url'}
"""
try:
ticker_info = get_ticker_info(ticker)
price = ticker_info['price'] # get latest price
ticker = ticker_info['symbol'] # get fixed ticker
timestamp = datetime.now().timestamp()
query = f'{TIP_RANKS_API}getData/?name={ticker}&benchmark=1&period=3&break={timestamp}'
r = make_request(query).json()
total = 0
estimates = []
try:
# Assumed to be ttm
eps_ttm = r['portfolioHoldingData']['lastReportedEps']['reportedEPS']
except TypeError:
eps_ttm = 0
target_prices = {
'symbol': ticker,
'name': r['companyName'],
'high': 0,
'low': 100000,
'price': price,
'eps_ttm': eps_ttm,
'source': f'https://www.tipranks.com/stocks/{ticker}/forecast',
'api_url': query
}
estimates = []
for expert in r['experts']:
target_price = expert['ratings'][0]['priceTarget']
if target_price:
# if analysis had a price target
if target_price > target_prices['high']: target_prices['high'] = target_price
if target_price < target_prices['low']: target_prices['low'] = target_price
total += target_price
estimates.append(target_price)
target_prices['avg'] = total / len(estimates) if estimates else 0
try:
target_prices['median'] = median(estimates)
except StatisticsError:
target_prices['avg'] = target_prices['median'] = r['ptConsensus'][0]['priceTarget']
target_prices['high'] = r['ptConsensus'][0]['high']
target_prices['low'] = r['ptConsensus'][0]['low']
target_prices['estimates'] = estimates
target_prices['total_estimates'] = len(estimates)
target_prices['upside'] = 100 * target_prices['high'] / target_prices['price'] - 100
target_prices['downside'] = 100 * target_prices['low'] / target_prices['price'] - 100
if round_values:
target_prices['upside'] = round(target_prices['upside'], 2)
target_prices['downside'] = round(target_prices['downside'], 2)
return target_prices
except json.JSONDecodeError:
raise ValueError(f'No Data Found for ticker "{ticker}"')
def get_target_prices(tickers, errors_as_str=False) -> tuple:
"""
returns: list[dict], list
uses a threadPoolExecutor instead of asyncio
"""
target_prices = []
tickers_not_found = []
with concurrent.futures.ThreadPoolExecutor(max_workers=35) as executor:
future_infos = [executor.submit(get_target_price, ticker) for ticker in tickers]
for future in concurrent.futures.as_completed(future_infos):
try:
result = future.result()
target_prices.append(result)
except ValueError as e:
tickers_not_found.append(str(e) if errors_as_str else e)
return target_prices, tickers_not_found
def sort_by_dividend(tickers):
ticker_infos = get_ticker_infos(tickers)[0]
ticker_infos.sort(key=lambda v: v['dividend_yield'], reverse=True)
return ticker_infos
def sort_by_pe(tickers, output_to_csv='', console_output=True):
"""
Returns the tickers by price-earnings ratio (remove negatives)
:param tickers: iterable
:param output_to_csv:
:param console_output:
:return:
"""
@cmp_to_key
def _pe_sort(left, right):
left, right = left['pe'], right['pe']
# smallest positive to smallest negative
# 0.1 ... 30 ... 0 ... -0.1 ... -100000
if left > 0 and right > 0:
# both are positive
# return number that is smaller
return left - right
elif left <= 0 and right <= 0:
# both are non-positive
# return number that is bigger
return right - left
# one of the pe's is positive and the other isn't
# positive comes before negative
return -1 if left > 0 else 1
ticker_infos = get_ticker_infos(tickers)[0]
ticker_infos.sort(key=_pe_sort, reverse=True)
if console_output:
header = 'TOP 5 (UNDER VALUED) TICKERS BY P/E'
line = '-' * len(header)
print(f'{header}\n{line}')
for i, ticker_info in enumerate(ticker_infos):
if i == 5:
break
ticker = ticker_info['symbol']
pe = ticker_info ['pe']
print(f'{ticker}: {round(pe, 2)}')
if output_to_csv and ticker_infos:
with open(output_to_csv, 'w') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=ticker_infos[0].keys())
writer.writeheader()
for ticker_info in ticker_infos:
writer.writerow(ticker_info)
return ticker_infos
def sort_by_volume(tickers):
ticker_infos = get_ticker_infos(tickers)[0]
ticker_infos.sort(key=lambda v: v['volume'], reverse=True)
return ticker_infos
def sort_by_roa(tickers):
financials = []
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
future_infos = [executor.submit(get_financials, ticker) for ticker in tickers]
for future in concurrent.futures.as_completed(future_infos):
with suppress(ValueError):
financials.append(future.result())
financials.sort(key=lambda v: v['roa'], reverse=True)
return financials
def get_index_futures():
resp = make_request(PREMARKET_FUTURES_URL)
soup = BeautifulSoup(resp.text, 'html.parser')
# noinspection PyUnresolvedReferences
quotes = soup.find('tbody').findAll('tr')
return_obj = {}
for quote in quotes:
index_name = quote.find('a').text.upper()
nums = quote.findAll('td')[3:]
price = nums[0].text
change = nums[3].text
percent_change = nums[4].text
return_obj[index_name] = {'name': index_name, 'price': price,
'change': change, 'percent_change': percent_change}
return return_obj
def get_random_stocks(n=1) -> set:
# return n stocks from NASDAQ and NYSE
if n < 1:
n = 1
us_stocks = get_nasdaq_tickers()
us_stocks.update(get_nyse_tickers())
return_stocks = set()
while len(return_stocks) < n:
stock = random.sample(list(us_stocks.keys()), 1)[0]
if not stock.count('.') and not stock.count('^'):
return_stocks.add(stock)
return return_stocks
def find_stock(query):
"""
Returns at most 10 results based on a search query
TODO: return list of dictionaries
"""
results = []
if isinstance(query, str):
query = {part.upper() for part in query.split()}
else:
query = {part.upper() for part in query}
for info in get_tickers('ALL').values():
match, parts_matched = 0, 0
company_name = info['name'].upper()
symbol = info['symbol']
if len(query) == 1 and symbol == clean_ticker(tuple(query)[0]):
match += len(query) ** 2
parts_matched += 1
elif symbol in query or ''.join(query) in symbol:
match += len(symbol)
parts_matched += 1
for part in query:
occurrences = company_name.count(part)
part_factor = occurrences * len(part)
if part_factor:
match += part_factor
parts_matched += occurrences
match /= len(company_name)
if match:
results.append((symbol, info['name'], parts_matched, match))
# sort results by number of parts matched and % matched
results.sort(key=lambda item: (item[2], item[3]), reverse=True)
return results[:12]
def get_trading_halts(days_back=0):
days_back = abs(days_back)
if days_back:
date = datetime.today() - timedelta(days=days_back)
date = date.strftime('%m%d%Y')
url = f'http://www.nasdaqtrader.com/rss.aspx?feed=tradehalts&haltdate={date}'
else:
url = 'http://www.nasdaqtrader.com/rss.aspx?feed=tradehalts'
feed = feedparser.parse(url)
del feed['headers']
halts = []
for halt in feed['entries']:
soup = BeautifulSoup(halt['summary'], 'html.parser')
values = [td.text.strip() for td in soup.find_all('tr')[1].find_all('td')]
halts.append({
'symbol': values[0],
'name': values[1],
'market': {'Q': 'NASDAQ'}.get(values[2], values[2]),
'reason_code': values[3],
'paused_price': values[4],
'halt_date': datetime.strptime(values[5], '%m/%d/%Y'),
'halt_time': values[6],
'resume_date': datetime.strptime(values[7], '%m/%d/%Y'),
'resume_quote_time': values[8],
'resume_trade_time': values[9]
})
return halts
# Options Section
# Enums are used for some calculations
class Option(IntEnum):
CALL = 1
PUT = -1
def get_month_and_year():
date = datetime.today()
month = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUNE',
'JULY', 'AUG', 'SEP', 'DEC'][date.month - 1]
year = date.year
return f'{month} {year}'
# noinspection PyUnusedLocal
@lru_cache(100)
def get_risk_free_interest_rate(month_and_year=None):
"""
e.g. month_and_year = 'FEB 2021'
returns the risk free interest rate:
the average interest rate of US Treasury Bills
throws: RunTimeError if interest rate could not be fetched
"""
us_treasury_api = 'https://api.fiscaldata.treasury.gov/services/api/fiscal_service'
endpoint = f'{us_treasury_api}/v2/accounting/od/avg_interest_rates'
link = f'{endpoint}?page[size]=10000'
r = requests.get(link).json()
last_count = r['meta']['total-count']
for i in range(last_count - 1, 0, -1):
node = r['data'][i]
if node['security_desc'] == 'Treasury Bills':
return float(node['avg_interest_rate_amt']) / 100
raise RuntimeError('Could not get risk free interest rate')
# noinspection PyUnusedLocal
@lru_cache(10000)
def get_volatility(stock_ticker, tll_hash=None):
"""
ttl_hash = time.time() / (3600 * 24)
Returns the (annualized) daily standard deviation return of the stock
for the last 365 days
"""
end = datetime.today()
start = end - timedelta(days=365)
data = yf.download(stock_ticker, start=start, end=end, progress=False)
data['Returns'] = np.log(data['Close'] / data['Close'].shift(-1))
# return annualized daily standard deviation
# noinspection PyUnresolvedReferences
return np.std(data['Returns']) * math.sqrt(252)
def d1(market_price, strike_price, years_to_expiry, volatility, risk_free, dividend_yield):
block_3 = volatility * math.sqrt(years_to_expiry)
block_1 = math.log(market_price / strike_price)
block_2 = years_to_expiry * \
(risk_free - dividend_yield + volatility ** 2 / 2)
return (block_1 + block_2) / block_3
def csn(y):
"""
returns the Cumulative Standard Normal of y
which is the cumulative distribution function of y with
mean = 0 and standard deviation = 1
"""
return NormalDist().cdf(y)
def snd(y):
"""
returns the Standard Normal Density of y
which is the probability density function of y with
mean = 0 and standard deviation = 1
"""
return NormalDist().pdf(y)
def calc_option_price(market_price, strike_price, days_to_expiry, volatility,
risk_free=None, dividend_yield=0, option_type=Option.CALL):
if risk_free is None:
risk_free = get_risk_free_interest_rate()
years_to_expiry = days_to_expiry / 365
_d1 = option_type * d1(market_price, strike_price,
years_to_expiry, volatility, risk_free, dividend_yield)
_d2 = _d1 - option_type * volatility * math.sqrt(years_to_expiry)
block_1 = market_price * \
math.e ** (-dividend_yield * years_to_expiry) * csn(_d1)
block_2 = strike_price * math.e ** (-risk_free * years_to_expiry) * csn(_d2)
return option_type * (block_1 - block_2)
def calc_option_delta(market_price, strike_price, days_to_expiry, volatility,
risk_free=get_risk_free_interest_rate(), dividend_yield=0, option_type=Option.CALL):
years_to_expiry = days_to_expiry / 365
block_1 = math.e ** (-dividend_yield * years_to_expiry)
_d1 = d1(market_price, strike_price, years_to_expiry,
volatility, risk_free, dividend_yield)
return option_type * block_1 * csn(option_type * _d1)
def calc_option_gamma(market_price, strike_price, days_to_expiry, volatility,
risk_free=get_risk_free_interest_rate(), dividend_yield=0):
years_to_expiry = days_to_expiry / 365
block_1 = math.e ** (-dividend_yield * years_to_expiry)
_d1 = d1(market_price, strike_price, years_to_expiry,
volatility, risk_free, dividend_yield)
return block_1 / (market_price * volatility * math.sqrt(years_to_expiry)) * snd(_d1)
def calc_option_vega(market_price, strike_price, days_to_expiry, volatility,
risk_free=get_risk_free_interest_rate(), dividend_yield=0):
years_to_expiry = days_to_expiry / 365
block_1 = market_price * math.e ** (-dividend_yield * years_to_expiry)
_d1 = d1(market_price, strike_price, years_to_expiry,
volatility, risk_free, dividend_yield)
return block_1 * math.sqrt(years_to_expiry) * snd(_d1)
def calc_option_rho(market_price, strike_price, days_to_expiry, volatility,
risk_free=get_risk_free_interest_rate(), dividend_yield=0, option_type=Option.CALL):
years_to_expiry = days_to_expiry / 365
block_1 = strike_price * math.e ** (-risk_free * years_to_expiry) * years_to_expiry
_d1 = d1(market_price, strike_price, years_to_expiry,
volatility, risk_free, dividend_yield)
_d2 = option_type * (_d1 - volatility * math.sqrt(years_to_expiry))
return option_type * block_1 * csn(_d2)
def calc_option_theta(market_price, strike_price, days_to_expiry, volatility,
risk_free=get_risk_free_interest_rate(), dividend_yield=0, option_type=Option.CALL):
years_to_expiry = days_to_expiry / 365
_d1 = d1(market_price, strike_price, years_to_expiry,
volatility, risk_free, dividend_yield)
block_1 = market_price * math.e ** (-dividend_yield * years_to_expiry) * csn(option_type * _d1)
block_2 = strike_price * math.e ** (-risk_free * years_to_expiry) * risk_free
block_3 = market_price * math.e ** (-dividend_yield * years_to_expiry)
block_3 *= volatility / (2 * math.sqrt(years_to_expiry)) * snd(_d1)
return option_type * (block_1 - block_2) - block_3
def run_tests():
print('Testing clean_ticker')
assert clean_ticker('ac.to') == 'AC.TO'
assert clean_ticker('23ac.to23@#0 ') == 'AC.TO'
print('Getting NASDAQ')
nasdaq_tickers = get_nasdaq_tickers()
assert nasdaq_tickers['AMD']['name'] == 'Advanced Micro Devices Inc.'
print('Getting NYSE')
assert get_nyse_tickers()['V']['name'] == 'VISA INC'
assert get_nyse_tickers()['VZ']['name'] == 'VERIZON COMMUNICATIONS'
print('Getting AMEX')
get_amex_tickers()
print('Getting NYSE ARCA')
assert get_nyse_arca_tickers()['SPY']['name'] == 'SPDR S&P 500 ETF TRUST'
print('Getting TSX')
assert 'SHOP.TO' in get_tsx_tickers()
print('Getting OTC')
assert get_otc_tickers()['HTZGQ']['name'] == 'HERTZ GLOBAL HOLDINGS INC'
print('Getting DOW')
dow_tickers = get_dow_tickers()
assert dow_tickers['AAPL']['name'] == 'Apple Inc.'
print('Getting S&P500')
sp500_tickers = get_sp500_tickers()
assert sp500_tickers['TSLA']['name'] == 'Tesla, Inc.'
print('Getting Russel 2k')
rut2k_tickers = get_russel_2k_tickers()
assert rut2k_tickers['PZZA']['name'] == "Papa John's International Inc."
print('Getting FUTURES')
get_index_futures()
print('Testing get_company_name')
assert get_company_name('NVDA') == 'NVIDIA CORP'
print('Getting 10 Random Stocks')
print(get_random_stocks(10))
print('Testing get ticker info')
real_tickers = ('RTX', 'PLTR', 'OVV.TO', 'SHOP.TO', 'AMD', 'CCIV', 'SPY', 'VOO')
for ticker in real_tickers:
# dividend, non-dividend, ca-dividend, ca-non-dividend, old
get_ticker_info(ticker)
# test invalid ticker
with suppress(ValueError):
get_ticker_info('ZWC')
# test get target prices
print('Testing get target price')
get_target_price('DOC')
with suppress(ValueError):
get_target_price('ZWC')
assert 0 < get_risk_free_interest_rate(0) < 1
print('Testing find_stock')
pprint(find_stock('entertainment'))
pprint(find_stock('TWLO'))
tickers = {'entertainment', 'Tesla', 'Twitter', 'TWLO', 'Paypal', 'Visa'}
for ticker in real_tickers:
try:
assert find_stock(ticker)
except AssertionError:
print(f'TEST FAILED: find_stock({ticker}')
assert not find_stock('thisshouldfail')
print('Testing get ticker infos')
tickers_info, errors = get_ticker_infos(tickers)
assert tickers_info and not errors
print('Testing get target prices')
tickers = {'Tesla', 'Twitter', 'TWLO', 'Paypal', 'Visa', 'OPEN', 'CRSR', 'PLTR', 'PTON', 'ZM'}
target_prices, errors = get_target_prices(tickers)
assert target_prices and not errors
print('Testing sort tickers by dividend yield')
sort_by_dividend(get_dow_tickers())
print('Testing top movers')
top_movers(market='DOW')
if __name__ == '__main__':
run_tests()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment