Skip to content

Instantly share code, notes, and snippets.

@samiles
Last active November 22, 2023 15:49
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samiles/431b077a70b53059b6308877ff276d8b to your computer and use it in GitHub Desktop.
Save samiles/431b077a70b53059b6308877ff276d8b to your computer and use it in GitHub Desktop.
Make a custom podcast stored in S3 from YouTube audio
"""
This script downloads the audio and thumbnail of a YouTube video, uploads them to Amazon S3, and updates a podcast RSS feed.
The script uses yt-dlp, boto3, feedgen, and other libraries for various tasks.
Args:
video_url (str): URL of the YouTube video to process.
Prints:
Video title
Thumbnail download status
Upload status
Public URLs for audio and thumbnail
RSS feed update status
Time taken for the entire process
More Info:
https://samiles.com/blog/2023/04/custom-youtube-to-podcast-generator-with-python/
"""
import os
import yt_dlp
import boto3
from feedgen.feed import FeedGenerator
from pathlib import Path
from urllib.parse import urljoin
from slugify import slugify
import requests
import feedparser
from tqdm import tqdm
from botocore.exceptions import ClientError
import time
S3_BUCKET = 'mypodcast'
session = boto3.Session(profile_name='personal')
s3_client = session.client('s3')
def download_thumbnail(url, output_path):
response = requests.get(url, stream=True)
response.raise_for_status()
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
def download_audio(url):
with yt_dlp.YoutubeDL() as ydl:
info = ydl.extract_info(url, download=False)
video_title = info['title']
thumbnail_url = info['thumbnail']
slug_title = slugify(video_title)
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'quiet': True,
'outtmpl': f'{slug_title}.%(ext)s',
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
return video_title, thumbnail_url
def download_rss_from_s3(local_rss_file_path, remote_rss_file_path):
try:
with open(local_rss_file_path, 'wb') as f:
s3_client.download_fileobj(S3_BUCKET, remote_rss_file_path, f)
return True
except ClientError as e:
if e.response['Error']['Code'] == '404':
return False
raise e
def upload_to_s3(file_path, remote_path):
file_size = os.path.getsize(file_path)
with open(file_path, 'rb') as f:
progress = tqdm(
total=file_size, unit='B', unit_scale=True, desc='Uploading', ncols=80
)
def callback(bytes_transferred):
progress.update(bytes_transferred)
config = boto3.s3.transfer.TransferConfig(use_threads=False)
transfer = boto3.s3.transfer.S3Transfer(s3_client, config)
transfer.upload_file(
file_path,
S3_BUCKET,
remote_path,
callback=callback,
extra_args={'ACL': 'public-read'},
)
progress.close()
public_url = f'https://{S3_BUCKET}.s3.amazonaws.com/{remote_path}'
return public_url
def update_rss_feed_s3(remote_audio_url, public_thumbnail_url, video_title, audio_length):
remote_rss_file_path = 'podcast.rss'
local_rss_file_path = 'podcast.rss'
# Check if the RSS file exists on S3
try:
s3_client.head_object(Bucket=S3_BUCKET, Key=remote_rss_file_path)
rss_file_found = True
except ClientError:
rss_file_found = False
rss_feed = FeedGenerator()
if rss_file_found:
# If the RSS file exists on S3, download and parse it
with open(local_rss_file_path, 'wb') as f:
s3_client.download_fileobj(S3_BUCKET, remote_rss_file_path, f)
with open(local_rss_file_path, 'r') as f:
existing_feed = feedparser.parse(f.read())
# Set feed-level information from the existing feed
rss_feed.title(existing_feed.feed.title)
# rss_feed.author(
# {'name': existing_feed.feed.author_detail.name, 'email': existing_feed.feed.author_detail.email})
rss_feed.link(href=existing_feed.feed.link, rel='alternate')
rss_feed.logo(existing_feed.feed.image.href)
rss_feed.subtitle(existing_feed.feed.subtitle)
rss_feed.language(existing_feed.feed.language)
# Add existing entries to the feed
for entry in existing_feed.entries:
e = rss_feed.add_entry()
e.id(entry.id)
e.title(entry.title)
e.link(href=entry.links[0].href, rel='enclosure', type=entry.links[0].type)
e.enclosure(entry.enclosures[0].href, entry.enclosures[0].length, entry.enclosures[0].type)
else:
# Initialize a new feed if the file does not exist
rss_feed.title('Sam\'s Podcast')
# rss_feed.author({'name': 'Sam', 'email': 'your_email@example.com'})
rss_feed.link(href='your_podcast_link', rel='alternate')
rss_feed.logo(public_thumbnail_url)
rss_feed.subtitle('Just my stuff')
rss_feed.language('en')
# Add the new entry to the feed
entry = rss_feed.add_entry()
entry.id(remote_audio_url)
entry.title(video_title)
entry.link(href=public_thumbnail_url, rel='enclosure', type='image/jpeg')
entry.enclosure(url=remote_audio_url, length=str(audio_length), type='audio/mpeg')
# Save the feed to a local file
rss_content = rss_feed.rss_str(pretty=True)
with open(local_rss_file_path, 'wb') as f:
f.write(rss_content)
# Upload the feed to S3
config = boto3.s3.transfer.TransferConfig(use_threads=False)
transfer = boto3.s3.transfer.S3Transfer(s3_client, config)
extra_args = {'ACL': 'public-read'}
transfer.upload_file(
local_rss_file_path,
S3_BUCKET,
remote_rss_file_path,
extra_args=extra_args,
)
def main():
start_time = time.time()
video_url = input("Enter the YouTube video URL: ")
video_title, thumbnail_url = download_audio(video_url)
print(f"Audio downloaded: {video_title}")
slug_title = slugify(video_title)
local_audio_file_path = f'{slug_title}.mp3'
local_thumbnail_file_path = f'{slug_title}.jpg'
audio_length = os.path.getsize(local_audio_file_path)
download_thumbnail(thumbnail_url, local_thumbnail_file_path)
print("Thumbnail downloaded.")
print("Uploading to S3.")
remote_audio_file_path = f'{slug_title}.mp3'
remote_thumbnail_file_path = f'{slug_title}.jpg'
public_audio_url = upload_to_s3(local_audio_file_path, remote_audio_file_path)
public_thumbnail_url = upload_to_s3(local_thumbnail_file_path, remote_thumbnail_file_path)
print("Audio and thumbnail uploaded to S3.")
print(f"Public audio URL: {public_audio_url}")
print(f"Public thumbnail URL: {public_thumbnail_url}")
local_rss_file_path = 'podcast.rss'
remote_rss_file_path = 'podcast.rss'
rss_file_found = download_rss_from_s3(local_rss_file_path, remote_rss_file_path)
if not rss_file_found:
print('RSS feed not found in S3. Creating a new one.')
with open(local_rss_file_path, 'w') as f:
f.write('')
rss_file_found = True
download_rss_from_s3(local_rss_file_path, remote_rss_file_path)
update_rss_feed_s3(public_audio_url, public_thumbnail_url, video_title, audio_length)
print("RSS feed updated.")
end_time = time.time()
time_taken = end_time - start_time
minutes = int(time_taken // 60)
seconds = int(time_taken % 60)
print("Time taken: {} mins {} secs".format(minutes, seconds))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment