Skip to content

Instantly share code, notes, and snippets.

@netixc
Last active February 3, 2025 09:38
Show Gist options
  • Save netixc/d28f3ac3ba774f67d8df7310ec53cdab to your computer and use it in GitHub Desktop.
Save netixc/d28f3ac3ba774f67d8df7310ec53cdab to your computer and use it in GitHub Desktop.
chatbot ollama/tts kokoro/stt speaches
import pyaudio
import wave
import time
import warnings
from openai import OpenAI
import json
import sys
import io
import numpy
import requests
import os
# Ignore DeprecationWarning
warnings.filterwarnings("ignore", category=DeprecationWarning)
class Config:
DEBUG = False # Enable debug output
USE_TEXT_TO_SPEECH = True
USE_SPEECH_TO_TEXT = True
AUDIO_RATE = 24000
MAX_RESPONSE_TOKENS = 150
STT_LANGUAGE = "en" # Default language for speech recognition
# Model Names
MODEL_NAME = "llama3.2:latest" # Chat model
TTS_MODEL = "kokoro" # TTS model
STT_MODEL = "Systran/faster-whisper-medium" # STT model
# API Base URLs
BASE_URL_CHAT = 'http://xxx.xxx.xx.xx:11434/v1'
BASE_URL_TTS = 'http://xxx.xxx.xx.xx:8880/v1'
BASE_URL_STT = 'http://xxx.xxx.xx.xx:8000/v1'
# Voice Options
VOICE_ID = "af"
def debug_print(message):
"""Print debug messages to stderr with proper formatting"""
if Config.DEBUG:
print(f"\033[90m[DEBUG] {message}\033[0m", file=sys.stderr, flush=True)
class AudioRecorder:
def __init__(self):
self.chunk = 1024
self.format = pyaudio.paInt16
self.channels = 1
self.rate = 16000
self.threshold = 30 # Lower threshold for better sensitivity
self.silence_limit = 1.5 # Time of silence before stopping
self.pre_buffer_size = 10 # Number of chunks to keep before speech detection
self.p = pyaudio.PyAudio()
# Print all available input devices
print("\nAvailable Audio Devices:")
info = self.p.get_host_api_info_by_index(0)
numdevices = info.get('deviceCount')
self.input_device_index = None
for i in range(0, numdevices):
device_info = self.p.get_device_info_by_index(i)
if device_info.get('maxInputChannels') > 0:
print(f"Input Device id {i} - {device_info.get('name')}")
# Select the first input device we find
if self.input_device_index is None:
self.input_device_index = i
print(f"Selected device {i} for input")
if self.input_device_index is None:
print("No input devices found!")
raise Exception("No audio input devices available")
def is_speech(self, data_chunk):
"""Check if the audio chunk contains speech"""
try:
data_chunk = numpy.frombuffer(data_chunk, dtype=numpy.int16)
rms = numpy.sqrt(numpy.mean(numpy.square(data_chunk)))
if Config.DEBUG:
print(f"Current audio level: {rms}")
return rms > self.threshold
except Exception as e:
print(f"Error in is_speech: {e}", file=sys.stderr)
return False
def record_audio(self):
"""Record audio with voice activity detection and pre-buffer"""
print("\nListening... (Speak to start recording)")
print(f"Using threshold: {self.threshold}")
try:
stream = self.p.open(
format=self.format,
channels=self.channels,
rate=self.rate,
input=True,
input_device_index=self.input_device_index,
frames_per_buffer=self.chunk
)
print("Audio stream opened successfully")
# Pre-buffer to catch early speech
pre_buffer = []
frames = []
recording_started = False
silence_frames = 0
max_frames = int(self.rate * 30 / self.chunk) # 30 seconds maximum
# Start with a small delay to ensure system is ready
time.sleep(0.1)
while True:
try:
data = stream.read(self.chunk, exception_on_overflow=False)
# Keep a rolling buffer of recent audio
pre_buffer.append(data)
if len(pre_buffer) > self.pre_buffer_size:
pre_buffer.pop(0)
if self.is_speech(data):
if not recording_started:
print("Speech detected! Recording started...")
recording_started = True
# Add pre-buffer to capture early speech
frames.extend(pre_buffer)
silence_frames = 0
frames.append(data)
elif recording_started:
silence_frames += 1
frames.append(data)
silence_duration = silence_frames * self.chunk / self.rate
if silence_duration > self.silence_limit:
print("Silence detected, stopping recording...")
break
if len(frames) >= max_frames:
print("Maximum recording time reached")
break
except IOError as e:
print(f"IOError during recording: {e}")
continue
except Exception as e:
print(f"Error during recording: {e}", file=sys.stderr)
return None
finally:
print("Recording finished.")
stream.stop_stream()
stream.close()
if not frames:
print("No audio data recorded")
return None
try:
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, 'wb') as wf:
wf.setnchannels(self.channels)
wf.setsampwidth(self.p.get_sample_size(self.format))
wf.setframerate(self.rate)
wf.writeframes(b''.join(frames))
print("WAV file created successfully")
return wav_buffer.getvalue()
except Exception as e:
print(f"Error creating WAV file: {e}")
return None
def cleanup(self):
self.p.terminate()
class AudioManager:
@staticmethod
def setup_stream():
p = pyaudio.PyAudio()
stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=Config.AUDIO_RATE,
output=True
)
return stream, p
@staticmethod
def play_audio(client, text, voice=Config.VOICE_ID):
if not text.strip():
return
stream, p = AudioManager.setup_stream()
try:
# Add natural pause at start
AudioManager.add_silence(stream, 0.2)
with client.audio.speech.with_streaming_response.create(
model=Config.TTS_MODEL,
voice=voice,
response_format="pcm",
input=text
) as response:
for chunk in response.iter_bytes(chunk_size=1024):
if chunk:
stream.write(chunk)
# Add natural pause at end
AudioManager.add_silence(stream, 0.1)
except Exception as e:
print(f"\nTTS Error: {e}", file=sys.stderr)
finally:
stream.stop_stream()
stream.close()
p.terminate()
@staticmethod
def add_silence(stream, duration):
silence = b'\x00' * int(Config.AUDIO_RATE * duration)
stream.write(silence)
class ConversationManager:
def __init__(self):
debug_print("Initializing ConversationManager")
self.setup_clients()
self.conversation_history = []
self.load_system_prompt()
self.audio_recorder = AudioRecorder()
def setup_clients(self):
debug_print("Setting up API clients")
try:
self.chatbot_client = OpenAI(base_url=Config.BASE_URL_CHAT, api_key='ollama')
self.tts_client = OpenAI(api_key="sk-111111111", base_url=Config.BASE_URL_TTS)
debug_print("Successfully connected to servers")
except Exception as e:
print(f"Error setting up clients: {e}", file=sys.stderr)
raise
def load_system_prompt(self):
try:
with open("system_prompt.txt", "r", encoding='utf-8') as file:
base_prompt = file.read().strip()
debug_print("Loaded system prompt successfully")
except FileNotFoundError:
base_prompt = """You are Sarah, a friendly and casual conversational partner. Keep responses short and natural:
- Match the length of user messages
- Use 1-2 short sentences for brief replies
- Stay concise and to the point
- Talk casually, like a friend
- Ask one question at a time"""
debug_print("Using default system prompt")
self.conversation_history = [{"role": "system", "content": base_prompt}]
def get_user_input(self):
"""Get user input either through voice or text"""
if Config.USE_SPEECH_TO_TEXT:
try:
# Record audio
audio_data = self.audio_recorder.record_audio()
if audio_data is None:
print("No speech detected. Please try again.")
return ""
# Save temporary file
with open("temp_audio.wav", "wb") as f:
f.write(audio_data)
# Prepare the files and data for the request
url = f"{Config.BASE_URL_STT}/audio/transcriptions"
# Open the file in binary mode
with open("temp_audio.wav", "rb") as audio_file:
# Create the files dictionary with the correct field name 'file'
files = {
'file': ('audio.wav', audio_file, 'audio/wav')
}
data = {
'model': Config.STT_MODEL,
'language': Config.STT_LANGUAGE
}
print(f"\nSending STT request to: {url}")
response = requests.post(url, files=files, data=data)
print(f"Response status: {response.status_code}")
if response.status_code == 200:
try:
result = response.json()
user_input = result.get("text", "").strip()
print(f"You (voice): {user_input}")
return user_input
except Exception as e:
print(f"Error parsing response: {e}")
return input("Please type your message: ").strip()
else:
print(f"STT Error: Status code {response.status_code}")
print(f"Error details: {response.text}")
return input("Please type your message: ").strip()
# Cleanup temporary file
if os.path.exists("temp_audio.wav"):
os.remove("temp_audio.wav")
except Exception as e:
print(f"\nError with speech recognition: {e}", file=sys.stderr)
print("Falling back to text input...")
return input("You: ").strip()
else:
return input("You: ").strip()
def generate_response(self, user_input):
if not user_input.strip():
return
debug_print(f"Generating response for input: {user_input}")
try:
debug_print("Testing API connection...")
test_response = self.chatbot_client.chat.completions.create(
model=Config.MODEL_NAME,
messages=[{"role": "user", "content": "test"}],
max_tokens=5,
stream=False
)
debug_print("API connection test successful")
self.conversation_history.append({"role": "user", "content": user_input})
debug_print("Starting response generation")
# Create the stream
stream = self.chatbot_client.chat.completions.create(
model=Config.MODEL_NAME,
messages=self.conversation_history,
stream=True,
max_tokens=Config.MAX_RESPONSE_TOKENS,
temperature=0.7
)
# Initialize response tracking
complete_response = ""
current_sentence = ""
response_started = False
# Process the stream
for chunk in stream:
if chunk.choices[0].delta.content is not None:
text_chunk = chunk.choices[0].delta.content
# Print AI prefix only once at the start
if not response_started:
print("\nAI: ", end="", flush=True)
response_started = True
# Print the text chunk
print(text_chunk, end="", flush=True)
current_sentence += text_chunk
complete_response += text_chunk
# Process complete sentences
if any(text_chunk.endswith(p) for p in ".!?") and len(current_sentence.strip()) > 0:
sentence = current_sentence.strip()
if Config.USE_TEXT_TO_SPEECH and len(sentence) > 0:
# Ensure debug message appears on new line
print("\n", end="", flush=True)
debug_print(f"Processing TTS for: {sentence}")
AudioManager.play_audio(self.tts_client, sentence)
current_sentence = ""
# Add newline after complete response
print("\n")
# Handle any remaining text
if current_sentence.strip():
final_text = current_sentence.strip()
if Config.USE_TEXT_TO_SPEECH:
debug_print(f"Processing final TTS: {final_text}")
AudioManager.play_audio(self.tts_client, final_text)
self.conversation_history.append({
"role": "assistant",
"content": complete_response.strip()
})
debug_print("Response completed successfully")
return complete_response.strip()
except Exception as e:
print(f"\nError generating response: {str(e)}", file=sys.stderr)
debug_print(f"Error details: {str(e)}")
return "I encountered an error. Could you please repeat that?"
def cleanup(self):
"""Cleanup resources"""
if hasattr(self, 'audio_recorder'):
self.audio_recorder.cleanup()
def main():
print("\nChatbot Configuration:")
print(f"Text-to-Speech: {'Enabled' if Config.USE_TEXT_TO_SPEECH else 'Disabled'}")
print(f"Speech-to-Text: {'Enabled' if Config.USE_SPEECH_TO_TEXT else 'Disabled'}")
print("\nType 'exit', 'quit', or 'bye' to end the conversation\n")
try:
conversation = ConversationManager()
last_input_empty = False
while True:
# Get user input with proper formatting
if not last_input_empty:
print() # Add space before input prompt
user_input = conversation.get_user_input()
last_input_empty = not bool(user_input.strip())
if user_input.lower() in ["exit", "quit", "bye"]:
print("\nAI: Goodbye! It was nice chatting with you!\n")
if Config.USE_TEXT_TO_SPEECH:
AudioManager.play_audio(conversation.tts_client, "Goodbye! It was nice chatting with you!")
break
if user_input.strip():
response = conversation.generate_response(user_input)
if not response:
debug_print("No response generated")
except KeyboardInterrupt:
print("\nExiting chatbot...")
except Exception as e:
print(f"\nAn unexpected error occurred: {e}", file=sys.stderr)
debug_print(f"Critical error: {str(e)}")
finally:
conversation.cleanup()
print("\nChatbot session ended.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment