Skip to content

Instantly share code, notes, and snippets.

@ceshine
Last active October 29, 2018 04:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ceshine/630a7836b8554633c1e0aaa462af577b to your computer and use it in GitHub Desktop.
Save ceshine/630a7836b8554633c1e0aaa462af577b to your computer and use it in GitHub Desktop.
Scripts to scrape and extract data from the Tourism Bureau of Taiwan
# WARNING: this script is out-dated since the last update of the Tourism Bureau website.
from pathlib import Path
import pandas as pd
SCHEMAS = [
(201201, "schema/residence-2012-01.csv"),
(201101, "schema/residence-2011-01.csv")
]
DATA_FILE_PATTERN = "raw_data/{year}-{month}.xls"
SHEET = "Sheet3"
OUTPUT_FILE_PATTERN = "residence-{year}-{month}.csv"
OUTPUT_COLUMNS = ["Residence", "Region", "Sub-Region", "Total", "Period"]
def get_schema(year, month, data_path) -> pd.DataFrame:
i = 0
while SCHEMAS[i][0] > year * 100 + month:
i += 1
print(year, month, SCHEMAS[i])
schema = pd.read_csv(Path(data_path) / SCHEMAS[i][1])
return schema
def extract_from_excel(year: int, month: int, data_path: str = "../data"):
xl = pd.ExcelFile(
Path(data_path) / DATA_FILE_PATTERN.format(year=year, month=month))
df_data = xl.parse(SHEET, skiprows=2)
df_schema = get_schema(year, month, data_path)
df_schema["Total"] = 0
df_schema["Period"] = f"{year}-{month:02d}"
for i, row in df_schema.iterrows():
df_schema.loc[i, "Total"] = int(df_data.iloc[row["Row"] - 4, 3])
# Make sure the residences from both sources match
residence = df_schema.loc[i, "Residence"]
if ("Others" in residence or "Korea" in residence or
"United" in residence or "Russian" in residence):
# Exceptions
continue
data_str = df_data.iloc[row["Row"] - 4, 2]
if not isinstance(data_str, str) or data_str == "":
data_str = df_data.iloc[row["Row"] - 4, 1]
assert (
residence == " ".join(data_str.split(" ")[1:])
)
# Make sure the grand total is correct
assert (
df_schema["Total"].sum() ==
int(df_data.iloc[df_schema["Row"].max() - 4 + 1, 3])
)
del df_schema["Row"]
df_schema[OUTPUT_COLUMNS].to_csv(
Path(data_path) /
OUTPUT_FILE_PATTERN.format(year=year, month=month),
index=False, header=False
)
if __name__ == "__main__":
for year in range(2011, 2018):
for month in range(1, 13):
extract_from_excel(year, month)
for month in range(1, 9):
extract_from_excel(2018, month)
with open("../data/residence.csv", "w") as fout:
fout.write(",".join(OUTPUT_COLUMNS) + "\n")
# WARNING: this script is out-dated since the last update of the Tourism Bureau website.
import os
import time
import re
from typing import List, Tuple
import requests
from retrying import retry
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium import webdriver
PROXY_URL = os.environ.get("SOCKS_PROXY", "192.168.199.10:12133")
def retry_if_timeout(exception):
"""Return True if we should retry (in this case when it's an IOError), False otherwise"""
return isinstance(exception, TimeoutException)
@retry(retry_on_exception=retry_if_timeout, stop_max_attempt_number=5)
def get_url(driver, url):
print(f"Fetching {url}")
driver.get(url)
def get_driver(headless: bool = False):
options = webdriver.ChromeOptions()
options.binary_location = "/usr/bin/google-chrome"
if headless:
options.add_argument('headless')
# options.add_argument('window-size=1920x1080')
options.add_argument(f'--proxy-server=socks5://{PROXY_URL}')
options.add_argument('--proxy-bypass-list=127.0.0.1;localhost')
driver = webdriver.Chrome(
'/opt/chromedriver',
chrome_options=options)
return driver
def visitors_by_residence(timestamps: List[Tuple[int, int]], output_pattern: str = "../data/raw_data/{}-{}.xls"):
driver = get_driver()
try:
get_url(
driver, "https://admin.taiwan.net.tw/statistics/month_en.aspx?no=14")
for year, month in timestamps:
select = Select(driver.find_element_by_id(
"ctl00_ctl00_ContentPlaceHolder1_ContentPlaceHolder1_searItm"
))
select.select_by_visible_text("Visitor Arrivals by Residence")
select = Select(driver.find_element_by_id(
"ctl00_ctl00_ContentPlaceHolder1_ContentPlaceHolder1_searYear"
))
select.select_by_visible_text(str(year))
select = Select(driver.find_element_by_id(
"ctl00_ctl00_ContentPlaceHolder1_ContentPlaceHolder1_searMonth"
))
select.select_by_visible_text(str(month))
driver.find_element_by_id(
"ctl00_ctl00_ContentPlaceHolder1_ContentPlaceHolder1_imgSend"
).click()
link_elems = driver.find_elements_by_css_selector(
"div.cOneTableC9 a")
found = False
for elem in link_elems:
link = elem.get_attribute("href")
if link.endswith(".xls"):
found = True
print("Found link: ", link)
res = requests.get(
link, allow_redirects=True,
proxies={
"https": f"socks5://{PROXY_URL}"
}, verify=False
)
with open(output_pattern.format(year, month), 'wb') as fout:
fout.write(res.content)
if found is False:
print(f"WARNING: link not found for {year}-{month}")
except Exception as e:
driver.quit()
raise e
driver.quit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment