Skip to content

Instantly share code, notes, and snippets.

@elibroftw
Last active July 2, 2023 02:33
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save elibroftw/2c374e9f58229d7cea1c14c6c4194d27 to your computer and use it in GitHub Desktop.
Save elibroftw/2c374e9f58229d7cea1c14c6c4194d27 to your computer and use it in GitHub Desktop.
A Python script and tool for speeding up stock market research.
"""
Investing Analytics
Author: Elijah Lopez
Version: 2.1
Created: April 3rd 2020
Updated: July 1st 2023
https://gist.github.com/elibroftw/2c374e9f58229d7cea1c14c6c4194d27
Resources:
Black-Scholes variables:
https://aaronschlegel.me/black-scholes-formula-python.html#Dividend-Paying-Black-Scholes-Formula
Black-Scholes formulas:
https://quantpie.co.uk/bsm_formula/bs_summary.php
Volatility (Standard Deviation) of a stock:
https://tinytrader.io/how-to-calculate-historical-price-volatility-with-python/
Concurrent Futures:
https://docs.python.org/3/library/concurrent.futures.html
"""
from contextlib import suppress
import csv
import concurrent.futures
from datetime import datetime, timedelta
from json.decoder import JSONDecodeError
import math
from statistics import NormalDist, median, StatisticsError
# noinspection PyUnresolvedReferences
from pprint import pprint
from typing import Iterator
# 3rd party libraries
from bs4 import BeautifulSoup
from fuzzywuzzy import process
import random
import requests
import json
import yfinance as yf
from enum import IntEnum
import numpy as np
from pytz import timezone
from functools import lru_cache, wraps, cmp_to_key
import time
import re
import feedparser
import sys
from tabulate import tabulate
def time_cache(max_age, maxsize=None, typed=False):
"""Least-recently-used cache decorator with time-based cache invalidation.
Args:
max_age: Time to live for cached results (in seconds).
maxsize: Maximum cache size (see `functoolslru_cache`).
typed: Cache on distinct input types (see `functools.lru_cache`).
"""
def _decorator(fn):
@lru_cache(maxsize=maxsize, typed=typed)
def _new(*args, __time_salt, **kwargs):
return fn(*args, **kwargs)
@wraps(fn)
def _wrapped(*args, **kwargs):
return _new(*args, **kwargs, __time_salt=int(time.time() / max_age))
return _wrapped
return _decorator
def timing(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
_start = time.time()
result = fn(*args, **kwargs)
print(f'@timing {fn.__name__} ELAPSED TIME:', time.time() - _start)
return result
return wrapper
NASDAQ_TICKERS_URL = 'https://api.nasdaq.com/api/screener/stocks?exchange=nasdaq&download=true'
OTC_TICKERS_URL = 'https://www.otcmarkets.com/research/stock-screener/api?securityType=Common%20Stock&market=20,21,22,10,6,5,2,1&sortField=symbol&pageSize=100000'
# NYSE_TICKERS_URL = 'https://api.nasdaq.com/api/screener/stocks?exchange=nyse&download=true'
NYSE_TICKERS_URL = 'https://www.nyse.com/api/quotes/filter'
NYSE_URL = 'https://www.nyse.com'
AMEX_TICKERS_URL = 'https://api.nasdaq.com/api/screener/stocks?exchange=amex&download=true'
TSX_TICKERS_URL = 'https://www.tsx.com/json/company-directory/search/tsx/^*'
PREMARKET_FUTURES_URL = 'https://ca.investing.com/indices/indices-futures'
DOW_URL = 'https://en.wikipedia.org/wiki/Dow_Jones_Industrial_Average'
SP500_URL = 'http://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
RUT_2K_URL ='https://api.vanguard.com/rs/ire/01/ind/fund/VTWO/portfolio-holding/stock.json'
TIP_RANKS_API = 'https://www.tipranks.com/api/stocks/'
CIK_LIST_URL = 'https://www.sec.gov/include/ticker.txt'
SORTED_INFO_CACHE = {} # for when its past 4 PM
GENERIC_HEADERS = {
'accept': 'text/html,application/xhtml+xml,application/json',
'user-agent': 'Mozilla/5.0'
}
# NOTE: something for later https://www.alphavantage.co/
# noinspection PyShadowingNames
def make_request(url, method='GET', headers=None, json=None, data=None):
if headers is None:
headers = GENERIC_HEADERS
if method == 'GET':
return requests.get(url, headers=headers)
elif method == 'POST':
return requests.post(url, json=json, headers=headers, data=None)
raise ValueError(f'Invalid method {method}')
@time_cache(24 * 3600, maxsize=1)
def get_dow_tickers() -> dict:
resp = make_request(DOW_URL)
soup = BeautifulSoup(resp.text, 'html.parser')
# noinspection PyUnresolvedReferences
table = soup.find('table', {'id': 'constituents'}).find('tbody')
rows = table.find_all('tr')
tickers = dict()
for row in rows:
with suppress(IndexError):
ticker = row.find_all('td')[1].text.split(':')[-1].strip()
name = row.find('th').text.strip()
tickers[ticker] = {'symbol': ticker, 'name': name}
return tickers
@time_cache(24 * 3600, maxsize=1)
def get_sp500_tickers() -> dict:
resp = make_request(SP500_URL)
soup = BeautifulSoup(resp.text, 'html.parser')
table = soup.find('table', {'id': 'constituents'})
tickers = {}
# noinspection PyUnresolvedReferences
for row in table.findAll('tr')[1:]:
tds = row.findAll('td')
ticker = tds[0].text.strip()
if '.' not in ticker:
name = tds[1].text.strip()
tickers[ticker] = {'symbol': ticker, 'name': name}
return tickers
@time_cache(24 * 3600, maxsize=1)
def get_russel_2k_tickers() -> dict:
'''
Instead of calculating the russel 2k every time,
'''
data = make_request(RUT_2K_URL, headers={'Referer': RUT_2K_URL}).json()
tickers = {}
for stock in data['fund']['entity']:
ticker = stock['ticker']
# filter tickers
# if asset_class == 'Equity' and ticker != '-' and not bool(re.search(r'\d', ticker)):
tickers[ticker] = {
'symbol': ticker,
'name': stock['longName']
}
return tickers
def clean_ticker(ticker):
# remove everything except for letters and periods
regex = re.compile(r'[^a-zA-Z.]')
return regex.sub('', ticker).strip().upper()
def clean_name(name: str):
return name.replace('Common Stock', '').strip()
def clean_stock_info(info):
info['name'] = clean_name(info['name'])
return info
@time_cache(24 * 3600, maxsize=1)
def get_bats_tickers() -> dict:
r = make_request(NASDAQ_TICKERS_URL).json()
tickers = {}
for stock in r['data']['rows']:
symbol = stock['symbol'].strip()
tickers[symbol] = {**clean_stock_info(stock), 'exchange': 'NASDAQ'}
return tickers
@time_cache(24 * 3600, maxsize=1)
def get_nasdaq_tickers() -> dict:
r = make_request(NASDAQ_TICKERS_URL).json()
tickers = {}
for stock in r['data']['rows']:
symbol = stock['symbol'].strip()
tickers[symbol] = {**clean_stock_info(stock), 'exchange': 'NASDAQ'}
return tickers
@time_cache(24 * 3600, maxsize=1)
def get_nyse_tickers() -> dict:
payload = {"instrumentType": "EQUITY", "pageNumber": 1, "sortColumn": "NORMALIZED_TICKER", "sortOrder": "ASC",
"maxResultsPerPage": 10000, "filterToken": ""}
r = make_request(NYSE_TICKERS_URL, method='POST', json=payload).json()
with open('test2.json', 'w') as f:
json.dump(r, f, indent=4)
tickers = {}
for stock in r:
symbol = stock['symbol'] = stock['symbolTicker'].strip()
stock['name'] = stock['instrumentName']
tickers[symbol] = {**clean_stock_info(stock), 'exchange': 'NYSE'}
return tickers
@time_cache(24 * 3600, maxsize=1)
def get_amex_tickers() -> dict:
r = make_request(AMEX_TICKERS_URL).json()
tickers = {}
for stock in r['data']['rows']:
symbol = stock['symbol'].strip()
tickers[symbol] = {**clean_stock_info(stock), 'exchange': 'AMEX'}
return tickers
@time_cache(24 * 3600, maxsize=1)
def get_tsx_tickers() -> dict:
r = make_request(TSX_TICKERS_URL).json()
tickers = {}
for stock in r['results']:
ticker = stock['symbol'].strip() + '.TO'
name = stock['name'].replace('Common Stock', '').strip()
tickers[ticker] = {'symbol': ticker, 'name': name, 'exchange': 'TSX'}
return tickers
@time_cache(24 * 3600, maxsize=1)
def get_nyse_arca_tickers() -> dict:
post_data = {'instrumentType': 'EXCHANGE_TRADED_FUND', 'pageNumber': 1, 'sortColumn': 'NORMALIZED_TICKER',
'sortOrder': 'ASC', 'maxResultsPerPage': 5000, 'filterToken': ''}
r = requests.post('https://www.nyse.com/api/quotes/filter',
json=post_data).json()
tickers = {}
for stock in r:
symbol = stock['symbolTicker'].strip()
tickers[symbol] = {'symbol': symbol, 'name': stock['instrumentName'], 'exchange': 'NYSEARCA'}
return tickers
@time_cache(24 * 3600, maxsize=1)
def get_otc_tickers() -> dict:
r = make_request(OTC_TICKERS_URL).text.strip('"').replace('\\"', '"')
r = json.loads(r)['stocks']
tickers = {}
for stock in r:
symbol = stock['symbol'].strip()
info = {'symbol': stock['symbol'], 'name': stock['securityName'], 'exchange': 'OTC'}
tickers[symbol] = info
return tickers
# can cache this since info rarely changes
@time_cache(24 * 3600, maxsize=100)
def get_tickers(category, return_list=False) -> dict | list:
"""
OPTIONS: ALL, US, NYSE, NASDAQ, SP500, DOW, TSX,
DEFENCE, MREITS, CARS, TANKERS, UTILS
"""
category = category.upper()
tickers = dict()
# Indexes
if category in {'S&P500', 'S&P 500', 'SP500'}:
tickers.update(get_sp500_tickers())
if category in {'DOW', 'DJIA'}:
tickers.update(get_dow_tickers())
if category in {'RUT2K', 'RUSSEL2K'}:
tickers.update(get_russel_2k_tickers())
# Exchanges
if category in {'NASDAQ', 'NDAQ', 'US', 'ALL'}:
tickers.update(get_nasdaq_tickers())
if category in {'NYSE', 'US', 'ALL'}:
tickers.update(get_nyse_tickers())
if category in {'AMEX', 'US', 'ALL'}:
tickers.update(get_amex_tickers())
if category in {'ARCA', 'NYSEARCA', 'US', 'ALL'}:
tickers.update(get_nyse_arca_tickers())
if category in {'TSX', 'TMX', 'CA', 'ALL'}:
tickers.update(get_tsx_tickers())
if category in {'OTC', 'OTCMKTS', 'ALL'}:
tickers.update(get_otc_tickers())
# Industries
elif category == 'DEFENCE':
defence_tickers = {'LMT', 'BA', 'NOC', 'GD', 'RTX', 'LDOS'}
tickers = get_nyse_tickers()
return {k: v for k, v in tickers.items() if k in defence_tickers}
elif category in {'MORTGAGE REITS', 'MREITS'}:
mreits = {'NLY', 'STWD', 'AGNC', 'TWO', 'PMT', 'MITT', 'NYMT', 'MFA',
'IVR', 'NRZ', 'TRTX', 'RWT', 'DX', 'XAN', 'WMC'}
tickers = get_tickers('ALL')
return {k: v for k, v in tickers.items() if k in mreits}
elif category in {'OIL', 'OIL & GAS', 'O&G'}:
oil_and_gas = {'DNR', 'PVAC', 'ROYT', 'SWN', 'CPE', 'CEQP', 'PAA', 'PUMP', 'PBF'}
tickers = get_tickers('ALL')
return {k: v for k, v in tickers.items() if k in oil_and_gas}
elif category in {'AUTO', 'AUTOMOBILE', 'CARS', 'CAR'}:
autos = {'TSLA', 'GM', 'F', 'NIO', 'RACE', 'FCAU', 'HMC', 'TTM', 'TM', 'XPEV', 'LI', 'CCIV'}
tickers = get_tickers('ALL')
return {k: v for k, v in tickers.items() if k in autos}
elif category == 'TANKERS':
oil_tankers = {'EURN', 'TNK', 'TK', 'TNP', 'DSX', 'NAT',
'STNG', 'SFL', 'DHT', 'CPLP', 'DSSI', 'FRO', 'INSW', 'NNA', 'SBNA'}
tickers = get_tickers('ALL')
return {k: v for k, v in tickers.items() if k in oil_tankers}
elif category in {'UTILS', 'UTILITIES'}:
utilities = {'PCG', 'ELLO', 'AT', 'ELP', 'ES', 'EDN', 'IDA', 'HNP', 'GPJA', 'NEP', 'SO', 'CEPU', 'AES', 'ETR',
'KEP', 'OGE', 'EIX', 'NEE', 'TVC', 'TAC', 'EE', 'CIG', 'PNW', 'EMP', 'EBR.B', 'CPL', 'DTE', 'POR',
'EAI', 'NRG', 'CWEN', 'KEN', 'AGR', 'BEP', 'ORA', 'EAE', 'PPX', 'AZRE', 'ENIC', 'FE', 'CVA', 'BKH',
'ELJ', 'EZT', 'HE', 'VST', 'ELU', 'ELC', 'TVE', 'AQN', 'PAM', 'AEP', 'ENIA', 'EAB', 'PPL', 'CNP',
'D', 'PNM', 'EBR', 'FTS'}
tickers = get_tickers('ALL')
return {k: v for k, v in tickers.items() if k in utilities}
return list(tickers) if return_list else tickers
@time_cache(24 * 3600, maxsize=1)
def get_cik_mapping():
r = make_request(CIK_LIST_URL)
cik_mapping = {}
for line in r.text.splitlines():
line = line.strip()
ticker, cik = line.split()
ticker = ticker.upper()
cik_mapping[ticker] = cik
return cik_mapping
@lru_cache(maxsize=10000)
def get_cik(ticker):
return get_cik_mapping()[ticker]
def get_company_name(ticker: str):
ticker = clean_ticker(ticker)
with suppress(KeyError):
return get_tickers('ALL')[ticker]['name']
if ticker.count('.TO'):
try:
return get_tsx_tickers()[ticker]['name']
except KeyError:
ticker = ticker.replace('.TO', '')
r = requests.get(f'https://www.tsx.com/json/company-directory/search/tsx/{ticker}')
results = {}
for s in r.json()['results']:
s['name'] = s['name'].upper()
if s['symbol'] == ticker: return s['name']
results[s['symbol']] = s
best_match = process.extractOne(ticker, list(results.keys()))[0]
return results[best_match]['name']
raise ValueError(f'could not get company name for {ticker}')
@time_cache(10000)
def get_financials_v2(ticker: str):
# TODO: https://api.nasdaq.com/api/company/IBM/financials?frequency=1
pass
@time_cache(10000)
def get_financials(ticker: str):
"""
Scrapes MarketWatch and returns Total Assets, Net Incomes, and Return on Assets (ROA) [for now]
for US Companies that file with the SEC.
Performance: ~5 seconds cold start, ~1.5 seconds thereafter
Args:
ticker: US ticker to get data for
aggregate: Whether to parse all 10K files. [future]
commit_db: Whether to handle the sqlite commit [future]
Returns:
"""
ticker = clean_ticker(ticker)
if ticker not in get_tickers('ALL'):
ticker = find_stock(ticker)[0][0]
url = f'https://finance.yahoo.com/quote/{ticker}/financials?p=IBM'
# TODO: make two functions?
r = make_request(url)
soup = BeautifulSoup(r.text, features='html.parser')
income_statement = soup.find('div', attrs={'class': 'M(0) Whs(n) BdEnd Bdc($seperatorColor) D(itb)'})
try:
periods = next(next(income_statement.children).children).children
except AttributeError:
print(ticker)
raise ValueError(f'Invalid ticker {ticker}')
data = list(income_statement.children)[1]
dates = []
financials = {'symbol': ticker, 'name': get_company_name(ticker)}
for period in periods:
period = period.text.lower()
if period == 'breakdown':
continue
if period == 'ttm':
dates.append(period)
else:
period = int(period.rsplit('/', 1)[1])
if 'latest_year' not in financials:
financials['latest_year'] = period
dates.append(period)
financials[period] = {}
for row in data.children:
with suppress(AttributeError):
values = row.find('div')
values = values.children
heading = next(values).text.lower().replace(' ', '_')
for i, value in enumerate(values):
key = dates[i]
try:
value = int(value.text.replace(',', ''))
except ValueError:
value = None
financials[key][heading] = value
url = f'https://finance.yahoo.com/quote/{ticker}/balance-sheet'
r = make_request(url)
soup = BeautifulSoup(r.text, features='html.parser')
balance_sheet = soup.find('div', attrs={'class': 'M(0) Whs(n) BdEnd Bdc($seperatorColor) D(itb)'})
periods = next(next(balance_sheet.children).children).children
data = list(balance_sheet.children)[1]
dates = []
for period in periods:
period = period.text.lower()
if period == 'breakdown':
continue
if period == 'ttm':
dates.append(period)
else:
period = int(period.rsplit('/', 1)[1])
if 'latest_year' not in financials:
financials['latest_year'] = period
dates.append(period)
if period not in financials:
financials[period] = {}
for row in data.children:
with suppress(AttributeError):
values = row.find('div')
values = values.children
heading = next(values).text.lower().replace(' ', '_')
for i, value in enumerate(values):
key = dates[i]
try:
value = int(value.text.replace(',', ''))
except ValueError:
value = None
financials[key][heading] = value
# calculate roa
latest_year = financials['latest_year']
while latest_year - 1 in financials:
assets_beginning = financials[latest_year - 1]['total_assets']
net_income = financials[latest_year]['net_income_common_stockholders']
financials[latest_year]['roa'] = net_income / assets_beginning
latest_year -= 1
financials['roa'] = financials[financials['latest_year']]['roa']
return financials
@time_cache(10000)
def get_financials_old(ticker: str, aggregate=False, commit_db=True):
"""
Parses 10K file and returns Total Assets, Net Incomes, and Return on Assets (ROA) [for now]
for US Companies that file with the SEC.
Performance: ~5 seconds cold start, ~1.5 seconds thereafter
Args:
ticker: US ticker to get data for
aggregate: Whether to parse all 10K files. [future]
commit_db: Whether to handle the sqlite commit [future]
Returns:
{'name': 'Apple Inc.',
'net_incomes': {2018: 59531000000, 2019: 55256000000, 2020: 57411000000},
'return_on_assets': {2020: 16.959611953349324},
'roa': 16.959611953349324,
'symbol': 'AAPL',
'total_assets': {2019: 338516000000, 2020: 323888000000}}
"""
# TODO: use a SQLITE database to cache data
# and the latest 10K url
ticker = clean_ticker(ticker)
if ticker not in get_tickers('ALL'):
ticker = find_stock(ticker)[0][0]
company_name = get_company_name(ticker)
cik = get_cik(ticker).rjust(10, '0')
submission = make_request(f'https://data.sec.gov/submissions/CIK{cik}.json').json()
form_index = submission['filings']['recent']['form'].index('10-K')
accession = submission['filings']['recent']['accessionNumber'][form_index].replace('-', '')
file_name = submission['filings']['recent']['primaryDocument'][form_index]
file_name, ext = file_name.rsplit('.')
url = f'https://www.sec.gov/Archives/edgar/data/{cik}/{accession}/{file_name}_{ext}.xml'
r = make_request(url).text
soup = BeautifulSoup(r, 'lxml')
def get_context_date(context_ref, is_balance_sheet=False, only_year=False):
try:
_date = context_ref.rsplit('_I' if is_balance_sheet else '-', 1)[1]
int(_date)
except (IndexError, ValueError):
if context_ref.startswith('As_Of') or context_ref.startswith('PAsOn'):
m, d, y = re.findall('[0-9]+_[0-9]+_[0-9]+', context_ref)[0].split('_')
m, d = int(m), int(d)
_date = f'{y}{m:02}{d:02}'
elif context_ref.startswith('FI') or context_ref.startswith('FD'):
# only the year is available
return int(re.findall('[1-9][0-9][0-9][0-9]', context_ref)[0])
else:
m, d, y = re.findall('[0-9]+_[0-9]+_[0-9]+', context_ref)[1].split('_')
m, d = int(m), int(d)
_date = f'{y}{m:02}{d:02}'
return int(_date[:4]) if only_year else _date
total_assets = {get_context_date(tag['contextref'], True, True): int(tag.text) for tag in soup.find_all('us-gaap:assets')}
net_income_loss = soup.find_all('us-gaap:netincomeloss')
# if tags not found use other alias
if not net_income_loss:
net_income_loss = soup.find_all('us-gaap:netincomelossavailabletocommonstockholdersbasic')
net_incomes = {}
for tag in net_income_loss:
try:
year = get_context_date(tag['contextref'], only_year=True)
if year not in net_incomes:
net_incomes[year] = int(tag.text)
except IndexError:
pprint(net_income_loss)
roas = {}
for year, value in total_assets.items():
with suppress(KeyError):
next_year = year + 1
roa = (net_incomes[next_year] / value) * 100 # %
roas[next_year] = roa
financials = {
'name': company_name,
'symbol': ticker,
'total_assets': total_assets,
'net_incomes': net_incomes,
'return_on_assets': roas,
'roa': sorted(roas.items())[0][1]
}
return financials
def get_ticker_info(query: str, round_values=True):
"""
Uses WSJ instead of yfinance to get stock info summary
Sample Return:
{'annualized_dividend': 6.52,
'api_url': 'https://www.wsj.com/market-data/quotes/IBM?id={"ticker":"IBM","countryCode":"US","path":"IBM"}&type=quotes_chart',
'change': -0.15,
'change_percent': -0.12,
'close_price': 120.71,
'dividend_yield': 5.40,
'eps_ttm': 6.24,
'pe': 19.34,
'extended_hours': True,
'last_dividend': 1.63,
'latest_change': -0.01,
'latest_change_percent': -0.01,
'name': 'International Business Machines Corp.',
'previous_close_price': 120.86,
'price': 120.7,
'source': 'https://www.marketwatch.com/investing/stock/IBM?countrycode=US',
'symbol': 'IBM',
'timestamp': datetime.datetime(2021, 2, 23, 19, 59, 49, 906000, tzinfo=<StaticTzInfo 'GMT'>),
'volume': 4531464}
"""
ticker = clean_ticker(query)
try:
is_etf = ticker in get_nyse_arca_tickers() or 'ETF' in get_company_name(ticker).split()
except ValueError:
is_etf = False
country_code = 'CA' if '.TO' in ticker else 'US'
ticker = ticker.replace('.TO', '') # remove exchange
api_query = {
'ticker': ticker,
'countryCode': country_code,
'path': ticker
}
api_query = json.dumps(api_query, separators=(',', ':'))
source = f'https://www.marketwatch.com/investing/stock/{ticker}?countrycode={country_code}'
api_url = f'https://www.wsj.com/market-data/quotes/{ticker}?id={api_query}&type=quotes_chart'
if is_etf:
ckey = 'cecc4267a0'
entitlement_token = 'cecc4267a0194af89ca343805a3e57af'
source = f'https://www.marketwatch.com/investing/fund/{ticker}?countrycode={country_code}'
api_url = f'https://api.wsj.net/api/dylan/quotes/v2/comp/quoteByDialect?dialect=official&needed=Financials|CompositeTrading|CompositeBeforeHoursTrading|CompositeAfterHoursTrading&MaxInstrumentMatches=1&accept=application/json&EntitlementToken={entitlement_token}&ckey={ckey}&dialects=Charting&id=ExchangeTradedFund-US-{ticker}'
r = make_request(api_url)
if not r.ok:
try:
ticker = find_stock(query)[0][0]
if ticker != query:
return get_ticker_info(ticker)
except IndexError:
raise ValueError(f'Invalid ticker "{query}"')
try:
data = r.json() if is_etf else r.json()['data']
except JSONDecodeError:
raise ValueError(f'Invalid ticker "{query}"')
try:
quote_data = data['InstrumentResponses'][0]['Matches'][0] if is_etf else data['quoteData']
except IndexError:
raise ValueError(f'Invalid ticker "{query}"')
financials = quote_data['Financials']
name = quote_data['Instrument']['CommonName']
try:
previous_close = financials['Previous']['Price']['Value']
except TypeError:
raise ValueError(f'Invalid ticker "{query}"')
latest_price = closing_price = quote_data['CompositeTrading']['Last']['Price']['Value']
try:
latest_price = quote_data['CompositeBeforeHoursTrading']['Price']['Value']
except TypeError:
try:
latest_price = quote_data['CompositeAfterHoursTrading']['Price']['Value']
except TypeError:
closing_price = previous_close
volume = quote_data['CompositeTrading']['Volume']
if is_etf:
if quote_data['CompositeBeforeHoursTrading']:
market_state = 'Pre-Market'
elif quote_data['CompositeAfterHoursTrading']:
market_state = 'After-Market' if quote_data['CompositeAfterHoursTrading']['IsRealtime'] else 'Closed'
else:
market_state = 'Open'
else:
market_state = data['quote']['marketState'].get('CurrentState', 'Open')
extended_hours = market_state in {'After-Market', 'Closed', 'Pre-Market'}
if market_state in {'After-Market', 'Closed'} and quote_data['CompositeAfterHoursTrading']:
timestamp = quote_data['CompositeAfterHoursTrading']['Time']
elif market_state == 'Pre-Market' and quote_data['CompositeBeforeHoursTrading']:
timestamp = quote_data['CompositeBeforeHoursTrading']['Time']
else:
timestamp = quote_data['CompositeTrading']['Last']['Time']
try:
timestamp = int(timestamp.split('(', 1)[1].split('+', 1)[0]) / 1e3
timestamp = datetime.utcfromtimestamp(timestamp).astimezone(timezone('US/Eastern'))
except IndexError:
# time format is: 2021-02-25T18:52:44.677
timestamp = datetime.strptime(timestamp.rsplit('.', 1)[0], '%Y-%m-%dT%H:%M:%S')
change = closing_price - previous_close
try:
change_percent = change / previous_close * 100
except ZeroDivisionError:
change_percent = 0
latest_change = latest_price - closing_price
try:
latest_change_percent = latest_change / closing_price * 100
except ZeroDivisionError:
latest_change_percent = 0
try:
market_cap = financials['MarketCapitalization']['Value']
except TypeError:
try:
market_cap = financials['SharesOutstanding'] * latest_price
except TypeError:
market_cap = 0
try:
eps_ttm = financials['LastEarningsPerShare']['Value']
except TypeError:
eps_ttm = 0
try:
last_dividend = financials['LastDividendPerShare']['Value']
except TypeError:
last_dividend = None
dividend_yield = financials['Yield']
annualized_dividend = financials['AnnualizedDividend']
if annualized_dividend is None:
dividend_yield = 0
last_dividend = 0
annualized_dividend = 0
pe = financials['PriceToEarningsRatio']
if pe is None:
try:
pe = closing_price / eps_ttm
except ZeroDivisionError:
pe = 0 # 0 = N/A
if round_values:
previous_close = round(previous_close, 2)
latest_price = round(latest_price, 2)
closing_price = round(closing_price, 2)
change = round(change, 2)
change_percent = round(change_percent, 2)
latest_change = round(latest_change, 2)
latest_change_percent = round(latest_change_percent, 2)
dividend_yield = round(dividend_yield, 2)
last_dividend = round(last_dividend, 2)
eps_ttm = round(eps_ttm, 2)
market_cap = round(market_cap)
return_info = {
'name': name,
'symbol': ticker + ('.TO' if country_code == 'CA' else ''),
'volume': volume,
'eps_ttm': eps_ttm,
'pe': pe,
'dividend_yield': dividend_yield,
'last_dividend': last_dividend,
'annualized_dividend': annualized_dividend,
'price': latest_price,
'market_cap': market_cap,
'close_price': closing_price,
'previous_close_price': previous_close,
'change': change,
'change_percent': change_percent,
'latest_change': latest_change,
'latest_change_percent': latest_change_percent,
'extended_hours': extended_hours,
'timestamp': timestamp,
'source': source,
'api_url': api_url
}
return return_info
# noinspection PyUnboundLocalVariable
@time_cache(30) # cache for 30 seconds
def get_ticker_info_old(ticker: str, round_values=True, use_nasdaq=False) -> dict:
"""
Raises ValueError
Sometimes the dividend yield is incorrect
"""
ticker = clean_ticker(ticker)
if use_nasdaq:
url = f'https://api.nasdaq.com/api/quote/{ticker}/summary?assetclass=stocks'
r = make_request(url).json()
if r['status']['rCode'] < 400:
summary = {k: v['value'] for k, v in r['data']['summaryData'].items()}
url = f'https://api.nasdaq.com/api/quote/{ticker}/info?assetclass=stocks'
info = make_request(url).json()['data']
# name = get_tickers('ALL')[ticker]['name']
name = clean_name(info['companyName'])
volume = int(summary['ShareVolume'].replace(',', ''))
previous_close = float(summary['PreviousClose'].replace('$', ''))
eps_ttm = float(summary['EarningsPerShare'].replace('$', '').replace('N/A', '0'))
# annualized dividend
last_dividend = float(summary['AnnualizedDividend'].replace('$', '').replace('N/A', '0'))
dividend_yield = float(summary['Yield'].replace('%', '').replace('N/A', '0'))
# industry = summary['Industry']
else:
use_nasdaq = False
yf_ticker = yf.Ticker(ticker)
if not use_nasdaq:
try:
info = yf_ticker.info
name = info['longName']
volume = info['volume']
previous_close = info['regularMarketPreviousClose']
eps_ttm = info.get('trailingEps')
last_dividend = info.get('lastDividendValue')
dividend_yield = info['trailingAnnualDividendYield']
if last_dividend is None:
dividend_yield = 0
last_dividend = 0
except (KeyError, ValueError):
raise ValueError(f'Invalid ticker "{ticker}"')
data_latest = yf_ticker.history(period='5d', interval='1m', prepost=True)
timestamp = data_latest.last_valid_index()
latest_price = float(data_latest.tail(1)['Close'].iloc[0])
# if market is open: most recent close
# else: close before most recent close
# get most recent price
timestamp_ending = str(timestamp)[-6:]
extended_hours = not (16 > timestamp.hour > 9 or (timestamp.hour == 9 and timestamp.min <= 30))
if timestamp.hour >= 16: # timestamp is from post market
today = datetime(timestamp.year, timestamp.month, timestamp.day, 15, 59)
closing_timestamp = today.strftime(f'%Y-%m-%d %H:%M:%S{timestamp_ending}')
closing_price = data_latest.loc[closing_timestamp]['Open']
else:
# open-market / pre-market since timestamp is before 4:00 pm
# if pre-market, this close is after the previous close
latest_close = datetime(timestamp.year, timestamp.month,
timestamp.day, 15, 59) - timedelta(days=1)
while True:
try:
prev_day_timestamp = latest_close.strftime(f'%Y-%m-%d %H:%M:%S{timestamp_ending}')
closing_price = data_latest.loc[prev_day_timestamp]['Open']
break
except KeyError:
latest_close -= timedelta(days=1)
change = closing_price - previous_close
change_percent = change / previous_close * 100
latest_change = latest_price - closing_price
latest_change_percent = latest_change / closing_price * 100
if round_values:
previous_close = round(previous_close, 2)
latest_price = round(latest_price, 2)
closing_price = round(closing_price, 2)
change = round(change, 2)
change_percent = round(change_percent, 2)
latest_change = round(latest_change, 2)
latest_change_percent = round(latest_change_percent, 2)
try: dividend_yield = round(dividend_yield, 4)
except TypeError: dividend_yield = 0
last_dividend = round(last_dividend, 2)
with suppress(TypeError): eps_ttm = round(eps_ttm, 2)
return_info = {
'name': name,
'symbol': ticker,
'volume': volume,
'eps_ttm': eps_ttm,
'dividend_yield': dividend_yield,
'last_dividend': last_dividend,
'price': latest_price,
'close_price': closing_price,
'previous_close_price': previous_close,
'change': change,
'change_percent': change_percent,
'latest_change': latest_change,
'latest_change_percent': latest_change_percent,
'extended_hours': extended_hours,
'timestamp': timestamp
}
return return_info
def get_ticker_infos(tickers, round_values=True, errors_as_str=False, show_progress=False) -> tuple:
"""
returns: list[dict], list
uses a threadPoolExecutor instead of asyncio
"""
ticker_infos = []
tickers_not_found = []
progress_str = f'0 / {len(tickers)} (0 %)'
if show_progress:
sys.stderr.write(f'Downloading Tickers: {progress_str}')
with concurrent.futures.ThreadPoolExecutor(max_workers=35) as executor:
future_infos = [executor.submit(get_ticker_info, ticker, round_values=round_values) for ticker in tickers]
for future in concurrent.futures.as_completed(future_infos):
try:
ticker_infos.append(future.result())
except ValueError as e:
tickers_not_found.append(str(e) if errors_as_str else e)
if show_progress:
sys.stderr.write('\b' * len(progress_str))
completed = len(ticker_infos) + len(tickers_not_found)
percentage = completed / len(tickers) * 100
progress_str = f'{completed} / {len(tickers)} ({percentage:.2f} %)'
sys.stderr.write(progress_str)
sys.stderr.flush()
if show_progress:
sys.stderr.write('\n')
sys.stderr.flush()
return ticker_infos, tickers_not_found
def get_data(tickers: Iterator, start_date=None, end_date=None, period=None, group_by='ticker', interval='1d', show_progress=True):
"""
start_date: 'YYYY-MM-DD', _datetime, or epoch
end_date: 'YYYY-MM-DD', _datetime, or epoch
period (default '3mo'): 1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max
interval (default '1d'): 1m,2m,5m,15m,30m,60m,90m,1h,1d,5d,1wk,1mo,3mo
"""
if start_date is None and end_date is None and period is None:
period = '3mo'
assert (start_date is None and end_date is None and period is not None) or (start_date is not None and end_date is not None and period is None)
# http://www.datasciencemadesimple.com/union-and-union-all-in-pandas-dataframe-in-python-2/
# new format
# _key = ' '.join(tickers) + f' {start_date} {end_date} {period} {group_by}'
_data = yf.download(list(tickers), start_date, end_date, period=period, group_by=group_by, threads=3,
progress=show_progress, interval=interval)
return _data
def parse_info(_data, ticker, start_date, end_date, start_price_key='Open'):
"""
start_price_key: can be 'Open' or 'Close'
TODO: change parse_info keys to snake_case
"""
start_price = _data[ticker][start_price_key][start_date]
if math.isnan(_data[ticker]['Open'][end_date]):
end_date = _data[ticker]['Open'].last_valid_index()
end_price = _data[ticker]['Close'][end_date]
change = end_price - start_price
percent_change = round(change / start_price, 5)
try:
start_volume = round(_data[ticker]['Volume'][start_date])
except ValueError:
start_volume = 0
end_volume = round(_data[ticker]['Volume'][end_date])
# THIS IS WRONG
avg_volume = round(_data[ticker]['Volume'].mean())
return {'ticker': ticker, 'start_price': round(start_price, 3), 'end_price': round(end_price, 3), 'change': round(change, 3), 'percent_change': percent_change, 'open_volume': start_volume, 'close_volume': end_volume, 'avg_volume': avg_volume}
def get_parsed_data(_data=None, tickers: Iterator = None, market='ALL', sort_key=lambda x: x['percent_change'], of='day', start_date: datetime = None, end_date: datetime = None, sort_dec=True):
"""
returns the parsed trading data sorted by percent change
:param _data: if you are doing a lot of parsing but None is recommended unless you are dealing with >= 1 month
:param tickers: specify if you have your own custom tickers list, otherwise market is used to get the list
:param market: the market if data is None
:param of: one of {'day', 'mtd', 'ytd', '1m', '1yr'}
:param sort_key: a lambda expression, the item has attributes {'start_price', 'end_price', 'change', 'percent_change', 'open_volume', 'close_volume', 'avg_volume'}
if None, a dict with the tickers as keys is returned instead of a list
:param start_date: if off == 'custom' specify this values
:param end_date: if of == 'custom' specify this value
"""
of = of.lower()
_today = datetime.today()
todays_date = _today.date()
if tickers is None:
tickers = list(get_tickers(market))
if _today.hour >= 16 and of == 'day':
# TODO: cache pre-market as well
# key format will be
with suppress(KeyError):
return SORTED_INFO_CACHE[of][str(todays_date)][','.join(tickers)]
if of == 'custom' or _data is not None:
if _data is None:
assert start_date and end_date
_data = get_data(tickers, start_date=start_date, end_date=end_date)
start_date, end_date = _data.first_valid_index(), _data.last_valid_index()
parsed_info = {}
for ticker in tickers:
info = parse_info(_data, ticker, start_date, end_date)
if not math.isnan(info['start_price']):
parsed_info[ticker] = info
elif of in {'day', '1d'}:
# TODO: use get_ticker_info instead
# ALWAYS USE LATEST DATA
_data = get_data(tickers, period='5d', interval='1m')
market_day = _data.last_valid_index().date() == todays_date
if not market_day or (_today.hour * 60 + _today.minute >= 645): # >= 10:45 AM
# movers of the latest market day [TODAY]
recent_day = _data.last_valid_index()
recent_start_day = recent_day.replace(hour=9, minute=30, second=0)
parsed_info = {}
for ticker in tickers:
try:
info = parse_info(_data, ticker, recent_start_day, recent_day)
if not math.isnan(info['start_price']):
parsed_info[ticker] = info
except ValueError:
# TODO: fix
print('ERROR: Could not get info for', ticker)
else: # movers of the second last market day
yest = _data.tail(2).first_valid_index() # assuming interval = 1d
parsed_info = {}
for ticker in tickers:
info = parse_info(_data, ticker, yest, yest)
if not math.isnan(info['start_price']):
parsed_info[ticker] = info
# TODO: custom day amount
elif of in {'mtd', 'month_to_date', 'monthtodate'}:
start_date = todays_date.replace(day=1)
if _data is None:
_data = get_data(tickers, start_date=start_date, end_date=_today)
while start_date not in _data.index and start_date < todays_date:
start_date += timedelta(days=1)
if start_date >= todays_date:
raise RuntimeError(
'No market days this month')
parsed_info = {}
for ticker in tickers:
info = parse_info(_data, ticker, start_date, todays_date)
if not math.isnan(info['start_price']):
parsed_info[ticker] = info
elif of in {'month', '1m', 'm'}:
start_date = todays_date - timedelta(days=30)
if _data is None:
_data = get_data(
tickers, start_date=start_date, end_date=_today)
while start_date not in _data.index:
start_date += timedelta(days=1)
parsed_info = {}
for ticker in tickers:
info = parse_info(_data, ticker, start_date, todays_date)
if not math.isnan(info['start_price']):
parsed_info[ticker] = info
# TODO: x months
elif of in {'ytd', 'year_to_date', 'yeartodate'}:
if _data is None:
_temp = _today.replace(day=1, month=1)
_data = get_data(tickers, start_date=_temp, end_date=_today)
start_date = _data.first_valid_index() # first market day of the year
else:
start_date = _today.replace(day=1, month=1).date() # Jan 1st
# find first market day of the year
while start_date not in _data.index:
start_date += timedelta(days=1)
end_date = _data.last_valid_index()
parsed_info = {}
for ticker in tickers:
info = parse_info(_data, ticker, start_date, end_date)
if not math.isnan(info['start_price']):
parsed_info[ticker] = info
elif of in {'year', '1yr', 'yr', 'y'}:
if _data is None:
_data = get_data(tickers, start_date=_today -
timedelta(days=365), end_date=_today)
start_date = _data.first_valid_index() # first market day of the year
else:
start_date = _today.date() - timedelta(days=365)
_data = get_data(tickers, start_date=_today.replace(
day=1, month=1), end_date=_today)
end_date = _data.last_valid_index()
parsed_info = {}
for ticker in tickers:
info = parse_info(_data, ticker, start_date, end_date)
if not math.isnan(info['start_price']):
parsed_info[ticker] = info
# TODO: x years
else:
parsed_info = {} # invalid of
if sort_key is None:
return parsed_info
sorted_info = sorted(parsed_info.values(), key=sort_key, reverse=sort_dec)
if _today.hour >= 16 and of == 'day':
if of not in SORTED_INFO_CACHE:
SORTED_INFO_CACHE[of] = {}
if str(todays_date) not in SORTED_INFO_CACHE[of]:
SORTED_INFO_CACHE[of][str(todays_date)] = {}
SORTED_INFO_CACHE[of][str(todays_date)][','.join(tickers)] = sorted_info
return sorted_info
def winners(sorted_info=None, tickers: list = None, market='ALL', of='day', start_date=None, end_date=None, show=5):
# sorted_info is the return of get_parsed_data with non-None sort_key
if sorted_info is None:
sorted_info = get_parsed_data(tickers=tickers, market=market, of=of, start_date=start_date, end_date=end_date)
return list(reversed(sorted_info[-show:]))
def losers(sorted_info=None, tickers: list = None, market='ALL', of='day', start_date=None, end_date=None, show=5):
# sorted_info is the return of get_parsed_data with non-None sort_key
if sorted_info is None:
sorted_info = get_parsed_data(
tickers=tickers, market=market, of=of, start_date=start_date, end_date=end_date)
return sorted_info[:show]
# noinspection PyTypeChecker
def winners_and_losers(_data=None, tickers=None, market='ALL', of='day', start_date=None, end_date=None, show=5,
console_output=True, csv_output=''):
sorted_info = get_parsed_data(_data, tickers, market, of=of, start_date=start_date, end_date=end_date)
if console_output:
bulls = ''
bears = ''
length = min(show, len(sorted_info))
for i in range(length):
better_stock = sorted_info[-i - 1]
worse_stock = sorted_info[i]
open_close1 = f'{round(better_stock[1]["Start"], 2)}, {round(better_stock[1]["End"], 2)}'
open_close2 = f'{round(worse_stock[1]["Start"], 2)}, {round(worse_stock[1]["End"], 2)}'
bulls += f'\n{better_stock[0]} [{open_close1}]: {round(better_stock[1]["Percent Change"] * 100, 2)}%'
bears += f'\n{worse_stock[0]} [{open_close2}]: {round(worse_stock[1]["Percent Change"] * 100, 2)}%'
header1 = f'TOP {length} WINNERS ({of})'
header2 = f'TOP {length} LOSERS ({of})'
line = '-' * len(header1)
print(f'{line}\n{header1}\n{line}{bulls}')
line = '-' * len(header2)
print(f'{line}\n{header2}\n{line}{bears}')
if csv_output:
with open(csv_output, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['TICKER'] + list(sorted_info[0][1].keys()))
for ticker in sorted_info:
writer.writerow([ticker[0]] + list(ticker[1].values()))
return sorted_info
def top_movers(_data=None, tickers=None, market='ALL', of='day', start_date=None, end_date=None, show=5,
console_output=True, csv_output=''):
return winners_and_losers(_data=_data, tickers=tickers, market=market, of=of, start_date=start_date,
end_date=end_date, show=show, console_output=console_output, csv_output=csv_output)
@time_cache(3600) # cache for 1 hour
def get_target_price(ticker, round_values=True):
"""
ticker: yahoo finance ticker
returns: {'avg': float, 'low': float, 'high': float, 'price': float,
'eps_ttm': float 'source': 'url', 'api_url': 'url'}
"""
try:
ticker_info = get_ticker_info(ticker)
price = ticker_info['price'] # get latest price
ticker = ticker_info['symbol'] # get fixed ticker
timestamp = datetime.now().timestamp()
query = f'{TIP_RANKS_API}getData/?name={ticker}&benchmark=1&period=3&break={timestamp}'
r = make_request(query).json()
total = 0
estimates = []
try:
# Assumed to be ttm
eps_ttm = r['portfolioHoldingData']['lastReportedEps']['reportedEPS']
except TypeError:
eps_ttm = 0
target_prices = {
'symbol': ticker,
'name': r['companyName'],
'high': 0,
'low': 100000,
'price': price,
'eps_ttm': eps_ttm,
'source': f'https://www.tipranks.com/stocks/{ticker}/forecast',
'api_url': query
}
estimates = []
for expert in r['experts']:
target_price = expert['ratings'][0]['priceTarget']
if target_price:
# if analysis had a price target
if target_price > target_prices['high']: target_prices['high'] = target_price
if target_price < target_prices['low']: target_prices['low'] = target_price
total += target_price
estimates.append(target_price)
target_prices['avg'] = total / len(estimates) if estimates else 0
try:
target_prices['median'] = median(estimates)
except StatisticsError:
target_prices['avg'] = target_prices['median'] = r['ptConsensus'][0]['priceTarget']
target_prices['high'] = r['ptConsensus'][0]['high']
target_prices['low'] = r['ptConsensus'][0]['low']
target_prices['estimates'] = estimates
target_prices['total_estimates'] = len(estimates)
target_prices['upside'] = 100 * target_prices['high'] / target_prices['price'] - 100
target_prices['downside'] = 100 * target_prices['low'] / target_prices['price'] - 100
if round_values:
target_prices['upside'] = round(target_prices['upside'], 2)
target_prices['downside'] = round(target_prices['downside'], 2)
return target_prices
except json.JSONDecodeError:
raise ValueError(f'No Data Found for ticker "{ticker}"')
def get_target_prices(tickers, errors_as_str=False) -> tuple:
"""
returns: list[dict], list
uses a threadPoolExecutor instead of asyncio
"""
target_prices = []
tickers_not_found = []
with concurrent.futures.ThreadPoolExecutor(max_workers=35) as executor:
future_infos = [executor.submit(get_target_price, ticker) for ticker in tickers]
for future in concurrent.futures.as_completed(future_infos):
try:
result = future.result()
target_prices.append(result)
except ValueError as e:
tickers_not_found.append(str(e) if errors_as_str else e)
return target_prices, tickers_not_found
def sort_by_dividend(tickers):
ticker_infos = get_ticker_infos(tickers)[0]
ticker_infos.sort(key=lambda v: v['dividend_yield'], reverse=True)
return ticker_infos
def sort_by_pe(tickers, output_to_csv='', console_output=True):
"""
Returns the tickers by price-earnings ratio (remove negatives)
:param tickers: iterable
:param output_to_csv:
:param console_output:
:return:
"""
@cmp_to_key
def _pe_sort(left, right):
left, right = left['pe'], right['pe']
# smallest positive to smallest negative
# 0.1 ... 30 ... 0 ... -0.1 ... -100000
if left > 0 and right > 0:
# both are positive
# return number that is smaller
return left - right
elif left <= 0 and right <= 0:
# both are non-positive
# return number that is bigger
return right - left
# one of the pe's is positive and the other isn't
# positive comes before negative
return -1 if left > 0 else 1
ticker_infos = get_ticker_infos(tickers)[0]
ticker_infos.sort(key=_pe_sort, reverse=True)
if console_output:
header = 'TOP 5 (UNDER VALUED) TICKERS BY P/E'
line = '-' * len(header)
print(f'{header}\n{line}')
for i, ticker_info in enumerate(ticker_infos):
if i == 5:
break
ticker = ticker_info['symbol']
pe = ticker_info ['pe']
print(f'{ticker}: {round(pe, 2)}')
if output_to_csv and ticker_infos:
with open(output_to_csv, 'w') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=ticker_infos[0].keys())
writer.writeheader()
for ticker_info in ticker_infos:
writer.writerow(ticker_info)
return ticker_infos
def sort_by_volume(tickers):
ticker_infos = get_ticker_infos(tickers)[0]
ticker_infos.sort(key=lambda v: v['volume'], reverse=True)
return ticker_infos
def sort_by_roa(tickers):
financials = []
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
future_infos = [executor.submit(get_financials, ticker) for ticker in tickers]
for future in concurrent.futures.as_completed(future_infos):
with suppress(ValueError):
financials.append(future.result())
financials.sort(key=lambda v: v['roa'], reverse=True)
return financials
def sort_by_ev_ebitda(tickers):
# TODO: Entreprise Value / EBITDA
pass
def filter_market_cap(tickers, lower_bound, upper_bound, show_progress=False):
ticker_infos = get_ticker_infos(tickers, show_progress=show_progress)[0]
ticker_infos = filter(lambda item: lower_bound <= item['market_cap'] <= upper_bound, ticker_infos)
return sorted(ticker_infos, key=lambda item: item['market_cap'])
def get_index_futures():
resp = make_request(PREMARKET_FUTURES_URL)
soup = BeautifulSoup(resp.text, 'html.parser')
# noinspection PyUnresolvedReferences
quotes = soup.find('tbody').findAll('tr')
return_obj = {}
for quote in quotes:
index_name = quote.find('a').text.upper()
nums = quote.findAll('td')[3:]
price = nums[0].text
change = nums[3].text
percent_change = nums[4].text
return_obj[index_name] = {'name': index_name, 'price': price,
'change': change, 'percent_change': percent_change}
return return_obj
def get_random_stocks(n=1) -> set:
# return n stocks from NASDAQ and NYSE
if n < 1:
n = 1
us_stocks = get_nasdaq_tickers()
us_stocks.update(get_nyse_tickers())
return_stocks = set()
while len(return_stocks) < n:
stock = random.sample(list(us_stocks.keys()), 1)[0]
if not stock.count('.') and not stock.count('^'):
return_stocks.add(stock)
return return_stocks
def find_stock(query):
"""
Returns at most 10 results based on a search query
TODO: return list of dictionaries
"""
results = []
if isinstance(query, str):
query = {part.upper() for part in query.split()}
else:
query = {part.upper() for part in query}
for info in get_tickers('ALL').values():
match, parts_matched = 0, 0
company_name = info['name'].upper()
symbol = info['symbol']
if len(query) == 1 and symbol == clean_ticker(tuple(query)[0]):
match += len(query) ** 2
parts_matched += 1
elif symbol in query or ''.join(query) in symbol:
match += len(symbol)
parts_matched += 1
for part in query:
occurrences = company_name.count(part)
part_factor = occurrences * len(part)
if part_factor:
match += part_factor
parts_matched += occurrences
match /= len(company_name)
if match:
results.append((symbol, info['name'], parts_matched, match))
# sort results by number of parts matched and % matched
results.sort(key=lambda item: (item[2], item[3]), reverse=True)
return results[:12]
def get_trading_halts(days_back=0):
days_back = abs(days_back)
if days_back:
date = datetime.today() - timedelta(days=days_back)
date = date.strftime('%m%d%Y')
url = f'http://www.nasdaqtrader.com/rss.aspx?feed=tradehalts&haltdate={date}'
else:
url = 'http://www.nasdaqtrader.com/rss.aspx?feed=tradehalts'
feed = feedparser.parse(url)
del feed['headers']
halts = []
for halt in feed['entries']:
soup = BeautifulSoup(halt['summary'], 'html.parser')
values = [td.text.strip() for td in soup.find_all('tr')[1].find_all('td')]
halts.append({
'symbol': values[0],
'name': values[1],
'market': {'Q': 'NASDAQ'}.get(values[2], values[2]),
'reason_code': values[3],
'paused_price': values[4],
'halt_date': datetime.strptime(values[5], '%m/%d/%Y'),
'halt_time': values[6],
'resume_date': datetime.strptime(values[7], '%m/%d/%Y'),
'resume_quote_time': values[8],
'resume_trade_time': values[9]
})
return halts
# Options Section
# Enums are used for some calculations
class Option(IntEnum):
CALL = 1
PUT = -1
def get_month_and_year():
date = datetime.today()
month = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUNE',
'JULY', 'AUG', 'SEP', 'DEC'][date.month - 1]
year = date.year
return f'{month} {year}'
@time_cache(60 * 60 * 60 * 24)
def get_risk_free_interest_rate():
"""
e.g. month_and_year = 'FEB 2021'
returns the risk free interest rate:
the average interest rate of US Treasury Bills
throws: RunTimeError if interest rate could not be fetched
"""
us_treasury_api = 'https://api.fiscaldata.treasury.gov/services/api/fiscal_service'
endpoint = f'{us_treasury_api}/v2/accounting/od/avg_interest_rates'
link = f'{endpoint}?page[size]=10000'
r = requests.get(link).json()
last_count = r['meta']['total-count']
for i in range(last_count - 1, 0, -1):
node = r['data'][i]
if node['security_desc'] == 'Treasury Bills':
return float(node['avg_interest_rate_amt']) / 100
raise RuntimeError('Could not get risk free interest rate')
@time_cache(24 * 60 * 60 * 60, 10000)
def get_volatility(stock_ticker, in_depth=False):
"""
Returns the annualized standard deviation return of the stock for the last 365 days
If in_depth, returns a dict
"""
end = datetime.today()
start = end - timedelta(days=365)
data = yf.download(stock_ticker, start=start, end=end, progress=False)
data['return'] = data['Close'] / data['Close'].shift(-1) - 1
daily_volatility = np.std(data['return'])
median_positive_rtn = np.median([rn for rn in data['return'] if rn > 0])
median_negative_rtn = np.median([rn for rn in data['return'] if rn < 0])
median_return = np.median([rn for rn in data['return'] if not np.math.isnan(rn)])
annualized_volatility = daily_volatility * math.sqrt(len(data['return']))
if in_depth:
return {'daily': daily_volatility, 'annualized': annualized_volatility, 'median_return': median_return,
'median_neg_return': median_negative_rtn, 'median_pos_return': median_positive_rtn}
return annualized_volatility
def d1(market_price, strike_price, years_to_expiry, volatility, risk_free, dividend_yield):
block_3 = volatility * math.sqrt(years_to_expiry)
block_1 = math.log(market_price / strike_price)
block_2 = years_to_expiry * \
(risk_free - dividend_yield + volatility ** 2 / 2)
return (block_1 + block_2) / block_3
def csn(y):
"""
returns the Cumulative Standard Normal of y
which is the cumulative distribution function of y with
mean = 0 and standard deviation = 1
"""
return NormalDist().cdf(y)
def snd(y):
"""
returns the Standard Normal Density of y
which is the probability density function of y with
mean = 0 and standard deviation = 1
"""
return NormalDist().pdf(y)
def calc_option_price(market_price, strike_price, days_to_expiry, volatility,
risk_free=None, dividend_yield=0, option_type=Option.CALL):
if risk_free is None:
risk_free = get_risk_free_interest_rate()
years_to_expiry = days_to_expiry / 365
_d1 = option_type * d1(market_price, strike_price,
years_to_expiry, volatility, risk_free, dividend_yield)
_d2 = _d1 - option_type * volatility * math.sqrt(years_to_expiry)
block_1 = market_price * \
math.e ** (-dividend_yield * years_to_expiry) * csn(_d1)
block_2 = strike_price * math.e ** (-risk_free * years_to_expiry) * csn(_d2)
return option_type * (block_1 - block_2)
def calc_option_delta(market_price, strike_price, days_to_expiry, volatility,
risk_free=get_risk_free_interest_rate(), dividend_yield=0, option_type=Option.CALL):
years_to_expiry = days_to_expiry / 365
block_1 = math.e ** (-dividend_yield * years_to_expiry)
_d1 = d1(market_price, strike_price, years_to_expiry,
volatility, risk_free, dividend_yield)
return option_type * block_1 * csn(option_type * _d1)
def calc_option_gamma(market_price, strike_price, days_to_expiry, volatility,
risk_free=get_risk_free_interest_rate(), dividend_yield=0):
years_to_expiry = days_to_expiry / 365
block_1 = math.e ** (-dividend_yield * years_to_expiry)
_d1 = d1(market_price, strike_price, years_to_expiry,
volatility, risk_free, dividend_yield)
return block_1 / (market_price * volatility * math.sqrt(years_to_expiry)) * snd(_d1)
def calc_option_vega(market_price, strike_price, days_to_expiry, volatility,
risk_free=get_risk_free_interest_rate(), dividend_yield=0):
years_to_expiry = days_to_expiry / 365
block_1 = market_price * math.e ** (-dividend_yield * years_to_expiry)
_d1 = d1(market_price, strike_price, years_to_expiry,
volatility, risk_free, dividend_yield)
return block_1 * math.sqrt(years_to_expiry) * snd(_d1)
def calc_option_rho(market_price, strike_price, days_to_expiry, volatility,
risk_free=get_risk_free_interest_rate(), dividend_yield=0, option_type=Option.CALL):
years_to_expiry = days_to_expiry / 365
block_1 = strike_price * math.e ** (-risk_free * years_to_expiry) * years_to_expiry
_d1 = d1(market_price, strike_price, years_to_expiry,
volatility, risk_free, dividend_yield)
_d2 = option_type * (_d1 - volatility * math.sqrt(years_to_expiry))
return option_type * block_1 * csn(_d2)
def calc_option_theta(market_price, strike_price, days_to_expiry, volatility,
risk_free=get_risk_free_interest_rate(), dividend_yield=0, option_type=Option.CALL):
years_to_expiry = days_to_expiry / 365
_d1 = d1(market_price, strike_price, years_to_expiry,
volatility, risk_free, dividend_yield)
block_1 = market_price * math.e ** (-dividend_yield * years_to_expiry) * csn(option_type * _d1)
block_2 = strike_price * math.e ** (-risk_free * years_to_expiry) * risk_free
block_3 = market_price * math.e ** (-dividend_yield * years_to_expiry)
block_3 *= volatility / (2 * math.sqrt(years_to_expiry)) * snd(_d1)
return option_type * (block_1 - block_2) - block_3
def pretty_info(info):
return {
'Ticker': info['ticker'],
'Change %': round(info['percent_change'] * 100, 3),
'First Price': info['start_price'],
'Last Price': info['end_price'],
'Change': info['change'],
# 'Open Volume': info['open_volume'],
# 'Close Volume': info['close_volume'],
# 'Average Volume': info['avg_volume']
}
def print_friendly(results: list, tablefmt='mixed_outline'):
# headers = ['ticker', 'percent_change', 'change', 'start_price', 'end_price', 'open_volume', 'close_volume', 'avg_volume']
print(tabulate((pretty_info(result) for result in results), headers='keys', tablefmt=tablefmt))
def run_tests():
print('Testing clean_ticker')
assert clean_ticker('ac.to') == 'AC.TO'
assert clean_ticker('23ac.to23@#0 ') == 'AC.TO'
print('Getting NASDAQ')
nasdaq_tickers = get_nasdaq_tickers()
assert nasdaq_tickers['AMD']['name'] == 'Advanced Micro Devices Inc.'
print('Getting NYSE')
assert get_nyse_tickers()['V']['name'] == 'VISA INC'
assert get_nyse_tickers()['VZ']['name'] == 'VERIZON COMMUNICATIONS'
print('Getting AMEX')
get_amex_tickers()
print('Getting NYSE ARCA')
assert get_nyse_arca_tickers()['SPY']['name'] == 'SPDR S&P 500 ETF TRUST'
print('Getting TSX')
assert 'SHOP.TO' in get_tsx_tickers()
print('Getting OTC')
assert get_otc_tickers()['HTZGQ']['name'] == 'HERTZ GLOBAL HOLDINGS INC'
print('Getting DOW')
dow_tickers = get_dow_tickers()
assert dow_tickers['AAPL']['name'] == 'Apple Inc.'
print('Getting S&P500')
sp500_tickers = get_sp500_tickers()
assert sp500_tickers['TSLA']['name'] == 'Tesla, Inc.'
print('Getting Russel 2k')
rut2k_tickers = get_russel_2k_tickers()
assert rut2k_tickers['PZZA']['name'] == "Papa John's International Inc."
print('Getting FUTURES')
get_index_futures()
print('Testing get_company_name')
assert get_company_name('NVDA') == 'NVIDIA CORP'
print('Getting 10 Random Stocks')
print(get_random_stocks(10))
print('Testing get ticker info')
real_tickers = ('RTX', 'PLTR', 'OVV.TO', 'SHOP.TO', 'AMD', 'CCIV', 'SPY', 'VOO')
for ticker in real_tickers:
# dividend, non-dividend, ca-dividend, ca-non-dividend, old
get_ticker_info(ticker)
# test invalid ticker
with suppress(ValueError):
get_ticker_info('ZWC')
# test get target prices
print('Testing get target price')
get_target_price('DOC')
with suppress(ValueError):
get_target_price('ZWC')
assert 0 < get_risk_free_interest_rate() < 1
print('Testing find_stock')
pprint(find_stock('entertainment'))
pprint(find_stock('TWLO'))
tickers = {'entertainment', 'Tesla', 'Twitter', 'TWLO', 'Paypal', 'Visa'}
for ticker in real_tickers:
try:
assert find_stock(ticker)
except AssertionError:
print(f'TEST FAILED: find_stock({ticker}')
assert not find_stock('thisshouldfail')
print('Testing get ticker infos')
tickers_info, errors = get_ticker_infos(tickers)
assert tickers_info and not errors
print('Testing get target prices')
tickers = {'Tesla', 'Twitter', 'TWLO', 'Paypal', 'Visa', 'OPEN', 'CRSR', 'PLTR', 'PTON', 'ZM'}
target_prices, errors = get_target_prices(tickers)
assert target_prices and not errors
print('Testing sort tickers by dividend yield')
sort_by_dividend(get_dow_tickers())
sort_by_roa(get_dow_tickers())
print('Testing top movers')
top_movers(market='DOW')
def six_month_movers(stock_group='S&P500', limit=20):
tickers = get_tickers(stock_group, True)
data = get_data(tickers=tickers, period='6mo', interval='1h')
info1 = get_parsed_data(data, tickers=tickers, of='custom', sort_key=lambda x: abs(x['percent_change']))
print_friendly(info1[:limit])
if __name__ == '__main__':
pass
# run_tests()
@eabase
Copy link

eabase commented Sep 25, 2020

Hey man! This look pretty cool. How to use it?

@elibroftw
Copy link
Author

elibroftw commented Sep 27, 2020

Sample usage: @eabase

from investing import *

pprint(get_ticker_info('OVV'))
print(find_stock('Shopify')[0])

market = 'DOW'
tickers = get_tickers(market)
by_div = sort_by_dividend(tickers)
movers = top_movers(tickers=tickers)

requirements.txt

yfinance~=0.1.55
lxml
fuzzywuzzy~=0.18.0
python-Levenshtein
pandas~=1.0.1
requests~=2.22.0
beautifulsoup4~=4.8.1
yahoo_fin
requests_html
numpy
pytz
grequests
feedparser

@eabase
Copy link

eabase commented Sep 29, 2020

Hi @elibroftw,
Thank you. However, I'm running into a bunch of dependency issues. What's your pip list say? And what Py version are you using?

I had to also install: pretty-py3, fuzzywuzzy, python-Levenshtein, yfinance, but I still get an error:

$ ./sample.py
'NoneType' object has no attribute 'find'
Traceback (most recent call last):
  File "./sample.py", line 6, in <module>
    assert get_ticker_info('IVR')['name'] == 'INVESCO MORTGAGE CAPITAL'
  File "/...../investing.py", line 248, in get_ticker_info
    pd_ratio = round(latest_price/dividend, 2)
ZeroDivisionError: float division by zero

@elibroftw
Copy link
Author

elibroftw commented Sep 29, 2020

The error is because the dividend of the stock was 0 (I made the code when the stock was still paying dividends).
As for third-party libraries, you'll need bs4, fuzzywuzzy, pandas, requests, and yfinance.

@elibroftw
Copy link
Author

I updated the code to fix that error.

@eabase
Copy link

eabase commented Sep 29, 2020

Great! Thank you Eliah. 👍

PS. For anyone else running this, be patient as it take a while (~1 minute) before getting any output...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment