Forked from sfinkens/gist:6853a5084eac032f21d2102d93e34ccb
Created
February 18, 2020 09:14
-
-
Save zxdawn/fbcf2d9c5c212ddc510ee50b6683ab12 to your computer and use it in GitHub Desktop.
H8 Downloader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
methods to download himawari data from | |
http://himawari.diasjp.net/expert/original/bin/original-main.cgi | |
""" | |
import datetime | |
import glob | |
import json | |
import logging | |
import os | |
import time | |
import shutil | |
import selenium as sel | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.common.keys import Keys | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.webdriver.support.ui import WebDriverWait | |
import sys | |
import himawari_orders as hwo | |
# define default logging levels | |
CRITICAL = 50 | |
ERROR = 40 | |
WARNING = 30 | |
INFO = 20 | |
DEBUG = 10 | |
NOTSET = 0 | |
class Website: | |
def __init__(self, driver, url, date, period, time_out=5*60): | |
""" | |
methods manipulating the elements on the website | |
:param date: timestamp of the first satellite image to search for | |
:param driver: web browser driver | |
:param period: search period in hours | |
<date + period> defines the upper boundary of the | |
satellite images that will be included in the | |
download. | |
:param time_out: standard maximum waiting time before an timeout | |
exception will be raised | |
:param url: website url | |
""" | |
self.driver = driver | |
self.url = url | |
date_start = date.strftime('%Y-%m-%dT%H:%M') | |
self.url = ''.join([self.url, | |
'?span=', str(period), | |
'&origin=from&datetime=', date_start, | |
'&timezone=UTC&type=HS&area=FLDK']) | |
self.time_out = time_out | |
def login(self, user, max_trial=20): | |
""" | |
send login information to url | |
:param user: login information [username, password] | |
:param max_trial: maximal number of approaches to login | |
""" | |
self.driver.get(self.url) | |
check = self.check_login_available() | |
count = 0 | |
while check is False and count <= max_trial: | |
print('website not available ... try again in 10 seconds') | |
time.sleep(10) | |
self.driver.get(self.url) | |
check = self.check_login_available() | |
count += 1 | |
if count > max_trial: | |
raise sel.common.exceptions.TimeoutException('website not available') | |
else: | |
login = self.driver.find_element_by_id('username') | |
login.clear() | |
login.send_keys(user[0]) | |
pword = self.driver.find_element_by_id('password') | |
pword.clear() | |
pword.send_keys(user[1]) | |
pword.send_keys(Keys.RETURN) | |
time.sleep(10) | |
login_check = self.check_login_available() | |
if check is True and login_check is True: | |
raise ValueError('Login failed: please check username and password ' | |
'for correctness') | |
def check_website(self): | |
""" | |
check whether accessed website is correct | |
""" | |
xpath = '//div[@id="navigator"]/ul/li[@class="current"]/a' | |
wait = WebDriverWait(self.driver, self.time_out) | |
nav = wait.until(EC.presence_of_element_located((By.XPATH, xpath))) | |
check_opt = nav.get_attribute('href') | |
if check_opt == self.url: | |
check = True | |
else: | |
check = False | |
return check | |
def check_login_available(self): | |
""" | |
check whether login website is available | |
""" | |
xpath = '//head/title' | |
wait = WebDriverWait(self.driver, self.time_out) | |
site = wait.until(EC.presence_of_element_located((By.XPATH, xpath))) | |
check_opt = site.text | |
if check_opt == 'Authentication for DIAS Systems': | |
check = True | |
else: | |
check = False | |
return check | |
def download(self): | |
""" | |
start download procedure | |
""" | |
xpath = '//input[@id="download-selected-files"]' | |
self._click_element(xpath) | |
def get_download_list(self): | |
""" | |
get list of files that shall be downloaded | |
""" | |
xpath = '//tbody[@id="selected-files-body"]/*' | |
num_files = self.driver.find_elements_by_xpath(xpath) | |
num_files = len(num_files) | |
file_list = [] | |
for ii in list(range(1, num_files+1)): | |
file_info = [] | |
for jj in list(range(2, 5)): | |
xpath = ''.join(['//tbody[@id="selected-files-body"]/tr[', | |
str(ii), ']/td[', str(jj), ']']) | |
element = self.get_element(xpath, element_num=0) | |
file_info.append(str(element.text)) | |
file_list.append(file_info) | |
file_list.sort() | |
return file_list | |
def get_download_size(self): | |
""" | |
get total size of all selected downloads | |
""" | |
xpath = '//span[@id="selected-files-summary"]' | |
element = self.get_element(xpath) | |
download_size = element.text | |
download_size = download_size.split(',')[1] | |
download_size = download_size.split('byte')[0] | |
download_size = int(download_size) | |
return download_size | |
def get_element(self, xpath, element_num=None): | |
""" | |
search and return a website element specified by its xpath | |
:param element_num: number of wanted element. Needed to distinct | |
between elements that have the same xpath. | |
:param xpath: xpath of the website element | |
""" | |
# split xpath into single nodes, because selenium | |
# find_element_by_xpath functions only work on a single node (level) | |
xpath_list = xpath.split('//') | |
if xpath_list[0] == '': | |
del xpath_list[0] | |
# search in all nodes defined by xpath for the element | |
for ii in list(range(len(xpath_list))): | |
if ii == 0: | |
xpath = ''.join(['//', xpath_list[ii]]) | |
element_list = self.driver.find_elements_by_xpath(xpath) | |
else: | |
subelement_list = [] | |
xpath = ''.join(['.//', xpath_list[ii]]) | |
for jj in list(range(len(element_list))): | |
if jj == 0: | |
subelement_list \ | |
= element_list[jj].find_elements_by_xpath(xpath) | |
else: | |
subelement_list = subelement_list.__add__( | |
element_list[jj].find_elements_by_xpath(xpath)) | |
element_list = subelement_list | |
del subelement_list | |
# check whether there is a unique element | |
if len(element_list) < 1: | |
print('element not found') | |
elif len(element_list) > 1: | |
if element_num is None: | |
print('to many elements match xpath pattern') | |
else: | |
element = element_list[element_num] | |
return element | |
else: | |
element = element_list[0] | |
return element | |
def _click_element(self, xpath, **kwargs): | |
""" | |
search and select elements by clicking | |
:param xpath: element given by its xpath | |
""" | |
element = self.get_element(xpath, **kwargs) | |
element.click() | |
def select_bands(self, bands): | |
""" | |
select satellite bands for downloading | |
:param bands: list of channels/bands to download | |
""" | |
if bands == list(range(1, 17)): | |
self.select_bands_all() | |
else: | |
# deselect chosen bands | |
xpath = ''.join(['//div[@id="controller"]', | |
'//input[@name="band" and @value="Clear All"]']) | |
self._click_element(xpath) | |
# select desired bands | |
for ii in bands: | |
band = ''.join(['B', str('%02d' % ii)]) | |
xpath = ''.join(['//input[@class="band" and @value="', | |
band, '"]']) | |
self._click_element(xpath) | |
def select_bands_all(self): | |
""" | |
select all available satellite bands for downloading | |
""" | |
xpath = ''.join(['//div[@id="controller"]', | |
'//input[@name="band" and @value="Select All"]']) | |
self._click_element(xpath) | |
def select2download_all(self): | |
""" | |
select all files to download | |
""" | |
xpath = ''.join(['//input[@name="action=select,type=HS,area=FLDK"]']) | |
element = self.get_element(xpath) | |
element.click() | |
def wait(self, xpath, max_time_out, min_time_out=30, sleep_time=0.5): | |
""" | |
wait until a website has been fully loaded by checking whether a website | |
element vanishes | |
:param max_time_out: maximal waiting time | |
:param min_time_out: minimal waiting time (time in between finished | |
loading of the website and first occurrence of | |
the element) | |
:param sleep_time: waiting time in between single tests of element | |
presence | |
:param xpath: xpath of the element to check | |
""" | |
time.sleep(min_time_out) | |
dt = datetime.timedelta(seconds=max_time_out) | |
tcur = datetime.datetime.today() | |
tend = tcur + dt | |
while tcur <= tend: | |
try: | |
element = self.driver.find_element_by_xpath(xpath) | |
if element.is_displayed(): | |
pass | |
except sel.common.exceptions.NoSuchElementException: | |
break | |
finally: | |
time.sleep(sleep_time) | |
tcur = datetime.datetime.today() | |
if tcur > tend: | |
msg = ' '.join(['website server overallocated']) | |
raise sel.common.exceptions.TimeoutException(msg) | |
class Browser: | |
def __init__(self, download_dir): | |
""" | |
Methods to handle a special firefox profile used to download | |
himawari satellite images. The profile won't be listed in the profile | |
manager of firefox web browser | |
:param download_dir: directory in which himawari data will be | |
downloaded | |
""" | |
self.download_dir = os.path.normpath(download_dir) | |
self.himawari = os.path.join(os.path.dirname(__file__), | |
'profile_himawari') | |
def get_profile(self): | |
""" | |
return profile path to use with selenium webdriver | |
""" | |
# check, whether a special profile directory already exists | |
path = glob.glob(self.himawari) | |
if path == list(): | |
raise NotImplementedError | |
else: | |
profile = sel.webdriver.firefox.firefox_profile.FirefoxProfile() | |
source \ | |
= os.path.abspath(os.path.join(self.himawari, 'handlers.json')) | |
destination = os.path.join(profile.path, 'handlers.json') | |
shutil.copyfile(source, destination) | |
profile.set_preference('browser.download.dir', self.download_dir) | |
profile.set_preference('browser.download.folderList', 2) | |
profile.update_preferences() | |
return profile | |
def set_tar_handler(self, profile): | |
""" | |
add automatic for saving .tar-files | |
:param profile: firefox profile that shall be manipulated | |
""" | |
handler_file = os.path.join(self.himawari, 'handlers.json') | |
fid = open(handler_file, 'r') | |
data = json.load(fid) | |
fid.close() | |
# check, whether there is already an automatic reaction on | |
# downloading tar-files | |
handler_keys = list(data['mimeTypes'].keys()) | |
new_key = 'application/x-tar' | |
if new_key not in handler_keys: | |
# create input to save tar-file automatically | |
tar_handler = {'action': 0, 'extensions': ['tar']} | |
tar_handler = {new_key: tar_handler} | |
data['mimeTypes'].update(tar_handler) | |
# write updated firefox handler | |
handler_file = os.path.join(profile.path, 'handlers.json') | |
fid = open(handler_file, 'w') | |
json.dump(data, fid) | |
fid.close() | |
def check_download_finished(download_dir, max_time_out=120*60, sleep_time=10): | |
""" | |
check whether the download of an archive is finished | |
:param download_dir: directory, where the archive was downloaded into | |
:param max_time_out: maximal time, in which the download shall be | |
finished, given in seconds | |
:param sleep_time: amount of seconds the programm waits in between | |
single checks | |
""" | |
tar_pattern = os.path.join(download_dir, 'files.tar') | |
tcur = datetime.datetime.today() | |
dt = datetime.timedelta(seconds=max_time_out) | |
tend = tcur + dt | |
finished = False | |
while finished is False and tcur <= tend: | |
tar_list = glob.glob(tar_pattern) | |
if tar_list == list(): | |
time.sleep(sleep_time) | |
tcur = datetime.datetime.today() | |
else: | |
time.sleep(10) | |
finished = True | |
if finished is False: | |
msg = ' '.join(['download takes to much time']) | |
raise sel.common.exceptions.TimeoutException(msg) | |
return finished | |
def log(job_id, stime, msg, lvl, log_lvl): | |
""" | |
create log file with failed jobs | |
""" | |
fmt = '%(asctime)-25s job id: %(job_id)6d %(tstart)-16s %(message)s' | |
logging.basicConfig(format=fmt, stream=sys.stdout) | |
info = {'job_id': job_id, 'tstart': stime} | |
logger = logging.getLogger('h8dl') | |
logger.setLevel(level=log_lvl) | |
logger.log(level=lvl, msg=msg, extra=info) | |
def set_download_dir(path): | |
""" | |
get an available temporary directory for the current downloading job | |
:param path: folder, which shall contain the job directory | |
""" | |
pattern = os.path.join(path, 'job??') | |
job_list = glob.glob(pattern) | |
ii = 1 | |
search_pattern = '' | |
while ii < 100: | |
job = ''.join(['job', str('%02d' % ii)]) | |
search_pattern = os.path.join(path, job) | |
if search_pattern not in job_list: | |
break | |
else: | |
ii += 1 | |
if ii >= 100: | |
msg = 'number of temporary directories exceed 99 - job canceled' | |
raise IOError(msg) | |
return search_pattern | |
def downloader_run(download_path, dbfile, period, enddate, bands, user, url, | |
log_lvl, debug): | |
""" | |
run and control a single download process | |
:param bands: channels/bands that shall be downloaded | |
:param dbfile: order database file | |
:param debug: flag switching debug mode on/off | |
:param download_path: download directory | |
:param enddate: timestamp of the data where to stop download | |
procedure | |
:param log_lvl: level of messages to recognize for logging procedure | |
:param period: search period in hours | |
<date + period> defines the upper boundary of the | |
satellite images that will be included in the | |
download. | |
:param user: login information [username, password] | |
:param url: website url | |
""" | |
dt = datetime.timedelta(hours=period) | |
try: | |
download_path = set_download_dir(download_path) | |
except BaseException as ex: | |
job_id = 0 | |
tcur = 'unknown' | |
log(job_id=job_id, stime=tcur, msg=str(ex), lvl=ERROR, | |
log_lvl=log_lvl) | |
raise | |
# get failed jobs from database and restart the oldest one; | |
# alternatively start a new job | |
db = hwo.connect2db(dbfile=dbfile) | |
failed = db.get_orders(status=hwo.FAILED, sort_by=['tstart']) | |
if len(failed) > 0: | |
# Select least recent failed order | |
tcur = failed[0]['tstart'] | |
job_id = failed[0]['id'] | |
db.update_order(order_id=job_id, status=hwo.DOWNLOADING) | |
db.commit_changes() | |
else: | |
# add new order | |
mro = db.get_most_recent_order(enddate=enddate) | |
if mro is None: | |
db.close() | |
msg = 'Getting most recent order failed: Cannot find any ' \ | |
'matching database entries for given timespan.' | |
raise ValueError(msg) | |
else: | |
tcur = mro['tend'] | |
tend = tcur + dt | |
job_id = db.insert_order(tstart=tcur, tend=tend) | |
db.commit_changes() | |
if os.path.exists(download_path) is False: | |
os.makedirs(download_path) | |
himawari_profile = Browser(download_path).get_profile() | |
# start Firefox and login | |
try: | |
driver = sel.webdriver.Firefox(firefox_profile=himawari_profile) | |
except BaseException as ex: | |
del user | |
if debug is False: | |
shutil.rmtree(download_path) | |
db.update_order(order_id=job_id, status=hwo.FAILED) | |
db.commit_changes() | |
db.close() | |
# log errors | |
log(job_id=job_id, stime=tcur, msg=str(ex), lvl=ERROR, | |
log_lvl=log_lvl) | |
raise | |
try: | |
log(job_id=job_id, stime=tcur, msg='start website login', | |
lvl=DEBUG, log_lvl=log_lvl) | |
website = Website(driver=driver, url=url, date=tcur, period=period) | |
website.login(user) | |
# check and fill search form | |
log(job_id=job_id, stime=tcur, msg='fill search form', | |
lvl=DEBUG, log_lvl=log_lvl) | |
website.wait(xpath='//div[@class="loadmask-msg"]', | |
max_time_out=period*3*60, # dynamic timeout see below | |
min_time_out=30, | |
sleep_time=10) | |
website.select_bands(bands=bands) | |
# prepare download and download selected files | |
log(job_id=job_id, stime=tcur, msg='download selected list', | |
lvl=DEBUG, log_lvl=log_lvl) | |
website.select2download_all() | |
ref_file_list = website.get_download_list() | |
website.download() | |
# set maximal timeout depending on the download data size (period); | |
# results from trial and error reveal the larger the period the more | |
# time the server needs to provide the data | |
log(job_id=job_id, stime=tcur, msg='check download end', | |
lvl=DEBUG, log_lvl=log_lvl) | |
time_out = period*20*60 # in seconds | |
fine = check_download_finished(download_path, max_time_out=time_out) | |
if fine is True: | |
db.update_order(order_id=job_id, status=hwo.FINISHED) | |
db.commit_changes() | |
except BaseException as ex: | |
if debug is False: | |
shutil.rmtree(download_path) | |
db.update_order(order_id=job_id, status=hwo.FAILED) | |
db.commit_changes() | |
# log errors | |
log(job_id=job_id, stime=tcur, msg=str(ex), lvl=ERROR, | |
log_lvl=log_lvl) | |
raise | |
finally: | |
del user | |
driver.quit() | |
db.close() | |
return job_id, ref_file_list | |
def get_reference(dbfile, period, bands, user, url, log_lvl): | |
""" | |
get a new reference list of an unchecked, finished download | |
:param bands: channels/bands that shall be downloaded | |
:param dbfile: order database file | |
:param log_lvl: level of messages that will be logged | |
:param period: search period in hours | |
<date + period> defines the upper boundary of the | |
satellite images that will be included in the | |
download. | |
:param user: login information [username, password] | |
:param url: website url | |
""" | |
# get unchecked jobs from database and restart the oldest one; | |
db = hwo.connect2db(dbfile=dbfile) | |
unchecked = db.get_orders(status=hwo.FINISHED, sort_by=['tstart']) | |
if len(unchecked) > 0: | |
# Select least recent failed order | |
tcur = unchecked[0]['tstart'] | |
job_id = unchecked[0]['id'] | |
# start Firefox and login | |
try: | |
driver = sel.webdriver.Firefox() | |
except BaseException as ex: | |
del user | |
db.close() | |
# log errors | |
log(job_id=job_id, stime=tcur, msg=str(ex), lvl=ERROR, | |
log_lvl=log_lvl) | |
raise | |
try: | |
log(job_id=job_id, stime=tcur, msg='start website login', | |
lvl=DEBUG, log_lvl=log_lvl) | |
website = Website(driver=driver, url=url, date=tcur, period=period) | |
website.login(user) | |
# check and fill search form | |
log(job_id=job_id, stime=tcur, msg='fill search form', | |
lvl=DEBUG, log_lvl=log_lvl) | |
website.wait('//div[@class="loadmask-msg"]', period*3*60, 30, 10) | |
website.select_bands(bands=bands) | |
# prepare download and download selected files | |
log(job_id=job_id, stime=tcur, msg='create reference list', | |
lvl=DEBUG, log_lvl=log_lvl) | |
website.select2download_all() | |
ref_file_list = website.get_download_list() | |
except BaseException as ex: | |
# log errors | |
log(job_id=job_id, stime=tcur, msg=str(ex), lvl=ERROR, | |
log_lvl=log_lvl) | |
raise | |
finally: | |
del user | |
driver.quit() | |
db.close() | |
else: | |
db.close() | |
job_id = None | |
ref_file_list = None | |
return job_id, ref_file_list |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment