Skip to content

Instantly share code, notes, and snippets.

@halflearned
Last active April 17, 2023 16:32
Show Gist options
  • Save halflearned/530eb6c3d01742434f86f4b8f4d886ff to your computer and use it in GitHub Desktop.
Save halflearned/530eb6c3d01742434f86f4b8f4d886ff to your computer and use it in GitHub Desktop.
Downloading youtube audio for vgg-sound
import argparse
import csv
import json
import subprocess
from multiprocessing import Pool
from os import remove, rename, makedirs
from os.path import exists, getsize
from time import sleep
def parse_arguments():
parser = argparse.ArgumentParser(description='Download audio from YouTube videos.')
parser.add_argument('--input-file', required=True, help='CSV file containing video data.')
parser.add_argument('--processes', type=int, default=8, help='Number of concurrent processes. (default: 8)')
return parser.parse_args()
def run_ytdlp(video_id, start_time):
"""Download audio from YouTube video and convert to wav."""
output_file = f"{video_id}-{start_time}_raw.%(ext)s"
command = [
'yt-dlp', '-x', '-f', 'bestaudio/best', '--audio-format', 'wav',
'--print-json', '--output', output_file,
f'https://www.youtube.com/watch?v={video_id}'
]
return subprocess.run(command, capture_output=True, text=True)
def is_download_successful(result, start_time, video_id):
"""Check if the download was successful and the file is large enough (i.e. not corrupted)"""
filename_raw = f"{video_id}-{start_time}_raw.wav"
return result.returncode == 0 and exists(filename_raw) and getsize(filename_raw) > 250
def is_retryable_error(result):
retryable = ["HTTP Error 429", "No such file or directory"]
return any(error_type in result.stderr for error_type in retryable)
def handle_retryable_errors(video_id, retry_count, max_retries):
"""Handle retryable errors by logging and waiting before retrying."""
if retry_count < max_retries:
wait_time = 30 * 2 ** retry_count # exponential backoff
print(f"Error: {video_id}: Waiting {wait_time} seconds (attempt {retry_count} of {max_retries})")
sleep(wait_time)
def run_ffmpeg(input_file, output_file, start_time, end_time):
"""Convert audio to 16kHz mono PCM WAV."""
command = [
'ffmpeg', '-i', input_file, '-vn', '-acodec', 'pcm_s16le',
'-ss', str(start_time), '-to', str(end_time), '-ar', '16000', '-ac', '1',
"--", output_file
]
return subprocess.run(command, capture_output=True, text=True)
def download_and_convert(args):
"""Download and convert audio from YouTube video."""
video_id, start_time, _, subset = args
output = {"video_id": video_id, "start_time": start_time}
if exists(f"{subset}/{video_id}-{start_time}.wav"):
print(f"{video_id}-{start_time} [already downloaded]")
output.update({"download_returncode": -1})
return output
max_attempts = 10
for attempt in range(max_attempts):
result = run_ytdlp(video_id, start_time)
# Stop if the file was downloaded successfully and is large enough (i.e. not corrupted)
if is_download_successful(result, start_time, video_id):
break
# Wait and retry if the error is retryable
elif is_retryable_error(result):
handle_retryable_errors(video_id, attempt, max_attempts)
continue
# Stop if the error is not retryable
else:
break
output.update({"download_returncode": result.returncode, "download_stderr": result.stderr})
if result.returncode == 0:
input_file = f"{video_id}-{start_time}_raw.wav"
output_file = f"{video_id}-{start_time}.wav"
start_time, end_time = int(start_time), int(start_time) + 10
result = run_ffmpeg(input_file, output_file, start_time, end_time)
if result.returncode == 0:
remove(input_file)
output.update({"conversion_returncode": result.returncode, "conversion_stderr": result.stderr})
# Move file to correct subset folder
if exists(f"{video_id}-{start_time}.wav"):
rename(f"{video_id}-{start_time}.wav", f"{subset}/{video_id}-{start_time}.wav")
print(f'{video_id}-{start_time}: {output["download_returncode"]}, {output["conversion_returncode"]}')
return output
def process_data(input_file, processes):
# Read CSV file
with open(input_file, 'r') as csvfile:
csvreader = csv.reader(csvfile)
data = [row for row in csvreader]
# Create subset folders
makedirs("train", exist_ok=True)
makedirs("test", exist_ok=True)
# Download and convert audio
if processes == 1:
# Use single process for debugging
return [download_and_convert(row) for row in data]
with Pool(processes) as pool:
# Use multiple processes
return pool.map(download_and_convert, data)
def main():
args = parse_arguments()
summary = process_data(args.input_file, args.processes)
with open("download_results.json", "w") as f:
json.dump(summary, f, indent=2)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment