Last active
February 3, 2025 09:38
-
-
Save netixc/d28f3ac3ba774f67d8df7310ec53cdab to your computer and use it in GitHub Desktop.
chatbot ollama/tts kokoro/stt speaches
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pyaudio | |
import wave | |
import time | |
import warnings | |
from openai import OpenAI | |
import json | |
import sys | |
import io | |
import numpy | |
import requests | |
import os | |
# Ignore DeprecationWarning | |
warnings.filterwarnings("ignore", category=DeprecationWarning) | |
class Config: | |
DEBUG = False # Enable debug output | |
USE_TEXT_TO_SPEECH = True | |
USE_SPEECH_TO_TEXT = True | |
AUDIO_RATE = 24000 | |
MAX_RESPONSE_TOKENS = 150 | |
STT_LANGUAGE = "en" # Default language for speech recognition | |
# Model Names | |
MODEL_NAME = "llama3.2:latest" # Chat model | |
TTS_MODEL = "kokoro" # TTS model | |
STT_MODEL = "Systran/faster-whisper-medium" # STT model | |
# API Base URLs | |
BASE_URL_CHAT = 'http://xxx.xxx.xx.xx:11434/v1' | |
BASE_URL_TTS = 'http://xxx.xxx.xx.xx:8880/v1' | |
BASE_URL_STT = 'http://xxx.xxx.xx.xx:8000/v1' | |
# Voice Options | |
VOICE_ID = "af" | |
def debug_print(message): | |
"""Print debug messages to stderr with proper formatting""" | |
if Config.DEBUG: | |
print(f"\033[90m[DEBUG] {message}\033[0m", file=sys.stderr, flush=True) | |
class AudioRecorder: | |
def __init__(self): | |
self.chunk = 1024 | |
self.format = pyaudio.paInt16 | |
self.channels = 1 | |
self.rate = 16000 | |
self.threshold = 30 # Lower threshold for better sensitivity | |
self.silence_limit = 1.5 # Time of silence before stopping | |
self.pre_buffer_size = 10 # Number of chunks to keep before speech detection | |
self.p = pyaudio.PyAudio() | |
# Print all available input devices | |
print("\nAvailable Audio Devices:") | |
info = self.p.get_host_api_info_by_index(0) | |
numdevices = info.get('deviceCount') | |
self.input_device_index = None | |
for i in range(0, numdevices): | |
device_info = self.p.get_device_info_by_index(i) | |
if device_info.get('maxInputChannels') > 0: | |
print(f"Input Device id {i} - {device_info.get('name')}") | |
# Select the first input device we find | |
if self.input_device_index is None: | |
self.input_device_index = i | |
print(f"Selected device {i} for input") | |
if self.input_device_index is None: | |
print("No input devices found!") | |
raise Exception("No audio input devices available") | |
def is_speech(self, data_chunk): | |
"""Check if the audio chunk contains speech""" | |
try: | |
data_chunk = numpy.frombuffer(data_chunk, dtype=numpy.int16) | |
rms = numpy.sqrt(numpy.mean(numpy.square(data_chunk))) | |
if Config.DEBUG: | |
print(f"Current audio level: {rms}") | |
return rms > self.threshold | |
except Exception as e: | |
print(f"Error in is_speech: {e}", file=sys.stderr) | |
return False | |
def record_audio(self): | |
"""Record audio with voice activity detection and pre-buffer""" | |
print("\nListening... (Speak to start recording)") | |
print(f"Using threshold: {self.threshold}") | |
try: | |
stream = self.p.open( | |
format=self.format, | |
channels=self.channels, | |
rate=self.rate, | |
input=True, | |
input_device_index=self.input_device_index, | |
frames_per_buffer=self.chunk | |
) | |
print("Audio stream opened successfully") | |
# Pre-buffer to catch early speech | |
pre_buffer = [] | |
frames = [] | |
recording_started = False | |
silence_frames = 0 | |
max_frames = int(self.rate * 30 / self.chunk) # 30 seconds maximum | |
# Start with a small delay to ensure system is ready | |
time.sleep(0.1) | |
while True: | |
try: | |
data = stream.read(self.chunk, exception_on_overflow=False) | |
# Keep a rolling buffer of recent audio | |
pre_buffer.append(data) | |
if len(pre_buffer) > self.pre_buffer_size: | |
pre_buffer.pop(0) | |
if self.is_speech(data): | |
if not recording_started: | |
print("Speech detected! Recording started...") | |
recording_started = True | |
# Add pre-buffer to capture early speech | |
frames.extend(pre_buffer) | |
silence_frames = 0 | |
frames.append(data) | |
elif recording_started: | |
silence_frames += 1 | |
frames.append(data) | |
silence_duration = silence_frames * self.chunk / self.rate | |
if silence_duration > self.silence_limit: | |
print("Silence detected, stopping recording...") | |
break | |
if len(frames) >= max_frames: | |
print("Maximum recording time reached") | |
break | |
except IOError as e: | |
print(f"IOError during recording: {e}") | |
continue | |
except Exception as e: | |
print(f"Error during recording: {e}", file=sys.stderr) | |
return None | |
finally: | |
print("Recording finished.") | |
stream.stop_stream() | |
stream.close() | |
if not frames: | |
print("No audio data recorded") | |
return None | |
try: | |
wav_buffer = io.BytesIO() | |
with wave.open(wav_buffer, 'wb') as wf: | |
wf.setnchannels(self.channels) | |
wf.setsampwidth(self.p.get_sample_size(self.format)) | |
wf.setframerate(self.rate) | |
wf.writeframes(b''.join(frames)) | |
print("WAV file created successfully") | |
return wav_buffer.getvalue() | |
except Exception as e: | |
print(f"Error creating WAV file: {e}") | |
return None | |
def cleanup(self): | |
self.p.terminate() | |
class AudioManager: | |
@staticmethod | |
def setup_stream(): | |
p = pyaudio.PyAudio() | |
stream = p.open( | |
format=pyaudio.paInt16, | |
channels=1, | |
rate=Config.AUDIO_RATE, | |
output=True | |
) | |
return stream, p | |
@staticmethod | |
def play_audio(client, text, voice=Config.VOICE_ID): | |
if not text.strip(): | |
return | |
stream, p = AudioManager.setup_stream() | |
try: | |
# Add natural pause at start | |
AudioManager.add_silence(stream, 0.2) | |
with client.audio.speech.with_streaming_response.create( | |
model=Config.TTS_MODEL, | |
voice=voice, | |
response_format="pcm", | |
input=text | |
) as response: | |
for chunk in response.iter_bytes(chunk_size=1024): | |
if chunk: | |
stream.write(chunk) | |
# Add natural pause at end | |
AudioManager.add_silence(stream, 0.1) | |
except Exception as e: | |
print(f"\nTTS Error: {e}", file=sys.stderr) | |
finally: | |
stream.stop_stream() | |
stream.close() | |
p.terminate() | |
@staticmethod | |
def add_silence(stream, duration): | |
silence = b'\x00' * int(Config.AUDIO_RATE * duration) | |
stream.write(silence) | |
class ConversationManager: | |
def __init__(self): | |
debug_print("Initializing ConversationManager") | |
self.setup_clients() | |
self.conversation_history = [] | |
self.load_system_prompt() | |
self.audio_recorder = AudioRecorder() | |
def setup_clients(self): | |
debug_print("Setting up API clients") | |
try: | |
self.chatbot_client = OpenAI(base_url=Config.BASE_URL_CHAT, api_key='ollama') | |
self.tts_client = OpenAI(api_key="sk-111111111", base_url=Config.BASE_URL_TTS) | |
debug_print("Successfully connected to servers") | |
except Exception as e: | |
print(f"Error setting up clients: {e}", file=sys.stderr) | |
raise | |
def load_system_prompt(self): | |
try: | |
with open("system_prompt.txt", "r", encoding='utf-8') as file: | |
base_prompt = file.read().strip() | |
debug_print("Loaded system prompt successfully") | |
except FileNotFoundError: | |
base_prompt = """You are Sarah, a friendly and casual conversational partner. Keep responses short and natural: | |
- Match the length of user messages | |
- Use 1-2 short sentences for brief replies | |
- Stay concise and to the point | |
- Talk casually, like a friend | |
- Ask one question at a time""" | |
debug_print("Using default system prompt") | |
self.conversation_history = [{"role": "system", "content": base_prompt}] | |
def get_user_input(self): | |
"""Get user input either through voice or text""" | |
if Config.USE_SPEECH_TO_TEXT: | |
try: | |
# Record audio | |
audio_data = self.audio_recorder.record_audio() | |
if audio_data is None: | |
print("No speech detected. Please try again.") | |
return "" | |
# Save temporary file | |
with open("temp_audio.wav", "wb") as f: | |
f.write(audio_data) | |
# Prepare the files and data for the request | |
url = f"{Config.BASE_URL_STT}/audio/transcriptions" | |
# Open the file in binary mode | |
with open("temp_audio.wav", "rb") as audio_file: | |
# Create the files dictionary with the correct field name 'file' | |
files = { | |
'file': ('audio.wav', audio_file, 'audio/wav') | |
} | |
data = { | |
'model': Config.STT_MODEL, | |
'language': Config.STT_LANGUAGE | |
} | |
print(f"\nSending STT request to: {url}") | |
response = requests.post(url, files=files, data=data) | |
print(f"Response status: {response.status_code}") | |
if response.status_code == 200: | |
try: | |
result = response.json() | |
user_input = result.get("text", "").strip() | |
print(f"You (voice): {user_input}") | |
return user_input | |
except Exception as e: | |
print(f"Error parsing response: {e}") | |
return input("Please type your message: ").strip() | |
else: | |
print(f"STT Error: Status code {response.status_code}") | |
print(f"Error details: {response.text}") | |
return input("Please type your message: ").strip() | |
# Cleanup temporary file | |
if os.path.exists("temp_audio.wav"): | |
os.remove("temp_audio.wav") | |
except Exception as e: | |
print(f"\nError with speech recognition: {e}", file=sys.stderr) | |
print("Falling back to text input...") | |
return input("You: ").strip() | |
else: | |
return input("You: ").strip() | |
def generate_response(self, user_input): | |
if not user_input.strip(): | |
return | |
debug_print(f"Generating response for input: {user_input}") | |
try: | |
debug_print("Testing API connection...") | |
test_response = self.chatbot_client.chat.completions.create( | |
model=Config.MODEL_NAME, | |
messages=[{"role": "user", "content": "test"}], | |
max_tokens=5, | |
stream=False | |
) | |
debug_print("API connection test successful") | |
self.conversation_history.append({"role": "user", "content": user_input}) | |
debug_print("Starting response generation") | |
# Create the stream | |
stream = self.chatbot_client.chat.completions.create( | |
model=Config.MODEL_NAME, | |
messages=self.conversation_history, | |
stream=True, | |
max_tokens=Config.MAX_RESPONSE_TOKENS, | |
temperature=0.7 | |
) | |
# Initialize response tracking | |
complete_response = "" | |
current_sentence = "" | |
response_started = False | |
# Process the stream | |
for chunk in stream: | |
if chunk.choices[0].delta.content is not None: | |
text_chunk = chunk.choices[0].delta.content | |
# Print AI prefix only once at the start | |
if not response_started: | |
print("\nAI: ", end="", flush=True) | |
response_started = True | |
# Print the text chunk | |
print(text_chunk, end="", flush=True) | |
current_sentence += text_chunk | |
complete_response += text_chunk | |
# Process complete sentences | |
if any(text_chunk.endswith(p) for p in ".!?") and len(current_sentence.strip()) > 0: | |
sentence = current_sentence.strip() | |
if Config.USE_TEXT_TO_SPEECH and len(sentence) > 0: | |
# Ensure debug message appears on new line | |
print("\n", end="", flush=True) | |
debug_print(f"Processing TTS for: {sentence}") | |
AudioManager.play_audio(self.tts_client, sentence) | |
current_sentence = "" | |
# Add newline after complete response | |
print("\n") | |
# Handle any remaining text | |
if current_sentence.strip(): | |
final_text = current_sentence.strip() | |
if Config.USE_TEXT_TO_SPEECH: | |
debug_print(f"Processing final TTS: {final_text}") | |
AudioManager.play_audio(self.tts_client, final_text) | |
self.conversation_history.append({ | |
"role": "assistant", | |
"content": complete_response.strip() | |
}) | |
debug_print("Response completed successfully") | |
return complete_response.strip() | |
except Exception as e: | |
print(f"\nError generating response: {str(e)}", file=sys.stderr) | |
debug_print(f"Error details: {str(e)}") | |
return "I encountered an error. Could you please repeat that?" | |
def cleanup(self): | |
"""Cleanup resources""" | |
if hasattr(self, 'audio_recorder'): | |
self.audio_recorder.cleanup() | |
def main(): | |
print("\nChatbot Configuration:") | |
print(f"Text-to-Speech: {'Enabled' if Config.USE_TEXT_TO_SPEECH else 'Disabled'}") | |
print(f"Speech-to-Text: {'Enabled' if Config.USE_SPEECH_TO_TEXT else 'Disabled'}") | |
print("\nType 'exit', 'quit', or 'bye' to end the conversation\n") | |
try: | |
conversation = ConversationManager() | |
last_input_empty = False | |
while True: | |
# Get user input with proper formatting | |
if not last_input_empty: | |
print() # Add space before input prompt | |
user_input = conversation.get_user_input() | |
last_input_empty = not bool(user_input.strip()) | |
if user_input.lower() in ["exit", "quit", "bye"]: | |
print("\nAI: Goodbye! It was nice chatting with you!\n") | |
if Config.USE_TEXT_TO_SPEECH: | |
AudioManager.play_audio(conversation.tts_client, "Goodbye! It was nice chatting with you!") | |
break | |
if user_input.strip(): | |
response = conversation.generate_response(user_input) | |
if not response: | |
debug_print("No response generated") | |
except KeyboardInterrupt: | |
print("\nExiting chatbot...") | |
except Exception as e: | |
print(f"\nAn unexpected error occurred: {e}", file=sys.stderr) | |
debug_print(f"Critical error: {str(e)}") | |
finally: | |
conversation.cleanup() | |
print("\nChatbot session ended.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment