Last active
December 8, 2018 09:18
-
-
Save lefirea/93a8c2ec6090878a405e05a52e50c6c4 to your computer and use it in GitHub Desktop.
pyWORLD使って音声加工
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
外部ライブラリは次の通り。 | |
・wxPython:GUI関係 | |
・pyWORLD:音声解析 | |
外部だったか忘れたライブラリは次の通り。 | |
・pyaudio | |
処理はマルチスレッド。以下の5枚のスレッドで行ってる。 | |
(1)キー入力監視 | |
(2)マイク入力取得 | |
(3)音声加工 | |
(4)加工音声再生 | |
(1)でEscキー、Qキーの入力を監視。押されれば録音終了。 | |
(2)で加工用のキューにスタックを積んでいく。 | |
(3)で加工用キューから取り出して加工。それを再生用キューに積んでいく。 | |
(4)で再生用キューから取り出して再生。 | |
=================================================================== | |
=================================================================== | |
現状、問題として、再生し続けるとどんどんラグができてしまう。 | |
理由としては、キューに追加して行くからだろう。 | |
・キューを使わない場合 | |
録音->加工->再生->録音->…… | |
・キューを使う場合 | |
録音->録音キューに追加->録音->…… | |
録音キューから取り出す->音声加工->再生キューに追加->…… | |
再生キューから取り出し->再生->…… | |
という流れになる。 | |
キューをつかわない場合は、一連の流れが一本の流れとして行われるので、入力は飛び飛びになってしまうが、ラグは増えない。ほぼ一律。 | |
キューを使うと、入力は飛び飛びにならないが、入力の方がスパンが早く、計算によってどんどんズレが蓄積していく。これは図を描いて間隔を測ればすぐ分かる。 | |
けど、これは正直どうしようもない気がする。全部自分で計算を書けばワンチャン早くなるだろうけど、それでもラグは発生するだろうなぁって。 | |
キューがあろうがなかろうが、大して変わらないと思う。 | |
=================================================================== | |
=================================================================== | |
pySPTK使う方法も試したけど、こっちは処理が重すぎてキューの有無に関わらずプツプツ切れる。電波が悪いときの通話みたいな感じ。 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import wx | |
import numpy as np | |
import pyaudio | |
import pyworld as pw | |
import threading | |
import queue | |
from pynput.keyboard import Key, Controller, Listener | |
class MyApp(wx.Frame): | |
f0ChangeFlag=False | |
spChangeFlag=False | |
def __init__(self, *args, **kw): | |
super(MyApp, self).__init__(*args, **kw) | |
self.initUI() | |
def initUI(self): | |
font=wx.Font(20, | |
wx.FONTFAMILY_DEFAULT, | |
wx.FONTSTYLE_NORMAL, | |
wx.FONTWEIGHT_NORMAL) | |
self.SetTitle('test') | |
self.SetBackgroundColour((255, 200, 150)) | |
self.SetPosition((200, 100)) | |
self.SetSize((300, 300)) | |
self.Show() | |
#self.frame=wx.Frame() | |
self.Bind(wx.EVT_CLOSE, self.PushedCloseButton) | |
#f0のON,OFFを切り替える | |
self.f0label=wx.StaticText(self, wx.ID_ANY, 'f0変換してないよ', pos=(50,50)) | |
self.f0label.SetFont(font) | |
self.f0CoefBox=wx.TextCtrl(self, -1, pos=(50, 80), size=(50,-1)) | |
self.f0CoefBox.SetLabel(str(f0ChangeCoef)) | |
self.Button1=wx.Button(self, -1, 'OFF', pos=(50,110), size=(50,20)) | |
self.Button1.Bind(wx.EVT_BUTTON, self.f0Button_clicked) | |
#spのON,OFFを切り替える | |
self.splabel=wx.StaticText(self, wx.ID_ANY, 'sp変換してないよ', pos=(50,150)) | |
self.splabel.SetFont(font) | |
self.spCoefBox=wx.TextCtrl(self, -1, pos=(50, 180), size=(50,-1)) | |
self.spCoefBox.SetLabel(str(spChangeCoef)) | |
self.spButton=wx.Button(self, -1, 'OFF', pos=(50,210), size=(50,20)) | |
self.spButton.Bind(wx.EVT_BUTTON, self.spButton_clicked) | |
def f0Button_clicked(self, event): | |
global f0ChangeCoef | |
if self.f0ChangeFlag==False: | |
self.f0ChangeFlag=True | |
self.Button1.SetLabel('ON') | |
f0ChangeCoef=float(self.f0CoefBox.GetValue()) | |
if f0ChangeCoef < 0: | |
f0ChangeCoef=1.0 | |
self.f0CoefBox.SetLabel(str(self.f0ChangeCoef)) | |
self.f0label.SetLabel('f0変換中') | |
self.f0CoefBox.Disable() | |
elif self.f0ChangeFlag==True: | |
self.f0ChangeFlag=False | |
self.Button1.SetLabel('OFF') | |
f0ChangeCoef=1.0 | |
self.f0label.SetLabel('f0変換してないよ') | |
self.f0CoefBox.Enable() | |
def spButton_clicked(self, event): | |
global spChangeCoef | |
if self.spChangeFlag==False: | |
self.spChangeFlag=True | |
self.spButton.SetLabel('ON') | |
spChangeCoef=float(self.spCoefBox.GetValue()) | |
if spChangeCoef<1.0: | |
spChangeCoef=1.0 | |
self.spCoefBox.SetLabel(str(self.spChangeCoef)) | |
self.splabel.SetLabel('sp変換中') | |
self.spCoefBox.Disable() | |
elif self.spChangeFlag==True: | |
self.spChangeFlag=False | |
self.spButton.SetLabel('OFF') | |
spChangeCoef=1.0 | |
self.splabel.SetLabel('sp変換してないよ') | |
self.spCoefBox.Enable() | |
def PushedCloseButton(self, event): | |
print('push close button') | |
#listener.join()を誘発 | |
keyboard=Controller() | |
keyboard.press(Key.esc) | |
keyboard.release(Key.esc) | |
KeyLoggerThread.join() | |
VoiceChangeThread.join() | |
StreamWriteThread.join() | |
RecordThread.join() | |
stream.stop_stream() | |
stream.close() | |
p.terminate() | |
self.Destroy() | |
def on_press(key): | |
global listener | |
global QuitKeyPushed | |
try: | |
char=key.char | |
except: | |
#修飾キー | |
char=key | |
finally: | |
if char==Key.esc or char=='q': | |
print('quit...') | |
QuitKeyPushed=True | |
listener.stop() | |
KeyLoggerThread.join() | |
stream.stop_stream() | |
stream.close() | |
p.terminate() | |
def on_release(key): | |
pass | |
def KeyLogger(): | |
try: | |
with Listener(on_press=on_press, on_release=on_release) as listener: | |
listener.join() | |
except: | |
pass | |
def Escape(): | |
#listener.join()を誘発 | |
keyboard=Controller() | |
keyboard.press(Key.esc) | |
keyboard.release(Key.esc) | |
KeyLoggerThread.join() | |
VoiceChangeThread.join() | |
StreamWriteThread.join() | |
RecordThread.join() | |
stream.stop_stream() | |
stream.close() | |
p.terminate() | |
def ChangeVoice(): | |
PlayThreRunned=False | |
while True: | |
data=RecQ.get() #録音データ取り出し | |
if data==None: #データが空なら | |
print('RecQ is None.') | |
Escape() | |
break; | |
#byte型->np.array() | |
data=np.frombuffer(data, dtype=np.int16).astype(np.float) | |
#f0解析 | |
_f0, t=pw.dio(data, Rate) | |
f0=pw.stonemask(data, _f0, t, Rate) | |
#sp | |
sp=pw.cheaptrick(data, f0, t, Rate) | |
#ap解析 | |
ap=pw.d4c(data, f0, t, Rate) | |
#f0加工 | |
convert_f0=f0*f0ChangeCoef | |
#sp加工 | |
convert_sp=np.zeros_like(sp) | |
for f in range(convert_sp.shape[1]): | |
convert_sp[:, f]=sp[:, int(f/spChangeCoef)] | |
#音声再生成 | |
synthe=pw.synthesize(convert_f0, convert_sp, ap, Rate).astype(np.int16) | |
PlayQ.put(synthe.tobytes()) #キューに追加 | |
if PlayThreRunned==False: | |
StreamWriteThread.start() | |
PlayThreRunned=True | |
def StreamWriter(): | |
global stream | |
global PlayQ | |
while True: | |
#加工済み音声取り出し | |
data=PlayQ.get() | |
if data==None: | |
print('PlayQ is None.') | |
Escape() | |
break; | |
stream.write(data) #再生 | |
def Recorder(): | |
ChaThreRunned=False | |
while stream.is_active(): | |
try: | |
#マイク入力取得 | |
data=stream.read(Chunk) | |
RecQ.put(data) #録音データをキューに追加 | |
if ChaThreRunned==False: | |
VoiceChangeThread.start() | |
ChaThreRunned=True | |
if QuitKeyPushed==True: | |
break; | |
except: | |
print('except') | |
Escape() | |
break; | |
finally: | |
pass | |
stream.stop_stream() | |
stream.close() | |
p.terminate() | |
print('Stop Streaming...') | |
Chunk=int(1024*2) | |
Format=pyaudio.paInt16 | |
Channels=1 | |
Rate=int(17.5*1000) | |
QuitKeyPushed=False #終了キー入力 | |
f0ChangeCoef=1.0 #f0変換係数 | |
spChangeCoef=1.0 #sp変換係数 | |
#キー入力監視スレッド | |
KeyLoggerThread=threading.Thread(target=KeyLogger) | |
KeyLoggerThread.start() | |
#マイク入力設定 | |
p=pyaudio.PyAudio() | |
stream=p.open(format=Format, | |
channels=Channels, | |
rate=Rate, | |
input=True, | |
output=True, | |
#frames_per_buffer=Chunk | |
) | |
#マイク音取得開始 | |
stream.start_stream() | |
#音声加工スレッド | |
VoiceChangeThread=threading.Thread(target=ChangeVoice) | |
#再生スレッド | |
StreamWriteThread=threading.Thread(target=StreamWriter) | |
RecQ=queue.Queue() #録音されたデータのキュー | |
PlayQ=queue.Queue() #加工された音声のキュー | |
#録音スレッド | |
RecordThread=threading.Thread(target=Recorder) | |
RecordThread.start() | |
#ウィンドウ作成 | |
app=wx.App() | |
MyApp(None) | |
app.MainLoop() | |
del app #app.close()的な感じ。これが無いとエラーが出る | |
#各キューの中身を吐きだす | |
while not RecQ.empty(): | |
RecQ.get() | |
while not PlayQ.empty(): | |
PlayQ.get() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment