Skip to content

Instantly share code, notes, and snippets.

@ychalier
Last active December 12, 2019 16:58
Show Gist options
  • Save ychalier/ee7a9e43db4790667a91e0755799daac to your computer and use it in GitHub Desktop.
Save ychalier/ee7a9e43db4790667a91e0755799daac to your computer and use it in GitHub Desktop.
Web scraper for http://inatheque.ina.fr/. The official INA database is not accessible outside its unpracticle interface. This script automatically gathers all results from a search query into a CSV file, easy to process.
"""Web scraper for http://inatheque.ina.fr/
The official INA database is not accessible outside this unpracticle interface.
This scripts automatically gathers all results from a search query into a CSV
file, easy to process.
Usage:
python ina.py <search-query>
Install:
Download the Firefox driver for selenium at
https://github.com/mozilla/geckodriver/releases,
put it in /usr/local/bin/,
then install the following Python modules:
pip install selenium tqdm beautifulsoup4
"""
import unicodedata
import datetime
import logging
import time
import sys
import re
import tqdm
import pandas as pd
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support.ui import Select
def init_driver():
"""Create the driver object from selenium"""
logging.debug("Initializing web driver...")
options = Options()
options.headless = True
driver = webdriver.Firefox(
options=options, executable_path="/usr/local/bin/geckodriver")
driver.implicitly_wait(PARAMETERS["implicit_wait"])
logging.info("Initialized web driver.")
return driver
def search(driver, query):
"""Perform the search with selenium"""
logging.info("Searching for '%s'", query)
driver.get(SEARCH_URL)
current_url = driver.current_url
search_input = driver.find_element(By.XPATH, XPATHS["search_input"])
search_input.send_keys(query)
select = Select(driver.find_element(
By.XPATH, XPATHS["results_per_page_select"]))
options = [o.get_attribute('value') for o in select.options]
select.select_by_value(options[-1])
form = driver.find_element(By.XPATH, XPATHS["search_form"])
form.submit()
WebDriverWait(driver, PARAMETERS["implicit_wait"]).until(
EC.url_changes(current_url))
def scrap_page(html, out, i):
"""Scrap one page and write the results to the output file"""
soup = BeautifulSoup(html, 'html.parser')
table_div = soup.find("div", {"id": RESULT_TABLE_ID})
first = True
for row in table_div.find_all("tr"):
if first:
if i == 1:
sub_first = True
for header in row.find_all("th"):
if sub_first:
sub_first = False
continue
out.write(header.find(
"span").get_text().strip() + "\t")
out.write("link\n")
first = False
continue
sub_first = True
for field in row.find_all("td"):
if sub_first:
sub_first = False
continue
out.write(field.get_text().strip() + "\t")
link = row.find("a")
if link is not None:
out.write(link["href"] + "\n")
else:
out.write("nan\n")
def scrap(driver):
"""Scrap each result page, one at a time"""
last_request = time.time()
results_count_div = driver.find_element(
By.XPATH, XPATHS["results_count_div"])
results = int(results_count_div.text.split(" ")[-1])
result_per_page = int(results_count_div.text.split(" ")[3])
pages = 1 + ((results - 1) // result_per_page)
logging.info(
"Found %d results (%d per page, %d pages)",
results,
result_per_page,
pages
)
logging.info("Starting to scrap, writing to '%s'", PARAMETERS["raw_output"])
pages = min(PARAMETERS["max_page_requests"], pages)
iterator = tqdm.tqdm(range(1, pages + 1))
out = open(PARAMETERS["raw_output"], "w")
for i in iterator:
scrap_page(driver.page_source, out, i)
if i < pages:
next_link = driver.find_elements_by_xpath(XPATHS["next_link"])
if len(next_link) == 0:
logging.warning("Next link not found!")
iterator.close()
break
next_link = next_link[0]
time_since_last_request = time.time() - last_request
time_to_wait = max(
0, PARAMETERS["delay"] - time_since_last_request)
next_link.click()
time.sleep(time_to_wait)
last_request = time.time()
out.close()
def extract_datetime(row):
"""Return a datetime from date and time of INA output"""
if row.isnull()["diffusion_date"]:
return row["diffusion_date"]
string = row["diffusion_date"] + " " + row["diffusion_time"]
try:
return datetime.datetime.strptime(string, "%d/%m/%Y %H:%M:%S")
except ValueError:
return datetime.datetime.strptime(row["diffusion_date"], "%d/%m/%Y")
def strip_accents(string):
"""Normalize accents in a string"""
return "".join(
char for char in unicodedata.normalize("NFD", string)
if unicodedata.category(char) != "Mn"
)
def normalize_collection(row):
"""Normalize a string to cope for noise in the input data"""
if row.isnull()["collection"]:
return row["collection"]
slug = re.sub("[ ']", "-", row["collection"].lower().strip())
return strip_accents(slug)
def clean():
"""Clean data output"""
logging.info("Cleaning data to '%s'", PARAMETERS["clean_output"])
tsv = pd.read_csv(PARAMETERS["raw_output"], delimiter="\t")
csv = tsv.rename(columns={
"Chaîne de diffusion": "channel",
"Date de diffusion": "diffusion_date",
"Heure de diffusion": "diffusion_time",
"Durée": "duration",
"Titre propre": "title",
"Titre collection": "collection",
"Titre programme": "program",
"Genre": "genra"
})
series_datetimes = list()
series_collections = list()
for _, row in tqdm.tqdm(csv.iterrows(), total=csv.shape[0]):
series_datetimes.append(extract_datetime(row))
series_collections.append(normalize_collection(row))
csv["collection_slug"] = series_collections
csv["diffusion"] = series_datetimes
columns = [
"diffusion", "title", "collection", "duration", "channel", "program",
"genra", "link", "collection_slug"
]
csv[columns].sort_values(by="diffusion").to_csv(
PARAMETERS["clean_output"],
index=False
)
def main():
"""Main script"""
args = sys.argv[1:]
if len(args) == 0:
print(__doc__)
sys.exit()
query = args[0]
firefox_driver = init_driver()
search(firefox_driver, query)
scrap(firefox_driver)
firefox_driver.quit()
clean()
if __name__ == "__main__":
SEARCH_URL = "http://inatheque.ina.fr/"
PARAMETERS = {
"delay": 1.5,
"max_page_requests": 300,
"raw_output": "ina_raw.tsv",
"clean_output": "ina_clean.csv",
"implicit_wait": 10,
}
XPATHS = {
"search_form": "/html/body/div[5]/div/div[1]/div/div[2]/div[3]/form",
"search_input": ("/html/body/div[5]/div/div[1]/div/div[2]/div[3]/form"
"/fieldset[1]/div/input[5]"),
"next_link": ("/html/body/div[5]/div/div[2]/div[3]/div[3]/div[2]"
"/div[3]/div[3]/a"),
"results_count_div": ("/html/body/div[5]/div/div[2]/div[3]/div[3]"
"/div[1]"),
"results_per_page_select": ("/html/body/div[5]/div/div[1]/div/div[2]"
"/div[3]/form/fieldset[3]/table/tbody/"
"tr[2]/td[2]/select"),
}
RESULT_TABLE_ID = "result-tableau-1"
LOG_FORMAT = "%(asctime)s\t%(levelname)s\t%(message)s"
logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment