Last active
February 5, 2022 01:57
-
-
Save shaneshifflett/4549111 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from BeautifulSoup import BeautifulSoup | |
from datetime import datetime | |
import pdb | |
''' | |
To use: | |
num_pages_to_follow = 85 | |
scraper = EmmaScraper('CAP APPREC') | |
for idx in xrange(0, num_pages_to_follow): | |
page = scraper.next_page() | |
with open(settings.HTML_PATH + 'lists/%s_%s.html' % (idx, datetime.now().date()), 'wb') as f: | |
f.write(page.prettify()) | |
results = scraper.parse_results_view(page) | |
#do what you will to store results locally in your own format | |
''' | |
class EmmaScraper(): | |
def __init__(self, search_term): | |
self.p_session = requests.session() | |
self.base_url = 'http://emma.msrb.org' | |
self.search_url = 'http://emma.msrb.org/Search/Search.aspx?hlt=search' | |
self.disclaimer_url = 'http://emma.msrb.org/Disclaimer.aspx?hlt=search' | |
self.search_term = search_term | |
self.headers = { | |
'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_8) AppleWebKit/537.4 (KHTML, like Gecko) Chrome/22.0.1229.79 Safari/537.4' | |
} | |
self.query_form = self.login() | |
self.page = 0 | |
def parse_results_view(self, soup): | |
rows = None | |
try: | |
tbl = soup.findAll('table', {'id': 'ctl00_mainContentArea_SearchResultsControl1_searchResultsGridView'})[0] | |
rows = tbl.findAll('tr')[1:]#skip header | |
except Exception as e: | |
print e | |
import pdb;pdb.set_trace | |
results = list() | |
if rows == None: | |
vs = self.get_viewstate() | |
self.login() | |
self.set_viewstate(vs) | |
print 'logged back in' | |
tbl = soup.findAll('table', {'id': 'ctl00_mainContentArea_SearchResultsControl1_searchResultsGridView'})[0] | |
rows = tbl.findAll('tr')[1:]#skip header | |
for row in rows: | |
try: | |
cols = row.findAll('td') | |
obj = {} | |
issuer = cols[0].findAll('a')[0] | |
obj['issuer_url'] = issuer['href'] | |
obj['issuer'] = issuer.text | |
issue_d = cols[1].findAll('a')[0] | |
obj['issue_description'] = issue_d.text | |
obj['issue_description_url'] = issue_d['href'] | |
obj['cusip'] = issuer['href'].split('cusip=')[1] | |
obj['details_id'] = issue_d['href'].split('id=')[1] | |
obj['dated'] = datetime.strptime(cols[2].text, '%m/%d/%Y').date() if cols[2].text != ' ' else None | |
except Exception as e: | |
print e | |
#import pdb;pdb.set_trace() | |
results.append(obj) | |
return results | |
def get_details(self, url, soup=None): | |
if soup == None: | |
resp = self.p_session.get(url=url, headers=self.headers) | |
soup = BeautifulSoup(resp.content) | |
result = {} | |
result['soup'] = soup | |
result['securities'] = list() | |
''' | |
try: | |
result['closing_date'] = soup.findAll('span', {'id': 'ctl00_mainContentArea_closingDateDataLabel'})[0].text | |
result['closing_date'] = datetime.strptime(result['closing_date'], '%m/%d/%Y') | |
except Exception as e: | |
print e | |
''' | |
try: | |
tbl = soup.findAll('table', {'id': 'ctl00_mainContentArea_cusipListTable'})[0] | |
rows = tbl.findAll('tr')[1:]#skip header | |
for row in rows: | |
cols = row.findAll('td') | |
obj = {} | |
try: | |
obj['cusip_url'] = cols[0].findAll('input')[0]['src'] | |
except Exception as e: | |
print 'no cusip url found e=%s details_url=%s' % (e, url) | |
#pdb.set_trace() | |
try: | |
obj['maturity_date'] = datetime.strptime(cols[1].text, '%m/%d/%Y') | |
obj['interest_rate'] = float(cols[2].text) if cols[2].text != ' ' else -1 | |
obj['principal_amt'] = float(''.join(cols[3].text.split(','))) if cols[3].text != ' ' else -1 | |
obj['initial_offering_price'] = float(cols[4].text) if cols[4].text != ' ' else -1 | |
obj['description'] = cols[5].text | |
result['securities'].append(obj) | |
except Exception as e: | |
print 'no data found for security e=%s details_url=%s' % (e, url) | |
#pdb.set_trace() | |
except Exception as e: | |
print e | |
#pdb.set_trace() | |
try: | |
link_tbl = soup.findAll('table', {'id': 'ctl00_mainContentArea_submissionListTable'})[0] | |
links_to_parse = link_tbl.findAll('a') | |
links = list() | |
for link in links_to_parse: | |
link_result = {} | |
#link = soup.findAll('a', {'id': 'ctl00_mainContentArea_submissionListTable_ctl02_documentHyperLink'})[0] | |
link_result['document_url'] = self.base_url + link['href'] | |
link_result['filename'] = link['href'].split('/')[1] | |
links.append(link_result) | |
result['link_results'] = links | |
except Exception as e: | |
print 'no official statement found for url=%s e=%s' % (url, e) | |
return result | |
def next_page(self): | |
self.query_form['ctl00$mainContentArea$RatingsSearchControl1$ratingEqualityOpFitch'] = 'EQGT' | |
self.query_form['ctl00$mainContentArea$RatingsSearchControl1$ratingEqualityOpSnp'] = 'EQGT' | |
self.query_form['ctl00$mainContentArea$AdvancedSearchControl1$issueNameTextBox'] = self.search_term | |
self.query_form['ctl00$Masthead$searchTextBox'] = 'Enter CUSIP or Name' | |
self.query_form['ctl00$mainContentArea$RatingsSearchControl1$cusipTextBox'] = '' | |
self.query_form['ctl00$mainContentArea$RatingsSearchControl1$stateDropDown'] = '' | |
self.query_form['ctl00$mainContentArea$RatingsSearchControl1$ratingValueFitch'] = '' | |
self.query_form['ctl00$mainContentArea$RatingsSearchControl1$ratingValueSnp'] = '' | |
self.query_form['ctl00$mainContentArea$AdvancedSearchControl1$cusipTextBox'] = '' | |
self.query_form['ctl00$mainContentArea$AdvancedSearchControl1$issuerNameTextBox'] = '' | |
self.query_form['ctl00$mainContentArea$AdvancedSearchControl1$stateDropDown'] = '' | |
self.query_form['ctl00$mainContentArea$AdvancedSearchControl1$maturityDateBeginTextBox'] = '' | |
self.query_form['ctl00$mainContentArea$AdvancedSearchControl1$approxIssueDateBeginTextBox'] = '' | |
self.query_form['ctl00$mainContentArea$AdvancedSearchControl1$couponRateBeginTextBox'] = '' | |
self.query_form['ctl00$mainContentArea$AdvancedSearchControl1$maturityDateEndTextBox'] = '' | |
self.query_form['ctl00$mainContentArea$AdvancedSearchControl1$approxIssueDateEndTextBox'] = '' | |
self.query_form['ctl00$mainContentArea$AdvancedSearchControl1$couponRateEndTextBox'] = '' | |
self.query_form['ctl00$mainContentArea$AdvancedSearchControl1$searchButton1.x']='48' | |
self.query_form['ctl00$mainContentArea$AdvancedSearchControl1$searchButton1.y']='10' | |
if self.page > 0: | |
self.query_form['__EVENTTARGET'] = 'ctl00$mainContentArea$SearchResultsControl1$nextButtonBottom' | |
resp = self.p_session.post(url=self.search_url, data=self.query_form, headers=self.headers) | |
soup = BeautifulSoup(resp.content) | |
self.query_form = self.get_page_vars(soup) | |
self.page += 1 | |
return soup | |
def set_viewstate(self, viewstate): | |
self.query_form['__VIEWSTATE'] = viewstate | |
def get_viewstate(self): | |
try: | |
return self.query_form['__VIEWSTATE'] | |
except: | |
return '' | |
def get_page(self): | |
return self.page | |
def set_page(self, page): | |
self.page = page | |
def get_page_vars(self, soup): | |
try: | |
evnt_validation = soup.findAll('input', {'id': '__EVENTVALIDATION'})[0]['value'] | |
except: | |
evnt_validation = '' | |
try: | |
evnt_target = soup.findAll('input', {'id': '__EVENTTARGET'})[0]['value'] | |
except: | |
evnt_target = '' | |
try: | |
evnt_arg = soup.findAll('input', {'id': '__EVENTARGUMENT'})[0]['value'] | |
except: | |
evnt_arg = '' | |
try: | |
evnt_viewstate = soup.findAll('input', {'id': '__VIEWSTATE'})[0]['value'] | |
except: | |
evnt_viewstate | |
validation_form = { | |
'__EVENTTARGET': evnt_target, | |
'__EVENTARGUMENT': evnt_arg, | |
'__VIEWSTATE': evnt_viewstate, | |
'__EVENTVALIDATION': evnt_validation, | |
'ctl00$Masthead$searchTextBox':'Enter CUSIP or Name', | |
'ctl00$mainContentArea$disclaimerContent$yesButton.x':'25', | |
'ctl00$mainContentArea$disclaimerContent$yesButton.y':'12' | |
} | |
return validation_form | |
def login(self): | |
#make initial request | |
resp = self.p_session.get(url=self.search_url, headers=self.headers) | |
soup = BeautifulSoup(resp.content) | |
#scrape out validation vars for form | |
validation_form = self.get_page_vars(soup) | |
resp = self.p_session.post(url=self.disclaimer_url, data=validation_form, headers=self.headers) | |
#do a get to the search page so we're ready to go | |
resp = self.p_session.get(url=self.search_url, headers=self.headers) | |
#get new viewstate | |
soup = BeautifulSoup(resp.content) | |
validation_form = self.get_page_vars(soup) | |
return validation_form | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I'm new to Python. Let's suppose I have a CUSIP or list of CUSIPS, can I use this program to download trading data?