Created
June 24, 2020 20:49
-
-
Save shannonpeng/b1bae365484b4318e9cc7819a4ecad7b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# 370_RINGTONE.py | |
# Created by Ian Hattwick with Fred Kelly | |
# Modified by Shannon Peng | |
# Original file created May 4 2020 | |
RAW_INCOMING_SERIAL_MONITOR = 0 | |
PACKET_INCOMING_SERIAL_MONITOR = 1 | |
import serial, serial.tools.list_ports, socket, sys | |
from pythonosc import osc_message_builder | |
from pythonosc import udp_client | |
from pythonosc.osc_server import AsyncIOOSCUDPServer | |
from pythonosc.dispatcher import Dispatcher | |
import asyncio | |
import struct | |
import time | |
###################### | |
# SET COMMUNICATION MODE | |
###################### | |
# don't forget to set the ESP32 firmware to match! | |
SERIAL_ENABLE = 1 | |
WIFI_ENABLE = 0 # must run Python script after ESP32 reset | |
###################### | |
# FILTER MESSAGES | |
###################### | |
# quick filters | |
accel_addresses = ["/accelX", "/accelY", "/accelZ"] | |
gyro_addresses = ["/gyroX", "/gyroY", "/gyroZ"] | |
capsense_addresses = [ "/capsense" + str(i) for i in range(12)] | |
FILTER_ON = True | |
PRINT_LED_MSGS = False | |
PRINT_LOG_MSGS = True | |
FILTER_ADDRESSES = [] | |
###################### | |
# CAP VALUES | |
###################### | |
CAP_THRESHOLD = 120 | |
CAP_MAX = 4000 | |
###################### | |
# ACCEL VALUES | |
###################### | |
ACCEL_NORMS = [32767+0, 32767-27, 32767+13] | |
ACCEL_SCALE = 10 | |
SHAKE_THRESHOLD = 120 | |
###################### | |
# GYRO VALUES | |
###################### | |
GYRO_NORMS = [32767+1020, 32767, 32767+70] | |
GYRO_SCALE = 10 | |
###################### | |
# SETUP SERIAL PORT | |
###################### | |
if( SERIAL_ENABLE): | |
# find serial port | |
CUR_SERIAL_PORT = "/dev/cu.usbserial-14240" | |
ports = list(serial.tools.list_ports.comports()) | |
# print ports | |
# print("available serial ports:") | |
# for x in range(len(ports)): | |
# print(ports[x] ) | |
# check if cur port is available | |
for x in range(len(ports)): | |
if CUR_SERIAL_PORT in ports[x]: | |
ser = serial.Serial(CUR_SERIAL_PORT) | |
ser.baudrate=115200 | |
ser.setDTR(False) # Drop DTR | |
time.sleep(0.022) # Read somewhere that 22ms is what the UI does. | |
ser.setDTR(True) # UP the DTR back | |
ser.read(ser.in_waiting) # if anything in input buffer, discard it | |
print(CUR_SERIAL_PORT + " connected\n") | |
SERIAL_ENABLE = 1 | |
else: | |
print(CUR_SERIAL_PORT + " not available\n") | |
###################### | |
# SETUP OSC | |
###################### | |
# initialize UDP client | |
client = udp_client.SimpleUDPClient("127.0.0.1", 5005) | |
# dispatcher in charge of executing functions in response to RECEIVED OSC messages | |
dispatcher = Dispatcher() | |
print("Sending data to port", 5005) | |
# sensor inputs | |
OSC_ADDRESSES = { | |
27:{ 'address':'/analog0', 'enable': 1, 'rate':250, 'mode':'DIGITAL' }, | |
33:{ 'address':'/analog1', 'enable': 0, 'rate':200, 'mode':'MEAN' }, | |
32:{ 'address':'/analog2', 'enable': 0, 'rate':200, 'mode':'MEAN' }, | |
14:{ 'address':'/analog3', 'enable': 0, 'rate':200, 'mode':'MEAN' }, | |
4: { 'address':'/analog4', 'enable': 0, 'rate':200, 'mode':'MEAN' }, | |
0: { 'address':'/analog5', 'enable': 0, 'rate':200, 'mode':'MEAN' },# pulled high by ESP32 | |
15:{ 'address':'/analog6', 'enable': 0, 'rate':200, 'mode':'MEAN' },# boot fail if pulled low | |
13:{ 'address':'/analog7', 'enable': 0, 'rate':200, 'mode':'MEAN' }, | |
36:{ 'address':'/analog8', 'enable': 0, 'rate':200, 'mode':'MEAN' }, | |
39:{ 'address':'/analog9', 'enable': 0, 'rate':200, 'mode':'MEAN' }, | |
# alternate analog inputs | |
34:{ 'address':'/button0', 'enable': 0, 'rate':125, 'mode':'MEAN' }, # button | |
35:{ 'address':'/button1', 'enable': 0, 'rate':125, 'mode':'MEAN' }, # button | |
2: { 'address':'/analog10', 'enable': 0, 'rate':200, 'mode':'MEAN' }, # CS0 | |
12:{ 'address':'/analog11', 'enable': 0, 'rate':200, 'mode':'MEAN' }, # CS1, boot fail if pulled high | |
25:{ 'address':'/analog12', 'enable': 0, 'rate':125, 'mode':'MEAN' }, # DAC1 | |
26:{ 'address':'/analog13', 'enable': 0, 'rate':125, 'mode':'MEAN' }, # DAC2 | |
# I2C and SPI pins, mode must be 'DIGITAL', can't be used if I2C or SPI are being used | |
18:{ 'address':'/digital0', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# CLK | |
19:{ 'address':'/digital1', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# MISO | |
21:{ 'address':'/digital2', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# I2C | |
22:{ 'address':'/digital3', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# I2C | |
23:{ 'address':'/digital4', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# MOSI | |
5:{ 'address':'/digital5', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# MIDI, boot fail if pulled low | |
# IMU | |
150:{ 'address':'/gyroX', 'enable': 0, 'rate':20, 'mode':'MEAN' }, | |
151:{ 'address':'/gyroY', 'enable': 0, 'rate':20, 'mode':'MEAN' }, | |
152:{ 'address':'/gyroZ', 'enable': 0, 'rate':20, 'mode':'MEAN' }, | |
153:{ 'address':'/accelX', 'enable': 0, 'rate':20, 'mode':'MEAN' }, | |
154: { 'address':'/accelY', 'enable': 0, 'rate':20, 'mode':'MEAN' }, | |
155: { 'address':'/accelZ', 'enable': 0, 'rate':20, 'mode':'MEAN' }, | |
156: { 'address':'/temp', 'enable': 0, 'rate':250, 'mode':'MEAN' } | |
} | |
OSC_INDEX_ARRAY = [27, 33, 32, 14, 4, 0, 15, 13, 36, 39, 34, 35, #analog pins | |
150, 151, 152, 153, 154, 155, 156 ] #IMU | |
ANALOG_MODES = { | |
'MEAN': 0, | |
'MEDIAN': 1, | |
'MIN': 2, | |
'MAX': 3, | |
'PEAK_DEVIATION': 4, | |
'CAP_SENSE': 5, | |
'DIGITAL': 6, | |
'ECHO': 11, | |
'TRIG': 10 | |
} | |
enableMsg = [0, 0, 0, 255] | |
enableMsg[0] = 1; | |
def setEnables(): | |
# print('\nsetting enabled inputs <input#><enableStatus>') | |
time.sleep(0.25) | |
for i in range(len(OSC_INDEX_ARRAY)): | |
enableMsg[0] = 1; | |
enableMsg[1]=i; | |
enableMsg[2]=OSC_ADDRESSES[OSC_INDEX_ARRAY[i]]['enable'] | |
if( SERIAL_ENABLE ): ser.write(bytearray(enableMsg)) | |
if( WIFI_ENABLE ): s.sendto(bytearray(enableMsg), (clientAddress) ) | |
# print('enable', enableMsg[1], enableMsg[2]) | |
time.sleep(0.025) | |
# print('\nsetting sensor data rate <input#><dataRateInMS>') | |
for i in range(len(OSC_INDEX_ARRAY)): | |
enableMsg[0] = 2; | |
enableMsg[1]=i; | |
enableMsg[2]=OSC_ADDRESSES[OSC_INDEX_ARRAY[i]]['rate'] | |
if( SERIAL_ENABLE ): ser.write(bytearray(enableMsg)) | |
if( WIFI_ENABLE ): s.sendto(bytearray(enableMsg), (clientAddress) ) | |
# print('rate', enableMsg[1], enableMsg[2]) | |
time.sleep(0.025) | |
# print('\nsetting sensor data mode <input#><mode>') | |
for i in range(len(OSC_INDEX_ARRAY)): | |
enableMsg[0] = 3; | |
enableMsg[1]=i; | |
_mode = OSC_ADDRESSES[OSC_INDEX_ARRAY[i]]['mode'] | |
enableMsg[2]=ANALOG_MODES[_mode] | |
if( SERIAL_ENABLE ): ser.write(bytearray(enableMsg)) | |
if( WIFI_ENABLE ): s.sendto(bytearray(enableMsg), (clientAddress) ) | |
# print('mode', enableMsg[1], enableMsg[2]) | |
time.sleep(0.025) | |
###################### | |
# INITIALIZE CAPACITIVE INPUTS | |
###################### | |
NUM_ELECTRODES = 12; # set NUM_ELECTRODES to 0 if not using MPR121 | |
chargeCurrent = 63; # 0-63 | |
capInterval = 100; | |
def setCapSense(): | |
capMessage = [10,NUM_ELECTRODES, 255] | |
ser.write(bytearray(capMessage)) | |
capMessage = [12,chargeCurrent, 255] | |
ser.write(bytearray(capMessage)) | |
capMessage = [11,0,capInterval, 255] | |
for i in range(NUM_ELECTRODES): | |
capMessage[1] = i | |
ser.write(bytearray(capMessage)) | |
###################### | |
# COMMUNICATION INPUT | |
###################### | |
def readNextMessage(): | |
"""Reads new messages over Serial or wifi.""" | |
if SERIAL_ENABLE: | |
return checkSerial() | |
elif WIFI_ENABLE: | |
return checkWiFi() | |
#dispatcher.map("/serialRate", rateHandler) | |
def checkSerial(): | |
"""Checks Serial port for incoming messages.""" | |
bytesTotal = bytes() | |
# defines reserved bytes signifying end of message and escape character | |
endByte = bytes([255]) | |
escByte = bytes([254]) | |
curByte = ser.read(1) | |
bytesTotal += curByte | |
msgInProgress = 1 | |
while(msgInProgress): | |
curByte = ser.read(1) | |
if(RAW_INCOMING_SERIAL_MONITOR): | |
#print ("raw serial: ", int.from_bytes(curByte,byteorder='big')) | |
print(int.from_bytes(curByte,byteorder='big')) | |
elif (curByte == endByte): | |
#if we reach a true end byte, we've read a full message, return buffer | |
msgInProgress = 0 | |
return bytesTotal | |
elif (curByte == escByte): | |
# if we reach a true escape byte, set the flag, but don't write the reserved byte to the buffer | |
curByte = ser.read() | |
bytesTotal += curByte | |
else: | |
bytesTotal += curByte | |
prevUdpMsgNum = 0 | |
###################### | |
#COMMUNICATION OUTPUT | |
###################### | |
def slipOutPacket(val = []): | |
"""Send SLIP encoded values to serial or wifi.""" | |
#print ('val', val) | |
outMessage = [] | |
endByte = bytes([255]) | |
escByte = bytes([254]) | |
for i in val: | |
if i == endByte: | |
outMessage.append(escByte) | |
outMessage.append(i) | |
else : | |
outMessage.append(i) | |
outMessage += (endByte) | |
#print(outMessage) | |
#outMessage = [0,0,34 ,255] | |
ser.write(bytearray(outMessage)) | |
def setLed(add, num, r, g, b): | |
ledMsg = [50,int(num),int(r), int(g),int(b)] | |
if PRINT_LED_MSGS: | |
print('ledMsg:', ledMsg) | |
slipOutPacket(bytearray(ledMsg)) | |
def setLedFromList(num, rgb): | |
setLed(50, num, rgb[0], rgb[1], rgb[2]) | |
dispatcher.map("/led", setLed) | |
###################### | |
# HSV TO RGB | |
###################### | |
# h: 0-360, s: 0-1, v: 0-1 (https://www.rapidtables.com/convert/color/hsv-to-rgb.html) | |
def HSVtoRGB(h, s, v): | |
h = h % 360 | |
s = max(min(s, 1), 0) | |
v = max(min(s, 1), 0) | |
c = v * s | |
x = c * (1 - abs((h / 60.0) % 2 - 1)) | |
m = v - c | |
r, g, b = (0, 0, 0) | |
if h < 60: | |
r,g,b = (c, x, 0) | |
elif h < 120: | |
r,g,b = (x, c, 0) | |
elif h < 180: | |
r,g,b = (0, c, x) | |
elif h < 240: | |
r,g,b = (0, x, c) | |
elif h < 300: | |
r,g,b = (x, 0, c) | |
elif h < 360: | |
r,g,b = (c, 0, x) | |
rgb = (int((r+m)*254), int((g+m)*254), int((b+m)*254)) | |
return rgb | |
###################### | |
# RINGTONE CLASS | |
###################### | |
class Ringtone: | |
# BUTTONS | |
B_BUTTON = 8 | |
DOWN_BUTTON = 9 | |
UP_BUTTON = 10 | |
K_BUTTON = 11 | |
# LEDs | |
NUM_LEDS = 32 | |
NUM_BUTTONS = 4 | |
B_LED = 9 | |
DOWN_LED = 10 | |
UP_LED = 11 | |
K_LED = 12 | |
NOTE_LED_START = 16 | |
LED_UPDATE_INTERVAL = 20 | |
KEY_COLOR_LENGTH = 3 | |
# NOTES/PITCHES | |
NUM_NOTES = 8 | |
MAJOR_SCALE = [0, 2, 4, 5, 7, 9, 11] | |
MAJOR_BLUES_SCALE = [0, 3, 5, 6, 7, 10] | |
MINOR_NATURAL_SCALE = [0, 2, 3, 5, 7, 8, 10] | |
MINOR_HARMONIC_SCALE = [0, 2, 3, 5, 7, 8, 11] | |
MINOR_MELODIC_SCALE = [0, 2, 3, 5, 7, 9, 11] | |
SCALES = [MAJOR_SCALE, MAJOR_BLUES_SCALE, MINOR_NATURAL_SCALE, MINOR_HARMONIC_SCALE, MINOR_MELODIC_SCALE] | |
ROLL_OFFSET_BOUNDS = (-7, 7) | |
START_PITCH_BOUNDS = (36, 108) | |
def __init__(self): | |
# cap states | |
self.capStates = [0 for i in range(12)] | |
self.capFlags = [0 for i in range(12)] # 0 no change, 1 for 0->1, 2 for 1->0 | |
# accel states | |
self.shakeFlag = 0 # 0 no change, 1 for 0->1, 2 for 1->0 | |
# note/pitch states | |
self.startPitch = 60 | |
self.scaleIndex = 0 | |
self.bassPitch = 0 | |
self.bassLock = False # bass pitch changed once per B button down | |
self.rollOffset = 0 | |
self.pitches = [60, 62, 64, 65, 67, 69, 71, 72] | |
# LED states | |
self.ledColors = [(0, 0, 0)] * Ringtone.NUM_LEDS | |
self.noteColors = [self.pitchToRGB(i) for i in range(60, 73)] | |
self.buttonColors = [(0, 0, 0)] * Ringtone.NUM_BUTTONS | |
self.keyColor = self.pitchToRGB(self.startPitch % 12 + 72) | |
self.keyColorStart = 0 | |
# device state | |
self.state = 0 | |
# LED update counter | |
self.ledUpdateCounter = 0 | |
self.updateButtonColors() | |
self.updateNoteColors() | |
# NOTE/PITCH METHODS | |
def rollUpOctave(self): | |
self.startPitch += 12 | |
self.startPitch = max(self.startPitch, Ringtone.START_PITCH_BOUNDS[0]) | |
self.startPitch = min(self.startPitch, Ringtone.START_PITCH_BOUNDS[1]) | |
self.updatePitches() | |
def rollDownOctave(self): | |
self.startPitch -= 12 | |
self.startPitch = max(self.startPitch, Ringtone.START_PITCH_BOUNDS[0]) | |
self.startPitch = min(self.startPitch, Ringtone.START_PITCH_BOUNDS[1]) | |
self.updatePitches() | |
def rollUpNote(self): | |
self.rollOffset += 1 | |
self.rollOffset = max(self.rollOffset, Ringtone.ROLL_OFFSET_BOUNDS[0]) | |
self.rollOffset = min(self.rollOffset, Ringtone.ROLL_OFFSET_BOUNDS[1]) | |
self.updatePitches() | |
def rollDownNote(self): | |
self.rollOffset -= 1 | |
self.rollOffset = max(self.rollOffset, Ringtone.ROLL_OFFSET_BOUNDS[0]) | |
self.rollOffset = min(self.rollOffset, Ringtone.ROLL_OFFSET_BOUNDS[1]) | |
self.updatePitches() | |
def incrementKey(self): | |
self.startPitch += 1 | |
self.startPitch = max(self.startPitch, Ringtone.START_PITCH_BOUNDS[0]) | |
self.startPitch = min(self.startPitch, Ringtone.START_PITCH_BOUNDS[1]) | |
self.updatePitches() | |
def decrementKey(self): | |
self.startPitch -= 1 | |
self.startPitch = max(self.startPitch, Ringtone.START_PITCH_BOUNDS[0]) | |
self.startPitch = min(self.startPitch, Ringtone.START_PITCH_BOUNDS[1]) | |
self.updatePitches() | |
def cycleScale(self): | |
self.scaleIndex += 1 | |
self.scaleIndex %= len(Ringtone.SCALES) | |
self.updatePitches() | |
def setBassPitch(self, note): # note: pitch of note button pressed | |
if not self.bassLock: | |
self.bassPitch = note - 12 | |
self.bassLock = True | |
client.send_message('/basspitch', self.bassPitch) | |
def clearBassPitch(self): | |
self.bassPitch = 0 | |
self.bassLock = False | |
client.send_message('/basspitch', self.bassPitch) | |
def sendBassNoteOff(self): | |
client.send_message('/bassnote', 0) | |
def sendBassNoteOn(self): | |
client.send_message('/bassnote', 1) | |
def sendPitches(self): | |
for i in range(len(self.pitches)): | |
address = '/pitch' + str(i) | |
val = self.pitches[i] | |
client.send_message(address, val) | |
def updatePitches(self): | |
newPitches = [] | |
for n in range(Ringtone.NUM_NOTES): | |
s = Ringtone.SCALES[self.scaleIndex] | |
i = n + self.rollOffset | |
p = self.startPitch + s[i % len(s)] | |
if i < 0: | |
p -= 12 * (int(-i / len(s)) + 1) | |
elif i >= len(s): | |
p += 12 * (int(i / len(s))) | |
newPitches.append(p) | |
self.pitches = newPitches | |
# LED methods | |
def pitchToRGB(self, pitch): | |
p = pitch % 12 | |
o = int(pitch / 12.0) - 1 | |
return HSVtoRGB((p*30+320) % 360, 0.70 + (o-4)*0.05, 0.75) | |
def updateButtonColors(self): | |
self.buttonColors = [(128, 128, 128) if self.capStates[i] == 1 else (0, 0, 0) for i in range(Ringtone.B_BUTTON, Ringtone.B_BUTTON+Ringtone.NUM_BUTTONS)] | |
self.ledColors[Ringtone.B_LED:Ringtone.B_LED+Ringtone.NUM_BUTTONS] = self.buttonColors | |
self.sendButtonColors() | |
def updateNoteColors(self): | |
self.noteColors = [self.pitchToRGB(p) for p in self.pitches] | |
self.ledColors[Ringtone.NOTE_LED_START:Ringtone.NOTE_LED_START+Ringtone.NUM_NOTES] = self.noteColors | |
self.sendNoteColors() | |
def updateKeyColor(self): | |
self.keyColor = self.pitchToRGB(self.startPitch % 12 + 72) | |
def cycleKeyColor(self): | |
self.keyColorStart += 1 | |
self.keyColorStart %= Ringtone.NUM_LEDS | |
self.ledColors = [(0, 0, 0)] * Ringtone.NUM_LEDS | |
for i in range(Ringtone.KEY_COLOR_LENGTH): | |
self.ledColors[(self.keyColorStart+i) % Ringtone.NUM_LEDS] = self.keyColor | |
for i in range(Ringtone.KEY_COLOR_LENGTH + 4, 2*Ringtone.KEY_COLOR_LENGTH + 4): | |
self.ledColors[(self.keyColorStart+i) % Ringtone.NUM_LEDS] = self.keyColor | |
for i in range(2*Ringtone.KEY_COLOR_LENGTH + 8, 3*Ringtone.KEY_COLOR_LENGTH + 8): | |
self.ledColors[(self.keyColorStart+i) % Ringtone.NUM_LEDS] = self.keyColor | |
self.updateButtonColors() | |
self.updateNoteColors() | |
self.sendAllLedColors() | |
def sendLedFlash(self): | |
for i in range(len(self.ledColors)): | |
setLedFromList(i, (64, 64, 64)) | |
def sendButtonColors(self): | |
for i in range(len(self.buttonColors)): | |
setLedFromList(Ringtone.B_LED+i, self.buttonColors[i]) | |
def sendNoteColors(self): | |
for i in range(len(self.noteColors)): | |
setLedFromList(Ringtone.NOTE_LED_START+i, self.noteColors[i]) | |
def sendAllLedColors(self): | |
for i in range(len(self.ledColors)): | |
setLedFromList(i, self.ledColors[i]) | |
# reset methods | |
def resetCapFlags(self): | |
self.capFlags = [0 for i in range(12)] | |
def resetShakeFlag(self): | |
self.shakeFlag = 0 | |
def interpretMessage(self, message): | |
if message is None: | |
return | |
if(0): | |
print ('mirror', message) | |
if(len(message) < 3): | |
if( SERIAL_ENABLE ): ser.read(ser.in_waiting) | |
return | |
if(message[0] == 1): | |
print(message[1], message[2]) | |
return | |
# analog inputs | |
if(message[0] in OSC_INDEX_ARRAY): | |
address = OSC_ADDRESSES[message[0]]['address'] | |
val = (message[1]<<8) + message[2] | |
# gyro normalize | |
if (message[0] in [150, 151, 152]): | |
val = int((val - GYRO_NORMS[int(message[0] - 150)]) / GYRO_SCALE) | |
# accel normalize | |
elif (message[0] in [153, 154, 155]): | |
val = int((val - ACCEL_NORMS[int(message[0] - 153)]) / ACCEL_SCALE) | |
if val >= SHAKE_THRESHOLD and self.shakeFlag == 0: | |
self.shakeFlag = 1 | |
elif val < SHAKE_THRESHOLD and self.shakeFlag == 1: | |
self.shakeFlag = 2 | |
else: | |
self.shakeFlag = 0 | |
if(PACKET_INCOMING_SERIAL_MONITOR ): | |
if not FILTER_ON or address in FILTER_ADDRESSES: | |
print(address,val) | |
client.send_message(address, val) | |
# capacitive values | |
elif(message[0] >= 64 and message[0] < 76): | |
num = message[0] - 64 | |
address = '/capsense' + str(num); | |
state_address = '/capstate' + str(num) | |
val = (message[1] << 8) + message[2] - 4096 | |
if val > CAP_MAX: | |
return | |
# cap state transitions | |
if val >= CAP_THRESHOLD and self.capStates[num] == 0: # cap down | |
self.capFlags[num] = 1 | |
client.send_message(state_address, 1) | |
elif val < CAP_THRESHOLD and self.capStates[num] == 1: # cap up | |
self.capFlags[num] = 2 | |
self.capStates[num] = 1 if val >= CAP_THRESHOLD else 0 | |
client.send_message(state_address, 0) | |
else: | |
self.capFlags[num] = 0 | |
self.capStates[num] = 1 if val >= CAP_THRESHOLD else 0 | |
if( PACKET_INCOMING_SERIAL_MONITOR ): | |
if not FILTER_ON or address in FILTER_ADDRESSES: | |
print(address, val) # /capsenseX | |
if not FILTER_ON or state_address in FILTER_ADDRESSES: | |
print(state_address, self.capStates[num]) # /capstateX | |
if not FILTER_ON or '/capflag' + str(num) in FILTER_ADDRESSES: | |
print('cap_flag' + str(num), self.capFlags[num]) | |
# client.send_message(address, val) | |
# client.send_message(state_address, self.capStates[num]) | |
def run(self): | |
''' | |
* handled in Pd | |
0 - PLAY MODE | |
8 down/up: note on/off* | |
B up: note off (8vb), clear bass pitch | |
+ down: roll +1 scale degree | |
- down: roll -1 scale degree | |
B down: enter bass select mode (1) | |
K down: enter key select mode (2) | |
1 - BASS SELECT MODE (B down) | |
8 down: change bass pitch, note on (8vb), enter play mode (0) | |
B up: note off (8vb), clear bass pitch, enter play mode (0) | |
K down: enter octave select mode (3) | |
2 - KEY SELECT MODE (K down) | |
8 down/up: note on/off* | |
+ down: increase key 1 semitone | |
- down: decrease key 1 semitone | |
shake: change scale (major/minor, etc.) | |
K up: enter play mode (0) | |
B down: enter octave select mode (3) | |
3 - OCTAVE SELECT MODE (K and B down) | |
8 down/up: note on/off* | |
+ down: roll +1 octave | |
- down: roll -1 octave | |
K up: enter bass select mode (1) | |
B up: enter key select mode (2) | |
''' | |
pitchesChangedFlag = False | |
# update button LEDs | |
if sum(self.capFlags[Ringtone.B_BUTTON:Ringtone.B_BUTTON+Ringtone.NUM_BUTTONS]) > 0: | |
self.updateButtonColors() | |
# if PRINT_LOG_MSGS: print('LEDs: button colors updated') | |
self.sendButtonColors() | |
# if PRINT_LOG_MSGS: print('LEDs: button colors sent') | |
# update key color cycle for all LEDs | |
if self.ledUpdateCounter == 0: | |
self.cycleKeyColor() | |
# if PRINT_LOG_MSGS: print('LEDs: all LEDs sent') | |
self.ledUpdateCounter = Ringtone.LED_UPDATE_INTERVAL | |
else: | |
self.ledUpdateCounter -= 1 | |
# send pitches to Pd | |
self.sendPitches() | |
if self.state == 0: # PLAY MODE | |
if self.capFlags[Ringtone.B_BUTTON] == 2: # B up | |
self.sendBassNoteOff() | |
if PRINT_LOG_MSGS: print('bass: note off') | |
self.clearBassPitch() | |
if PRINT_LOG_MSGS: print('bass: pitch cleared') | |
if self.capFlags[Ringtone.UP_BUTTON] == 1: # + down | |
self.rollUpNote() | |
pitchesChangedFlag = True | |
if PRINT_LOG_MSGS: print('rollUpNote: new offset', self.rollOffset) | |
if self.capFlags[Ringtone.DOWN_BUTTON] == 1: # - down | |
self.rollDownNote() | |
pitchesChangedFlag = True | |
if PRINT_LOG_MSGS: print('rollDownNote: new offset', self.rollOffset) | |
if self.capFlags[Ringtone.B_BUTTON] == 1 and self.capFlags[Ringtone.K_BUTTON] == 1: # B and K down | |
self.state = 3 | |
if PRINT_LOG_MSGS: print('state: 0 to 3') | |
elif self.capFlags[Ringtone.B_BUTTON] == 1: # B down | |
self.state = 1 | |
if PRINT_LOG_MSGS: print('state: 0 to 1') | |
elif self.capFlags[Ringtone.K_BUTTON] == 1: # K down | |
self.state = 2 | |
if PRINT_LOG_MSGS: print('state: 0 to 2') | |
elif self.state == 1: # BASS SELECT MODE | |
if sum(self.capFlags) > 0: # 8 down | |
for n in range(Ringtone.NUM_NOTES): | |
if self.capFlags[n] == 1: # find first note down | |
self.setBassPitch(self.pitches[n]) | |
if PRINT_LOG_MSGS: print('setBassPitch:', self.bassPitch) | |
self.sendBassNoteOn() | |
if PRINT_LOG_MSGS: print('bass: note on') | |
self.state = 0 | |
break | |
if self.capFlags[Ringtone.B_BUTTON] == 2: # B up | |
self.sendBassNoteOff() | |
if PRINT_LOG_MSGS: print('bass: note off') | |
self.clearBassPitch() | |
if PRINT_LOG_MSGS: print('bass: pitch cleared') | |
self.state = 0 | |
if PRINT_LOG_MSGS: print('state: 1 to 0') | |
if self.capFlags[Ringtone.B_BUTTON] == 2 and self.capFlags[Ringtone.K_BUTTON] == 1: # B up and K down | |
self.state = 2 | |
if PRINT_LOG_MSGS: print('state: 1 to 2') | |
elif self.capFlags[Ringtone.K_BUTTON] == 1: # K down | |
self.state = 3 | |
if PRINT_LOG_MSGS: print('state: 1 to 3') | |
elif self.state == 2: # KEY SELECT MODE | |
if self.capFlags[Ringtone.UP_BUTTON] == 1: # + down | |
self.incrementKey() | |
self.updateKeyColor() | |
pitchesChangedFlag = True | |
if PRINT_LOG_MSGS: print('key: incremented, new start pitch ', self.startPitch) | |
if self.capFlags[Ringtone.DOWN_BUTTON] == 1: # - down | |
self.decrementKey() | |
self.updateKeyColor() | |
pitchesChangedFlag = True | |
if PRINT_LOG_MSGS: print('key: decremented, new start pitch ', self.startPitch) | |
if self.shakeFlag: | |
self.cycleScale() | |
self.sendLedFlash() | |
pitchesChangedFlag = True | |
if PRINT_LOG_MSGS: print('scale: new index', self.scaleIndex) | |
if self.capFlags[Ringtone.K_BUTTON] == 2 and self.capFlags[Ringtone.B_BUTTON] == 1: # K up and B down | |
self.state = 1 | |
if PRINT_LOG_MSGS: print('state: 2 to 1') | |
elif self.capFlags[Ringtone.K_BUTTON] == 2: # K up | |
self.state = 0 | |
if PRINT_LOG_MSGS: print('state: 2 to 0') | |
elif self.capFlags[Ringtone.B_BUTTON] == 1: # B down | |
self.state = 3 | |
if PRINT_LOG_MSGS: print('state: 2 to 3') | |
elif self.state == 3: # OCTAVE SELECT MODE | |
if self.capFlags[Ringtone.UP_BUTTON] == 1: # + down | |
self.rollUpOctave() | |
pitchesChangedFlag = True | |
if PRINT_LOG_MSGS: print('octave: incremented, new start pitch ', self.startPitch) | |
if self.capFlags[Ringtone.DOWN_BUTTON] == 1: # - down | |
self.rollDownOctave() | |
pitchesChangedFlag = True | |
if PRINT_LOG_MSGS: print('octave: decremented, new start pitch ', self.startPitch) | |
if self.capFlags[Ringtone.K_BUTTON] == 2 and self.capFlags[Ringtone.B_BUTTON] == 2: # K up and B up | |
self.state = 0 | |
if PRINT_LOG_MSGS: print('state: 3 to 0') | |
elif self.capFlags[Ringtone.K_BUTTON] == 2: # K up | |
self.state = 1 | |
if PRINT_LOG_MSGS: print('state: 3 to 1') | |
elif self.capFlags[Ringtone.B_BUTTON] == 2: # B up | |
self.state = 2 | |
if PRINT_LOG_MSGS: print('state: 3 to 2') | |
# reset cap flags bc they're all consumed by now | |
self.resetCapFlags() | |
# reset shake flag too | |
self.resetShakeFlag() | |
if pitchesChangedFlag: # if note pitches changed, update LED colors | |
self.updateNoteColors() | |
# if PRINT_LOG_MSGS: print('LEDs: note colors updated') | |
self.sendNoteColors() | |
# if PRINT_LOG_MSGS: print('LEDs: note colors sent') | |
###################### | |
#LOOP | |
###################### | |
async def loop(): | |
debugVal = 0 | |
time.sleep(0.1) | |
if SERIAL_ENABLE: ser.flushInput() | |
time.sleep(0.1) | |
setEnables() | |
setCapSense() | |
rt = Ringtone() | |
while(1): | |
currentMessage = readNextMessage() # can be None if nothing in input buffer | |
rt.interpretMessage(currentMessage) | |
rt.run() | |
await asyncio.sleep(0) | |
time.sleep(0.001) | |
async def init(): | |
server = AsyncIOOSCUDPServer(("127.0.0.1", 5006), dispatcher, asyncio.get_event_loop()) | |
transport, protocol = await server.create_serve_endpoint() | |
if SERIAL_ENABLE: ser.read(ser.in_waiting) | |
await loop() | |
transport.close() | |
asyncio.run(init()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment