Created
October 16, 2021 06:26
-
-
Save srisankethu/f1b1f636b5db5da58f3713b4e761a22f to your computer and use it in GitHub Desktop.
Wolfram Virtual Assistant
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import speech_recognition as sr | |
from gtts import gTTS | |
from pydub import AudioSegment | |
from pydub.playback import play | |
from io import BytesIO | |
from threading import Thread | |
import wolframalpha | |
import os | |
class VirtualAssistant: | |
def __init__(self, trigger_word = "Alexa"): | |
self.trigger_word = trigger_word | |
self.recognizer = sr.Recognizer() | |
self.microphone = sr.Microphone() | |
self.wolframalpha_client = wolframalpha.Client(os.environ["WOLFRAM_ALPHA_APP_ID"]) | |
self.loop = None | |
def prepare_audio_file(self, audio_file_name): | |
audio_file = sr.AudioFile(audio_file_name) | |
with audio_file as source: | |
audio = self.recognizer.record(source) | |
return audio | |
def get_text(self, audio=None, audio_file_name = None): | |
if(audio == None and audio_file_name == None): | |
return | |
elif(audio == None and audio_file_name): | |
audio = self.prepare_audio_file(audio_file_name) | |
text = self.recognizer.recognize_google(audio) | |
return text | |
elif(audio and audio_file_name == None): | |
text = self.recognizer.recognize_google(audio) | |
return text | |
else: | |
return | |
def get_voice_command(self): | |
with self.microphone as source: | |
self.recognizer.adjust_for_ambient_noise(source) | |
audio = self.recognizer.listen(source) | |
response = { | |
"success": True, | |
"error": None, | |
"transcription": None | |
} | |
try: | |
response["transcription"] = self.recognizer.recognize_google(audio) | |
except sr.RequestError: | |
# API was unreachable or unresponsive | |
response["success"] = False | |
response["error"] = "API unavailable" | |
except sr.UnknownValueError: | |
# speech was unintelligible | |
response["error"] = "Unable to recognize speech" | |
return response | |
def speak(self, response): | |
speech = gTTS(text = response, lang="en-IN", slow = False) | |
mp3_fp = BytesIO() | |
speech.write_to_fp(mp3_fp) | |
mp3_fp.seek(0) | |
voice = AudioSegment.from_file(mp3_fp, format="mp3") | |
play(voice) | |
def start_process(self): | |
self.speak("Hi! Ask me a question") | |
response = self.get_voice_command() | |
if(response["transcription"]): | |
if("stop listening" in str(response["transcription"])): | |
self.loop = False | |
return | |
try: | |
answer = next(self.wolframalpha_client.query(response["transcription"]).results).text | |
self.speak(answer) | |
except: | |
self.speak("Failed to get response to your question") | |
elif(response["error"]): | |
self.speak(response["error"]) | |
def start_listening(self): | |
response = None | |
process = None | |
self.loop = True | |
while self.loop: | |
if(response == None): | |
response = self.get_voice_command() | |
elif(response["transcription"]): | |
if(self.trigger_word in response["transcription"]): | |
if(process != None): | |
if(process.is_alive() == True): | |
pass | |
else: | |
process = Thread(target=self.start_process) | |
process.start() | |
process.join() | |
response = None | |
else: | |
process = Thread(target=self.start_process) | |
process.start() | |
process.join() | |
response = None | |
else: | |
response = None | |
elif(response["success"] == False): | |
break | |
else: | |
response = None | |
if __name__ == "__main__": | |
va = VirtualAssistant(trigger_word="Jarvis") | |
va.start_listening() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Steps:
export WOLFRAM_ALPHA_APP_ID="<APP_ID>"