Skip to content

Instantly share code, notes, and snippets.

@FlyingFathead
Last active February 2, 2024 22:11
Show Gist options
  • Save FlyingFathead/1a7a1345555e4f4c38139a7547483262 to your computer and use it in GitHub Desktop.
Save FlyingFathead/1a7a1345555e4f4c38139a7547483262 to your computer and use it in GitHub Desktop.
async chunk queue for Python's `tts`
# async_tts.py, for `TTS` pypi package (also requires `pydub`)
# $ pip install -U tts pydub
#
# v0.02
# changes:
# - now uses pydub, normalizes audio
#
# performs a staggered execution / playback of the audio,
# where the next chunk is being processed while the previous is still playing
#
# this is just a quick test, and probably sub-optimal
import asyncio
import os
import platform
import re
import sys
import torch
import numpy as np
from TTS.api import TTS
from threading import Thread, Event
from pydub import AudioSegment
import soundfile as sf
def preprocess_text(text):
text = re.sub(r'(?<![.!?])\s*\n+', '. ', text)
text = re.sub(r' +', ' ', text)
return text.strip()
def split_text(text, max_length=400):
sentences = re.split(r'(?<=[.!?])\s+', text)
chunks = []
current_chunk = ""
for sentence in sentences:
sentence = sentence.strip()
if len(current_chunk + ' ' + sentence) <= max_length:
current_chunk += f" {sentence}".strip()
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = sentence
while len(current_chunk) > max_length:
split_index = current_chunk.rfind(' ', 0, max_length)
if split_index == -1:
split_index = max_length
chunks.append(current_chunk[:split_index])
current_chunk = current_chunk[split_index:].strip()
if current_chunk:
chunks.append(current_chunk)
return chunks
def play_audio(file_path, finished_event):
if platform.system() == "Windows":
os.system(f"start /wait {file_path}")
elif platform.system() == "Darwin": # macOS
os.system(f"afplay {file_path}")
else: # Linux
os.system(f"aplay {file_path}")
finished_event.set()
def normalize_audio(file_path):
sound = AudioSegment.from_file(file_path, format="wav")
normalized_sound = sound.apply_gain(sound.max_dBFS - sound.dBFS) # Normalizing the sound
return normalized_sound
async def synthesize_text(tts, text):
loop = asyncio.get_event_loop()
wav = await loop.run_in_executor(None, tts.tts, text)
if isinstance(wav, list):
wav = np.array(wav)
return wav
async def main_async():
device = "cuda" if torch.cuda.is_available() else "cpu"
# model_name = 'tts_models/en/ljspeech/tacotron2-DDC'
# model_name = 'tts_models/en/ljspeech/vits--neon'
model_name = 'tts_models/en/jenny/jenny'
tts = TTS(model_name).to(device)
text = input("Enter text to synthesize (type 'exit' to quit): ") if len(sys.argv) <= 2 else open(sys.argv[2], 'r').read().strip()
if text.lower() == 'exit':
return
text = preprocess_text(text)
sentences = split_text(text)
finished_event = Event()
for i, sentence in enumerate(sentences):
wav = await synthesize_text(tts, sentence)
if len(wav.shape) == 1:
wav = np.expand_dims(wav, axis=1)
output_path = f"chunk_{i}.wav"
sf.write(output_path, wav, 44100)
normalized_sound = normalize_audio(output_path)
normalized_path = f"normalized_chunk_{i}.wav"
normalized_sound.export(normalized_path, format="wav")
if i > 0:
await asyncio.to_thread(finished_event.wait)
finished_event.clear()
Thread(target=play_audio, args=(normalized_path, finished_event)).start()
# Wait for the last audio to finish playing
await asyncio.to_thread(finished_event.wait)
print("All chunks processed and played.")
if __name__ == "__main__":
asyncio.run(main_async())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment