Last active
November 22, 2023 15:49
-
-
Save samiles/431b077a70b53059b6308877ff276d8b to your computer and use it in GitHub Desktop.
Make a custom podcast stored in S3 from YouTube audio
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This script downloads the audio and thumbnail of a YouTube video, uploads them to Amazon S3, and updates a podcast RSS feed. | |
The script uses yt-dlp, boto3, feedgen, and other libraries for various tasks. | |
Args: | |
video_url (str): URL of the YouTube video to process. | |
Prints: | |
Video title | |
Thumbnail download status | |
Upload status | |
Public URLs for audio and thumbnail | |
RSS feed update status | |
Time taken for the entire process | |
More Info: | |
https://samiles.com/blog/2023/04/custom-youtube-to-podcast-generator-with-python/ | |
""" | |
import os | |
import yt_dlp | |
import boto3 | |
from feedgen.feed import FeedGenerator | |
from pathlib import Path | |
from urllib.parse import urljoin | |
from slugify import slugify | |
import requests | |
import feedparser | |
from tqdm import tqdm | |
from botocore.exceptions import ClientError | |
import time | |
S3_BUCKET = 'mypodcast' | |
session = boto3.Session(profile_name='personal') | |
s3_client = session.client('s3') | |
def download_thumbnail(url, output_path): | |
response = requests.get(url, stream=True) | |
response.raise_for_status() | |
with open(output_path, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
def download_audio(url): | |
with yt_dlp.YoutubeDL() as ydl: | |
info = ydl.extract_info(url, download=False) | |
video_title = info['title'] | |
thumbnail_url = info['thumbnail'] | |
slug_title = slugify(video_title) | |
ydl_opts = { | |
'format': 'bestaudio/best', | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'mp3', | |
'preferredquality': '192', | |
}], | |
'quiet': True, | |
'outtmpl': f'{slug_title}.%(ext)s', | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([url]) | |
return video_title, thumbnail_url | |
def download_rss_from_s3(local_rss_file_path, remote_rss_file_path): | |
try: | |
with open(local_rss_file_path, 'wb') as f: | |
s3_client.download_fileobj(S3_BUCKET, remote_rss_file_path, f) | |
return True | |
except ClientError as e: | |
if e.response['Error']['Code'] == '404': | |
return False | |
raise e | |
def upload_to_s3(file_path, remote_path): | |
file_size = os.path.getsize(file_path) | |
with open(file_path, 'rb') as f: | |
progress = tqdm( | |
total=file_size, unit='B', unit_scale=True, desc='Uploading', ncols=80 | |
) | |
def callback(bytes_transferred): | |
progress.update(bytes_transferred) | |
config = boto3.s3.transfer.TransferConfig(use_threads=False) | |
transfer = boto3.s3.transfer.S3Transfer(s3_client, config) | |
transfer.upload_file( | |
file_path, | |
S3_BUCKET, | |
remote_path, | |
callback=callback, | |
extra_args={'ACL': 'public-read'}, | |
) | |
progress.close() | |
public_url = f'https://{S3_BUCKET}.s3.amazonaws.com/{remote_path}' | |
return public_url | |
def update_rss_feed_s3(remote_audio_url, public_thumbnail_url, video_title, audio_length): | |
remote_rss_file_path = 'podcast.rss' | |
local_rss_file_path = 'podcast.rss' | |
# Check if the RSS file exists on S3 | |
try: | |
s3_client.head_object(Bucket=S3_BUCKET, Key=remote_rss_file_path) | |
rss_file_found = True | |
except ClientError: | |
rss_file_found = False | |
rss_feed = FeedGenerator() | |
if rss_file_found: | |
# If the RSS file exists on S3, download and parse it | |
with open(local_rss_file_path, 'wb') as f: | |
s3_client.download_fileobj(S3_BUCKET, remote_rss_file_path, f) | |
with open(local_rss_file_path, 'r') as f: | |
existing_feed = feedparser.parse(f.read()) | |
# Set feed-level information from the existing feed | |
rss_feed.title(existing_feed.feed.title) | |
# rss_feed.author( | |
# {'name': existing_feed.feed.author_detail.name, 'email': existing_feed.feed.author_detail.email}) | |
rss_feed.link(href=existing_feed.feed.link, rel='alternate') | |
rss_feed.logo(existing_feed.feed.image.href) | |
rss_feed.subtitle(existing_feed.feed.subtitle) | |
rss_feed.language(existing_feed.feed.language) | |
# Add existing entries to the feed | |
for entry in existing_feed.entries: | |
e = rss_feed.add_entry() | |
e.id(entry.id) | |
e.title(entry.title) | |
e.link(href=entry.links[0].href, rel='enclosure', type=entry.links[0].type) | |
e.enclosure(entry.enclosures[0].href, entry.enclosures[0].length, entry.enclosures[0].type) | |
else: | |
# Initialize a new feed if the file does not exist | |
rss_feed.title('Sam\'s Podcast') | |
# rss_feed.author({'name': 'Sam', 'email': 'your_email@example.com'}) | |
rss_feed.link(href='your_podcast_link', rel='alternate') | |
rss_feed.logo(public_thumbnail_url) | |
rss_feed.subtitle('Just my stuff') | |
rss_feed.language('en') | |
# Add the new entry to the feed | |
entry = rss_feed.add_entry() | |
entry.id(remote_audio_url) | |
entry.title(video_title) | |
entry.link(href=public_thumbnail_url, rel='enclosure', type='image/jpeg') | |
entry.enclosure(url=remote_audio_url, length=str(audio_length), type='audio/mpeg') | |
# Save the feed to a local file | |
rss_content = rss_feed.rss_str(pretty=True) | |
with open(local_rss_file_path, 'wb') as f: | |
f.write(rss_content) | |
# Upload the feed to S3 | |
config = boto3.s3.transfer.TransferConfig(use_threads=False) | |
transfer = boto3.s3.transfer.S3Transfer(s3_client, config) | |
extra_args = {'ACL': 'public-read'} | |
transfer.upload_file( | |
local_rss_file_path, | |
S3_BUCKET, | |
remote_rss_file_path, | |
extra_args=extra_args, | |
) | |
def main(): | |
start_time = time.time() | |
video_url = input("Enter the YouTube video URL: ") | |
video_title, thumbnail_url = download_audio(video_url) | |
print(f"Audio downloaded: {video_title}") | |
slug_title = slugify(video_title) | |
local_audio_file_path = f'{slug_title}.mp3' | |
local_thumbnail_file_path = f'{slug_title}.jpg' | |
audio_length = os.path.getsize(local_audio_file_path) | |
download_thumbnail(thumbnail_url, local_thumbnail_file_path) | |
print("Thumbnail downloaded.") | |
print("Uploading to S3.") | |
remote_audio_file_path = f'{slug_title}.mp3' | |
remote_thumbnail_file_path = f'{slug_title}.jpg' | |
public_audio_url = upload_to_s3(local_audio_file_path, remote_audio_file_path) | |
public_thumbnail_url = upload_to_s3(local_thumbnail_file_path, remote_thumbnail_file_path) | |
print("Audio and thumbnail uploaded to S3.") | |
print(f"Public audio URL: {public_audio_url}") | |
print(f"Public thumbnail URL: {public_thumbnail_url}") | |
local_rss_file_path = 'podcast.rss' | |
remote_rss_file_path = 'podcast.rss' | |
rss_file_found = download_rss_from_s3(local_rss_file_path, remote_rss_file_path) | |
if not rss_file_found: | |
print('RSS feed not found in S3. Creating a new one.') | |
with open(local_rss_file_path, 'w') as f: | |
f.write('') | |
rss_file_found = True | |
download_rss_from_s3(local_rss_file_path, remote_rss_file_path) | |
update_rss_feed_s3(public_audio_url, public_thumbnail_url, video_title, audio_length) | |
print("RSS feed updated.") | |
end_time = time.time() | |
time_taken = end_time - start_time | |
minutes = int(time_taken // 60) | |
seconds = int(time_taken % 60) | |
print("Time taken: {} mins {} secs".format(minutes, seconds)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment