Skip to content

Instantly share code, notes, and snippets.

@gorkemgoknar
Forked from reuben/coqui_api_stream.py
Last active October 19, 2023 13:20
Show Gist options
  • Save gorkemgoknar/367d36134c186b1939840fa5729a1c0a to your computer and use it in GitHub Desktop.
Save gorkemgoknar/367d36134c186b1939840fa5729a1c0a to your computer and use it in GitHub Desktop.
"""
Example of how to synthesize speech using the Coqui Studio API.
Streams the download/playback of the audio.
Usage:
$ COQUI_API_TOKEN="put your API token here" python coqui_api_stream.py --text "Hi there!"
To specify the voice to use, pass eg: `--voice 98d4af7d-aca0-4a70-a26e-4ca59023a248`
To save the audio to a file after playback, pass `--save_dest audio.wav`. Audio will be
written to the specified file path.
To use the V1 model instead of XTTS, pass `--model v1`.
"""
import argparse
import shutil
import subprocess
import requests
import os
from typing import Iterator
import nltk # we'll use this to split into sentences
nltk.download("punkt", quiet=True)
def is_installed(lib_name: str) -> bool:
lib = shutil.which(lib_name)
if lib is None:
return False
return True
def play(audio: bytes) -> None:
if not is_installed("ffplay"):
message = (
"ffplay from ffmpeg not found, necessary to play audio. "
"On mac you can install it with 'brew install ffmpeg'. "
"On linux and windows you can install it from https://ffmpeg.org/"
)
raise ValueError(message)
args = ["ffplay", "-autoexit", "-", "-nodisp"]
proc = subprocess.Popen(
args=args,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
)
out, err = proc.communicate(input=audio)
proc.poll()
def save(audio: bytes, filename: str) -> None:
with open(filename, "wb") as f:
f.write(audio)
def stream(audio_stream: Iterator[bytes]) -> bytes:
if not is_installed("mpv"):
message = (
"mpv not found, necessary to stream audio. "
"On mac you can install it with 'brew install mpv'. "
"On linux and windows you can install it from https://mpv.io/"
)
raise ValueError(message)
mpv_command = ["mpv", "--no-cache", "--no-terminal", "--", "fd://0"]
mpv_process = subprocess.Popen(
mpv_command,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
audio = b""
for chunk in audio_stream:
if chunk is not None:
mpv_process.stdin.write(chunk) # type: ignore
mpv_process.stdin.flush() # type: ignore
audio += chunk
if mpv_process.stdin:
mpv_process.stdin.close()
mpv_process.wait()
return audio
try:
COQUI_API_TOKEN = os.environ["COQUI_API_TOKEN"]
except KeyError:
raise RuntimeError("Set COQUI_API_TOKEN environment variable to your API key")
def tts(
text: str, voice_id: str, model: str = "xtts", language: str = "en"
) -> Iterator[bytes]:
if model == "xtts":
url = "https://app.coqui.ai/api/v2/samples/xtts/stream/"
else:
url = "https://app.coqui.ai/api/v2/samples?format=wav"
json_data = {
"text": text,
"voice_id": voice_id,
"language": language,
"speed": 1,
}
res = requests.post(
url,
json=json_data,
headers={"Authorization": f"Bearer {COQUI_API_TOKEN}"},
)
if 400 == res.status_code:
message = f"ERROR:{res.status_code}:{res.reason}: {res.text}:"
raise ValueError(message)
elif 401 <= res.status_code < 500:
print(res.reason)
message = f"There was a error from response , please make sure you set correct COQUI_API_TOKEN or make sure Text is not too long, status code: {res.status_code}: {res.text}:"
raise ValueError(message)
if res.status_code >= 500:
message = (
f"There seems to be error from server side status code: {res.status_code}"
)
raise ValueError(message)
for chunk in res.iter_content(chunk_size=2048):
if chunk:
yield chunk
if __name__ == "__main__":
parser = argparse.ArgumentParser()
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--text", help="Text to synthesize")
group.add_argument("--text_file_name", help="Text filename to synthesize")
parser.add_argument("--language", help="Language , default 'en'", default="en")
parser.add_argument(
"--voice",
help="ID of the voice to use for synthesis",
default="98d4af7d-aca0-4a70-a26e-4ca59023a248",
)
parser.add_argument(
"--save_dest", help="Optional path to save audio file to, after playback."
)
parser.add_argument(
"--model",
help="Which model to use for synthesis (V1 or XTTS), XTTS default",
choices=["v1", "xtts"],
default="xtts",
)
args = parser.parse_args()
if args.text_file_name is not None:
print("Processing text file:", args.text_file_name)
# Simple splitting text file into arrays
with open(args.text_file_name, "r", encoding="utf8") as f:
text = f.read()
# remove empty lines
text = "\n".join([ll.rstrip() for ll in text.splitlines() if ll.strip()])
# Split text into sentences
text = nltk.sent_tokenize(text.replace("\n", " ").strip())
else:
text = args.text
voice_id = args.voice
save_dest = args.save_dest
if type(text) == list:
audio = b""
for line in text:
print(line)
# Will speak each line then save the output
audio += stream(tts(line, voice_id, args.model, language=args.language))
else:
audio = stream(tts(text, voice_id, args.model, language=args.language))
if save_dest:
save(audio, save_dest)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment