Skip to content

Instantly share code, notes, and snippets.

@tam17aki
Last active December 4, 2023 04:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tam17aki/7a766b263ebc08752539ecdff9514298 to your computer and use it in GitHub Desktop.
Save tam17aki/7a766b263ebc08752539ecdff9514298 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
"""音声パワーと基本周波数をリアルタイムモニタリングする.
Copyright (C) 2023 by Akira TAMAMORI
Copyright (C) 2022 by Koji INOUE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import math
import queue
import struct
import sys
import numpy as np
import pyworld
import sounddevice as sd
class MicrophoneStream:
"""マイク音声入力のためのクラス."""
def __init__(self, rate, chunk):
"""音声入力ストリームを初期化する.
Args:
rate (int): サンプリングレート (Hz)
chunk (int): 音声データを受け取る単位(サンプル数)
"""
# マイク入力のパラメータ
self.rate = rate
self.chunk = chunk
# 入力された音声データを保持するデータキュー(バッファ)
self.buff = queue.Queue()
# マイク音声入力の初期化
self.input_stream = None
def open_stream(self):
"""入力ストリームを初期化する."""
self.input_stream = sd.RawInputStream(
samplerate=self.rate,
blocksize=self.chunk,
dtype="int16",
channels=1,
callback=self.callback,
)
def callback(self, indata, frames, time, status):
"""音声パワーに基づいて発話区間を判定.
Args:
indata: チャンクから取得した音声(バイナリ)データ
frames: 未使用(取得に成功したチャンクのサイズ)
time: 未使用
status: 異常発生時のステータス
"""
if status:
print(status, file=sys.stderr)
# 入力された音声データをキューへ保存
self.buff.put(bytes(indata))
def generator(self):
"""音声データを取得するための関数."""
while True: # キューに保存されているデータを全て取り出す
# 先頭のデータを取得
chunk = self.buff.get()
if chunk is None:
return
data = [chunk]
# まだキューにデータが残っていれば全て取得する
while True:
try:
chunk = self.buff.get(block=False)
if chunk is None:
return
data.append(chunk)
except queue.Empty:
break
# yieldにすることでキューのデータを随時取得できるようにする
yield b"".join(data)
def compute_power_fo(self, indata):
"""音声パワーと基本周波数を計算する関数.
Args:
indata (Bytes): チャンクから取得した音声データ.
"""
audio = struct.unpack(f"{len(indata) / 2:.0f}h", indata) # 2Byte単位でunpackする
audio = np.array(audio).astype(np.float64)
# 音声のパワー(音声データの二乗平均)を計算する
rms = math.sqrt(np.square(audio).mean())
power = 20 * math.log10(rms) if rms > 0.0 else -math.inf # RMSからデシベルへ
# 基本周波数を計算する
fo, _ = pyworld.dio(audio, self.rate)
nonzero_ind = np.nonzero(fo.astype(int))[0]
fo = fo[nonzero_ind] # foが非ゼロの部分を取り出すことで、推定をロバストにする
if len(fo) > 0:
fo = fo.mean() # フレーム平均
else:
fo = 0.0 # 空っぽだったら 0.0 Hz
return power, fo
def main(chunk_size=8000):
"""音量と基本周波数をモニタリングするデモンストレーションを実行.
Args:
chunk_size (int): 音声データを受け取る単位(サンプル数)
"""
# 入力デバイス情報に基づき、サンプリング周波数の情報を取得
input_device_info = sd.query_devices(kind="input")
sample_rate = int(input_device_info["default_samplerate"])
# マイク入力
mic_stream = MicrophoneStream(sample_rate, chunk_size)
try:
print("<収録開始>")
mic_stream.open_stream() # 入力ストリームを開く準備
with mic_stream.input_stream: # 入力ストリームから音声取得
audio_generator = mic_stream.generator() # 音声データ(のカタマリ)
for data in audio_generator: # チャンクごとに情報を表示してモニタリング
power, fo = mic_stream.compute_power_fo(data) # 音声パワーと基本周波数を取得
print(
"\r" + f"音声パワー {power:5.1f}[dB] " + f"基本周波数 {fo:5.1f}[Hz]",
end="",
)
continue
except KeyboardInterrupt: # Ctrl-C (MacだとCommand-C) で強制終了
print("\n<収録終了>")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment