Skip to content

Instantly share code, notes, and snippets.

@aidiary
Last active November 1, 2021 01:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aidiary/1b29bec4f079d465be4fc36e45d5a3d4 to your computer and use it in GitHub Desktop.
Save aidiary/1b29bec4f079d465be4fc36e45d5a3d4 to your computer and use it in GitHub Desktop.
Phase Vocoderのサンプル1
import numpy as np
import sounddevice as sd
import soundfile as sf
sd.default.device = 'MacBook Proのスピーカー'
filename = '../slow-drum-loop.wav'
# filename = '../DIALOG_001_01.wav'
data, fs = sf.read(filename)
data = data[:, 0] # stereo -> mono
current_frame = 0
fft_size = 1024
hop_size = 256
# input curcular buffer
# ファイルやマイクからの入力サンプルを書き込むバッファ
buffer_size = 16384
input_buffer = np.zeros(buffer_size)
input_buffer_pointer = 0
hop_counter = 0
# output circular buffer
output_buffer = np.zeros(buffer_size)
output_buffer_write_pointer = hop_size
output_buffer_read_pointer = 0
def process_fft(in_buffer, in_pointer, out_buffer, out_pointer):
unwrapped_buffer = np.zeros(fft_size)
for n in range(0, fft_size):
# in_pointerの位置から過去fft_sizeの範囲をfftする
# リアルタイム処理はcausalなので過去のサンプルしか使えない
# buffer_sizeはindexが負の値になったときにバッファの逆側から使うため
circular_buffer_index = (in_pointer - fft_size + n + buffer_size) % buffer_size
unwrapped_buffer[n] = in_buffer[circular_buffer_index]
spectrum = np.fft.fft(unwrapped_buffer)
reconstructed_wave = np.fft.ifft(spectrum)
# out_pointerの位置から書き出す
for n in range(0, fft_size):
circular_buffer_index = (out_pointer + n) % buffer_size
# ifftの結果は複素数なので実部のみ取り出す
# 上書きではなく前の結果と足し算するのがポイント(Overlap-Addのため)
out_buffer[circular_buffer_index] += reconstructed_wave[n].real
def callback(outdata, frames, time, status):
global current_frame, input_buffer_pointer, hop_counter, output_buffer_read_pointer, output_buffer_write_pointer
for n in range(0, frames):
# 入力処理
# 入力音声をinput_bufferにためていく
input = data[current_frame]
input_buffer[input_buffer_pointer] = input
input_buffer_pointer += 1
# circular buffer
if input_buffer_pointer >= buffer_size:
input_buffer_pointer = 0
# 出力処理
# 処理済みのoutput_bufferの音声をスピーカーに出力
# output_bufferから現在のサンプルでの出力を取得
out = output_buffer[output_buffer_read_pointer]
# 読み込んだら次のoverlap-addで使うためにそなえてそこの場所はクリアしておく
# overlap-addは値を上書きをせずに足し算するだけなのでクリアしておかないといけない
output_buffer[output_buffer_read_pointer] = 0
# 波形がオーバラップしてる部分はオーバラップの回数分平均して出力
out *= hop_size / fft_size
output_buffer_read_pointer += 1
# circular buffer
if output_buffer_read_pointer >= buffer_size:
output_buffer_read_pointer = 0
# input_bufferにhop_sizeたまるたびにFFTを開始
hop_counter += 1
if hop_counter >= hop_size:
hop_counter = 0
# input_buffer_pointerの位置から過去fft_size分のサンプルを使う
# output_buffer_write_pointerの位置からfft_size分だけ結果を書き込む
process_fft(input_buffer, input_buffer_pointer, output_buffer,
output_buffer_write_pointer)
# Overlap Addするためにoutput_buffer_write_pointerはhop_size分だけすすめる
# output_buffer_read_pointerよりは常に先行している必要がある
output_buffer_write_pointer = (output_buffer_write_pointer + hop_size) % buffer_size
outdata[n, :] = out
current_frame += 1
# 最後まで再生したら終了
if current_frame == len(data):
raise sd.CallbackStop()
with sd.OutputStream(samplerate=fs, callback=callback, blocksize=512):
input()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment