Created
December 21, 2020 09:38
-
-
Save h-a-graham/a785063535a4dd8cd663484fb3062592 to your computer and use it in GitHub Desktop.
A function to automate the scraping of EA lidar URL tokens for a list of zipped shp files.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from selenium import webdriver | |
from selenium.webdriver.support.ui import Select | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
# from selenium.webdriver.firefox.options import Options | |
import pandas as pd | |
import math | |
from glob import glob | |
import time | |
import os | |
import re | |
from pathlib import Path | |
from datetime import datetime | |
import warnings | |
def scrapestuff(gecko_exe, zip_list): | |
""" This is the main function. gecko_exe is a path to the gecko diver: download from here: https://github.com/mozilla/geckodriver/releases | |
zip_list is a list filepaths for zipped shape files. My preferred option is to upload 10km OS tiles from here: https://github.com/charlesroper/OSGB_Grids | |
Currently the headless version of this doesn't work but I'm sure this would be easy to set up if required.""" | |
startTime = datetime.now() # start timer | |
link = 'https://environment.data.gov.uk/DefraDataDownload/?Mode=survey' | |
# work_dir = r.wd | |
# search_str = os.path.join(work_dir, 'data/grid_shp_zip/Tile_*.zip') | |
# zip_list = glob(search_str) | |
# zip_list = [str(Path(x)) for x in zip_list] | |
# zip_list = zip_list[:13] # Use this line for testing/debugging | |
# we must chunk up the list to avoid the limit of 10 uploads per session... | |
zip_chunks = chunks(zip_list, 10) | |
# print(browser.title) | |
fail_list = [] | |
dump_list = [] | |
for chunk in zip_chunks: | |
# options = Options() | |
# options.headless = True | |
# options.add_argument("--window-size=1920,1080") | |
# options.add_argument("--headless") | |
# browser = webdriver.Firefox(options=options, executable_path = gecko_exe) | |
browser = webdriver.Firefox(executable_path = gecko_exe) | |
browser.implicitly_wait(20) | |
browser.get(link) | |
WebDriverWait(browser, 60).until(EC.element_to_be_clickable((By.CSS_SELECTOR, '#polygon'))) | |
for file in chunk: | |
# get tile id number NOTE: I named my tiles by index but change this depending on how you named your tiles... | |
tile_n = int(re.search('Tile_(.*).zip', file).group(1)) | |
try: | |
# Wait for loading screen to go and then click on upload button | |
WebDriverWait(browser, 60).until(EC.invisibility_of_element_located((By.CSS_SELECTOR, '#dojox_widget_Standby_0 > div:nth-child(1)'))) | |
WebDriverWait(browser, 60).until(EC.invisibility_of_element_located((By.CSS_SELECTOR, '#dojox_widget_Standby_0 > img:nth-child(2)'))) | |
# WebDriverWait(browser, 60).until(EC.element_to_be_clickable((By.CSS_SELECTOR, '#buttonid'))).click() | |
WebDriverWait(browser, 60).until(EC.invisibility_of_element_located((By.CSS_SELECTOR, '#fileid'))).send_keys(file) # This is the solution! | |
# Wait for loading screen to go and then click 'get available tiles' | |
WebDriverWait(browser, 60).until(EC.invisibility_of_element_located((By.CSS_SELECTOR, '#dojox_widget_Standby_0 > div:nth-child(1)'))) | |
WebDriverWait(browser, 60).until(EC.invisibility_of_element_located((By.CSS_SELECTOR, '#dojox_widget_Standby_0 > img:nth-child(2)'))) | |
WebDriverWait(browser, 60).until(EC.element_to_be_clickable((By.CSS_SELECTOR, '.grid-item-container'))).click() | |
# time.sleep(35) | |
# Wait for loading screen to go and then select DTM | |
WebDriverWait(browser, 60).until(EC.invisibility_of_element_located((By.CSS_SELECTOR, '#dojox_widget_Standby_0 > div:nth-child(1)'))) | |
WebDriverWait(browser, 60).until(EC.invisibility_of_element_located((By.CSS_SELECTOR, '#dojox_widget_Standby_0 > img:nth-child(2)'))) | |
##### WORK TO BE DONE HERE!!!! FIX THIS TO CHECK IF DATA EXISTS... | |
select = Select(browser.find_element_by_css_selector('#productSelect')) | |
# WebDriverWait(browser, 10).until(EC.element_to_be_selected(select.options[0])) | |
options = select.options | |
option_list = [] | |
for index in range(1, len(options)-1): | |
option_list.append(options[index].text) | |
if 'LIDAR Composite DTM' in option_list or 'LIDAR Composite DSM' in option_list: | |
# print('YAY') | |
# time.sleep(1) | |
if 'LIDAR Composite DTM' in option_list: | |
select.select_by_visible_text('LIDAR Composite DTM') | |
time.sleep(1) | |
else: | |
select.select_by_visible_text('LIDAR Composite DSM') | |
time.sleep(1) | |
# down_link = WebDriverWait(browser, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR,'.data-ready-container > a:nth-child(1)')).get_property('href') | |
# get first link for download | |
down_link = browser.find_element_by_css_selector('.data-ready-container > a:nth-child(1)').get_property('href') | |
#retrieve arc object id from url | |
result = re.search('interactive/(.*)/LIDARCOMP', down_link).group(1) | |
#reset upload window | |
browser.find_element_by_css_selector('div.result-options:nth-child(7) > input:nth-child(1)').click() | |
# create pandas dataframe from results and append to list | |
out_vals = [[tile_n, result]] | |
scrape_out = pd.DataFrame(out_vals, columns=['tile_n', 'arc_code']) | |
dump_list.append(scrape_out) | |
else: | |
#reset upload window | |
browser.find_element_by_css_selector('div.result-options:nth-child(7) > input:nth-child(1)').click() | |
out_vals = [[tile_n, 'NO_DTM_COMP']] # temp solution... | |
scrape_out = pd.DataFrame(out_vals, columns=['tile_n', 'arc_code']) | |
dump_list.append(scrape_out) | |
except Exception as e: | |
# Error handling - very general at the moment as I have no idea why errors will be thrown... Store tile number and Error in tuple and append to list... | |
warnings.warn("Error has occurred for Tile {0}".format(tile_n)) | |
error_out = [[tile_n, str(e)]] | |
error_out_pd = pd.DataFrame(error_out, columns=['tile_n', 'error_message']) | |
fail_list.append(error_out_pd) | |
pass | |
browser.quit() | |
# join list of pf dfs to single df | |
try: | |
combine_dfs = pd.concat(dump_list).reset_index(drop=True) | |
except Exception: | |
warnings.warn('No data to join - somethings gone horribly wrong!!!') | |
combine_dfs = [] | |
try: | |
combine_errs = pd.concat(fail_list).reset_index(drop=True) | |
warnings.warn('Errors have occurred - check Error log with .$error_df') | |
except Exception: | |
print('No Errors Occurred - YAY!!') | |
combine_errs = [] | |
endTime = datetime.now() - startTime | |
print('Python Script completed in {0}'.format(endTime)) | |
return combine_dfs, combine_errs | |
def chunks(l, n): | |
n = max(1, n) | |
return (l[i:i+n] for i in range(0, len(l), n)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment