Skip to content

Instantly share code, notes, and snippets.

@kived
Forked from abhigenie92/twisted_kivy_pyaudio.py
Last active April 7, 2016 20:41
Show Gist options
  • Save kived/588888f450d794e650b85e55b0229a8b to your computer and use it in GitHub Desktop.
Save kived/588888f450d794e650b85e55b0229a8b to your computer and use it in GitHub Desktop.
from twisted.internet.protocol import Protocol, ClientFactory
from twisted.internet.defer import inlineCallbacks
from twisted.internet.threads import deferToThread
import pdb
# audio imports
import pyaudio,wave
class AudioClientFactory(ClientFactory):
def __init__(self, canvas_obj):
self.canvas_obj = canvas_obj
def buildProtocol(self, addr):
return StrokeClient(self)
class AudioClient(Protocol):
#audio parameters
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 2
RATE = 44100
def __init__(self, factory):
self. factory = factory
self.recv_data=''
self.pyaudio_obj=pyaudio.PyAudio()
self.stream=pyaudio_obj.open(format=self.FORMAT,channels=self.CHANNELS,rate=self.RATE,input=True,
frames_per_buffer=self.CHUNK)
self.receive_audio_pyobj=pyaudio.PyAudio()
self.stream_receive = self.receive_audio_pyobj.open(format=self.FORMAT,channels=self.CHANNELS,
rate=self.RATE,output=True)
def connectionMade(self):
self.start_audio_stream()
print "Connected to remote server"
@inlineCallbacks
def start_audio_stream(self):
while True: # or maybe while connected, while running, etc.
audio_data = yield deferToThread(self.stream.read, self.CHUNK)
do_something_with_audio_data(audio_data)
def dataReceived(self,data):
self.stream_receive.write(data)
if __name__ == '__main__':
from twisted.internet import reactor
reactor.connectTCP("127.0.0.1",3036,StrokeClientFactory())
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment