Skip to content

Instantly share code, notes, and snippets.

@h-a-graham
Created December 21, 2020 09:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save h-a-graham/a785063535a4dd8cd663484fb3062592 to your computer and use it in GitHub Desktop.
Save h-a-graham/a785063535a4dd8cd663484fb3062592 to your computer and use it in GitHub Desktop.
A function to automate the scraping of EA lidar URL tokens for a list of zipped shp files.
from selenium import webdriver
from selenium.webdriver.support.ui import Select
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# from selenium.webdriver.firefox.options import Options
import pandas as pd
import math
from glob import glob
import time
import os
import re
from pathlib import Path
from datetime import datetime
import warnings
def scrapestuff(gecko_exe, zip_list):
""" This is the main function. gecko_exe is a path to the gecko diver: download from here: https://github.com/mozilla/geckodriver/releases
zip_list is a list filepaths for zipped shape files. My preferred option is to upload 10km OS tiles from here: https://github.com/charlesroper/OSGB_Grids
Currently the headless version of this doesn't work but I'm sure this would be easy to set up if required."""
startTime = datetime.now() # start timer
link = 'https://environment.data.gov.uk/DefraDataDownload/?Mode=survey'
# work_dir = r.wd
# search_str = os.path.join(work_dir, 'data/grid_shp_zip/Tile_*.zip')
# zip_list = glob(search_str)
# zip_list = [str(Path(x)) for x in zip_list]
# zip_list = zip_list[:13] # Use this line for testing/debugging
# we must chunk up the list to avoid the limit of 10 uploads per session...
zip_chunks = chunks(zip_list, 10)
# print(browser.title)
fail_list = []
dump_list = []
for chunk in zip_chunks:
# options = Options()
# options.headless = True
# options.add_argument("--window-size=1920,1080")
# options.add_argument("--headless")
# browser = webdriver.Firefox(options=options, executable_path = gecko_exe)
browser = webdriver.Firefox(executable_path = gecko_exe)
browser.implicitly_wait(20)
browser.get(link)
WebDriverWait(browser, 60).until(EC.element_to_be_clickable((By.CSS_SELECTOR, '#polygon')))
for file in chunk:
# get tile id number NOTE: I named my tiles by index but change this depending on how you named your tiles...
tile_n = int(re.search('Tile_(.*).zip', file).group(1))
try:
# Wait for loading screen to go and then click on upload button
WebDriverWait(browser, 60).until(EC.invisibility_of_element_located((By.CSS_SELECTOR, '#dojox_widget_Standby_0 > div:nth-child(1)')))
WebDriverWait(browser, 60).until(EC.invisibility_of_element_located((By.CSS_SELECTOR, '#dojox_widget_Standby_0 > img:nth-child(2)')))
# WebDriverWait(browser, 60).until(EC.element_to_be_clickable((By.CSS_SELECTOR, '#buttonid'))).click()
WebDriverWait(browser, 60).until(EC.invisibility_of_element_located((By.CSS_SELECTOR, '#fileid'))).send_keys(file) # This is the solution!
# Wait for loading screen to go and then click 'get available tiles'
WebDriverWait(browser, 60).until(EC.invisibility_of_element_located((By.CSS_SELECTOR, '#dojox_widget_Standby_0 > div:nth-child(1)')))
WebDriverWait(browser, 60).until(EC.invisibility_of_element_located((By.CSS_SELECTOR, '#dojox_widget_Standby_0 > img:nth-child(2)')))
WebDriverWait(browser, 60).until(EC.element_to_be_clickable((By.CSS_SELECTOR, '.grid-item-container'))).click()
# time.sleep(35)
# Wait for loading screen to go and then select DTM
WebDriverWait(browser, 60).until(EC.invisibility_of_element_located((By.CSS_SELECTOR, '#dojox_widget_Standby_0 > div:nth-child(1)')))
WebDriverWait(browser, 60).until(EC.invisibility_of_element_located((By.CSS_SELECTOR, '#dojox_widget_Standby_0 > img:nth-child(2)')))
##### WORK TO BE DONE HERE!!!! FIX THIS TO CHECK IF DATA EXISTS...
select = Select(browser.find_element_by_css_selector('#productSelect'))
# WebDriverWait(browser, 10).until(EC.element_to_be_selected(select.options[0]))
options = select.options
option_list = []
for index in range(1, len(options)-1):
option_list.append(options[index].text)
if 'LIDAR Composite DTM' in option_list or 'LIDAR Composite DSM' in option_list:
# print('YAY')
# time.sleep(1)
if 'LIDAR Composite DTM' in option_list:
select.select_by_visible_text('LIDAR Composite DTM')
time.sleep(1)
else:
select.select_by_visible_text('LIDAR Composite DSM')
time.sleep(1)
# down_link = WebDriverWait(browser, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR,'.data-ready-container > a:nth-child(1)')).get_property('href')
# get first link for download
down_link = browser.find_element_by_css_selector('.data-ready-container > a:nth-child(1)').get_property('href')
#retrieve arc object id from url
result = re.search('interactive/(.*)/LIDARCOMP', down_link).group(1)
#reset upload window
browser.find_element_by_css_selector('div.result-options:nth-child(7) > input:nth-child(1)').click()
# create pandas dataframe from results and append to list
out_vals = [[tile_n, result]]
scrape_out = pd.DataFrame(out_vals, columns=['tile_n', 'arc_code'])
dump_list.append(scrape_out)
else:
#reset upload window
browser.find_element_by_css_selector('div.result-options:nth-child(7) > input:nth-child(1)').click()
out_vals = [[tile_n, 'NO_DTM_COMP']] # temp solution...
scrape_out = pd.DataFrame(out_vals, columns=['tile_n', 'arc_code'])
dump_list.append(scrape_out)
except Exception as e:
# Error handling - very general at the moment as I have no idea why errors will be thrown... Store tile number and Error in tuple and append to list...
warnings.warn("Error has occurred for Tile {0}".format(tile_n))
error_out = [[tile_n, str(e)]]
error_out_pd = pd.DataFrame(error_out, columns=['tile_n', 'error_message'])
fail_list.append(error_out_pd)
pass
browser.quit()
# join list of pf dfs to single df
try:
combine_dfs = pd.concat(dump_list).reset_index(drop=True)
except Exception:
warnings.warn('No data to join - somethings gone horribly wrong!!!')
combine_dfs = []
try:
combine_errs = pd.concat(fail_list).reset_index(drop=True)
warnings.warn('Errors have occurred - check Error log with .$error_df')
except Exception:
print('No Errors Occurred - YAY!!')
combine_errs = []
endTime = datetime.now() - startTime
print('Python Script completed in {0}'.format(endTime))
return combine_dfs, combine_errs
def chunks(l, n):
n = max(1, n)
return (l[i:i+n] for i in range(0, len(l), n))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment