Skip to content

Instantly share code, notes, and snippets.

@jalbarl
Forked from FlyingFathead/async_tts.py
Created February 6, 2025 21:59
Show Gist options
  • Save jalbarl/ebb5c997aff58ab04b3c1a0179d5b154 to your computer and use it in GitHub Desktop.
Save jalbarl/ebb5c997aff58ab04b3c1a0179d5b154 to your computer and use it in GitHub Desktop.
async chunk queue for Python's `tts`
# async_tts.py, for `TTS` pypi package (also requires `pydub`)
# $ pip install -U tts pydub
#
# v0.02
# changes:
# - now uses pydub, normalizes audio
#
# performs a staggered execution / playback of the audio,
# where the next chunk is being processed while the previous is still playing
#
# this is just a quick test, and probably sub-optimal
import asyncio
import os
import platform
import re
import sys
import torch
import numpy as np
from TTS.api import TTS
from threading import Thread, Event
from pydub import AudioSegment
import soundfile as sf
def preprocess_text(text):
text = re.sub(r'(?<![.!?])\s*\n+', '. ', text)
text = re.sub(r' +', ' ', text)
return text.strip()
def split_text(text, max_length=400):
sentences = re.split(r'(?<=[.!?])\s+', text)
chunks = []
current_chunk = ""
for sentence in sentences:
sentence = sentence.strip()
if len(current_chunk + ' ' + sentence) <= max_length:
current_chunk += f" {sentence}".strip()
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = sentence
while len(current_chunk) > max_length:
split_index = current_chunk.rfind(' ', 0, max_length)
if split_index == -1:
split_index = max_length
chunks.append(current_chunk[:split_index])
current_chunk = current_chunk[split_index:].strip()
if current_chunk:
chunks.append(current_chunk)
return chunks
def play_audio(file_path, finished_event):
if platform.system() == "Windows":
os.system(f"start /wait {file_path}")
elif platform.system() == "Darwin": # macOS
os.system(f"afplay {file_path}")
else: # Linux
os.system(f"aplay {file_path}")
finished_event.set()
def normalize_audio(file_path):
sound = AudioSegment.from_file(file_path, format="wav")
normalized_sound = sound.apply_gain(sound.max_dBFS - sound.dBFS) # Normalizing the sound
return normalized_sound
async def synthesize_text(tts, text):
loop = asyncio.get_event_loop()
wav = await loop.run_in_executor(None, tts.tts, text)
if isinstance(wav, list):
wav = np.array(wav)
return wav
async def main_async():
device = "cuda" if torch.cuda.is_available() else "cpu"
# model_name = 'tts_models/en/ljspeech/tacotron2-DDC'
# model_name = 'tts_models/en/ljspeech/vits--neon'
model_name = 'tts_models/en/jenny/jenny'
tts = TTS(model_name).to(device)
text = input("Enter text to synthesize (type 'exit' to quit): ") if len(sys.argv) <= 2 else open(sys.argv[2], 'r').read().strip()
if text.lower() == 'exit':
return
text = preprocess_text(text)
sentences = split_text(text)
finished_event = Event()
for i, sentence in enumerate(sentences):
wav = await synthesize_text(tts, sentence)
if len(wav.shape) == 1:
wav = np.expand_dims(wav, axis=1)
output_path = f"chunk_{i}.wav"
sf.write(output_path, wav, 44100)
normalized_sound = normalize_audio(output_path)
normalized_path = f"normalized_chunk_{i}.wav"
normalized_sound.export(normalized_path, format="wav")
if i > 0:
await asyncio.to_thread(finished_event.wait)
finished_event.clear()
Thread(target=play_audio, args=(normalized_path, finished_event)).start()
# Wait for the last audio to finish playing
await asyncio.to_thread(finished_event.wait)
print("All chunks processed and played.")
if __name__ == "__main__":
asyncio.run(main_async())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment