Skip to content

Instantly share code, notes, and snippets.

@cheery
Created March 16, 2014 15:56
Show Gist options
  • Save cheery/9585299 to your computer and use it in GitHub Desktop.
Save cheery/9585299 to your computer and use it in GitHub Desktop.
from asyncio import schedule, http, log
from asyncio.log import Logger
import sys
import stream
axis49 = "/dev/snd/midiC2D0"
def main():
fd = stream.open(axis49, 'rb')
schedule(read_midi, fd)
log = Logger(sys.stdout, sys.stdout)
server = http.Server(application, log, port=8000)
schedule(server.run)
schedule.run()
def read_midi(fd):
while 1:
code = ord(fd.read(1))
channel = code & 0xF
if code & 0xF0 == 0x90: # note on
note = ord(fd.read(1))
velocity = ord(fd.read(1)) / 127.0
print "note on ch=%i note=%i vel=%f" % (channel, note, velocity)
elif code & 0xF0 == 0x80: # note off
note = ord(fd.read(1))
velocity = ord(fd.read(1)) / 127.0
print "note off ch=%i note=%i vel=%f" % (channel, note, velocity)
else:
print "ignored hex=%x" % code
def application(request, response):
if request.websocket:
response.upgrade(websocket_callback)
else:
return "websocket server"
def websocket_callback(command, payload):
if command=='connect':
ws = payload
print "ready"
elif command=='close':
print "discon", payload
else:
print command, payload
if __name__=='__main__':
main()
from greenlet import getcurrent
from asyncio import schedule
from select import select
def open(path, flags):
return Stream(file(path, flags), streams)
class FileSelect(object):
def __init__(self, timeout=0.01):
self.timeout = timeout
self.main = getcurrent()
self.reading = []
@property
def tasks(self):
return len(self.reading)
def read(self, stream):
stream.reading_greenlet = getcurrent()
self.reading.append(stream)
self.main.switch()
def step(self):
readable, writable, error = select(self.reading, [], [], self.timeout)
for stream in readable:
self.switchReading(stream)
for stream in error:
self.throw(stream, StreamError('error during select'))
def switchReading(self, stream):
greenlet = stream.reading_greenlet
stream.reading_greenlet = None
self.reading.remove(stream)
greenlet.switch()
def throw(self, stream, error):
if stream in self.reading:
greenlet = stream.reading_greenlet
stream.reading_greenlet = None
self.reading.remove(socket)
greenlet.throw(error)
class Stream(object):
def __init__(self, fd, strategy):
self.fd = fd
self.strategy = strategy
self.fileno = fd.fileno
def read(self, count):
self.strategy.read(self)
return self.fd.read(count)
class StreamError(Exception): pass
streams = FileSelect()
schedule.strategies.append(streams)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment