Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save zxdawn/fbcf2d9c5c212ddc510ee50b6683ab12 to your computer and use it in GitHub Desktop.
Save zxdawn/fbcf2d9c5c212ddc510ee50b6683ab12 to your computer and use it in GitHub Desktop.
H8 Downloader
"""
methods to download himawari data from
http://himawari.diasjp.net/expert/original/bin/original-main.cgi
"""
import datetime
import glob
import json
import logging
import os
import time
import shutil
import selenium as sel
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
import sys
import himawari_orders as hwo
# define default logging levels
CRITICAL = 50
ERROR = 40
WARNING = 30
INFO = 20
DEBUG = 10
NOTSET = 0
class Website:
def __init__(self, driver, url, date, period, time_out=5*60):
"""
methods manipulating the elements on the website
:param date: timestamp of the first satellite image to search for
:param driver: web browser driver
:param period: search period in hours
<date + period> defines the upper boundary of the
satellite images that will be included in the
download.
:param time_out: standard maximum waiting time before an timeout
exception will be raised
:param url: website url
"""
self.driver = driver
self.url = url
date_start = date.strftime('%Y-%m-%dT%H:%M')
self.url = ''.join([self.url,
'?span=', str(period),
'&origin=from&datetime=', date_start,
'&timezone=UTC&type=HS&area=FLDK'])
self.time_out = time_out
def login(self, user, max_trial=20):
"""
send login information to url
:param user: login information [username, password]
:param max_trial: maximal number of approaches to login
"""
self.driver.get(self.url)
check = self.check_login_available()
count = 0
while check is False and count <= max_trial:
print('website not available ... try again in 10 seconds')
time.sleep(10)
self.driver.get(self.url)
check = self.check_login_available()
count += 1
if count > max_trial:
raise sel.common.exceptions.TimeoutException('website not available')
else:
login = self.driver.find_element_by_id('username')
login.clear()
login.send_keys(user[0])
pword = self.driver.find_element_by_id('password')
pword.clear()
pword.send_keys(user[1])
pword.send_keys(Keys.RETURN)
time.sleep(10)
login_check = self.check_login_available()
if check is True and login_check is True:
raise ValueError('Login failed: please check username and password '
'for correctness')
def check_website(self):
"""
check whether accessed website is correct
"""
xpath = '//div[@id="navigator"]/ul/li[@class="current"]/a'
wait = WebDriverWait(self.driver, self.time_out)
nav = wait.until(EC.presence_of_element_located((By.XPATH, xpath)))
check_opt = nav.get_attribute('href')
if check_opt == self.url:
check = True
else:
check = False
return check
def check_login_available(self):
"""
check whether login website is available
"""
xpath = '//head/title'
wait = WebDriverWait(self.driver, self.time_out)
site = wait.until(EC.presence_of_element_located((By.XPATH, xpath)))
check_opt = site.text
if check_opt == 'Authentication for DIAS Systems':
check = True
else:
check = False
return check
def download(self):
"""
start download procedure
"""
xpath = '//input[@id="download-selected-files"]'
self._click_element(xpath)
def get_download_list(self):
"""
get list of files that shall be downloaded
"""
xpath = '//tbody[@id="selected-files-body"]/*'
num_files = self.driver.find_elements_by_xpath(xpath)
num_files = len(num_files)
file_list = []
for ii in list(range(1, num_files+1)):
file_info = []
for jj in list(range(2, 5)):
xpath = ''.join(['//tbody[@id="selected-files-body"]/tr[',
str(ii), ']/td[', str(jj), ']'])
element = self.get_element(xpath, element_num=0)
file_info.append(str(element.text))
file_list.append(file_info)
file_list.sort()
return file_list
def get_download_size(self):
"""
get total size of all selected downloads
"""
xpath = '//span[@id="selected-files-summary"]'
element = self.get_element(xpath)
download_size = element.text
download_size = download_size.split(',')[1]
download_size = download_size.split('byte')[0]
download_size = int(download_size)
return download_size
def get_element(self, xpath, element_num=None):
"""
search and return a website element specified by its xpath
:param element_num: number of wanted element. Needed to distinct
between elements that have the same xpath.
:param xpath: xpath of the website element
"""
# split xpath into single nodes, because selenium
# find_element_by_xpath functions only work on a single node (level)
xpath_list = xpath.split('//')
if xpath_list[0] == '':
del xpath_list[0]
# search in all nodes defined by xpath for the element
for ii in list(range(len(xpath_list))):
if ii == 0:
xpath = ''.join(['//', xpath_list[ii]])
element_list = self.driver.find_elements_by_xpath(xpath)
else:
subelement_list = []
xpath = ''.join(['.//', xpath_list[ii]])
for jj in list(range(len(element_list))):
if jj == 0:
subelement_list \
= element_list[jj].find_elements_by_xpath(xpath)
else:
subelement_list = subelement_list.__add__(
element_list[jj].find_elements_by_xpath(xpath))
element_list = subelement_list
del subelement_list
# check whether there is a unique element
if len(element_list) < 1:
print('element not found')
elif len(element_list) > 1:
if element_num is None:
print('to many elements match xpath pattern')
else:
element = element_list[element_num]
return element
else:
element = element_list[0]
return element
def _click_element(self, xpath, **kwargs):
"""
search and select elements by clicking
:param xpath: element given by its xpath
"""
element = self.get_element(xpath, **kwargs)
element.click()
def select_bands(self, bands):
"""
select satellite bands for downloading
:param bands: list of channels/bands to download
"""
if bands == list(range(1, 17)):
self.select_bands_all()
else:
# deselect chosen bands
xpath = ''.join(['//div[@id="controller"]',
'//input[@name="band" and @value="Clear All"]'])
self._click_element(xpath)
# select desired bands
for ii in bands:
band = ''.join(['B', str('%02d' % ii)])
xpath = ''.join(['//input[@class="band" and @value="',
band, '"]'])
self._click_element(xpath)
def select_bands_all(self):
"""
select all available satellite bands for downloading
"""
xpath = ''.join(['//div[@id="controller"]',
'//input[@name="band" and @value="Select All"]'])
self._click_element(xpath)
def select2download_all(self):
"""
select all files to download
"""
xpath = ''.join(['//input[@name="action=select,type=HS,area=FLDK"]'])
element = self.get_element(xpath)
element.click()
def wait(self, xpath, max_time_out, min_time_out=30, sleep_time=0.5):
"""
wait until a website has been fully loaded by checking whether a website
element vanishes
:param max_time_out: maximal waiting time
:param min_time_out: minimal waiting time (time in between finished
loading of the website and first occurrence of
the element)
:param sleep_time: waiting time in between single tests of element
presence
:param xpath: xpath of the element to check
"""
time.sleep(min_time_out)
dt = datetime.timedelta(seconds=max_time_out)
tcur = datetime.datetime.today()
tend = tcur + dt
while tcur <= tend:
try:
element = self.driver.find_element_by_xpath(xpath)
if element.is_displayed():
pass
except sel.common.exceptions.NoSuchElementException:
break
finally:
time.sleep(sleep_time)
tcur = datetime.datetime.today()
if tcur > tend:
msg = ' '.join(['website server overallocated'])
raise sel.common.exceptions.TimeoutException(msg)
class Browser:
def __init__(self, download_dir):
"""
Methods to handle a special firefox profile used to download
himawari satellite images. The profile won't be listed in the profile
manager of firefox web browser
:param download_dir: directory in which himawari data will be
downloaded
"""
self.download_dir = os.path.normpath(download_dir)
self.himawari = os.path.join(os.path.dirname(__file__),
'profile_himawari')
def get_profile(self):
"""
return profile path to use with selenium webdriver
"""
# check, whether a special profile directory already exists
path = glob.glob(self.himawari)
if path == list():
raise NotImplementedError
else:
profile = sel.webdriver.firefox.firefox_profile.FirefoxProfile()
source \
= os.path.abspath(os.path.join(self.himawari, 'handlers.json'))
destination = os.path.join(profile.path, 'handlers.json')
shutil.copyfile(source, destination)
profile.set_preference('browser.download.dir', self.download_dir)
profile.set_preference('browser.download.folderList', 2)
profile.update_preferences()
return profile
def set_tar_handler(self, profile):
"""
add automatic for saving .tar-files
:param profile: firefox profile that shall be manipulated
"""
handler_file = os.path.join(self.himawari, 'handlers.json')
fid = open(handler_file, 'r')
data = json.load(fid)
fid.close()
# check, whether there is already an automatic reaction on
# downloading tar-files
handler_keys = list(data['mimeTypes'].keys())
new_key = 'application/x-tar'
if new_key not in handler_keys:
# create input to save tar-file automatically
tar_handler = {'action': 0, 'extensions': ['tar']}
tar_handler = {new_key: tar_handler}
data['mimeTypes'].update(tar_handler)
# write updated firefox handler
handler_file = os.path.join(profile.path, 'handlers.json')
fid = open(handler_file, 'w')
json.dump(data, fid)
fid.close()
def check_download_finished(download_dir, max_time_out=120*60, sleep_time=10):
"""
check whether the download of an archive is finished
:param download_dir: directory, where the archive was downloaded into
:param max_time_out: maximal time, in which the download shall be
finished, given in seconds
:param sleep_time: amount of seconds the programm waits in between
single checks
"""
tar_pattern = os.path.join(download_dir, 'files.tar')
tcur = datetime.datetime.today()
dt = datetime.timedelta(seconds=max_time_out)
tend = tcur + dt
finished = False
while finished is False and tcur <= tend:
tar_list = glob.glob(tar_pattern)
if tar_list == list():
time.sleep(sleep_time)
tcur = datetime.datetime.today()
else:
time.sleep(10)
finished = True
if finished is False:
msg = ' '.join(['download takes to much time'])
raise sel.common.exceptions.TimeoutException(msg)
return finished
def log(job_id, stime, msg, lvl, log_lvl):
"""
create log file with failed jobs
"""
fmt = '%(asctime)-25s job id: %(job_id)6d %(tstart)-16s %(message)s'
logging.basicConfig(format=fmt, stream=sys.stdout)
info = {'job_id': job_id, 'tstart': stime}
logger = logging.getLogger('h8dl')
logger.setLevel(level=log_lvl)
logger.log(level=lvl, msg=msg, extra=info)
def set_download_dir(path):
"""
get an available temporary directory for the current downloading job
:param path: folder, which shall contain the job directory
"""
pattern = os.path.join(path, 'job??')
job_list = glob.glob(pattern)
ii = 1
search_pattern = ''
while ii < 100:
job = ''.join(['job', str('%02d' % ii)])
search_pattern = os.path.join(path, job)
if search_pattern not in job_list:
break
else:
ii += 1
if ii >= 100:
msg = 'number of temporary directories exceed 99 - job canceled'
raise IOError(msg)
return search_pattern
def downloader_run(download_path, dbfile, period, enddate, bands, user, url,
log_lvl, debug):
"""
run and control a single download process
:param bands: channels/bands that shall be downloaded
:param dbfile: order database file
:param debug: flag switching debug mode on/off
:param download_path: download directory
:param enddate: timestamp of the data where to stop download
procedure
:param log_lvl: level of messages to recognize for logging procedure
:param period: search period in hours
<date + period> defines the upper boundary of the
satellite images that will be included in the
download.
:param user: login information [username, password]
:param url: website url
"""
dt = datetime.timedelta(hours=period)
try:
download_path = set_download_dir(download_path)
except BaseException as ex:
job_id = 0
tcur = 'unknown'
log(job_id=job_id, stime=tcur, msg=str(ex), lvl=ERROR,
log_lvl=log_lvl)
raise
# get failed jobs from database and restart the oldest one;
# alternatively start a new job
db = hwo.connect2db(dbfile=dbfile)
failed = db.get_orders(status=hwo.FAILED, sort_by=['tstart'])
if len(failed) > 0:
# Select least recent failed order
tcur = failed[0]['tstart']
job_id = failed[0]['id']
db.update_order(order_id=job_id, status=hwo.DOWNLOADING)
db.commit_changes()
else:
# add new order
mro = db.get_most_recent_order(enddate=enddate)
if mro is None:
db.close()
msg = 'Getting most recent order failed: Cannot find any ' \
'matching database entries for given timespan.'
raise ValueError(msg)
else:
tcur = mro['tend']
tend = tcur + dt
job_id = db.insert_order(tstart=tcur, tend=tend)
db.commit_changes()
if os.path.exists(download_path) is False:
os.makedirs(download_path)
himawari_profile = Browser(download_path).get_profile()
# start Firefox and login
try:
driver = sel.webdriver.Firefox(firefox_profile=himawari_profile)
except BaseException as ex:
del user
if debug is False:
shutil.rmtree(download_path)
db.update_order(order_id=job_id, status=hwo.FAILED)
db.commit_changes()
db.close()
# log errors
log(job_id=job_id, stime=tcur, msg=str(ex), lvl=ERROR,
log_lvl=log_lvl)
raise
try:
log(job_id=job_id, stime=tcur, msg='start website login',
lvl=DEBUG, log_lvl=log_lvl)
website = Website(driver=driver, url=url, date=tcur, period=period)
website.login(user)
# check and fill search form
log(job_id=job_id, stime=tcur, msg='fill search form',
lvl=DEBUG, log_lvl=log_lvl)
website.wait(xpath='//div[@class="loadmask-msg"]',
max_time_out=period*3*60, # dynamic timeout see below
min_time_out=30,
sleep_time=10)
website.select_bands(bands=bands)
# prepare download and download selected files
log(job_id=job_id, stime=tcur, msg='download selected list',
lvl=DEBUG, log_lvl=log_lvl)
website.select2download_all()
ref_file_list = website.get_download_list()
website.download()
# set maximal timeout depending on the download data size (period);
# results from trial and error reveal the larger the period the more
# time the server needs to provide the data
log(job_id=job_id, stime=tcur, msg='check download end',
lvl=DEBUG, log_lvl=log_lvl)
time_out = period*20*60 # in seconds
fine = check_download_finished(download_path, max_time_out=time_out)
if fine is True:
db.update_order(order_id=job_id, status=hwo.FINISHED)
db.commit_changes()
except BaseException as ex:
if debug is False:
shutil.rmtree(download_path)
db.update_order(order_id=job_id, status=hwo.FAILED)
db.commit_changes()
# log errors
log(job_id=job_id, stime=tcur, msg=str(ex), lvl=ERROR,
log_lvl=log_lvl)
raise
finally:
del user
driver.quit()
db.close()
return job_id, ref_file_list
def get_reference(dbfile, period, bands, user, url, log_lvl):
"""
get a new reference list of an unchecked, finished download
:param bands: channels/bands that shall be downloaded
:param dbfile: order database file
:param log_lvl: level of messages that will be logged
:param period: search period in hours
<date + period> defines the upper boundary of the
satellite images that will be included in the
download.
:param user: login information [username, password]
:param url: website url
"""
# get unchecked jobs from database and restart the oldest one;
db = hwo.connect2db(dbfile=dbfile)
unchecked = db.get_orders(status=hwo.FINISHED, sort_by=['tstart'])
if len(unchecked) > 0:
# Select least recent failed order
tcur = unchecked[0]['tstart']
job_id = unchecked[0]['id']
# start Firefox and login
try:
driver = sel.webdriver.Firefox()
except BaseException as ex:
del user
db.close()
# log errors
log(job_id=job_id, stime=tcur, msg=str(ex), lvl=ERROR,
log_lvl=log_lvl)
raise
try:
log(job_id=job_id, stime=tcur, msg='start website login',
lvl=DEBUG, log_lvl=log_lvl)
website = Website(driver=driver, url=url, date=tcur, period=period)
website.login(user)
# check and fill search form
log(job_id=job_id, stime=tcur, msg='fill search form',
lvl=DEBUG, log_lvl=log_lvl)
website.wait('//div[@class="loadmask-msg"]', period*3*60, 30, 10)
website.select_bands(bands=bands)
# prepare download and download selected files
log(job_id=job_id, stime=tcur, msg='create reference list',
lvl=DEBUG, log_lvl=log_lvl)
website.select2download_all()
ref_file_list = website.get_download_list()
except BaseException as ex:
# log errors
log(job_id=job_id, stime=tcur, msg=str(ex), lvl=ERROR,
log_lvl=log_lvl)
raise
finally:
del user
driver.quit()
db.close()
else:
db.close()
job_id = None
ref_file_list = None
return job_id, ref_file_list
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment