Skip to content

Instantly share code, notes, and snippets.

@kizernis
Last active December 4, 2018 11:33
Show Gist options
  • Save kizernis/5c6df7b3c8a371937b9277d8356def4c to your computer and use it in GitHub Desktop.
Save kizernis/5c6df7b3c8a371937b9277d8356def4c to your computer and use it in GitHub Desktop.
settings = {}
with open('settings_csv.txt') as f:
lines = f.read().splitlines()
for line in lines[1:7+1]:
x, y = line.split('=', 1)
settings[x.strip().replace(' ', '')] = y.strip()
settings['firefox'] = 'true' == settings['firefox'].lower()
settings['headless'] = 'true' == settings['headless'].lower()
settings['additional_pause'] = int(settings['additional_pause']) / 1000
times = lines[10:]
times = list(time.strip().replace('.', ':').replace('_', ':').replace('-', ':').replace(' ', '') for time in times if time.strip() != '')
assert len(times) % 2 == 0
times = list(':'.join(f'{s:0>2}' for s in time.split(':')) for time in times)
with open('data_codes.txt') as f:
data_codes = list(s.strip().replace('/', '.').replace(' ', '') for s in f.read().splitlines() if s.strip() != '')
import os
import time
from datetime import datetime
from glob import glob
from tqdm import tqdm
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
import selenium.webdriver.support.expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import NoSuchElementException
if settings['firefox']:
o = webdriver.FirefoxOptions()
else:
o = webdriver.ChromeOptions()
if settings['headless']:
o.add_argument('--headless')
if settings['firefox']:
p = webdriver.FirefoxProfile()
p.set_preference('browser.download.folderList', 2)
p.set_preference('browser.download.manager.showWhenStarting', False)
p.set_preference('browser.helperApps.neverAsk.saveToDisk', 'text/csv')
p.set_preference('browser.download.dir', settings['output_path'])
driver = webdriver.Firefox(options=o, firefox_profile=p, service_log_path=os.devnull)
if not settings['headless']:
driver.maximize_window()
else:
o.add_experimental_option('prefs', {'download.default_directory': settings['output_path'], 'download.prompt_for_download': False, 'download.directory_upgrade': True, 'safebrowsing.disable_download_protection': True})
o.add_argument('--log-level=3')
o.add_argument('--disable-infobars')
if settings['headless']:
o.add_argument('--disable-gpu')
else:
o.add_argument('--start-maximized')
driver = webdriver.Chrome(options=o)
if settings['headless']:
driver.command_executor._commands['send_command'] = ('POST', '/session/$sessionId/chromium/send_command')
driver.execute('send_command', {'cmd': 'Page.setDownloadBehavior', 'params': {'behavior': 'allow', 'downloadPath': settings['output_path']}})
if settings['headless']:
driver.set_window_size(1440, 900)
# Login
driver.get(settings['url'])
driver.find_element_by_xpath('//input[@name="username"]').send_keys(settings['login'])
driver.find_element_by_xpath('//input[@name="password"]').send_keys(settings['password'], Keys.RETURN)
# Select "Trade Tape" tab
e = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//li[@class="DragTabs__tab__3a1AS DragTabs__tabSelected__2D4o4"]')))
if e.text != 'Trade Tape':
driver.find_element_by_xpath('//span[@class="TabbedView__tabTitle__3I2Fo" and text()="Trade Tape"]').click()
def wait_for_new_grid_rows(old_grid_rows):
while True:
time.sleep(0.05)
grid_rows = driver.find_elements_by_xpath('//div[@data-qa="tradeTapeRow"]')
if grid_rows != old_grid_rows:
break
if len(driver.find_elements_by_xpath('//div[@class="EmptyGridView__centered__1IvU4" and text()="No data available."]')):
return None
while True:
if len(grid_rows) > 0:
break
if len(driver.find_elements_by_xpath('//div[@class="EmptyGridView__centered__1IvU4" and text()="No data available."]')):
return None
time.sleep(0.05)
grid_rows = driver.find_elements_by_xpath('//div[@data-qa="tradeTapeRow"]')
return grid_rows
# Select number of rows
grid_rows = wait_for_new_grid_rows(None)
e = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-placeholder" and contains(@title, "00 rows")]')))
if e.text == '10000 rows':
e.click()
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-option" and @data-label="1000 rows"]'))).click()
e = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-placeholder" and @title="1000 rows"]')))
e.click()
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-option" and @data-label="10000 rows"]'))).click()
grid_rows = wait_for_new_grid_rows(grid_rows)
# Select default view
e = driver.find_element_by_xpath('//span[@class="QuickViewsDropdown__listItemName__2nHlD"]')
if e.text != 'Default View':
e.click()
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="QuickViewsDropdown__listItem__1NmP0" and text()="Default View"]'))).click()
grid_rows = wait_for_new_grid_rows(grid_rows)
def touch(fname, times=None):
with open(fname, 'a'):
os.utime(fname, times)
# We can't have any "trades_*.csv" files in the output folder
for erroneous_file_path in glob(f'{settings["output_path"]}/trades_*.csv'):
os.unlink(erroneous_file_path)
script_launch_date = os.getenv('csv_script_launch_date', f'{datetime.now():%m.%d.%Y}')
progress_bar = tqdm(total=int(len(times) / 2 * len(data_codes)))
for data_code in data_codes:
try:
# Remove all filters
driver.find_element_by_xpath('//div[@data-qa="tradeTapeToolbarResetFilters"]').click()
grid_rows = wait_for_new_grid_rows(grid_rows)
except NoSuchElementException:
pass
# Enter data code
input_data_code = driver.find_element_by_xpath('//input[@data-qa="symbolNameInput"]')
label_data_code = driver.find_element_by_xpath('//div[@class="FilterPreview__value__3bDRJ"]')
if label_data_code.text.lower() != f'== {data_code.lower()}':
input_data_code.send_keys(data_code)
# Handle invalid data codes
e = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="SymbolSelector__autocomplete__EF_vQ"]')))
if len(e.find_elements_by_xpath('.//span[@data-qa="matchingSymbolsList-noMatches"]')):
input_data_code.send_keys(Keys.ESCAPE)
touch(f'{settings["output_path"]}/{data_code} {script_launch_date} INVALID SYMBOL.csv')
for _ in range(0, len(times) // 2):
progress_bar.update()
continue
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, f'//mark[@class="SymbolSelector__searching__1234y " and translate(text(),"ABCDEFGHIJKLMNOPQRSTUVWXYZ","abcdefghijklmnopqrstuvwxyz")="{data_code.lower()}"]')))
input_data_code.submit()
grid_rows = wait_for_new_grid_rows(grid_rows)
# Sometimes it doesn't say "No matches" but the data code is still invalid
if label_data_code.text.lower() != f'== {data_code.lower()}':
touch(f'{settings["output_path"]}/{data_code} {script_launch_date} INVALID SYMBOL.csv')
for _ in range(0, len(times) // 2):
progress_bar.update()
continue
if grid_rows is None:
touch(f'{settings["output_path"]}/{data_code} {script_launch_date} (no filters).csv')
for _ in range(0, len(times) // 2):
progress_bar.update()
continue
# Choose time filter
for i in range(0, len(times), 2):
progress_bar.update()
output_file_path = f'{settings["output_path"]}/{data_code} {script_launch_date} {times[i].replace(":", ".")}-{times[i + 1].replace(":", ".")}.csv'
# Delete the output file if it already exists
if os.path.isfile(output_file_path):
os.unlink(output_file_path)
driver.find_element_by_xpath('//div[@data-qa="TradeTimeColumnFilter"]').click()
if i == 0:
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@title="No filter" and text()="No filter"]'))).click()
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-option" and text()="in"]'))).click()
input_field_min = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//input[@data-qa="filterMinValue"]')))
input_field_max = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//input[@data-qa="filterMaxValue"]')))
input_field_min.clear()
input_field_min.send_keys('0' + times[i].replace(':', ''))
input_field_max.clear()
input_field_max.send_keys('0' + times[i + 1].replace(':', ''))
driver.find_element_by_xpath('//button[@data-qa="Apply"]').click()
grid_rows = wait_for_new_grid_rows(grid_rows)
if grid_rows is None:
touch(output_file_path)
continue
# Export data to CSV
driver.find_element_by_xpath('//span[@class="DropdownButton-placeholder" and text()="Export"]').click()
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="DropdownButton-option" and text()="to CSV"]'))).click()
while True:
default_files_list = glob(f'{settings["output_path"]}/trades_*.csv')
if len(default_files_list):
assert len(default_files_list) == 1
os.rename(default_files_list[0], output_file_path)
break
time.sleep(0.05)
time.sleep(settings['additional_pause'])
progress_bar.close()
driver.quit()
# input("\nPress Enter to finish...")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment