Skip to content

Instantly share code, notes, and snippets.

@kizernis
Last active May 22, 2019 00:13
Show Gist options
  • Save kizernis/d49da5d98467a9662da845babc77b6f5 to your computer and use it in GitHub Desktop.
Save kizernis/d49da5d98467a9662da845babc77b6f5 to your computer and use it in GitHub Desktop.
# Get data from a complex webapp using a headless browser
settings = {}
with open('settings_scrap.txt') as f:
lines = f.read().splitlines()
for line in lines[1:9+1]:
x, y = line.split('=', 1)
settings[x.strip().replace(' ', '')] = y.strip()
settings['firefox'] = 'true' == settings['firefox'].lower()
settings['headless'] = 'true' == settings['headless'].lower()
settings['additional_pause'] = int(settings['additional_pause']) / 1000
settings['ram_threshold'] = float(settings['ram_threshold'])
settings['logging'] = 'true' == settings['logging'].lower()
dates = lines[12:]
dates = list(date.strip().replace(':', '.').replace('_', '.').replace('-', '.').replace(' ', '') for date in dates if date.strip() != '')
assert len(dates) == 2
dates = list('{:0>2}.{:0>2}.{:0>4}'.format(*(date.split('.'))) for date in dates)
dates.sort(reverse=True)
# TODO: let it have a head some day
settings['headless'] = True
with open('data_codes.txt') as f:
data_codes = list(s.strip().replace('/', '.').replace(' ', '') for s in f.read().splitlines() if s.strip() != '')
import os
import time
import csv
import gc
import psutil
from tqdm import tqdm
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
import selenium.webdriver.support.expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import TimeoutException, NoSuchElementException
if settings['firefox']:
o = webdriver.FirefoxOptions()
if settings['headless']:
o.add_argument('--headless')
else:
o = webdriver.ChromeOptions()
o.add_argument('--log-level=3')
o.add_argument('--disable-infobars')
if settings['headless']:
o.add_argument('--headless')
o.add_argument('--disable-gpu')
else:
o.add_argument('--start-maximized')
months = ('January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December')
months_short_numbers = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12}
csv_column_names = ('Date', 'Time', 'Root', 'Option', 'Qty', 'Price', 'Exchange', 'Condition', 'Market', 'Trade IV', 'Underlying Price')
def touch(fname, times=None):
with open(fname, 'a'):
os.utime(fname, times)
def wait_for_new_grid_rows(old_grid_rows):
while True:
time.sleep(0.05)
grid_rows = driver.find_elements_by_xpath('//div[@data-qa="gridRow"]')
if grid_rows != old_grid_rows:
break
try:
if len(driver.find_elements_by_xpath('//div[@class="EmptyGridView__centered__1IvU4"]/span[text()="No data available."]')):
return None
except TimeoutException:
continue
while True:
if len(grid_rows) > 0:
break
try:
if len(driver.find_elements_by_xpath('//div[@class="EmptyGridView__centered__1IvU4"]/span[text()="No data available."]')):
return None
except TimeoutException:
continue
time.sleep(0.05)
grid_rows = driver.find_elements_by_xpath('//div[@data-qa="gridRow"]')
return grid_rows
xpath_input_data_code = '//input[@data-qa="symbolNameInput"]'
xpath_label_data_code = '//div[@class="DailySnapshot__symbolName__3Nqp2"]'
xpath_button_back = '//button[@data-qa="dailySnapshotPrevDay"]'
xpath_picked_date = '//span[@class="DateChange__customDayPickerInput__3ac_D"]'
def enter_data_code():
global grid_rows, label_data_code, input_data_code
label_data_code = driver.find_element_by_xpath(xpath_label_data_code)
if label_data_code.text.lower() != data_code.lower():
input_data_code = driver.find_element_by_xpath(xpath_input_data_code)
input_data_code.send_keys(data_code)
# Handle invalid data codes
e = WebDriverWait(driver, 60).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="SymbolSelector__autocomplete__EF_vQ"]')))
if len(e.find_elements_by_xpath('.//span[@data-qa="matchingSymbolsList-noMatches"]')):
input_data_code.send_keys(Keys.ESCAPE)
touch(f'{settings["output_path"]}/{data_code} is INVALID.csv')
return False
WebDriverWait(driver, 60).until(EC.visibility_of_element_located((By.XPATH, f'//mark[@class="SymbolSelector__searching__1234y " and translate(text(),"ABCDEFGHIJKLMNOPQRSTUVWXYZ","abcdefghijklmnopqrstuvwxyz")="{data_code.lower()}"]')))
input_data_code.submit()
grid_rows = wait_for_new_grid_rows(grid_rows)
# Sometimes it doesn't say "No matches" but the data code is still invalid
if label_data_code.text.lower() != data_code.lower():
touch(f'{settings["output_path"]}/{data_code} is INVALID.csv')
return False
return True
def new_webdriver_session():
global driver, grid_rows, input_data_code, label_data_code, button_back, picked_date
if settings['logging']:
global time1, time2
if driver is not None:
if settings['logging']:
time2 = datetime.now(); log_file.write(f'{time2 - time1} Restarting session: {datetime.now():%H-%M-%S}\n'); time1 = time2
driver.quit()
driver = None
gc.collect()
if settings['firefox']:
driver = webdriver.Firefox(options=o, service_log_path=os.devnull)
if not settings['headless']:
driver.maximize_window()
else:
driver = webdriver.Chrome(options=o)
if settings['headless']:
driver.set_window_size(1440, 1080)
# Login
driver.get(settings['url'])
driver.find_element_by_xpath('//input[@name="username"]').send_keys(settings['login'])
driver.find_element_by_xpath('//input[@name="password"]').send_keys(settings['password'], Keys.RETURN)
# Select "Historical Snapshots" tab
e = WebDriverWait(driver, 60).until(EC.visibility_of_element_located((By.XPATH, '//li[@class="DragTabs__tab__3a1AS DragTabs__tabSelected__2D4o4"]')))
if e.text != 'Historical Snapshots':
driver.find_element_by_xpath('//span[@class="TabbedView__tabTitle__3I2Fo" and text()="Historical Snapshots"]').click()
grid_rows = wait_for_new_grid_rows(None)
return enter_data_code()
def enter_first_date(date_current_str):
global grid_rows, picked_date
month_number, day, year = (int(x) for x in date_current_str.split('.'))
month = months[month_number - 1]
month_short = list(months_short_numbers.keys())[month_number - 1]
picked_date = driver.find_element_by_xpath(xpath_picked_date)
if picked_date.text != f'{day:02d} {month_short} {year}':
picked_date.click()
date_picker_month = WebDriverWait(driver, 60).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="react-datepicker__month"]')))
driver.find_element_by_xpath('//select[@class="react-datepicker__year-select"]').find_element_by_xpath(f'.//option[text()="{year}"]').click()
driver.find_element_by_xpath('//select[@class="react-datepicker__month-select"]').find_element_by_xpath(f'.//option[text()="{month}"]').click()
date_picker_month.find_element_by_xpath(f'.//div[@class="react-datepicker__day" and text()="{day}"]').click()
grid_rows = wait_for_new_grid_rows(grid_rows)
if settings['logging']:
log_file = open(f'log {datetime.now():%Y-%m-%d %H-%M-%S}.txt', 'w', encoding='utf-8')
time1 = datetime.now()
driver = None
date_last = datetime.strptime(dates[1], '%m.%d.%Y')
days_total = (datetime.strptime(dates[0], '%m.%d.%Y') - date_last).days + 1
for data_code in data_codes:
if driver is None:
data_code_is_valid = new_webdriver_session()
progress_bar = tqdm(total=len(data_codes) * days_total)
elif psutil.virtual_memory().percent > settings['ram_threshold']:
data_code_is_valid = new_webdriver_session()
else:
data_code_is_valid = enter_data_code()
if not data_code_is_valid:
for _ in range(0, days_total):
progress_bar.update()
continue
if settings['logging']:
time2 = datetime.now(); log_file.write(f'{time2 - time1} Data code: {data_code}\n'); time1 = time2
output_file = open(f'{settings["output_path"]}/{data_code}.csv', 'w', newline='', encoding='utf-8')
writer = csv.DictWriter(output_file, fieldnames=csv_column_names)
writer.writeheader()
if settings['logging']:
time2 = datetime.now(); log_file.write(f'{time2 - time1} {dates[0]}\n'); time1 = time2
date_current_str = dates[0]
enter_first_date(date_current_str)
date_current = datetime.strptime(date_current_str, '%m.%d.%Y')
date_next = date_current - timedelta(days=1)
while True:
if date_current_str is None:
# Select previous date
button_back = driver.find_element_by_xpath(xpath_button_back)
button_back.click()
grid_rows = wait_for_new_grid_rows(grid_rows)
picked_date = driver.find_element_by_xpath(xpath_picked_date)
day_str, month_short, year_str = picked_date.text.split()
date_current_str = f'{months_short_numbers[month_short]:02d}.{day_str:0>2}.{year_str:0>4}'
date_current = datetime.strptime(date_current_str, '%m.%d.%Y')
for _ in range(0, (date_next - date_current).days):
progress_bar.update()
date_next = date_current - timedelta(days=1)
if settings['logging']:
time2 = datetime.now(); log_file.write(f'{time2 - time1} {date_current_str} {psutil.virtual_memory().percent}%\n'); time1 = time2
progress_bar.update()
if grid_rows is not None:
# Save data
soup = BeautifulSoup(driver.page_source, 'lxml')
for grid_row in soup.find_all('div', attrs={'data-qa': 'gridRow'}):
row = {'Date': date_current_str}
for i, cell in enumerate(grid_row.find_all('div', class_='react-grid-Cell__value'), start=1):
row[csv_column_names[i]] = cell.get_text().strip()
writer.writerow(row)
soup.decompose()
soup = None
gc.collect()
try:
button_back = driver.find_element_by_xpath(xpath_button_back)
except NoSuchElementException:
button_back = WebDriverWait(driver, 60).until(EC.visibility_of_element_located((By.XPATH, xpath_button_back)))
if date_current <= date_last or button_back.get_attribute('disabled') is not None:
break
# Restart WebDriver session if too much RAM consumed
if psutil.virtual_memory().percent > settings['ram_threshold']:
new_webdriver_session()
enter_first_date(date_current_str)
date_current_str = None
output_file.close()
output_file = None
writer = None
gc.collect()
time.sleep(settings['additional_pause'])
if settings['logging']:
log_file.close()
progress_bar.close()
driver.quit()
# input("\nPress Enter to finish...")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment