Skip to content

Instantly share code, notes, and snippets.

@aidiary
Created November 8, 2021 00:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aidiary/767786370ba6ffc36e2a1577024ceaf6 to your computer and use it in GitHub Desktop.
Save aidiary/767786370ba6ffc36e2a1577024ceaf6 to your computer and use it in GitHub Desktop.
リングバッファを用いたパススルー再生
import numpy as np
import sounddevice as sd
import soundfile as sf
from ring_buffer import InputRingBuffer
input_device = 'Yeti Stereo Microphone'
output_device = 'MacbookProのスピーカー'
fft_size = 1024
hop_size = 256
analysis_window = np.hanning(fft_size)
synthesis_window = np.hanning(fft_size)
# input curcular buffer
# ファイルやマイクからの入力サンプルを書き込むバッファ
buffer_size = 16384
input_buffer = InputRingBuffer(buffer_size)
hop_counter = 0
# output circular buffer
output_buffer = np.zeros(buffer_size)
output_buffer_write_pointer = hop_size
output_buffer_read_pointer = 0
def process_fft(in_buffer, out_buffer, out_pointer):
unwrapped_buffer = np.zeros(fft_size)
for n in range(0, fft_size):
# in_pointerの位置から過去fft_sizeの範囲をfftする
# リアルタイム処理はcausalなので過去のサンプルしか使えない
# buffer_sizeはindexが負の値になったときにバッファの逆側から使うため
unwrapped_buffer[n] = in_buffer.get(in_buffer.head - fft_size + n) * analysis_window[n]
spectrum = np.fft.fft(unwrapped_buffer)
# IFFTで音声波形を復元
reconstructed_wave = np.fft.ifft(spectrum)
# out_pointerの位置から書き出す
for n in range(0, fft_size):
circular_buffer_index = (out_pointer + n) % buffer_size
# ifftの結果は複素数なので実部のみ取り出す
# 上書きではなく前の結果と足し算するのがポイント(Overlap-Addのため)
# 音量が小さくなるので5倍する
out_buffer[circular_buffer_index] += reconstructed_wave[n].real * synthesis_window[n] * 5.0
def callback(indata, outdata, frames, time, status):
global hop_counter, output_buffer_read_pointer, output_buffer_write_pointer
for n in range(0, frames):
# 入力処理
# 入力音声をinput_bufferにためていく
# 1チャンネルの音声のみ利用
input = indata[n, 0]
input_buffer.add(input)
# 出力処理
# 処理済みのoutput_bufferの音声をスピーカーに出力
# output_bufferから現在のサンプルでの出力を取得
out = output_buffer[output_buffer_read_pointer]
# 読み込んだら次のoverlap-addで使うためにそなえてそこの場所はクリアしておく
# overlap-addは値を上書きをせずに足し算するだけなのでクリアしておかないといけない
output_buffer[output_buffer_read_pointer] = 0
# 波形がオーバラップしてる部分はオーバラップの回数分平均して出力
out *= hop_size / fft_size
output_buffer_read_pointer += 1
# circular buffer
if output_buffer_read_pointer >= buffer_size:
output_buffer_read_pointer = 0
# input_bufferにhop_sizeたまるたびにFFTを開始
hop_counter += 1
if hop_counter >= hop_size:
hop_counter = 0
# input_buffer_pointerの位置から過去fft_size分のサンプルを使う
# output_buffer_write_pointerの位置からfft_size分だけ結果を書き込む
process_fft(input_buffer, output_buffer, output_buffer_write_pointer)
# Overlap Addするためにoutput_buffer_write_pointerはhop_size分だけすすめる
# output_buffer_read_pointerよりは常に先行している必要がある
output_buffer_write_pointer = (output_buffer_write_pointer + hop_size) % buffer_size
outdata[n, 0] = out
outdata[n, 1] = out
with sd.Stream(device=(input_device, output_device),
samplerate=44100,
channels=2,
blocksize=512,
latency=0,
callback=callback):
input()
@aidiary
Copy link
Author

aidiary commented Nov 18, 2021

テストだよ。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment