Skip to content

Instantly share code, notes, and snippets.

@tonyahowe
Created November 28, 2021 16:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tonyahowe/000e78ff7a846e9bdaca910a13e2009a to your computer and use it in GitHub Desktop.
Save tonyahowe/000e78ff7a846e9bdaca910a13e2009a to your computer and use it in GitHub Desktop.
IMDB_fetcher
import requests
from bs4 import BeautifulSoup
import pandas as pd
def findByTagAndAttrs(soup, ifFailed, *args):
i = 0
try:
while True:
result = soup.find(args[i][0], args[i][1])
i += 1
if i >= len(args): break
return result.text.strip()
except:
return ifFailed
def findByText(soup, attr, text_to_find,next_attr):
try:
return soup.find(attr, text=text_to_find).find_next(next_attr).text.strip()
except:
return None
def fetch_record_info(id) -> pd.core.series.Series:
"""
fetch a record from website/api and scrape it
the return must match addition_columns defined above
"""
#synopsis_re = re.compile("^synopsis-")
#columns = soup.findAll('td', text = re.compile('your regex here'), attrs = {'class' : 'pos'})
#id = source_row[ID_COLUMN_INDEX].strip().lstrip().rstrip()
url = f"https://www.imdb.com/title/{id}/"
page = requests.get(url)
soup = BeautifulSoup(page.text, "html.parser")
countries_of_origin = soup.find("li",{"data-testid": "title-details-origin"}).find_all("a")
test = map(lambda x: x.text, countries_of_origin)
countries_of_origin=", ".join(test)
print(countries_of_origin)
storyline = soup.find("div", {"data-testid": "storyline-plot-summary"}).find("div").find("div").text
print(storyline)
budget = soup.find("li",{"data-testid": "title-boxoffice-budget"}).find("li").find("span").text.strip().split(" ")[0]
print(budget)
opening_weekend = soup.find("li",{"data-testid": "title-boxoffice-openingweekenddomestic"}).find("li").find("span").text
print(opening_weekend)
production_company = soup.find("li",{"data-testid": "title-details-companies"}).find("li").find("a").text
print(production_company)
url = f"https://www.imdb.com/title/{id}/plotsummary"
page = requests.get(url)
soup = BeautifulSoup(page.text, "html.parser")
synopsis = soup.find("ul", {"id": "plot-synopsis-content"}).find("li").text
print(synopsis)
# call imdb_search_functions
#source_row[ADDITIONAL_COLUMNS[0]] = storyline #adding the new columns to the row
#source_row[ADDITIONAL_COLUMNS[1]] = synopsis
#source_row[ADDITIONAL_COLUMNS[2]] = country_of_origin
#source_row[ADDITIONAL_COLUMNS[3]] = budget
#source_row[ADDITIONAL_COLUMNS[4]] = opening_weekend
#source_row[ADDITIONAL_COLUMNS[5]] = production_company
#source_row[ADDITIONAL_COLUMNS[6]] = distributor
#return source_row
if __name__=="__main__":
fetch_record_info("tt0078748")
# * ************************************
# * version
# * ************************************
__version__ = "1.0.0"
# * ************************************
# * imports
# * ************************************
import os
import sys
import logging
from time import sleep
from numpy import product
# from typing import List
# import numpy as np
import pandas as pd
from bs4 import BeautifulSoup
import requests
import re # re = regular expression
from imdb_data_settings import *
# * ***************************************
# * convenience variables - do not modify
# * ***************************************
__source_column_names = []
__output_column_names = []
__titletype_to_include = []
"lower cased titletype to include in search"
__genres_to_include = []
"lower cased genres to include in search"
__genres_to_exclude = []
"lower cased genres to exclude in search"
# * ************************************
# * functions
# * ************************************
# need to add a count function for all films in file with horror in genre and either movie or short in titletype
# after specific titles subsetted, need a count of all these films
def init():
"""
initialize the program:
- initialize convenience variables
- create an empty excel file for output - will overwrite existing output file
"""
global __source_column_names, __output_column_names, __genres_to_include, __genres_to_exclude, __titletype_to_include #these variables are global but they"ll be changed here in this function
# convert search genres to lower case to avoid case mismatch
def lowercase(genre): return genre.lower()
__genres_to_include = list(map(lowercase, GENRES_TO_INCLUDE)) # for each item passed to function, map x onto it, and make sure it"s a list
__genres_to_exclude = list(map(lowercase, GENRES_TO_EXCLUDE))
def lowercase(titletype): return titletype.lower()
__titletype_to_include = list(map(lowercase, TITLETYPE_TO_INCLUDE))
source_file_column_names = []
# get the header row from the source file if needed
if SOURCE_COLUMN_NAMES and len(SOURCE_COLUMN_NAMES) > 0:
__source_column_names = SOURCE_COLUMN_NAMES
else:
df = pd.read_csv(SOURCE_FILENAME, nrows=0, sep=SOURCE_FILE_DELIMITER) #weird pandas thing-- pull zero rows, but want the headers for each column
__source_column_names = ((df.columns).values).tolist() #grab columns of df, then grab values of columns.df, then create list of them -- could also be df.columns.values.tolist()
#df is an object w/properties, one of which is columns; columns is an object w/properties, one of which is values; values is an object with methods, one of which is tolist()
#properties = adjectives/nouns, methods = verbs
logging.info("Source file column names: \n%s", #like print but not--conditional printing, if rises to specified debug level
__source_column_names) #log prints "source file....: new line and the string i pass--__source_column_names
__output_column_names = __source_column_names + ADDITIONAL_COLUMNS
logging.info("Destination file column names: \n%s",
__output_column_names)
if OVERWRITE_OUTPUT_FILE or not os.path.isfile(OUTPUT_FILENAME): #with these output_column_names, create an empty df and save to file if file is not already there
output_df = pd.DataFrame(columns=__output_column_names)
output_df.to_csv(
OUTPUT_FILENAME, sep=OUTPUT_FILE_DELIMITER, index=False)
def fetch_and_save_batch(start_at) -> bool: # -> returns bool
"""
fetch the next batch. if no records left, return false
"""
logging.info(f"fetching records {start_at} to {start_at + BATCH_SIZE}")
# read next batch from source file
source_df = pd.read_csv(SOURCE_FILENAME, sep=SOURCE_FILE_DELIMITER, header=None,
skiprows=start_at, nrows=BATCH_SIZE, names=__source_column_names)
#putting all this specific stuff into source_df; header = none because if in middle of file, no headers! skiprows=start_at b/c that"s where we"re starting; names are __source_column_names
logging.debug(source_df)
if source_df.empty:
return False
output_df = pd.DataFrame(columns=__output_column_names)
# maybe rename include list to genre_include_list and create a type_include_list as well -- to enable us to search only horror genres that are also movies or shorts
for source_row in source_df.iloc: #iloc = array of rows; if wanted to select a particular one, use iloc[20] for row 21 in a zero-referenced df. fyi, source_row is an array, itself
# if match_genre(source_row[GENRES_COLUMN_INDEX]):
if match_genre_search_criteria(
text=source_row[GENRES_COLUMN_INDEX],
include_list=__genres_to_include,
exclude_list=__genres_to_exclude,
match_all=MATCH_ALL_GENRES
) and match_titletype_search_criteria(
text=source_row[TITLE_TYPE_COLUMN_INDEX],
include_list=__titletype_to_include,
match_all=MATCH_ALL_TITLETYPES
):
logging.debug("fetching data for: %s", source_row.to_dict())
#take batch of 1000, test, add to df, append; restarts with empty df for next batch
destination_row = fetch_record_info(source_row) #creating an array destination_row of source row plus fetched info
logging.debug("fetched: %s", source_row.to_dict())
output_df = output_df.append(destination_row) #appending it to the output_df
sleep(DELAY_BETWEEN_RECORDS)
# save batch to output file
output_df.to_csv(OUTPUT_FILENAME, mode="a", #a = append
sep=OUTPUT_FILE_DELIMITER, index=False, header=False)
logging.info("Saving %i records", len(output_df))
return True
def fetch_record_info(source_row) -> pd.core.series.Series:
"""
fetch a record from website/api and scrape it
the return must match addition_columns defined above
"""
url = f"https://www.imdb.com/title/{id}/"
page = requests.get(url)
soup = BeautifulSoup(page.text, "html.parser")
try:
countries_of_origin = soup.find("li",{"data-testid": "title-details-origin"}).find_all("a")
test = map(lambda x: x.text, countries_of_origin)
countries_of_origin=", ".join(test)
source_row[ADDITIONAL_COLUMNS[COUNTRY_COLUMN_INDEX]] = countries_of_origin
except:
pass
try:
storyline = soup.find("div", {"data-testid": "storyline-plot-summary"}).find("div").find("div").text
source_row[ADDITIONAL_COLUMNS[STORYLINE_COLUMN_INDEX]] = storyline
except:
pass
try:
budget = soup.find("li",{"data-testid": "title-boxoffice-budget"}).find("li").find("span").text.strip().split(" ")[0]
source_row[ADDITIONAL_COLUMNS[BUDGET_COLUMN_INDEX]] = budget
except:
pass
try:
opening_weekend = soup.find("li",{"data-testid": "title-boxoffice-openingweekenddomestic"}).find("li").find("span").text
source_row[ADDITIONAL_COLUMNS[OPENING_WEEKEND_COLUMN_INDEX]] = opening_weekend
except:
pass
try:
production_company = soup.find("li",{"data-testid": "title-details-companies"}).find("li").find("a").text
source_row[ADDITIONAL_COLUMNS[PRODUCTION_COMPANY_COLUMN_INDEX]] = production_company
except:
pass
try:
rating = soup.find("div", {"data-testid": "hero-rating-bar__aggregate-rating__score"}).find("span").text
source_row[ADDITIONAL_COLUMNS[RATING_COLUMN_INDEX]] = rating
except:
pass
try:
number_of_votes = soup.find("div", {"class": "AggregateRatingButton__TotalRatingAmount-sc-1ll29m0-3"}).text
source_row[ADDITIONAL_COLUMNS[NUMBER_OF_VOTES_COLUMN_INDEX]] = rating
except:
pass
url = f"https://www.imdb.com/title/{id}/plotsummary"
page = requests.get(url)
soup = BeautifulSoup(page.text, "html.parser")
try:
synopsis = soup.find("ul", {"id": "plot-synopsis-content"}).find("li").text
source_row[ADDITIONAL_COLUMNS[SYNOPSIS_COLUMN_INDEX]] = synopsis
except:
pass
return source_row
# * ************************************
# * main
# * ************************************
if __name__ == "__main__":
logging.info("**** Welcome to data fetcher ****")
init()
start_at = START_AT_INDEX
while fetch_and_save_batch(start_at): #while returning true, increment the start at in the batch
start_at += BATCH_SIZE
# can be removed
if SOURCE_RECORDS_TO_FETCH and start_at >= SOURCE_RECORDS_TO_FETCH:
break
logging.info(
f"sleeping between batches for {DELAY_BETWEEN_BATCHES} sec...") #f = formatted string; variables in curly braces. Could also have used %s as above
sleep(DELAY_BETWEEN_BATCHES)
# * ************************************
# * version
# * ************************************
__version__ = "1.0.0"
import logging
import sys
# * ************************************
# * knobs - modify as needed
# * ************************************
DEBUG_LEVEL = logging.DEBUG
"""
print debug messages with level <= DEBUG_LEVEL
logging.DEBUG -> print every thing including python logs (only for trouble shooting)
use logging.debug('my message') for this level
logging.INFO -> print only informational messages (RECOMMENDED)
use logging.info('my message') for this level
logging.NOSET -> disable all messages
"""
logging.basicConfig(stream=sys.stdout, level=DEBUG_LEVEL)
SOURCE_FILENAME = "titles.tsv"
"""
source file with all film id's and genres
"""
SOURCE_FILE_DELIMITER = '\t'
"""
delimiter used in source file - can be '\t' or ','
"""
SOURCE_COLUMN_NAMES = None
"""
set to array of strings represting column names in output file
must correspond one to one with source file columns
MUST BE SET if source file doesnot include a header row
set to None to use the same names as the source file (ONLY if header row exists)
"""
ID_COLUMN_INDEX = 0
"""
index of id column - first column -> index = 0, etc..
"""
TITLE_TYPE_COLUMN_INDEX = 1
TITLE_COLUMN_INDEX = 2
YEAR_COLUMN_INDEX = 5
GENRES_COLUMN_INDEX = 8
"""
index of genres column - first column -> index = 0, etc..
"""
STORYLINE_COLUMN_INDEX = 9
SYNOPSIS_COLUMN_INDEX = 10
COUNTRY_COLUMN_INDEX = 11
BUDGET_COLUMN_INDEX = 12
OPENING_WEEKEND_COLUMN_INDEX = 13
PRODUCTION_COMPANY_COLUMN_INDEX = 14
RATING_COLUMN_INDEX = 15
NUMBER_OF_VOTES_COLUMN_INDEX = 16
TITLETYPE_TO_INCLUDE = ["short", "movie"]
MATCH_ALL_TITLETYPES = False
GENRES_TO_INCLUDE = ["Horror"]
"""
genre(s) to search by - search can be for ALL or for ANY
"""
MATCH_ALL_GENRES = True
"""
set to true to match ALL genres, false to match ANY genre
"""
GENRES_TO_EXCLUDE = []
"""
exclude any films that have ANY of these genre(s)
"""
OUTPUT_FILENAME = "imdb_horror_data_TH.tsv"
"""
file to output result to - existing file will be overwritten
"""
OUTPUT_FILE_DELIMITER = '\t'
"""
delimiter used in output file - can be '\t' or ','
"""
ADDITIONAL_COLUMNS = ["storyline", "synopsis", "countries_of_origin", "budget", "opening_weekend", "production_company", "rating", "number_of_votes"]
"""
additional columns to add to output - update as needed in fetch_record_info()
"""
START_AT_INDEX = 101
"""
set to 1 to start at begining of source file - 0 if file does not have a header row
set to higher value to skip that many records
in case a previous run was aborted, make sure to set INIT_OUTPUT_FILE to False
"""
OVERWRITE_OUTPUT_FILE = False
"""
if true, create new output file and overwrite existing one
if false, file must already exist and data will be appended to it
"""
BATCH_SIZE = 100
"""
number of records to process from source file each batch
"""
SOURCE_RECORDS_TO_FETCH = 100
"""
total number of records to process from source file - set to None to process all
"""
DELAY_BETWEEN_RECORDS = 2
"""
number of seconds to delay between fetching records
"""
DELAY_BETWEEN_BATCHES = 5
"""
number of seconds for additional delay between batches
"""
OUTPUT_RECORDS_TO_PLOT = 150
"""
number of records in output file to process and plot - set to None for all
"""
MATCH_ALL_SEARCH_WORDS = False
"""
set to True to match ALL words - False to match ANY of the words
"""
IGNORE_NUMBERS_IN_SEARCH = True
"""
numbers can cause problems for some plotting packages
"""
SEARCH_WORDS = ['artist', 'author', 'filmmaker', 'musician',
'sculptor', 'painter', 'painting', 'dancer', 'director',
'create', 'creator', 'creative', 'created',
'creates', 'photographer', 'photograph', 'music', 'dance', 'graffiti']
"""
key words to search for in title, summary and synopsis
"""
SEARCH_WORDS_EXCLUDED = []
"""
exclude records that have ANY of these words in title, summary or synopsis
"""
PLOT_HTML_FILE_NAME = 'imdb_horror_data.html'
"""
name html plot file
"""
# * ************************************
# * common functions
# * ************************************
def match_genre_search_criteria(text, include_list, exclude_list=[], match_all=False) -> bool:
"""
check if word matches search criteria (exclusion THEN inclusion list)
if match_all is True, ALL words in include_list must match, otherwise just one is needed to match
"""
text = text.lower()
# check exclusion list first
for word in exclude_list:
if word in text:
return False
# check inclusion list
for word in include_list:
if word in text:
if not match_all:
return True
else:
if match_all:
return False
# if we get to this point
if match_all:
# all the words have matched otherwise a mismatch would have already returned False
return True
else:
# all the words have mismatched otherwise a match would have already returned True
return False
def match_titletype_search_criteria(text, include_list, match_all=False) -> bool:
text = text.lower()
for word in include_list:
if word in text:
if not match_all:
return True
else:
if match_all:
return False
if match_all:
return True
else:
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment