Skip to content

Instantly share code, notes, and snippets.

@lefirea
Last active December 8, 2018 09:18
Show Gist options
  • Save lefirea/93a8c2ec6090878a405e05a52e50c6c4 to your computer and use it in GitHub Desktop.
Save lefirea/93a8c2ec6090878a405e05a52e50c6c4 to your computer and use it in GitHub Desktop.
pyWORLD使って音声加工
外部ライブラリは次の通り。
・wxPython:GUI関係
・pyWORLD:音声解析
外部だったか忘れたライブラリは次の通り。
・pyaudio
処理はマルチスレッド。以下の5枚のスレッドで行ってる。
(1)キー入力監視
(2)マイク入力取得
(3)音声加工
(4)加工音声再生
(1)でEscキー、Qキーの入力を監視。押されれば録音終了。
(2)で加工用のキューにスタックを積んでいく。
(3)で加工用キューから取り出して加工。それを再生用キューに積んでいく。
(4)で再生用キューから取り出して再生。
===================================================================
===================================================================
現状、問題として、再生し続けるとどんどんラグができてしまう。
理由としては、キューに追加して行くからだろう。
・キューを使わない場合
録音->加工->再生->録音->……
・キューを使う場合
録音->録音キューに追加->録音->……
録音キューから取り出す->音声加工->再生キューに追加->……
再生キューから取り出し->再生->……
という流れになる。
キューをつかわない場合は、一連の流れが一本の流れとして行われるので、入力は飛び飛びになってしまうが、ラグは増えない。ほぼ一律。
キューを使うと、入力は飛び飛びにならないが、入力の方がスパンが早く、計算によってどんどんズレが蓄積していく。これは図を描いて間隔を測ればすぐ分かる。
けど、これは正直どうしようもない気がする。全部自分で計算を書けばワンチャン早くなるだろうけど、それでもラグは発生するだろうなぁって。
キューがあろうがなかろうが、大して変わらないと思う。
===================================================================
===================================================================
pySPTK使う方法も試したけど、こっちは処理が重すぎてキューの有無に関わらずプツプツ切れる。電波が悪いときの通話みたいな感じ。
# -*- coding: utf-8 -*-
import wx
import numpy as np
import pyaudio
import pyworld as pw
import threading
import queue
from pynput.keyboard import Key, Controller, Listener
class MyApp(wx.Frame):
f0ChangeFlag=False
spChangeFlag=False
def __init__(self, *args, **kw):
super(MyApp, self).__init__(*args, **kw)
self.initUI()
def initUI(self):
font=wx.Font(20,
wx.FONTFAMILY_DEFAULT,
wx.FONTSTYLE_NORMAL,
wx.FONTWEIGHT_NORMAL)
self.SetTitle('test')
self.SetBackgroundColour((255, 200, 150))
self.SetPosition((200, 100))
self.SetSize((300, 300))
self.Show()
#self.frame=wx.Frame()
self.Bind(wx.EVT_CLOSE, self.PushedCloseButton)
#f0のON,OFFを切り替える
self.f0label=wx.StaticText(self, wx.ID_ANY, 'f0変換してないよ', pos=(50,50))
self.f0label.SetFont(font)
self.f0CoefBox=wx.TextCtrl(self, -1, pos=(50, 80), size=(50,-1))
self.f0CoefBox.SetLabel(str(f0ChangeCoef))
self.Button1=wx.Button(self, -1, 'OFF', pos=(50,110), size=(50,20))
self.Button1.Bind(wx.EVT_BUTTON, self.f0Button_clicked)
#spのON,OFFを切り替える
self.splabel=wx.StaticText(self, wx.ID_ANY, 'sp変換してないよ', pos=(50,150))
self.splabel.SetFont(font)
self.spCoefBox=wx.TextCtrl(self, -1, pos=(50, 180), size=(50,-1))
self.spCoefBox.SetLabel(str(spChangeCoef))
self.spButton=wx.Button(self, -1, 'OFF', pos=(50,210), size=(50,20))
self.spButton.Bind(wx.EVT_BUTTON, self.spButton_clicked)
def f0Button_clicked(self, event):
global f0ChangeCoef
if self.f0ChangeFlag==False:
self.f0ChangeFlag=True
self.Button1.SetLabel('ON')
f0ChangeCoef=float(self.f0CoefBox.GetValue())
if f0ChangeCoef < 0:
f0ChangeCoef=1.0
self.f0CoefBox.SetLabel(str(self.f0ChangeCoef))
self.f0label.SetLabel('f0変換中')
self.f0CoefBox.Disable()
elif self.f0ChangeFlag==True:
self.f0ChangeFlag=False
self.Button1.SetLabel('OFF')
f0ChangeCoef=1.0
self.f0label.SetLabel('f0変換してないよ')
self.f0CoefBox.Enable()
def spButton_clicked(self, event):
global spChangeCoef
if self.spChangeFlag==False:
self.spChangeFlag=True
self.spButton.SetLabel('ON')
spChangeCoef=float(self.spCoefBox.GetValue())
if spChangeCoef<1.0:
spChangeCoef=1.0
self.spCoefBox.SetLabel(str(self.spChangeCoef))
self.splabel.SetLabel('sp変換中')
self.spCoefBox.Disable()
elif self.spChangeFlag==True:
self.spChangeFlag=False
self.spButton.SetLabel('OFF')
spChangeCoef=1.0
self.splabel.SetLabel('sp変換してないよ')
self.spCoefBox.Enable()
def PushedCloseButton(self, event):
print('push close button')
#listener.join()を誘発
keyboard=Controller()
keyboard.press(Key.esc)
keyboard.release(Key.esc)
KeyLoggerThread.join()
VoiceChangeThread.join()
StreamWriteThread.join()
RecordThread.join()
stream.stop_stream()
stream.close()
p.terminate()
self.Destroy()
def on_press(key):
global listener
global QuitKeyPushed
try:
char=key.char
except:
#修飾キー
char=key
finally:
if char==Key.esc or char=='q':
print('quit...')
QuitKeyPushed=True
listener.stop()
KeyLoggerThread.join()
stream.stop_stream()
stream.close()
p.terminate()
def on_release(key):
pass
def KeyLogger():
try:
with Listener(on_press=on_press, on_release=on_release) as listener:
listener.join()
except:
pass
def Escape():
#listener.join()を誘発
keyboard=Controller()
keyboard.press(Key.esc)
keyboard.release(Key.esc)
KeyLoggerThread.join()
VoiceChangeThread.join()
StreamWriteThread.join()
RecordThread.join()
stream.stop_stream()
stream.close()
p.terminate()
def ChangeVoice():
PlayThreRunned=False
while True:
data=RecQ.get() #録音データ取り出し
if data==None: #データが空なら
print('RecQ is None.')
Escape()
break;
#byte型->np.array()
data=np.frombuffer(data, dtype=np.int16).astype(np.float)
#f0解析
_f0, t=pw.dio(data, Rate)
f0=pw.stonemask(data, _f0, t, Rate)
#sp
sp=pw.cheaptrick(data, f0, t, Rate)
#ap解析
ap=pw.d4c(data, f0, t, Rate)
#f0加工
convert_f0=f0*f0ChangeCoef
#sp加工
convert_sp=np.zeros_like(sp)
for f in range(convert_sp.shape[1]):
convert_sp[:, f]=sp[:, int(f/spChangeCoef)]
#音声再生成
synthe=pw.synthesize(convert_f0, convert_sp, ap, Rate).astype(np.int16)
PlayQ.put(synthe.tobytes()) #キューに追加
if PlayThreRunned==False:
StreamWriteThread.start()
PlayThreRunned=True
def StreamWriter():
global stream
global PlayQ
while True:
#加工済み音声取り出し
data=PlayQ.get()
if data==None:
print('PlayQ is None.')
Escape()
break;
stream.write(data) #再生
def Recorder():
ChaThreRunned=False
while stream.is_active():
try:
#マイク入力取得
data=stream.read(Chunk)
RecQ.put(data) #録音データをキューに追加
if ChaThreRunned==False:
VoiceChangeThread.start()
ChaThreRunned=True
if QuitKeyPushed==True:
break;
except:
print('except')
Escape()
break;
finally:
pass
stream.stop_stream()
stream.close()
p.terminate()
print('Stop Streaming...')
Chunk=int(1024*2)
Format=pyaudio.paInt16
Channels=1
Rate=int(17.5*1000)
QuitKeyPushed=False #終了キー入力
f0ChangeCoef=1.0 #f0変換係数
spChangeCoef=1.0 #sp変換係数
#キー入力監視スレッド
KeyLoggerThread=threading.Thread(target=KeyLogger)
KeyLoggerThread.start()
#マイク入力設定
p=pyaudio.PyAudio()
stream=p.open(format=Format,
channels=Channels,
rate=Rate,
input=True,
output=True,
#frames_per_buffer=Chunk
)
#マイク音取得開始
stream.start_stream()
#音声加工スレッド
VoiceChangeThread=threading.Thread(target=ChangeVoice)
#再生スレッド
StreamWriteThread=threading.Thread(target=StreamWriter)
RecQ=queue.Queue() #録音されたデータのキュー
PlayQ=queue.Queue() #加工された音声のキュー
#録音スレッド
RecordThread=threading.Thread(target=Recorder)
RecordThread.start()
#ウィンドウ作成
app=wx.App()
MyApp(None)
app.MainLoop()
del app #app.close()的な感じ。これが無いとエラーが出る
#各キューの中身を吐きだす
while not RecQ.empty():
RecQ.get()
while not PlayQ.empty():
PlayQ.get()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment