Skip to content

Instantly share code, notes, and snippets.

@IzumiSatoshi
Created September 22, 2023 10:43
Show Gist options
  • Save IzumiSatoshi/3ff5c69b365c8c9754755b3950237b73 to your computer and use it in GitHub Desktop.
Save IzumiSatoshi/3ff5c69b365c8c9754755b3950237b73 to your computer and use it in GitHub Desktop.
import os
import time
import sys
import numpy as np
import pyaudio
import threading
import queue
import openai
from google.cloud import speech_v1p1beta1 as speech
from google.cloud.speech_v1p1beta1 import types
sys.path.append("E:\\Projects\\GPT")
from my_utils.my_u import timestamp
openai.api_key = open("./openai_key.txt", "r").read()
RATE = 16000
CHUNK_DURATION = 0.2 # sec
CHUNK = int(RATE * CHUNK_DURATION)
THRESHOLD = 10
SILENCE_DURATION = 0.5
client = speech.SpeechClient()
config = types.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=RATE,
language_code="ja-JP",
# language_code="en-US",
max_alternatives=1,
)
streaming_config = speech.StreamingRecognitionConfig(
config=config, interim_results=True
)
IS_MIC_ON = False
def call_gpt(message_history, prefix):
completion = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=message_history,
temperature=0,
)
reply_content = completion.choices[0].message.content
print(f"{prefix} --> {reply_content}")
def start_gpt_thread(user_input, prefix):
mh = [
{
"role": "system",
"content": ("あなたはどんな質問にも一単語で答えるAIです。必ず一単語で回答してください"),
},
{
"role": "user",
"content": ("あなたはどんな質問にも一単語で答えるAIです。必ず一単語で回答してください"),
},
{
"role": "user",
"content": "好きな色は?",
},
{
"role": "assistant",
"content": "青",
},
{
"role": "user",
"content": "今日の天気は?",
},
{
"role": "assistant",
"content": "不明",
},
{
"role": "user",
"content": user_input,
},
]
t = threading.Thread(target=call_gpt, args=(mh, prefix))
t.start()
def listen_print_loop(responses):
for response in responses:
result = response.results[0]
alt = result.alternatives[0]
start_gpt_thread(
alt.transcript,
f"{round(alt.confidence, 3)}\t{alt.transcript}",
)
"""
if result.is_final:
timestamp("transcribe completed")
"""
def calculate_volume(data):
audio_data = np.frombuffer(data, dtype=np.int16)
volume = np.linalg.norm(audio_data) / len(audio_data)
return volume
def microphone_stream(q):
global IS_MIC_ON
p = pyaudio.PyAudio()
stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
)
silence_count = 0
while True:
audio_chunk = stream.read(CHUNK)
volume = calculate_volume(audio_chunk)
if volume < THRESHOLD:
silence_count += 1
else:
IS_MIC_ON = True
silence_count = 0
q.put(audio_chunk)
if silence_count * CHUNK / RATE > SILENCE_DURATION:
if IS_MIC_ON:
IS_MIC_ON = False
q = queue.Queue()
t = threading.Thread(target=microphone_stream, args=(q,))
t.daemon = True
t.start()
requests = (
types.StreamingRecognizeRequest(audio_content=content)
for content in iter(q.get, None)
)
print("$ready")
print("confidence\tquestion --> answer")
responses = client.streaming_recognize(streaming_config, requests)
listen_print_loop(responses)
print("$listen_print_loop")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment