Created
January 5, 2024 23:20
-
-
Save moomou/d5ff6af6716d20b33026f53f209502af to your computer and use it in GitHub Desktop.
MusicCaps data download script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import multiprocessing | |
import time | |
import csv | |
import subprocess | |
import pathlib as pl | |
import os | |
def format_seconds(seconds): | |
# Calculate hours, minutes, and seconds | |
hours, remainder = divmod(seconds, 3600) | |
minutes, seconds = divmod(remainder, 60) | |
# Format the result as HH:MM:SS | |
formatted_time = "{:02}:{:02}:{:02}".format(int(hours), int(minutes), int(seconds)) | |
return formatted_time | |
def download_audio_with_start_end_time(ytid, output_path, start_sec, end_sec): | |
video_url = 'https://www.youtube.com/watch?v=' + ytid | |
start_time = format_seconds(int(start_sec)) | |
end_time = format_seconds(int(end_sec)) | |
output_path_wt_suffix = pl.Path(str(output_path)+".mp3") | |
output_path = pl.Path(output_path) | |
if output_path_wt_suffix.exists(): | |
command = [ | |
"ffmpeg", | |
"-v", "error", | |
"-i", output_path_wt_suffix, | |
"-f", "null", | |
"-" | |
] | |
result = subprocess.run(command, capture_output=True, text=True) | |
# Check the return code to see if there were any issues | |
if result.returncode == 0: | |
print('good', output_path_wt_suffix) | |
return | |
else: | |
print('bad', output_path_wt_suffix, 'removing') | |
os.remove(output_path_wt_suffix) | |
cmd = [ | |
'yt-dlp', | |
'-x', | |
'--audio-format', 'mp3', | |
'--extract-audio', | |
'--audio-quality', '0', | |
'--postprocessor-args', f'-ss {start_time} -to {end_time}', | |
'-o', output_path, | |
video_url | |
] | |
try: | |
subprocess.run(cmd, check=True) | |
except Exception as e: | |
print(e) | |
def consumer(queue): | |
while True: | |
args = queue.get() | |
if args is None: | |
return | |
# print(f"Consuming {args}") | |
download_audio_with_start_end_time(*args) | |
def main(): | |
output_dir = pl.Path("videos") | |
output_dir.mkdir(exist_ok=True) | |
shared_queue = multiprocessing.Queue() | |
consumer_process = multiprocessing.Process(target=consumer, args=(shared_queue,)) | |
consumer_process.start() | |
# Open the CSV file | |
with open('musiccaps-public.csv', mode='r') as csv_file: | |
# Create a CSV reader | |
csv_reader = csv.DictReader(csv_file) | |
# Iterate over each row in the CSV file | |
for i, row in enumerate(csv_reader): | |
# Each row is a dictionary where the keys are the column names | |
shared_queue.put(( | |
row['ytid'], | |
output_dir / f"{row['ytid']}-{i}", | |
row['start_s'], | |
row['end_s'], | |
)) | |
shared_queue.put(None) | |
# Wait for the consumer to finish | |
consumer_process.join() | |
print("All processes have finished.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment