# 370_RINGTONE.py | |
# Created by Ian Hattwick with Fred Kelly | |
# Modified by Shannon Peng | |
# Original file created May 4 2020 | |
RAW_INCOMING_SERIAL_MONITOR = 0 | |
PACKET_INCOMING_SERIAL_MONITOR = 1 | |
import serial, serial.tools.list_ports, socket, sys | |
from pythonosc import osc_message_builder | |
from pythonosc import udp_client | |
from pythonosc.osc_server import AsyncIOOSCUDPServer | |
from pythonosc.dispatcher import Dispatcher | |
import asyncio | |
import struct | |
import time | |
###################### | |
# SET COMMUNICATION MODE | |
###################### | |
# don't forget to set the ESP32 firmware to match! | |
SERIAL_ENABLE = 1 | |
WIFI_ENABLE = 0 # must run Python script after ESP32 reset | |
###################### | |
# FILTER MESSAGES | |
###################### | |
# quick filters | |
accel_addresses = ["/accelX", "/accelY", "/accelZ"] | |
gyro_addresses = ["/gyroX", "/gyroY", "/gyroZ"] | |
capsense_addresses = [ "/capsense" + str(i) for i in range(12)] | |
FILTER_ON = True | |
PRINT_LED_MSGS = False | |
PRINT_LOG_MSGS = True | |
FILTER_ADDRESSES = [] | |
###################### | |
# CAP VALUES | |
###################### | |
CAP_THRESHOLD = 120 | |
CAP_MAX = 4000 | |
###################### | |
# ACCEL VALUES | |
###################### | |
ACCEL_NORMS = [32767+0, 32767-27, 32767+13] | |
ACCEL_SCALE = 10 | |
SHAKE_THRESHOLD = 120 | |
###################### | |
# GYRO VALUES | |
###################### | |
GYRO_NORMS = [32767+1020, 32767, 32767+70] | |
GYRO_SCALE = 10 | |
###################### | |
# SETUP SERIAL PORT | |
###################### | |
if( SERIAL_ENABLE): | |
# find serial port | |
CUR_SERIAL_PORT = "/dev/cu.usbserial-14240" | |
ports = list(serial.tools.list_ports.comports()) | |
# print ports | |
# print("available serial ports:") | |
# for x in range(len(ports)): | |
# print(ports[x] ) | |
# check if cur port is available | |
for x in range(len(ports)): | |
if CUR_SERIAL_PORT in ports[x]: | |
ser = serial.Serial(CUR_SERIAL_PORT) | |
ser.baudrate=115200 | |
ser.setDTR(False) # Drop DTR | |
time.sleep(0.022) # Read somewhere that 22ms is what the UI does. | |
ser.setDTR(True) # UP the DTR back | |
ser.read(ser.in_waiting) # if anything in input buffer, discard it | |
print(CUR_SERIAL_PORT + " connected\n") | |
SERIAL_ENABLE = 1 | |
else: | |
print(CUR_SERIAL_PORT + " not available\n") | |
###################### | |
# SETUP OSC | |
###################### | |
# initialize UDP client | |
client = udp_client.SimpleUDPClient("127.0.0.1", 5005) | |
# dispatcher in charge of executing functions in response to RECEIVED OSC messages | |
dispatcher = Dispatcher() | |
print("Sending data to port", 5005) | |
# sensor inputs | |
OSC_ADDRESSES = { | |
27:{ 'address':'/analog0', 'enable': 1, 'rate':250, 'mode':'DIGITAL' }, | |
33:{ 'address':'/analog1', 'enable': 0, 'rate':200, 'mode':'MEAN' }, | |
32:{ 'address':'/analog2', 'enable': 0, 'rate':200, 'mode':'MEAN' }, | |
14:{ 'address':'/analog3', 'enable': 0, 'rate':200, 'mode':'MEAN' }, | |
4: { 'address':'/analog4', 'enable': 0, 'rate':200, 'mode':'MEAN' }, | |
0: { 'address':'/analog5', 'enable': 0, 'rate':200, 'mode':'MEAN' },# pulled high by ESP32 | |
15:{ 'address':'/analog6', 'enable': 0, 'rate':200, 'mode':'MEAN' },# boot fail if pulled low | |
13:{ 'address':'/analog7', 'enable': 0, 'rate':200, 'mode':'MEAN' }, | |
36:{ 'address':'/analog8', 'enable': 0, 'rate':200, 'mode':'MEAN' }, | |
39:{ 'address':'/analog9', 'enable': 0, 'rate':200, 'mode':'MEAN' }, | |
# alternate analog inputs | |
34:{ 'address':'/button0', 'enable': 0, 'rate':125, 'mode':'MEAN' }, # button | |
35:{ 'address':'/button1', 'enable': 0, 'rate':125, 'mode':'MEAN' }, # button | |
2: { 'address':'/analog10', 'enable': 0, 'rate':200, 'mode':'MEAN' }, # CS0 | |
12:{ 'address':'/analog11', 'enable': 0, 'rate':200, 'mode':'MEAN' }, # CS1, boot fail if pulled high | |
25:{ 'address':'/analog12', 'enable': 0, 'rate':125, 'mode':'MEAN' }, # DAC1 | |
26:{ 'address':'/analog13', 'enable': 0, 'rate':125, 'mode':'MEAN' }, # DAC2 | |
# I2C and SPI pins, mode must be 'DIGITAL', can't be used if I2C or SPI are being used | |
18:{ 'address':'/digital0', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# CLK | |
19:{ 'address':'/digital1', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# MISO | |
21:{ 'address':'/digital2', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# I2C | |
22:{ 'address':'/digital3', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# I2C | |
23:{ 'address':'/digital4', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# MOSI | |
5:{ 'address':'/digital5', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# MIDI, boot fail if pulled low | |
# IMU | |
150:{ 'address':'/gyroX', 'enable': 0, 'rate':20, 'mode':'MEAN' }, | |
151:{ 'address':'/gyroY', 'enable': 0, 'rate':20, 'mode':'MEAN' }, | |
152:{ 'address':'/gyroZ', 'enable': 0, 'rate':20, 'mode':'MEAN' }, | |
153:{ 'address':'/accelX', 'enable': 0, 'rate':20, 'mode':'MEAN' }, | |
154: { 'address':'/accelY', 'enable': 0, 'rate':20, 'mode':'MEAN' }, | |
155: { 'address':'/accelZ', 'enable': 0, 'rate':20, 'mode':'MEAN' }, | |
156: { 'address':'/temp', 'enable': 0, 'rate':250, 'mode':'MEAN' } | |
} | |
OSC_INDEX_ARRAY = [27, 33, 32, 14, 4, 0, 15, 13, 36, 39, 34, 35, #analog pins | |
150, 151, 152, 153, 154, 155, 156 ] #IMU | |
ANALOG_MODES = { | |
'MEAN': 0, | |
'MEDIAN': 1, | |
'MIN': 2, | |
'MAX': 3, | |
'PEAK_DEVIATION': 4, | |
'CAP_SENSE': 5, | |
'DIGITAL': 6, | |
'ECHO': 11, | |
'TRIG': 10 | |
} | |
enableMsg = [0, 0, 0, 255] | |
enableMsg[0] = 1; | |
def setEnables(): | |
# print('\nsetting enabled inputs <input#><enableStatus>') | |
time.sleep(0.25) | |
for i in range(len(OSC_INDEX_ARRAY)): | |
enableMsg[0] = 1; | |
enableMsg[1]=i; | |
enableMsg[2]=OSC_ADDRESSES[OSC_INDEX_ARRAY[i]]['enable'] | |
if( SERIAL_ENABLE ): ser.write(bytearray(enableMsg)) | |
if( WIFI_ENABLE ): s.sendto(bytearray(enableMsg), (clientAddress) ) | |
# print('enable', enableMsg[1], enableMsg[2]) | |
time.sleep(0.025) | |
# print('\nsetting sensor data rate <input#><dataRateInMS>') | |
for i in range(len(OSC_INDEX_ARRAY)): | |
enableMsg[0] = 2; | |
enableMsg[1]=i; | |
enableMsg[2]=OSC_ADDRESSES[OSC_INDEX_ARRAY[i]]['rate'] | |
if( SERIAL_ENABLE ): ser.write(bytearray(enableMsg)) | |
if( WIFI_ENABLE ): s.sendto(bytearray(enableMsg), (clientAddress) ) | |
# print('rate', enableMsg[1], enableMsg[2]) | |
time.sleep(0.025) | |
# print('\nsetting sensor data mode <input#><mode>') | |
for i in range(len(OSC_INDEX_ARRAY)): | |
enableMsg[0] = 3; | |
enableMsg[1]=i; | |
_mode = OSC_ADDRESSES[OSC_INDEX_ARRAY[i]]['mode'] | |
enableMsg[2]=ANALOG_MODES[_mode] | |
if( SERIAL_ENABLE ): ser.write(bytearray(enableMsg)) | |
if( WIFI_ENABLE ): s.sendto(bytearray(enableMsg), (clientAddress) ) | |
# print('mode', enableMsg[1], enableMsg[2]) | |
time.sleep(0.025) | |
###################### | |
# INITIALIZE CAPACITIVE INPUTS | |
###################### | |
NUM_ELECTRODES = 12; # set NUM_ELECTRODES to 0 if not using MPR121 | |
chargeCurrent = 63; # 0-63 | |
capInterval = 100; | |
def setCapSense(): | |
capMessage = [10,NUM_ELECTRODES, 255] | |
ser.write(bytearray(capMessage)) | |
capMessage = [12,chargeCurrent, 255] | |
ser.write(bytearray(capMessage)) | |
capMessage = [11,0,capInterval, 255] | |
for i in range(NUM_ELECTRODES): | |
capMessage[1] = i | |
ser.write(bytearray(capMessage)) | |
###################### | |
# COMMUNICATION INPUT | |
###################### | |
def readNextMessage(): | |
"""Reads new messages over Serial or wifi.""" | |
if SERIAL_ENABLE: | |
return checkSerial() | |
elif WIFI_ENABLE: | |
return checkWiFi() | |
#dispatcher.map("/serialRate", rateHandler) | |
def checkSerial(): | |
"""Checks Serial port for incoming messages.""" | |
bytesTotal = bytes() | |
# defines reserved bytes signifying end of message and escape character | |
endByte = bytes([255]) | |
escByte = bytes([254]) | |
curByte = ser.read(1) | |
bytesTotal += curByte | |
msgInProgress = 1 | |
while(msgInProgress): | |
curByte = ser.read(1) | |
if(RAW_INCOMING_SERIAL_MONITOR): | |
#print ("raw serial: ", int.from_bytes(curByte,byteorder='big')) | |
print(int.from_bytes(curByte,byteorder='big')) | |
elif (curByte == endByte): | |
#if we reach a true end byte, we've read a full message, return buffer | |
msgInProgress = 0 | |
return bytesTotal | |
elif (curByte == escByte): | |
# if we reach a true escape byte, set the flag, but don't write the reserved byte to the buffer | |
curByte = ser.read() | |
bytesTotal += curByte | |
else: | |
bytesTotal += curByte | |
prevUdpMsgNum = 0 | |
###################### | |
#COMMUNICATION OUTPUT | |
###################### | |
def slipOutPacket(val = []): | |
"""Send SLIP encoded values to serial or wifi.""" | |
#print ('val', val) | |
outMessage = [] | |
endByte = bytes([255]) | |
escByte = bytes([254]) | |
for i in val: | |
if i == endByte: | |
outMessage.append(escByte) | |
outMessage.append(i) | |
else : | |
outMessage.append(i) | |
outMessage += (endByte) | |
#print(outMessage) | |
#outMessage = [0,0,34 ,255] | |
ser.write(bytearray(outMessage)) | |
def setLed(add, num, r, g, b): | |
ledMsg = [50,int(num),int(r), int(g),int(b)] | |
if PRINT_LED_MSGS: | |
print('ledMsg:', ledMsg) | |
slipOutPacket(bytearray(ledMsg)) | |
def setLedFromList(num, rgb): | |
setLed(50, num, rgb[0], rgb[1], rgb[2]) | |
dispatcher.map("/led", setLed) | |
###################### | |
# HSV TO RGB | |
###################### | |
# h: 0-360, s: 0-1, v: 0-1 (https://www.rapidtables.com/convert/color/hsv-to-rgb.html) | |
def HSVtoRGB(h, s, v): | |
h = h % 360 | |
s = max(min(s, 1), 0) | |
v = max(min(s, 1), 0) | |
c = v * s | |
x = c * (1 - abs((h / 60.0) % 2 - 1)) | |
m = v - c | |
r, g, b = (0, 0, 0) | |
if h < 60: | |
r,g,b = (c, x, 0) | |
elif h < 120: | |
r,g,b = (x, c, 0) | |
elif h < 180: | |
r,g,b = (0, c, x) | |
elif h < 240: | |
r,g,b = (0, x, c) | |
elif h < 300: | |
r,g,b = (x, 0, c) | |
elif h < 360: | |
r,g,b = (c, 0, x) | |
rgb = (int((r+m)*254), int((g+m)*254), int((b+m)*254)) | |
return rgb | |
###################### | |
# RINGTONE CLASS | |
###################### | |
class Ringtone: | |
# BUTTONS | |
B_BUTTON = 8 | |
DOWN_BUTTON = 9 | |
UP_BUTTON = 10 | |
K_BUTTON = 11 | |
# LEDs | |
NUM_LEDS = 32 | |
NUM_BUTTONS = 4 | |
B_LED = 9 | |
DOWN_LED = 10 | |
UP_LED = 11 | |
K_LED = 12 | |
NOTE_LED_START = 16 | |
LED_UPDATE_INTERVAL = 20 | |
KEY_COLOR_LENGTH = 3 | |
# NOTES/PITCHES | |
NUM_NOTES = 8 | |
MAJOR_SCALE = [0, 2, 4, 5, 7, 9, 11] | |
MAJOR_BLUES_SCALE = [0, 3, 5, 6, 7, 10] | |
MINOR_NATURAL_SCALE = [0, 2, 3, 5, 7, 8, 10] | |
MINOR_HARMONIC_SCALE = [0, 2, 3, 5, 7, 8, 11] | |
MINOR_MELODIC_SCALE = [0, 2, 3, 5, 7, 9, 11] | |
SCALES = [MAJOR_SCALE, MAJOR_BLUES_SCALE, MINOR_NATURAL_SCALE, MINOR_HARMONIC_SCALE, MINOR_MELODIC_SCALE] | |
ROLL_OFFSET_BOUNDS = (-7, 7) | |
START_PITCH_BOUNDS = (36, 108) | |
def __init__(self): | |
# cap states | |
self.capStates = [0 for i in range(12)] | |
self.capFlags = [0 for i in range(12)] # 0 no change, 1 for 0->1, 2 for 1->0 | |
# accel states | |
self.shakeFlag = 0 # 0 no change, 1 for 0->1, 2 for 1->0 | |
# note/pitch states | |
self.startPitch = 60 | |
self.scaleIndex = 0 | |
self.bassPitch = 0 | |
self.bassLock = False # bass pitch changed once per B button down | |
self.rollOffset = 0 | |
self.pitches = [60, 62, 64, 65, 67, 69, 71, 72] | |
# LED states | |
self.ledColors = [(0, 0, 0)] * Ringtone.NUM_LEDS | |
self.noteColors = [self.pitchToRGB(i) for i in range(60, 73)] | |
self.buttonColors = [(0, 0, 0)] * Ringtone.NUM_BUTTONS | |
self.keyColor = self.pitchToRGB(self.startPitch % 12 + 72) | |
self.keyColorStart = 0 | |
# device state | |
self.state = 0 | |
# LED update counter | |
self.ledUpdateCounter = 0 | |
self.updateButtonColors() | |
self.updateNoteColors() | |
# NOTE/PITCH METHODS | |
def rollUpOctave(self): | |
self.startPitch += 12 | |
self.startPitch = max(self.startPitch, Ringtone.START_PITCH_BOUNDS[0]) | |
self.startPitch = min(self.startPitch, Ringtone.START_PITCH_BOUNDS[1]) | |
self.updatePitches() | |
def rollDownOctave(self): | |
self.startPitch -= 12 | |
self.startPitch = max(self.startPitch, Ringtone.START_PITCH_BOUNDS[0]) | |
self.startPitch = min(self.startPitch, Ringtone.START_PITCH_BOUNDS[1]) | |
self.updatePitches() | |
def rollUpNote(self): | |
self.rollOffset += 1 | |
self.rollOffset = max(self.rollOffset, Ringtone.ROLL_OFFSET_BOUNDS[0]) | |
self.rollOffset = min(self.rollOffset, Ringtone.ROLL_OFFSET_BOUNDS[1]) | |
self.updatePitches() | |
def rollDownNote(self): | |
self.rollOffset -= 1 | |
self.rollOffset = max(self.rollOffset, Ringtone.ROLL_OFFSET_BOUNDS[0]) | |
self.rollOffset = min(self.rollOffset, Ringtone.ROLL_OFFSET_BOUNDS[1]) | |
self.updatePitches() | |
def incrementKey(self): | |
self.startPitch += 1 | |
self.startPitch = max(self.startPitch, Ringtone.START_PITCH_BOUNDS[0]) | |
self.startPitch = min(self.startPitch, Ringtone.START_PITCH_BOUNDS[1]) | |
self.updatePitches() | |
def decrementKey(self): | |
self.startPitch -= 1 | |
self.startPitch = max(self.startPitch, Ringtone.START_PITCH_BOUNDS[0]) | |
self.startPitch = min(self.startPitch, Ringtone.START_PITCH_BOUNDS[1]) | |
self.updatePitches() | |
def cycleScale(self): | |
self.scaleIndex += 1 | |
self.scaleIndex %= len(Ringtone.SCALES) | |
self.updatePitches() | |
def setBassPitch(self, note): # note: pitch of note button pressed | |
if not self.bassLock: | |
self.bassPitch = note - 12 | |
self.bassLock = True | |
client.send_message('/basspitch', self.bassPitch) | |
def clearBassPitch(self): | |
self.bassPitch = 0 | |
self.bassLock = False | |
client.send_message('/basspitch', self.bassPitch) | |
def sendBassNoteOff(self): | |
client.send_message('/bassnote', 0) | |
def sendBassNoteOn(self): | |
client.send_message('/bassnote', 1) | |
def sendPitches(self): | |
for i in range(len(self.pitches)): | |
address = '/pitch' + str(i) | |
val = self.pitches[i] | |
client.send_message(address, val) | |
def updatePitches(self): | |
newPitches = [] | |
for n in range(Ringtone.NUM_NOTES): | |
s = Ringtone.SCALES[self.scaleIndex] | |
i = n + self.rollOffset | |
p = self.startPitch + s[i % len(s)] | |
if i < 0: | |
p -= 12 * (int(-i / len(s)) + 1) | |
elif i >= len(s): | |
p += 12 * (int(i / len(s))) | |
newPitches.append(p) | |
self.pitches = newPitches | |
# LED methods | |
def pitchToRGB(self, pitch): | |
p = pitch % 12 | |
o = int(pitch / 12.0) - 1 | |
return HSVtoRGB((p*30+320) % 360, 0.70 + (o-4)*0.05, 0.75) | |
def updateButtonColors(self): | |
self.buttonColors = [(128, 128, 128) if self.capStates[i] == 1 else (0, 0, 0) for i in range(Ringtone.B_BUTTON, Ringtone.B_BUTTON+Ringtone.NUM_BUTTONS)] | |
self.ledColors[Ringtone.B_LED:Ringtone.B_LED+Ringtone.NUM_BUTTONS] = self.buttonColors | |
self.sendButtonColors() | |
def updateNoteColors(self): | |
self.noteColors = [self.pitchToRGB(p) for p in self.pitches] | |
self.ledColors[Ringtone.NOTE_LED_START:Ringtone.NOTE_LED_START+Ringtone.NUM_NOTES] = self.noteColors | |
self.sendNoteColors() | |
def updateKeyColor(self): | |
self.keyColor = self.pitchToRGB(self.startPitch % 12 + 72) | |
def cycleKeyColor(self): | |
self.keyColorStart += 1 | |
self.keyColorStart %= Ringtone.NUM_LEDS | |
self.ledColors = [(0, 0, 0)] * Ringtone.NUM_LEDS | |
for i in range(Ringtone.KEY_COLOR_LENGTH): | |
self.ledColors[(self.keyColorStart+i) % Ringtone.NUM_LEDS] = self.keyColor | |
for i in range(Ringtone.KEY_COLOR_LENGTH + 4, 2*Ringtone.KEY_COLOR_LENGTH + 4): | |
self.ledColors[(self.keyColorStart+i) % Ringtone.NUM_LEDS] = self.keyColor | |
for i in range(2*Ringtone.KEY_COLOR_LENGTH + 8, 3*Ringtone.KEY_COLOR_LENGTH + 8): | |
self.ledColors[(self.keyColorStart+i) % Ringtone.NUM_LEDS] = self.keyColor | |
self.updateButtonColors() | |
self.updateNoteColors() | |
self.sendAllLedColors() | |
def sendLedFlash(self): | |
for i in range(len(self.ledColors)): | |
setLedFromList(i, (64, 64, 64)) | |
def sendButtonColors(self): | |
for i in range(len(self.buttonColors)): | |
setLedFromList(Ringtone.B_LED+i, self.buttonColors[i]) | |
def sendNoteColors(self): | |
for i in range(len(self.noteColors)): | |
setLedFromList(Ringtone.NOTE_LED_START+i, self.noteColors[i]) | |
def sendAllLedColors(self): | |
for i in range(len(self.ledColors)): | |
setLedFromList(i, self.ledColors[i]) | |
# reset methods | |
def resetCapFlags(self): | |
self.capFlags = [0 for i in range(12)] | |
def resetShakeFlag(self): | |
self.shakeFlag = 0 | |
def interpretMessage(self, message): | |
if message is None: | |
return | |
if(0): | |
print ('mirror', message) | |
if(len(message) < 3): | |
if( SERIAL_ENABLE ): ser.read(ser.in_waiting) | |
return | |
if(message[0] == 1): | |
print(message[1], message[2]) | |
return | |
# analog inputs | |
if(message[0] in OSC_INDEX_ARRAY): | |
address = OSC_ADDRESSES[message[0]]['address'] | |
val = (message[1]<<8) + message[2] | |
# gyro normalize | |
if (message[0] in [150, 151, 152]): | |
val = int((val - GYRO_NORMS[int(message[0] - 150)]) / GYRO_SCALE) | |
# accel normalize | |
elif (message[0] in [153, 154, 155]): | |
val = int((val - ACCEL_NORMS[int(message[0] - 153)]) / ACCEL_SCALE) | |
if val >= SHAKE_THRESHOLD and self.shakeFlag == 0: | |
self.shakeFlag = 1 | |
elif val < SHAKE_THRESHOLD and self.shakeFlag == 1: | |
self.shakeFlag = 2 | |
else: | |
self.shakeFlag = 0 | |
if(PACKET_INCOMING_SERIAL_MONITOR ): | |
if not FILTER_ON or address in FILTER_ADDRESSES: | |
print(address,val) | |
client.send_message(address, val) | |
# capacitive values | |
elif(message[0] >= 64 and message[0] < 76): | |
num = message[0] - 64 | |
address = '/capsense' + str(num); | |
state_address = '/capstate' + str(num) | |
val = (message[1] << 8) + message[2] - 4096 | |
if val > CAP_MAX: | |
return | |
# cap state transitions | |
if val >= CAP_THRESHOLD and self.capStates[num] == 0: # cap down | |
self.capFlags[num] = 1 | |
client.send_message(state_address, 1) | |
elif val < CAP_THRESHOLD and self.capStates[num] == 1: # cap up | |
self.capFlags[num] = 2 | |
self.capStates[num] = 1 if val >= CAP_THRESHOLD else 0 | |
client.send_message(state_address, 0) | |
else: | |
self.capFlags[num] = 0 | |
self.capStates[num] = 1 if val >= CAP_THRESHOLD else 0 | |
if( PACKET_INCOMING_SERIAL_MONITOR ): | |
if not FILTER_ON or address in FILTER_ADDRESSES: | |
print(address, val) # /capsenseX | |
if not FILTER_ON or state_address in FILTER_ADDRESSES: | |
print(state_address, self.capStates[num]) # /capstateX | |
if not FILTER_ON or '/capflag' + str(num) in FILTER_ADDRESSES: | |
print('cap_flag' + str(num), self.capFlags[num]) | |
# client.send_message(address, val) | |
# client.send_message(state_address, self.capStates[num]) | |
def run(self): | |
''' | |
* handled in Pd | |
0 - PLAY MODE | |
8 down/up: note on/off* | |
B up: note off (8vb), clear bass pitch | |
+ down: roll +1 scale degree | |
- down: roll -1 scale degree | |
B down: enter bass select mode (1) | |
K down: enter key select mode (2) | |
1 - BASS SELECT MODE (B down) | |
8 down: change bass pitch, note on (8vb), enter play mode (0) | |
B up: note off (8vb), clear bass pitch, enter play mode (0) | |
K down: enter octave select mode (3) | |
2 - KEY SELECT MODE (K down) | |
8 down/up: note on/off* | |
+ down: increase key 1 semitone | |
- down: decrease key 1 semitone | |
shake: change scale (major/minor, etc.) | |
K up: enter play mode (0) | |
B down: enter octave select mode (3) | |
3 - OCTAVE SELECT MODE (K and B down) | |
8 down/up: note on/off* | |
+ down: roll +1 octave | |
- down: roll -1 octave | |
K up: enter bass select mode (1) | |
B up: enter key select mode (2) | |
''' | |
pitchesChangedFlag = False | |
# update button LEDs | |
if sum(self.capFlags[Ringtone.B_BUTTON:Ringtone.B_BUTTON+Ringtone.NUM_BUTTONS]) > 0: | |
self.updateButtonColors() | |
# if PRINT_LOG_MSGS: print('LEDs: button colors updated') | |
self.sendButtonColors() | |
# if PRINT_LOG_MSGS: print('LEDs: button colors sent') | |
# update key color cycle for all LEDs | |
if self.ledUpdateCounter == 0: | |
self.cycleKeyColor() | |
# if PRINT_LOG_MSGS: print('LEDs: all LEDs sent') | |
self.ledUpdateCounter = Ringtone.LED_UPDATE_INTERVAL | |
else: | |
self.ledUpdateCounter -= 1 | |
# send pitches to Pd | |
self.sendPitches() | |
if self.state == 0: # PLAY MODE | |
if self.capFlags[Ringtone.B_BUTTON] == 2: # B up | |
self.sendBassNoteOff() | |
if PRINT_LOG_MSGS: print('bass: note off') | |
self.clearBassPitch() | |
if PRINT_LOG_MSGS: print('bass: pitch cleared') | |
if self.capFlags[Ringtone.UP_BUTTON] == 1: # + down | |
self.rollUpNote() | |
pitchesChangedFlag = True | |
if PRINT_LOG_MSGS: print('rollUpNote: new offset', self.rollOffset) | |
if self.capFlags[Ringtone.DOWN_BUTTON] == 1: # - down | |
self.rollDownNote() | |
pitchesChangedFlag = True | |
if PRINT_LOG_MSGS: print('rollDownNote: new offset', self.rollOffset) | |
if self.capFlags[Ringtone.B_BUTTON] == 1 and self.capFlags[Ringtone.K_BUTTON] == 1: # B and K down | |
self.state = 3 | |
if PRINT_LOG_MSGS: print('state: 0 to 3') | |
elif self.capFlags[Ringtone.B_BUTTON] == 1: # B down | |
self.state = 1 | |
if PRINT_LOG_MSGS: print('state: 0 to 1') | |
elif self.capFlags[Ringtone.K_BUTTON] == 1: # K down | |
self.state = 2 | |
if PRINT_LOG_MSGS: print('state: 0 to 2') | |
elif self.state == 1: # BASS SELECT MODE | |
if sum(self.capFlags) > 0: # 8 down | |
for n in range(Ringtone.NUM_NOTES): | |
if self.capFlags[n] == 1: # find first note down | |
self.setBassPitch(self.pitches[n]) | |
if PRINT_LOG_MSGS: print('setBassPitch:', self.bassPitch) | |
self.sendBassNoteOn() | |
if PRINT_LOG_MSGS: print('bass: note on') | |
self.state = 0 | |
break | |
if self.capFlags[Ringtone.B_BUTTON] == 2: # B up | |
self.sendBassNoteOff() | |
if PRINT_LOG_MSGS: print('bass: note off') | |
self.clearBassPitch() | |
if PRINT_LOG_MSGS: print('bass: pitch cleared') | |
self.state = 0 | |
if PRINT_LOG_MSGS: print('state: 1 to 0') | |
if self.capFlags[Ringtone.B_BUTTON] == 2 and self.capFlags[Ringtone.K_BUTTON] == 1: # B up and K down | |
self.state = 2 | |
if PRINT_LOG_MSGS: print('state: 1 to 2') | |
elif self.capFlags[Ringtone.K_BUTTON] == 1: # K down | |
self.state = 3 | |
if PRINT_LOG_MSGS: print('state: 1 to 3') | |
elif self.state == 2: # KEY SELECT MODE | |
if self.capFlags[Ringtone.UP_BUTTON] == 1: # + down | |
self.incrementKey() | |
self.updateKeyColor() | |
pitchesChangedFlag = True | |
if PRINT_LOG_MSGS: print('key: incremented, new start pitch ', self.startPitch) | |
if self.capFlags[Ringtone.DOWN_BUTTON] == 1: # - down | |
self.decrementKey() | |
self.updateKeyColor() | |
pitchesChangedFlag = True | |
if PRINT_LOG_MSGS: print('key: decremented, new start pitch ', self.startPitch) | |
if self.shakeFlag: | |
self.cycleScale() | |
self.sendLedFlash() | |
pitchesChangedFlag = True | |
if PRINT_LOG_MSGS: print('scale: new index', self.scaleIndex) | |
if self.capFlags[Ringtone.K_BUTTON] == 2 and self.capFlags[Ringtone.B_BUTTON] == 1: # K up and B down | |
self.state = 1 | |
if PRINT_LOG_MSGS: print('state: 2 to 1') | |
elif self.capFlags[Ringtone.K_BUTTON] == 2: # K up | |
self.state = 0 | |
if PRINT_LOG_MSGS: print('state: 2 to 0') | |
elif self.capFlags[Ringtone.B_BUTTON] == 1: # B down | |
self.state = 3 | |
if PRINT_LOG_MSGS: print('state: 2 to 3') | |
elif self.state == 3: # OCTAVE SELECT MODE | |
if self.capFlags[Ringtone.UP_BUTTON] == 1: # + down | |
self.rollUpOctave() | |
pitchesChangedFlag = True | |
if PRINT_LOG_MSGS: print('octave: incremented, new start pitch ', self.startPitch) | |
if self.capFlags[Ringtone.DOWN_BUTTON] == 1: # - down | |
self.rollDownOctave() | |
pitchesChangedFlag = True | |
if PRINT_LOG_MSGS: print('octave: decremented, new start pitch ', self.startPitch) | |
if self.capFlags[Ringtone.K_BUTTON] == 2 and self.capFlags[Ringtone.B_BUTTON] == 2: # K up and B up | |
self.state = 0 | |
if PRINT_LOG_MSGS: print('state: 3 to 0') | |
elif self.capFlags[Ringtone.K_BUTTON] == 2: # K up | |
self.state = 1 | |
if PRINT_LOG_MSGS: print('state: 3 to 1') | |
elif self.capFlags[Ringtone.B_BUTTON] == 2: # B up | |
self.state = 2 | |
if PRINT_LOG_MSGS: print('state: 3 to 2') | |
# reset cap flags bc they're all consumed by now | |
self.resetCapFlags() | |
# reset shake flag too | |
self.resetShakeFlag() | |
if pitchesChangedFlag: # if note pitches changed, update LED colors | |
self.updateNoteColors() | |
# if PRINT_LOG_MSGS: print('LEDs: note colors updated') | |
self.sendNoteColors() | |
# if PRINT_LOG_MSGS: print('LEDs: note colors sent') | |
###################### | |
#LOOP | |
###################### | |
async def loop(): | |
debugVal = 0 | |
time.sleep(0.1) | |
if SERIAL_ENABLE: ser.flushInput() | |
time.sleep(0.1) | |
setEnables() | |
setCapSense() | |
rt = Ringtone() | |
while(1): | |
currentMessage = readNextMessage() # can be None if nothing in input buffer | |
rt.interpretMessage(currentMessage) | |
rt.run() | |
await asyncio.sleep(0) | |
time.sleep(0.001) | |
async def init(): | |
server = AsyncIOOSCUDPServer(("127.0.0.1", 5006), dispatcher, asyncio.get_event_loop()) | |
transport, protocol = await server.create_serve_endpoint() | |
if SERIAL_ENABLE: ser.read(ser.in_waiting) | |
await loop() | |
transport.close() | |
asyncio.run(init()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment