-
-
Save mikurins/f54a4fabae92c81d7e4bf763e7884d33 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""マイク音声入力によるストリーミング音声認識(発話区間検出付き)via VOSK. | |
Copyright (C) 2022 by Akira TAMAMORI | |
Copyright (C) 2022 by Koji INOUE | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. | |
本ソースはオリジナルに対し、簡易GUI、クリップボードによる | |
仮想キーボード機能を付加したものです。 | |
""" | |
import json | |
import math | |
import queue | |
import struct | |
import sys | |
from collections import namedtuple | |
from typing import NamedTuple | |
import numpy as np | |
import sounddevice as sd | |
from vosk import KaldiRecognizer, Model, SetLogLevel | |
import tkinter as tk | |
import threading | |
import re | |
# グローバル変数でGUI要素へのアクセスを共有する | |
text_area = None | |
enable_checkbox = None | |
checked = None | |
# 引数のグローバル変数 | |
chunk_size=8000 | |
threshold=45 | |
vad_start=0.3 | |
vad_end=1.0 | |
# クリップボードを使用して仮想キーボードとして動作させる | |
import pyperclip | |
from pynput.keyboard import Controller, Key | |
import time | |
keyboard = Controller() | |
# 文字列をクリップボードにコピーして貼り付ける関数 | |
def paste_text(input_text): | |
pyperclip.copy(input_text) # クリップボードにテキストをコピー | |
time.sleep(0.1) # クリップボードへのコピーが完了するのを少し待つ | |
with keyboard.pressed(Key.ctrl): | |
keyboard.press('v') # Ctrl+Vを押して貼り付け | |
keyboard.release('v') | |
class MicrophoneStream: | |
"""マイク音声入力のためのクラス.""" | |
def __init__(self, rate, chunk, vad_config): | |
"""音声入力ストリームを初期化する. | |
Args: | |
rate (int): サンプリングレート (Hz) | |
chunk (int): 音声データを受け取る単位(サンプル数) | |
vad_config (VadConfig): 発話区間検出の設定 | |
""" | |
# マイク入力のパラメータ | |
self.rate = rate | |
self.chunk = chunk | |
# 入力された音声データを保持するデータキュー(バッファ) | |
self.buff = queue.Queue() | |
# 発話区間検出のパラメータ | |
self.vad_config = { | |
"threshold": vad_config.threshold, | |
"vad_start": vad_config.vad_start, | |
"vad_end": vad_config.vad_end, | |
} | |
# 発話区間検出の作業用変数 | |
self.workspace = { | |
"is_speaking": False, # 現在発話区間を認定しているか | |
"count_on": 0, # 現在まででしきい値以上の区間が連続している数 | |
"count_off": 0, # 現在まででしきい値以下の区間が連続している数 | |
"voice_end": False, # 発話が終了したか | |
"str_current_power": "", # 現在のパワーの値を確認するための文字列(音声認識のクラスから参照) | |
} | |
# マイク音声入力の初期化 | |
self.input_stream = None | |
def open_stream(self): | |
"""マイク音声入力の開始""" | |
self.input_stream = sd.RawInputStream( | |
samplerate=self.rate, | |
blocksize=self.chunk, | |
dtype="int16", | |
channels=1, | |
callback=self.callback, | |
) | |
def callback(self, indata, frames, time, status): | |
"""音声入力の度に呼び出される関数. | |
音声パワーに基づいて発話区間を判定. | |
""" | |
if status: | |
print(status, file=sys.stderr) | |
# 入力された音声データをキューへ保存 | |
self.buff.put(bytes(indata)) | |
# 音声のパワー(音声データの二乗平均)を計算する | |
indata2 = struct.unpack(f"{len(indata) / 2:.0f}h", indata) | |
rms = math.sqrt(np.square(indata2).mean()) | |
power = 20 * math.log10(rms) if rms > 0.0 else -math.inf # RMSからデシベルへ | |
self.workspace["str_current_power"] = f"音声パワー {power:5.1f}[dB] " | |
print("\r" + self.workspace["str_current_power"], end="") | |
# 音声パワーがしきい値以上、かつ発話区間をまだ認定していない場合 | |
if ( | |
power >= self.vad_config["threshold"] | |
and self.workspace["is_speaking"] is False | |
): | |
# しきい値以上の区間のカウンタを増やす | |
self.workspace["count_on"] += 1 | |
# しきい値以上の区間の長さを秒単位に変換 | |
count_on_sec = float(self.workspace["count_on"] * self.chunk) / self.rate | |
# 発話区間の開始を認定するしきい値と比較 | |
if count_on_sec >= self.vad_config["vad_start"]: | |
self.workspace["is_speaking"] = True | |
self.workspace["count_on"] = 0 | |
if power < self.vad_config["threshold"] and self.workspace["is_speaking"]: | |
# しきい値以下の区間のカウンタを増やす | |
self.workspace["count_off"] += 1 | |
# しきい値以下の区間の長さを秒単位に変換 | |
count_off_sec = float(self.workspace["count_off"] * self.chunk) / self.rate | |
# 発話区間の終了を認定するしきい値と比較 | |
if count_off_sec >= self.vad_config["vad_end"]: | |
self.workspace["voice_end"] = True | |
self.workspace["count_off"] = 0 | |
# しきい値と比較して、反対の条件のカウンタをリセット | |
if power >= self.vad_config["threshold"]: | |
self.workspace["count_off"] = 0 | |
else: | |
self.workspace["count_on"] = 0 | |
def generator(self): | |
"""音声認識に必要な音声データを取得するための関数.""" | |
while True: # キューに保存されているデータを全て取り出す | |
# 先頭のデータを取得 | |
chunk = self.buff.get() | |
if chunk is None: | |
return | |
data = [chunk] | |
# まだキューにデータが残っていれば全て取得する | |
while True: | |
try: | |
chunk = self.buff.get(block=False) | |
if chunk is None: | |
return | |
data.append(chunk) | |
except queue.Empty: | |
break | |
# yieldにすることでキューのデータを随時取得できるようにする | |
yield b"".join(data) | |
def get_asr_result(vosk_asr): | |
"""音声認識APIを実行して最終的な認識結果を得る. | |
Args: | |
vosk_asr (VoskStreamingASR): 音声認識モジュール | |
Returns: | |
recog_text (str): 音声認識結果 | |
""" | |
mic_stream = vosk_asr.microphone_stream | |
mic_stream.open_stream() | |
with mic_stream.input_stream: | |
audio_generator = mic_stream.generator() | |
for content in audio_generator: | |
if vosk_asr.recognizer.AcceptWaveform(content): | |
recog_result = json.loads(vosk_asr.recognizer.Result()) | |
recog_text = recog_result["text"].split() | |
recog_text = "".join(recog_text) # 空白記号を除去 | |
return recog_text | |
return None | |
class VadConfig(NamedTuple): | |
"""発話区間検出を設定するクラス. | |
threshold (int): 発話区間検出を判定するパワーのしきい値 (dB) | |
vad_start (float): 発話区間を開始判定する秒数(sec) | |
vad_end (float): 発話区間を終了判定する秒数 (sec) | |
""" | |
threshold: int = 45 | |
vad_start: float = 0.3 | |
vad_end: float = 1.0 | |
# 全角英数字を半角に変換する関数 | |
def zenkaku_to_hankaku(input_string): | |
# 全角英数字を半角に変換する正規表現パターン | |
pattern = r'[A-Za-z0-9]' # 全角英字、全角数字の正規表現 | |
# 正規表現にマッチする全角文字を半角に変換 | |
def replace(match): | |
full_width_char = match.group(0) | |
half_width_char = chr(ord(full_width_char) - 0xFEE0) | |
return half_width_char | |
# 正規表現パターンにマッチする部分を置換 | |
result_string = re.sub(pattern, replace, input_string) | |
return result_string | |
# テキストボックスに認識結果を追加する関数(別スレッドで実行) | |
def add_text_from_thread(): | |
global text_area,checked | |
text_area.insert(tk.END, f"マイク入力と音声認識器を初期化しています...\n") | |
# 入力デバイス情報に基づき、サンプリング周波数の情報を取得 | |
input_device_info = sd.query_devices(kind="input") | |
sample_rate = int(input_device_info["default_samplerate"]) | |
# 発話区間検出の設定 | |
vad_config = VadConfig(threshold, vad_start, vad_end) | |
# マイク入力を初期化・開始 | |
mic_stream = MicrophoneStream(sample_rate, chunk_size, vad_config) | |
# 音声認識器を構築 | |
recognizer = KaldiRecognizer(Model("model"), sample_rate) | |
# マイク入力ストリームおよび音声認識器をまとめて保持 | |
VoskStreamingASR = namedtuple( | |
"VoskStreamingASR", ["microphone_stream", "recognizer"] | |
) | |
vosk_asr = VoskStreamingASR(mic_stream, recognizer) | |
text_area.insert(tk.END, f"準備完了\n\n") | |
text_area.insert(tk.END, f"※クリップボード経由の自動入力を使うとクリップボードが上書きされます\n\n") | |
while True: | |
recog_result = get_asr_result(vosk_asr) | |
recog_result = zenkaku_to_hankaku(recog_result) | |
# 認識結果が空でない場合のみ出力処理 | |
if not recog_result == "": | |
text_area.insert(tk.END, f"{recog_result}\n") | |
# カーソルを最後尾に移動 | |
text_area.mark_set("insert", "end") | |
# カーソル位置が見えるようにスクロール | |
text_area.see("insert") | |
# クリップボード有効ならペースト | |
if checked.get()==1: | |
paste_text(f"{recog_result}") | |
# テキストエリアをクリアする関数 | |
def clear_textarea(): | |
global text_area | |
text_area.delete(1.0, tk.END) # 入力欄をクリア | |
def main(ichunk_size=8000, ithreshold=45, ivad_start=0.3, ivad_end=1.0): | |
"""音声認識デモンストレーションを実行. | |
Args: | |
chunk_size (int): 音声データを受け取る単位(サンプル数) | |
threshold (int): 発話区間検出を判定するパワーのしきい値 (dB) | |
vad_start (float): 発話区間を開始判定する秒数(sec) | |
vad_end (float): 発話区間を終了判定する秒数 (sec) | |
""" | |
SetLogLevel(-1) # VOSK起動時のログ表示を抑制 | |
global text_area,enable_checkbox,checked | |
global chunk_size,threshold,vad_start,vad_end | |
chunk_size = ichunk_size | |
threshold = ithreshold | |
vad_start = ivad_start | |
vad_end = ivad_end | |
# ウィンドウの作成 | |
root = tk.Tk() | |
root.title("Vosk音声認識") | |
root.attributes('-topmost', True) | |
# テキストエリアの作成 | |
text_area = tk.Text(root) | |
text_area.pack() | |
# テキストをクリアするボタン | |
clear_button = tk.Button(root, text="clear", command=clear_textarea) | |
clear_button.pack() | |
checked = tk.IntVar(value=1) | |
enable_checkbox = tk.Checkbutton(root, text="クリップボード経由の自動入力を有効化", variable=checked) | |
enable_checkbox.pack() | |
# スレッドでバックグラウンド処理を開始 | |
thread = threading.Thread(target=add_text_from_thread) | |
thread.daemon = True | |
thread.start() | |
# ウィンドウを表示 | |
root.mainloop() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment