Created
October 17, 2018 04:22
-
-
Save lefirea/93be0b48b3307f14fadefe7af82a41f1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import wx | |
import numpy as np | |
import pyaudio | |
import pyworld as pw | |
import threading | |
import queue | |
from pynput.keyboard import Key, Controller, Listener | |
class MyApp(wx.Frame): | |
f0ChangeFlag=False | |
spChangeFlag=False | |
def __init__(self, *args, **kw): | |
super(MyApp, self).__init__(*args, **kw) | |
self.initUI() | |
def initUI(self): | |
font=wx.Font(20, | |
wx.FONTFAMILY_DEFAULT, | |
wx.FONTSTYLE_NORMAL, | |
wx.FONTWEIGHT_NORMAL | |
) | |
self.SetTitle('test') | |
self.SetBackgroundColour((255, 200, 150)) | |
self.SetPosition((200, 100)) | |
self.SetSize((300, 300)) | |
self.Show() | |
#self.frame=wx.Frame() | |
self.Bind(wx.EVT_CLOSE, self.PushedCloseButton) | |
#f0のON,OFFを切り替える | |
self.f0label=wx.StaticText(self, wx.ID_ANY, 'こえのたかさ', pos=(50,50)) | |
self.f0label.SetFont(font) | |
self.f0Slider=wx.Slider(self, pos=(50,80), size=(150, 50), style=wx.SL_LABELS | wx.SL_AUTOTICKS) | |
self.f0Slider.SetMin(5) | |
self.f0Slider.SetMax(20) | |
self.f0Slider.SetValue(10) | |
self.f0Slider.Bind(wx.EVT_SLIDER, self.f0SliderChanged) | |
#spのON,OFFを切り替える | |
self.splabel=wx.StaticText(self, wx.ID_ANY, 'こえのとくちょう', pos=(50,150)) | |
self.splabel.SetFont(font) | |
self.spSlider=wx.Slider(self, pos=(50,180), size=(150, 50), style=wx.SL_LABELS | wx.SL_AUTOTICKS) | |
self.spSlider.SetMin(100) | |
self.spSlider.SetMax(150) | |
self.spSlider.SetValue(100) | |
self.spSlider.Bind(wx.EVT_SLIDER, self.spSliderChanged) | |
def PushedCloseButton(self, event): | |
print('push close button') | |
on_press(Key.esc) | |
self.Destroy() | |
print('Destroy') | |
def f0SliderChanged(self, event): | |
global f0ChangeCoef | |
value=self.f0Slider.GetValue() / 10 | |
f0ChangeCoef=value | |
def spSliderChanged(self, event): | |
global spChangeCoef | |
value=self.spSlider.GetValue() / 100 | |
spChangeCoef=value | |
def PushEscKey(): | |
global listener | |
QuitKeyPushed=True | |
#listener.join()を誘発 | |
keyboard=Controller() | |
keyboard.press(Key.esc) | |
keyboard.release(Key.esc) | |
KeyLoggerThread.join() | |
listener.stop() | |
def on_press(key): | |
global listener | |
global QuitKeyPushed | |
try: | |
char=key.char | |
except: | |
#修飾キー | |
char=key | |
finally: | |
if char==Key.esc or char=='q': | |
QuitKeyPushed=True | |
listener.stop() | |
KeyLoggerThread.join() | |
print('quit...') | |
def on_release(key): | |
pass | |
listener=[] | |
def KeyLogger(): | |
global listener | |
try: | |
with Listener(on_press=on_press, on_release=on_release) as listener: | |
listener.join() | |
except: | |
pass | |
def ChangeVoice(): | |
global QuitKeyPushed | |
PlayThreRunned=False | |
while True: | |
if QuitKeyPushed==True: | |
break; | |
data=RecQ.get() | |
if data==None: | |
print('RecQ is None.') | |
on_press(Key.esc) | |
break; | |
data=np.frombuffer(data, dtype=np.int16).astype(np.float) | |
_f0, t=pw.dio(data, Rate) | |
f0=pw.stonemask(data, _f0, t, Rate) | |
sp=pw.cheaptrick(data, f0, t, Rate) | |
ap=pw.d4c(data, f0, t, Rate) | |
convert_f0=f0*f0ChangeCoef | |
convert_sp=np.zeros_like(sp) | |
for f in range(convert_sp.shape[1]): | |
convert_sp[:, f]=sp[:, int(f/spChangeCoef)] | |
synthe=pw.synthesize(convert_f0, convert_sp, ap, Rate).astype(np.int16) | |
PlayQ.put(synthe.tobytes()) | |
if PlayThreRunned==False: | |
StreamWriteThread.start() | |
PlayThreRunned=True | |
def StreamWriter(): | |
global stream | |
global PlayQ | |
global QuitKeyPushed | |
while True: | |
if QuitKeyPushed==True: | |
break; | |
data=PlayQ.get() | |
if data==None: | |
print('PlayQ is None.') | |
on_press(Key.esc) | |
break; | |
stream.write(data) | |
def Recorder(): | |
global VoiceChangeThread | |
global StreamWriteThread | |
ChaThreRunned=False | |
while stream.is_active(): | |
try: | |
data=stream.read(Chunk) | |
RecQ.put(data) | |
if ChaThreRunned==False: | |
VoiceChangeThread.start() | |
ChaThreRunned=True | |
if QuitKeyPushed==True: | |
break; | |
except: | |
print('except') | |
break; | |
finally: | |
pass | |
VoiceChangeThread.join() | |
StreamWriteThread.join() | |
stream.stop_stream() | |
stream.close() | |
p.terminate() | |
print('Stop Streaming...') | |
Chunk=int(1024*2) | |
Format=pyaudio.paInt16 | |
Channels=1 | |
Rate=int(17.5*1000) | |
QuitKeyPushed=False #終了キー入力 | |
f0ChangeCoef=1.0 #f0変換係数 | |
spChangeCoef=1.0 #sp変換係数 | |
#キー入力監視スレッド | |
KeyLoggerThread=threading.Thread(target=KeyLogger) | |
KeyLoggerThread.start() | |
#マイク入力設定 | |
p=pyaudio.PyAudio() | |
stream=p.open(format=Format, | |
channels=Channels, | |
rate=Rate, | |
input=True, | |
output=True, | |
#frames_per_buffer=Chunk | |
) | |
#マイク音取得開始 | |
stream.start_stream() | |
VoiceChangeThread=threading.Thread(target=ChangeVoice) | |
StreamWriteThread=threading.Thread(target=StreamWriter) | |
RecQ=queue.Queue() | |
PlayQ=queue.Queue() | |
#録音スレッド | |
RecordThread=threading.Thread(target=Recorder) | |
RecordThread.start() | |
#ウィンドウ作成 | |
app=wx.App() | |
MyApp(None) | |
app.MainLoop() | |
del app #app.close()的な感じ。これが無いとエラーが出る | |
RecordThread.join() | |
#各キューの中身を吐きだす | |
while not RecQ.empty(): | |
RecQ.get() | |
while not PlayQ.empty(): | |
PlayQ.get() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment