Last active
December 12, 2019 16:58
-
-
Save ychalier/ee7a9e43db4790667a91e0755799daac to your computer and use it in GitHub Desktop.
Web scraper for http://inatheque.ina.fr/. The official INA database is not accessible outside its unpracticle interface. This script automatically gathers all results from a search query into a CSV file, easy to process.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Web scraper for http://inatheque.ina.fr/ | |
The official INA database is not accessible outside this unpracticle interface. | |
This scripts automatically gathers all results from a search query into a CSV | |
file, easy to process. | |
Usage: | |
python ina.py <search-query> | |
Install: | |
Download the Firefox driver for selenium at | |
https://github.com/mozilla/geckodriver/releases, | |
put it in /usr/local/bin/, | |
then install the following Python modules: | |
pip install selenium tqdm beautifulsoup4 | |
""" | |
import unicodedata | |
import datetime | |
import logging | |
import time | |
import sys | |
import re | |
import tqdm | |
import pandas as pd | |
from bs4 import BeautifulSoup | |
from selenium import webdriver | |
from selenium.webdriver.firefox.options import Options | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support.ui import Select | |
def init_driver(): | |
"""Create the driver object from selenium""" | |
logging.debug("Initializing web driver...") | |
options = Options() | |
options.headless = True | |
driver = webdriver.Firefox( | |
options=options, executable_path="/usr/local/bin/geckodriver") | |
driver.implicitly_wait(PARAMETERS["implicit_wait"]) | |
logging.info("Initialized web driver.") | |
return driver | |
def search(driver, query): | |
"""Perform the search with selenium""" | |
logging.info("Searching for '%s'", query) | |
driver.get(SEARCH_URL) | |
current_url = driver.current_url | |
search_input = driver.find_element(By.XPATH, XPATHS["search_input"]) | |
search_input.send_keys(query) | |
select = Select(driver.find_element( | |
By.XPATH, XPATHS["results_per_page_select"])) | |
options = [o.get_attribute('value') for o in select.options] | |
select.select_by_value(options[-1]) | |
form = driver.find_element(By.XPATH, XPATHS["search_form"]) | |
form.submit() | |
WebDriverWait(driver, PARAMETERS["implicit_wait"]).until( | |
EC.url_changes(current_url)) | |
def scrap_page(html, out, i): | |
"""Scrap one page and write the results to the output file""" | |
soup = BeautifulSoup(html, 'html.parser') | |
table_div = soup.find("div", {"id": RESULT_TABLE_ID}) | |
first = True | |
for row in table_div.find_all("tr"): | |
if first: | |
if i == 1: | |
sub_first = True | |
for header in row.find_all("th"): | |
if sub_first: | |
sub_first = False | |
continue | |
out.write(header.find( | |
"span").get_text().strip() + "\t") | |
out.write("link\n") | |
first = False | |
continue | |
sub_first = True | |
for field in row.find_all("td"): | |
if sub_first: | |
sub_first = False | |
continue | |
out.write(field.get_text().strip() + "\t") | |
link = row.find("a") | |
if link is not None: | |
out.write(link["href"] + "\n") | |
else: | |
out.write("nan\n") | |
def scrap(driver): | |
"""Scrap each result page, one at a time""" | |
last_request = time.time() | |
results_count_div = driver.find_element( | |
By.XPATH, XPATHS["results_count_div"]) | |
results = int(results_count_div.text.split(" ")[-1]) | |
result_per_page = int(results_count_div.text.split(" ")[3]) | |
pages = 1 + ((results - 1) // result_per_page) | |
logging.info( | |
"Found %d results (%d per page, %d pages)", | |
results, | |
result_per_page, | |
pages | |
) | |
logging.info("Starting to scrap, writing to '%s'", PARAMETERS["raw_output"]) | |
pages = min(PARAMETERS["max_page_requests"], pages) | |
iterator = tqdm.tqdm(range(1, pages + 1)) | |
out = open(PARAMETERS["raw_output"], "w") | |
for i in iterator: | |
scrap_page(driver.page_source, out, i) | |
if i < pages: | |
next_link = driver.find_elements_by_xpath(XPATHS["next_link"]) | |
if len(next_link) == 0: | |
logging.warning("Next link not found!") | |
iterator.close() | |
break | |
next_link = next_link[0] | |
time_since_last_request = time.time() - last_request | |
time_to_wait = max( | |
0, PARAMETERS["delay"] - time_since_last_request) | |
next_link.click() | |
time.sleep(time_to_wait) | |
last_request = time.time() | |
out.close() | |
def extract_datetime(row): | |
"""Return a datetime from date and time of INA output""" | |
if row.isnull()["diffusion_date"]: | |
return row["diffusion_date"] | |
string = row["diffusion_date"] + " " + row["diffusion_time"] | |
try: | |
return datetime.datetime.strptime(string, "%d/%m/%Y %H:%M:%S") | |
except ValueError: | |
return datetime.datetime.strptime(row["diffusion_date"], "%d/%m/%Y") | |
def strip_accents(string): | |
"""Normalize accents in a string""" | |
return "".join( | |
char for char in unicodedata.normalize("NFD", string) | |
if unicodedata.category(char) != "Mn" | |
) | |
def normalize_collection(row): | |
"""Normalize a string to cope for noise in the input data""" | |
if row.isnull()["collection"]: | |
return row["collection"] | |
slug = re.sub("[ ']", "-", row["collection"].lower().strip()) | |
return strip_accents(slug) | |
def clean(): | |
"""Clean data output""" | |
logging.info("Cleaning data to '%s'", PARAMETERS["clean_output"]) | |
tsv = pd.read_csv(PARAMETERS["raw_output"], delimiter="\t") | |
csv = tsv.rename(columns={ | |
"Chaîne de diffusion": "channel", | |
"Date de diffusion": "diffusion_date", | |
"Heure de diffusion": "diffusion_time", | |
"Durée": "duration", | |
"Titre propre": "title", | |
"Titre collection": "collection", | |
"Titre programme": "program", | |
"Genre": "genra" | |
}) | |
series_datetimes = list() | |
series_collections = list() | |
for _, row in tqdm.tqdm(csv.iterrows(), total=csv.shape[0]): | |
series_datetimes.append(extract_datetime(row)) | |
series_collections.append(normalize_collection(row)) | |
csv["collection_slug"] = series_collections | |
csv["diffusion"] = series_datetimes | |
columns = [ | |
"diffusion", "title", "collection", "duration", "channel", "program", | |
"genra", "link", "collection_slug" | |
] | |
csv[columns].sort_values(by="diffusion").to_csv( | |
PARAMETERS["clean_output"], | |
index=False | |
) | |
def main(): | |
"""Main script""" | |
args = sys.argv[1:] | |
if len(args) == 0: | |
print(__doc__) | |
sys.exit() | |
query = args[0] | |
firefox_driver = init_driver() | |
search(firefox_driver, query) | |
scrap(firefox_driver) | |
firefox_driver.quit() | |
clean() | |
if __name__ == "__main__": | |
SEARCH_URL = "http://inatheque.ina.fr/" | |
PARAMETERS = { | |
"delay": 1.5, | |
"max_page_requests": 300, | |
"raw_output": "ina_raw.tsv", | |
"clean_output": "ina_clean.csv", | |
"implicit_wait": 10, | |
} | |
XPATHS = { | |
"search_form": "/html/body/div[5]/div/div[1]/div/div[2]/div[3]/form", | |
"search_input": ("/html/body/div[5]/div/div[1]/div/div[2]/div[3]/form" | |
"/fieldset[1]/div/input[5]"), | |
"next_link": ("/html/body/div[5]/div/div[2]/div[3]/div[3]/div[2]" | |
"/div[3]/div[3]/a"), | |
"results_count_div": ("/html/body/div[5]/div/div[2]/div[3]/div[3]" | |
"/div[1]"), | |
"results_per_page_select": ("/html/body/div[5]/div/div[1]/div/div[2]" | |
"/div[3]/form/fieldset[3]/table/tbody/" | |
"tr[2]/td[2]/select"), | |
} | |
RESULT_TABLE_ID = "result-tableau-1" | |
LOG_FORMAT = "%(asctime)s\t%(levelname)s\t%(message)s" | |
logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment