Skip to content

Instantly share code, notes, and snippets.

@Mizzlr
Created September 15, 2021 11:05
Show Gist options
  • Save Mizzlr/6fa9a399eb7f51d40222bb3021137de7 to your computer and use it in GitHub Desktop.
Save Mizzlr/6fa9a399eb7f51d40222bb3021137de7 to your computer and use it in GitHub Desktop.
# from selenium import webdriver
from seleniumwire import webdriver
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.chrome.options import Options
import time
import os
import calendar
import traceback
import json
from tqdm import tqdm
import random
from fabulous.color import bold, green, red
MONTHS = list(calendar.month_abbr)
DOWNLOAD_DIR = os.path.expanduser('~/Downloads/selenium-dataset/')
REQUESTED = False
SEEN_ALL_REPORTS_JS = False
def make_driver():
global REQUESTED
global SEEN_ALL_REPORTS_JS
options = Options()
options.binary_location = '/Applications/Brave Browser.app/Contents/MacOS/Brave Browser'
# options.binary_location = '/Applications/Google Chrome.app/Contents/MacOS/Google Chrome'
chromePrefs = {
"profile.default_content_settings.popups": 0,
"download.prompt_for_download": False,
'devtools.open_docked': True,
"download.default_directory": DOWNLOAD_DIR,
}
options.add_experimental_option("prefs", chromePrefs)
options.add_argument("disable-popup-blocking")
options.add_argument("--auto-open-devtools-for-tabs")
# options.add_argument('--start-fullscreen')
# options.add_argument('--host-resolver-rules=MAP c.go-mpulse.net 192.168.0.120')
# options.add_argument('--host-resolver-rules=MAP s.go-mpulse.net 192.168.0.121')
def request_interceptor(request):
global SEEN_ALL_REPORTS_JS
print(bold(green('GET')), request.url)
if any(x in str(request.url) for x in ['go-mpulse.net', 'google-analytics.com', 'googletagmanager.com/gtm.js', 'akam/11/']): #, 'google-analytics.com',
# 'doubleclick.net', 'akam/11/', 'googletagmanager.com/gtm.js']):
print(red('aborting ... '), request.url)
request.abort()
# if 'all-reports' in request.url:
# print(bold(red('Deleting cookie for all-reports endpoint')),
# request.headers.get('Cookie', None), request.url)
# del request.headers['Cookie']
# if not SEEN_ALL_REPORTS_JS and 'ak_bmsc=' in request.headers.get('Cookie', ''):
# print(bold(red('Deleting cookie for all-reports endpoint')),
# request.headers.get('Cookie', None), request.url)
# del request.headers['Cookie']
# if 'all-reports.js' in request.url:
# SEEN_ALL_REPORTS_JS = True
# if 'allMarketStatus' in request.url:
# del request.headers['Cookie']
elif 'https://www.nseindia.com/api/reports?archives=' in str(request.url):
REQUESTED = True
# del request.headers['Cookie']
def response_interceptor(request, response): # A response interceptor takes two args
if 'https://www.nseindia.com/api/reports?archives=' in str(request.url):
REQUESTED = False
if response.status_code != 200:
print(red(' >> FAILED'), response.status_code, request.url)
driver = webdriver.Chrome(options=options)
driver.request_interceptor = request_interceptor
driver.response_interceptor = response_interceptor
return driver
def pick_date(day, month, desired_year, driver):
dp_button = driver.find_element_by_xpath('//*[@id="cr_equity_archives"]/div/div[1]/div[2]/div/div/div[2]/span/button')
time.sleep(2)
dp_button.click()
cal = driver.find_element_by_xpath('/html/body/div[@role="calendar"]')
while True:
period = cal.find_element_by_xpath('//*/div[@role="period"]')
month_year = str(period.get_attribute('innerHTML')).strip()
current_month_index = MONTHS.index(month_year[:3])
desired_month_index = MONTHS.index(month)
current_year = int(month_year[-4:])
print(f'{desired_year=}, {current_year=}, {desired_month_index=}, {current_month_index=}')
if desired_year > current_year or (desired_year == current_year and desired_month_index > current_month_index):
# move right
left = cal.find_element_by_class_name("fa-chevron-right")
left.click()
period = cal.find_element_by_xpath('//*/div[@role="period"]')
print('moved right', period.get_attribute('innerHTML'))
# time.sleep(0.1)
elif desired_year < current_year or (desired_year == current_year and desired_month_index < current_month_index):
# move right
right = cal.find_element_by_class_name("fa-chevron-left")
right.click()
period = cal.find_element_by_xpath('//*/div[@role="period"]')
print('moved left', period.get_attribute('innerHTML'))
# time.sleep(0.1)
else:
assert desired_year == current_year and desired_month_index == current_month_index
day = cal.find_element_by_xpath('//*/td[@day="' + str(day) + '"]')
day.click()
print(' selectedday after click', cal.get_attribute('selectedday'))
time.sleep(2)
break
def sorted_bhav_copy_files():
files = os.listdir(os.path.expanduser('~/bhavcopy-dataset/basic/'))
dates = []
file_map = {}
for date in tqdm(os.listdir(os.path.expanduser('~/bhavcopy-dataset/basic/')), desc='Downloading Bhavcopy PR'):
day, month, year = date[2:4], date[4:7], date[7:11]
month_number = str(MONTHS.index(month))
if len(month_number) == 1:
month_number = '0' + month_number
dates.append(f'{year}{month_number}{day}')
file_map[dates[-1]] = date
dates = reversed(sorted(dates))
files = [file_map[k] for k in dates]
return files
def run_driver(driver, files):
try:
driver.get('https://www.nseindia.com/all-reports#cr_equity_archives')
time.sleep(10)
filtered_files = []
for date in files:
day, month, year = int(date[2:4]), date[4:7], int(date[7:11])
month_number = str(MONTHS.index(month))
if len(month_number) == 1:
month_number = '0' + month_number
already_downloaded = f'PR{date[2:4]}{month_number}{date[9:11]}.zip'
print('Checking for file', os.path.join(DOWNLOAD_DIR, already_downloaded))
if os.path.isfile(os.path.join(DOWNLOAD_DIR, already_downloaded)):
print(f'File {already_downloaded} already downloaded, skipping it ...')
continue
filtered_files.append(date)
print('ALL FILES:', len(files))
print('FILTERED :', len(filtered_files))
# random.shuffle(filtered_files)
for date in tqdm(filtered_files, desc='Downloading Bhavcopy PR'):
day, month, year = int(date[2:4]), date[4:7], int(date[7:11])
month_number = str(MONTHS.index(month))
if len(month_number) == 1:
month_number = '0' + month_number
already_downloaded = f'PR{date[2:4]}{month_number}{date[9:11]}.zip'
print('Checking for file', os.path.join(DOWNLOAD_DIR, already_downloaded))
if os.path.isfile(os.path.join(DOWNLOAD_DIR, already_downloaded)):
print(f'File {already_downloaded} already downloaded, skipping it ...')
continue
already_downloaded = f'PR{date[2:4]}{month_number}{date[9:11]} (1).zip'
print('Checking for file', os.path.join(DOWNLOAD_DIR, already_downloaded))
if os.path.isfile(os.path.join(DOWNLOAD_DIR, already_downloaded)):
print(f'File {already_downloaded} already downloaded, skipping it ...')
continue
print(f'Downloading file {already_downloaded}, ...')
# continue
# for day in [11, 12, 13]:
# while REQUESTED:
# print('Waiting for download ...')
# time.sleep(1)
pick_date(day, month, year, driver)
def scroll(by, steps=10):
driver.execute_script('window.scrollBy(0,' + str(by) + ');')
return
# step = by // steps
# for _ in range(0, by, step):
# driver.execute_script('window.scrollBy(0,' + str(step) + ');')
# # time.sleep(0.2)
scroll(500)
time.sleep(2)
checkbox = driver.find_element_by_xpath('//*[@id="cr_equity_archives"]/div/div[3]/div[11]/div/div/label')
checkbox.click()
time.sleep(2)
scroll(-500)
time.sleep(2)
download_button = driver.find_element_by_xpath('//*[@id="cr_equity_archives"]/div/div[3]/div[11]/div/div/span')
print('clicking download button')
download_button.click()
driver.implicitly_wait(10)
scroll(-500)
print('Completed downloading all the files.')
time.sleep(1000)
except:
traceback.print_exc()
finally:
time.sleep(1000)
driver.close()
if __name__ == "__main__":
files = sorted_bhav_copy_files()
# print('Sorted dates:', files[:100])
time.sleep(10)
while True:
try:
driver = make_driver()
run_driver(driver, files) # files[1960 + 50:len(files) - 650])
except KeyboardInterrupt:
traceback.print_exc()
exit(1)
except Exception:
print('Ignoring exception, continuing anyways ...')
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment