Skip to content

Instantly share code, notes, and snippets.

@idatsy
Created February 10, 2024 14:15
Show Gist options
  • Save idatsy/19cb92e5481a28f8a30a7b71fe8a9b35 to your computer and use it in GitHub Desktop.
Save idatsy/19cb92e5481a28f8a30a7b71fe8a9b35 to your computer and use it in GitHub Desktop.
Download youtube audio and remove silences
import logging
import multiprocessing
import re
from pathlib import Path
from typing import Any
from pydub import silence, AudioSegment
from pydub.utils import make_chunks
from pytube import YouTube
from tqdm import tqdm
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger(__name__)
progress_bar = None
def on_progress(stream: Any, chunk: bytes, _bytes_remaining: int):
global progress_bar
if progress_bar is None:
total_size = stream.filesize
progress_bar = tqdm(total=total_size, unit='B', unit_scale=True, desc="Downloading")
progress_bar.update(len(chunk))
def on_complete(_stream: Any, filepath: str):
global progress_bar
if progress_bar:
progress_bar.close()
logger.info(f"Download completed to: {filepath}")
def clean_filename(filename: str) -> str:
# Replace any character that is not a letter, number, space, or underscore with nothing
cleaned_filename = re.sub(r'[^\w\s]', '', filename)
# Replace spaces with underscores to avoid issues in file names
cleaned_filename = re.sub(r'\s+', '_', cleaned_filename)
return cleaned_filename.lower()
def download_youtube_audio(url: str) -> Path:
yt = YouTube(url, on_progress_callback=on_progress, on_complete_callback=on_complete)
logger.info(f"Downloading: '{yt.title}' @ {yt.length} seconds by '{yt.author}'")
video = yt.streams.filter(only_audio=True).first()
(Path(__file__).parent / 'audio').mkdir(parents=True, exist_ok=True)
out_file = video.download(output_path="audio")
audio = AudioSegment.from_file(out_file)
video_name = clean_filename(yt.title) + '.wav'
new_file = Path(__file__).parent / "audio" / video_name
audio.export(new_file, format='wav')
logger.info(f"Wav file saved to: {new_file}")
return new_file
def process_chunk(chunk_data):
chunk, min_silence_len, silence_thresh = chunk_data
non_silents = silence.detect_nonsilent(
chunk, min_silence_len=min_silence_len, silence_thresh=chunk.dBFS - silence_thresh
)
processed_chunk = AudioSegment.empty()
for start_i, end_i in non_silents:
processed_chunk += chunk[start_i:end_i]
return processed_chunk
def remove_silence(audio_file: Path):
logger.info(f"Processing audio {audio_file.name}")
audio = AudioSegment.from_file(audio_file)
# Parameters for silence detection
silence_thresh = 30
min_silence_len = 100
# Use all available CPU cores to split the audio into chunks
num_cores = multiprocessing.cpu_count()
chunk_length_ms = len(audio) // num_cores
logger.info(f"Splitting audio into {num_cores} chunks, each ~{chunk_length_ms}ms")
chunks = make_chunks(audio, chunk_length_ms)
chunk_data = [(chunk, min_silence_len, silence_thresh) for chunk in chunks]
pool = multiprocessing.Pool(processes=num_cores)
processed_chunks = pool.map(process_chunk, chunk_data)
pool.close()
pool.join()
concatenated_audio = sum(processed_chunks, AudioSegment.empty())
output_file = audio_file.with_name(f"{audio_file.stem}_processed.wav")
concatenated_audio.export(output_file, format="wav")
logger.info(f"Processed audio file saved: {output_file.name}")
if __name__ == "__main__":
url = "https://www.youtube.com/watch?v=zMYvGf7BA9o&ab_channel=LexFridman"
downloaded_audio = download_youtube_audio(url)
remove_silence(downloaded_audio)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment