-
-
Save jalbarl/ebb5c997aff58ab04b3c1a0179d5b154 to your computer and use it in GitHub Desktop.
async chunk queue for Python's `tts`
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# async_tts.py, for `TTS` pypi package (also requires `pydub`) | |
# $ pip install -U tts pydub | |
# | |
# v0.02 | |
# changes: | |
# - now uses pydub, normalizes audio | |
# | |
# performs a staggered execution / playback of the audio, | |
# where the next chunk is being processed while the previous is still playing | |
# | |
# this is just a quick test, and probably sub-optimal | |
import asyncio | |
import os | |
import platform | |
import re | |
import sys | |
import torch | |
import numpy as np | |
from TTS.api import TTS | |
from threading import Thread, Event | |
from pydub import AudioSegment | |
import soundfile as sf | |
def preprocess_text(text): | |
text = re.sub(r'(?<![.!?])\s*\n+', '. ', text) | |
text = re.sub(r' +', ' ', text) | |
return text.strip() | |
def split_text(text, max_length=400): | |
sentences = re.split(r'(?<=[.!?])\s+', text) | |
chunks = [] | |
current_chunk = "" | |
for sentence in sentences: | |
sentence = sentence.strip() | |
if len(current_chunk + ' ' + sentence) <= max_length: | |
current_chunk += f" {sentence}".strip() | |
else: | |
if current_chunk: | |
chunks.append(current_chunk) | |
current_chunk = sentence | |
while len(current_chunk) > max_length: | |
split_index = current_chunk.rfind(' ', 0, max_length) | |
if split_index == -1: | |
split_index = max_length | |
chunks.append(current_chunk[:split_index]) | |
current_chunk = current_chunk[split_index:].strip() | |
if current_chunk: | |
chunks.append(current_chunk) | |
return chunks | |
def play_audio(file_path, finished_event): | |
if platform.system() == "Windows": | |
os.system(f"start /wait {file_path}") | |
elif platform.system() == "Darwin": # macOS | |
os.system(f"afplay {file_path}") | |
else: # Linux | |
os.system(f"aplay {file_path}") | |
finished_event.set() | |
def normalize_audio(file_path): | |
sound = AudioSegment.from_file(file_path, format="wav") | |
normalized_sound = sound.apply_gain(sound.max_dBFS - sound.dBFS) # Normalizing the sound | |
return normalized_sound | |
async def synthesize_text(tts, text): | |
loop = asyncio.get_event_loop() | |
wav = await loop.run_in_executor(None, tts.tts, text) | |
if isinstance(wav, list): | |
wav = np.array(wav) | |
return wav | |
async def main_async(): | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
# model_name = 'tts_models/en/ljspeech/tacotron2-DDC' | |
# model_name = 'tts_models/en/ljspeech/vits--neon' | |
model_name = 'tts_models/en/jenny/jenny' | |
tts = TTS(model_name).to(device) | |
text = input("Enter text to synthesize (type 'exit' to quit): ") if len(sys.argv) <= 2 else open(sys.argv[2], 'r').read().strip() | |
if text.lower() == 'exit': | |
return | |
text = preprocess_text(text) | |
sentences = split_text(text) | |
finished_event = Event() | |
for i, sentence in enumerate(sentences): | |
wav = await synthesize_text(tts, sentence) | |
if len(wav.shape) == 1: | |
wav = np.expand_dims(wav, axis=1) | |
output_path = f"chunk_{i}.wav" | |
sf.write(output_path, wav, 44100) | |
normalized_sound = normalize_audio(output_path) | |
normalized_path = f"normalized_chunk_{i}.wav" | |
normalized_sound.export(normalized_path, format="wav") | |
if i > 0: | |
await asyncio.to_thread(finished_event.wait) | |
finished_event.clear() | |
Thread(target=play_audio, args=(normalized_path, finished_event)).start() | |
# Wait for the last audio to finish playing | |
await asyncio.to_thread(finished_event.wait) | |
print("All chunks processed and played.") | |
if __name__ == "__main__": | |
asyncio.run(main_async()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment