Created
July 19, 2024 12:40
-
-
Save tapanrachchh/d3a5f13168f2c8690ea3aa36b9bbf637 to your computer and use it in GitHub Desktop.
Get TMDB popular movies data with cast, crew and credits, saved in csv file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import csv | |
import time | |
from requests.adapters import HTTPAdapter | |
from requests.packages.urllib3.util.retry import Retry # type: ignore | |
# Your TMDb API key | |
API_KEY = 'API_KEY' | |
BASE_URL = 'https://api.themoviedb.org/3' | |
# Set up session with retries | |
session = requests.Session() | |
retry = Retry( | |
total=5, | |
backoff_factor=1, | |
status_forcelist=[429, 500, 502, 503, 504], | |
) | |
adapter = HTTPAdapter(max_retries=retry) | |
session.mount("https://", adapter) | |
session.mount("http://", adapter) | |
def fetch_genres(api_key): | |
url = f"{BASE_URL}/genre/movie/list" | |
params = { | |
'api_key': api_key, | |
'language': 'en-US' | |
} | |
response = session.get(url, params=params) | |
response.raise_for_status() | |
genres = response.json().get('genres', []) | |
return {genre['id']: genre['name'] for genre in genres} | |
def fetch_movies(api_key, page): | |
url = f"{BASE_URL}/discover/movie" | |
params = { | |
'api_key': api_key, | |
'with_original_language': 'hi', | |
'page': page | |
} | |
response = session.get(url, params=params) | |
response.raise_for_status() | |
return response.json() | |
# Function to fetch cast, crew, keywords, videos, and watch providers information | |
def fetch_movie_details(api_key, movie_id): | |
credits_url = f"{BASE_URL}/movie/{movie_id}/credits" | |
keywords_url = f"{BASE_URL}/movie/{movie_id}/keywords" | |
videos_url = f"{BASE_URL}/movie/{movie_id}/videos" | |
watch_providers_url = f"{BASE_URL}/movie/{movie_id}/watch/providers" | |
details_url = f"{BASE_URL}/movie/{movie_id}" | |
params = {'api_key': api_key, 'language': 'en-US'} | |
credits_response = session.get(credits_url, params=params) | |
credits_response.raise_for_status() | |
keywords_response = session.get(keywords_url, params=params) | |
keywords_response.raise_for_status() | |
videos_response = session.get(videos_url, params=params) | |
videos_response.raise_for_status() | |
watch_providers_response = session.get(watch_providers_url, params=params) | |
details_response = session.get(details_url, params=params) | |
watch_providers_response.raise_for_status() | |
credits = credits_response.json() | |
keywords = keywords_response.json() | |
videos = videos_response.json() | |
details = details_response.json() | |
cast_names = [cast['name'] for cast in credits.get('cast', [])[:10]] # Limit to top 10 cast members | |
crew_names = [crew['name'] for crew in credits.get('crew', [])[:10]] # Limit to top 10 crew members | |
directors = [] | |
writers = [] | |
producers = [] | |
for crew in credits.get('crew', []): | |
if crew['job'] == 'Director': | |
directors.append(crew['name']) | |
elif crew['job'] in ['Writer', 'Screenplay', 'Story']: | |
writers.append(crew['name']) | |
elif crew['job'] in ['Producer', 'Executive Producer', 'Co-Producer']: | |
producers.append(crew['name']) | |
keywords_list = [keyword['name'] for keyword in keywords.get('keywords', [])] | |
trailer_url = None | |
for video in videos.get('results', []): | |
if video['type'] == 'Trailer' and video['site'] == 'YouTube': | |
trailer_url = f"https://www.youtube.com/watch?v={video['key']}" | |
break | |
return { | |
'cast': ', '.join(cast_names), | |
'crew': ', '.join(crew_names), | |
'keywords': ', '.join(keywords_list), | |
'trailer_url': trailer_url, | |
'release_year': details.get('release_date', '').split('-')[0], # Extract the year from the release date | |
'runtime': details.get('runtime'), | |
'directors': ', '.join(directors) , | |
'producers': ', '.join(producers), | |
'writers': ', '.join(writers) | |
} | |
def generate_vector_data(movie_info): | |
return movie_info['title'] + " | " + movie_info['overview'] + " | Cast includes " + movie_info["cast"] + " | Directed by " + movie_info['directors'] + " | Produced by "+ movie_info['producers'] + " | Written by " + movie_info['writers'] + " | Released on "+ movie_info['release_year'] + " | Movie is about "+movie_info['keywords'] | |
def extract_movie_info(api_key, movies, genre_mapping): | |
movie_list = [] | |
for movie in movies: | |
movie_details = fetch_movie_details(api_key, movie['id']) | |
# Get genre names | |
genre_names = [genre_mapping.get(genre_id, "Unknown") for genre_id in movie.get('genre_ids', [])] | |
# Add all movie fields to movie_info | |
movie_info = movie.copy() | |
# Add additional information | |
movie_info['genres'] = ', '.join(genre_names) | |
movie_info.update(movie_details) | |
movie_info['vector_data'] = generate_vector_data(movie_info) | |
movie_list.append(movie_info) | |
return movie_list | |
def fetch_movie_data(api_key, start = 1): | |
genre_mapping = fetch_genres(api_key) | |
all_movies = [] | |
page = start | |
while len(all_movies) < 10: | |
try: | |
response = fetch_movies(api_key, page) | |
movies = response.get('results', []) | |
all_movies.extend(extract_movie_info(api_key, movies, genre_mapping)) | |
if len(movies) == 0: | |
break | |
page += 1 | |
except requests.exceptions.RequestException as e: | |
time.sleep(5) # Wait before retrying | |
return all_movies[:10] | |
# Save movies to CSV | |
def save_to_csv(movies, filename): | |
keys = movies[0].keys() | |
with open(filename, 'w', newline='', encoding='utf-8') as output_file: | |
dict_writer = csv.DictWriter(output_file, fieldnames=keys) | |
dict_writer.writeheader() | |
dict_writer.writerows(movies) | |
# Fetch and save the movies to a CSV file | |
if __name__ == "__main__": | |
top_movies = fetch_movie_data(API_KEY, 1) | |
save_to_csv(top_movies, 'movies_data.csv') | |
print("Movies have been saved!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment