Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
# 370_RINGTONE.py
# Created by Ian Hattwick with Fred Kelly
# Modified by Shannon Peng
# Original file created May 4 2020
RAW_INCOMING_SERIAL_MONITOR = 0
PACKET_INCOMING_SERIAL_MONITOR = 1
import serial, serial.tools.list_ports, socket, sys
from pythonosc import osc_message_builder
from pythonosc import udp_client
from pythonosc.osc_server import AsyncIOOSCUDPServer
from pythonosc.dispatcher import Dispatcher
import asyncio
import struct
import time
######################
# SET COMMUNICATION MODE
######################
# don't forget to set the ESP32 firmware to match!
SERIAL_ENABLE = 1
WIFI_ENABLE = 0 # must run Python script after ESP32 reset
######################
# FILTER MESSAGES
######################
# quick filters
accel_addresses = ["/accelX", "/accelY", "/accelZ"]
gyro_addresses = ["/gyroX", "/gyroY", "/gyroZ"]
capsense_addresses = [ "/capsense" + str(i) for i in range(12)]
FILTER_ON = True
PRINT_LED_MSGS = False
PRINT_LOG_MSGS = True
FILTER_ADDRESSES = []
######################
# CAP VALUES
######################
CAP_THRESHOLD = 120
CAP_MAX = 4000
######################
# ACCEL VALUES
######################
ACCEL_NORMS = [32767+0, 32767-27, 32767+13]
ACCEL_SCALE = 10
SHAKE_THRESHOLD = 120
######################
# GYRO VALUES
######################
GYRO_NORMS = [32767+1020, 32767, 32767+70]
GYRO_SCALE = 10
######################
# SETUP SERIAL PORT
######################
if( SERIAL_ENABLE):
# find serial port
CUR_SERIAL_PORT = "/dev/cu.usbserial-14240"
ports = list(serial.tools.list_ports.comports())
# print ports
# print("available serial ports:")
# for x in range(len(ports)):
# print(ports[x] )
# check if cur port is available
for x in range(len(ports)):
if CUR_SERIAL_PORT in ports[x]:
ser = serial.Serial(CUR_SERIAL_PORT)
ser.baudrate=115200
ser.setDTR(False) # Drop DTR
time.sleep(0.022) # Read somewhere that 22ms is what the UI does.
ser.setDTR(True) # UP the DTR back
ser.read(ser.in_waiting) # if anything in input buffer, discard it
print(CUR_SERIAL_PORT + " connected\n")
SERIAL_ENABLE = 1
else:
print(CUR_SERIAL_PORT + " not available\n")
######################
# SETUP OSC
######################
# initialize UDP client
client = udp_client.SimpleUDPClient("127.0.0.1", 5005)
# dispatcher in charge of executing functions in response to RECEIVED OSC messages
dispatcher = Dispatcher()
print("Sending data to port", 5005)
# sensor inputs
OSC_ADDRESSES = {
27:{ 'address':'/analog0', 'enable': 1, 'rate':250, 'mode':'DIGITAL' },
33:{ 'address':'/analog1', 'enable': 0, 'rate':200, 'mode':'MEAN' },
32:{ 'address':'/analog2', 'enable': 0, 'rate':200, 'mode':'MEAN' },
14:{ 'address':'/analog3', 'enable': 0, 'rate':200, 'mode':'MEAN' },
4: { 'address':'/analog4', 'enable': 0, 'rate':200, 'mode':'MEAN' },
0: { 'address':'/analog5', 'enable': 0, 'rate':200, 'mode':'MEAN' },# pulled high by ESP32
15:{ 'address':'/analog6', 'enable': 0, 'rate':200, 'mode':'MEAN' },# boot fail if pulled low
13:{ 'address':'/analog7', 'enable': 0, 'rate':200, 'mode':'MEAN' },
36:{ 'address':'/analog8', 'enable': 0, 'rate':200, 'mode':'MEAN' },
39:{ 'address':'/analog9', 'enable': 0, 'rate':200, 'mode':'MEAN' },
# alternate analog inputs
34:{ 'address':'/button0', 'enable': 0, 'rate':125, 'mode':'MEAN' }, # button
35:{ 'address':'/button1', 'enable': 0, 'rate':125, 'mode':'MEAN' }, # button
2: { 'address':'/analog10', 'enable': 0, 'rate':200, 'mode':'MEAN' }, # CS0
12:{ 'address':'/analog11', 'enable': 0, 'rate':200, 'mode':'MEAN' }, # CS1, boot fail if pulled high
25:{ 'address':'/analog12', 'enable': 0, 'rate':125, 'mode':'MEAN' }, # DAC1
26:{ 'address':'/analog13', 'enable': 0, 'rate':125, 'mode':'MEAN' }, # DAC2
# I2C and SPI pins, mode must be 'DIGITAL', can't be used if I2C or SPI are being used
18:{ 'address':'/digital0', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# CLK
19:{ 'address':'/digital1', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# MISO
21:{ 'address':'/digital2', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# I2C
22:{ 'address':'/digital3', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# I2C
23:{ 'address':'/digital4', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# MOSI
5:{ 'address':'/digital5', 'enable': 0, 'rate':200, 'mode':'DIGITAL' },# MIDI, boot fail if pulled low
# IMU
150:{ 'address':'/gyroX', 'enable': 0, 'rate':20, 'mode':'MEAN' },
151:{ 'address':'/gyroY', 'enable': 0, 'rate':20, 'mode':'MEAN' },
152:{ 'address':'/gyroZ', 'enable': 0, 'rate':20, 'mode':'MEAN' },
153:{ 'address':'/accelX', 'enable': 0, 'rate':20, 'mode':'MEAN' },
154: { 'address':'/accelY', 'enable': 0, 'rate':20, 'mode':'MEAN' },
155: { 'address':'/accelZ', 'enable': 0, 'rate':20, 'mode':'MEAN' },
156: { 'address':'/temp', 'enable': 0, 'rate':250, 'mode':'MEAN' }
}
OSC_INDEX_ARRAY = [27, 33, 32, 14, 4, 0, 15, 13, 36, 39, 34, 35, #analog pins
150, 151, 152, 153, 154, 155, 156 ] #IMU
ANALOG_MODES = {
'MEAN': 0,
'MEDIAN': 1,
'MIN': 2,
'MAX': 3,
'PEAK_DEVIATION': 4,
'CAP_SENSE': 5,
'DIGITAL': 6,
'ECHO': 11,
'TRIG': 10
}
enableMsg = [0, 0, 0, 255]
enableMsg[0] = 1;
def setEnables():
# print('\nsetting enabled inputs <input#><enableStatus>')
time.sleep(0.25)
for i in range(len(OSC_INDEX_ARRAY)):
enableMsg[0] = 1;
enableMsg[1]=i;
enableMsg[2]=OSC_ADDRESSES[OSC_INDEX_ARRAY[i]]['enable']
if( SERIAL_ENABLE ): ser.write(bytearray(enableMsg))
if( WIFI_ENABLE ): s.sendto(bytearray(enableMsg), (clientAddress) )
# print('enable', enableMsg[1], enableMsg[2])
time.sleep(0.025)
# print('\nsetting sensor data rate <input#><dataRateInMS>')
for i in range(len(OSC_INDEX_ARRAY)):
enableMsg[0] = 2;
enableMsg[1]=i;
enableMsg[2]=OSC_ADDRESSES[OSC_INDEX_ARRAY[i]]['rate']
if( SERIAL_ENABLE ): ser.write(bytearray(enableMsg))
if( WIFI_ENABLE ): s.sendto(bytearray(enableMsg), (clientAddress) )
# print('rate', enableMsg[1], enableMsg[2])
time.sleep(0.025)
# print('\nsetting sensor data mode <input#><mode>')
for i in range(len(OSC_INDEX_ARRAY)):
enableMsg[0] = 3;
enableMsg[1]=i;
_mode = OSC_ADDRESSES[OSC_INDEX_ARRAY[i]]['mode']
enableMsg[2]=ANALOG_MODES[_mode]
if( SERIAL_ENABLE ): ser.write(bytearray(enableMsg))
if( WIFI_ENABLE ): s.sendto(bytearray(enableMsg), (clientAddress) )
# print('mode', enableMsg[1], enableMsg[2])
time.sleep(0.025)
######################
# INITIALIZE CAPACITIVE INPUTS
######################
NUM_ELECTRODES = 12; # set NUM_ELECTRODES to 0 if not using MPR121
chargeCurrent = 63; # 0-63
capInterval = 100;
def setCapSense():
capMessage = [10,NUM_ELECTRODES, 255]
ser.write(bytearray(capMessage))
capMessage = [12,chargeCurrent, 255]
ser.write(bytearray(capMessage))
capMessage = [11,0,capInterval, 255]
for i in range(NUM_ELECTRODES):
capMessage[1] = i
ser.write(bytearray(capMessage))
######################
# COMMUNICATION INPUT
######################
def readNextMessage():
"""Reads new messages over Serial or wifi."""
if SERIAL_ENABLE:
return checkSerial()
elif WIFI_ENABLE:
return checkWiFi()
#dispatcher.map("/serialRate", rateHandler)
def checkSerial():
"""Checks Serial port for incoming messages."""
bytesTotal = bytes()
# defines reserved bytes signifying end of message and escape character
endByte = bytes([255])
escByte = bytes([254])
curByte = ser.read(1)
bytesTotal += curByte
msgInProgress = 1
while(msgInProgress):
curByte = ser.read(1)
if(RAW_INCOMING_SERIAL_MONITOR):
#print ("raw serial: ", int.from_bytes(curByte,byteorder='big'))
print(int.from_bytes(curByte,byteorder='big'))
elif (curByte == endByte):
#if we reach a true end byte, we've read a full message, return buffer
msgInProgress = 0
return bytesTotal
elif (curByte == escByte):
# if we reach a true escape byte, set the flag, but don't write the reserved byte to the buffer
curByte = ser.read()
bytesTotal += curByte
else:
bytesTotal += curByte
prevUdpMsgNum = 0
######################
#COMMUNICATION OUTPUT
######################
def slipOutPacket(val = []):
"""Send SLIP encoded values to serial or wifi."""
#print ('val', val)
outMessage = []
endByte = bytes([255])
escByte = bytes([254])
for i in val:
if i == endByte:
outMessage.append(escByte)
outMessage.append(i)
else :
outMessage.append(i)
outMessage += (endByte)
#print(outMessage)
#outMessage = [0,0,34 ,255]
ser.write(bytearray(outMessage))
def setLed(add, num, r, g, b):
ledMsg = [50,int(num),int(r), int(g),int(b)]
if PRINT_LED_MSGS:
print('ledMsg:', ledMsg)
slipOutPacket(bytearray(ledMsg))
def setLedFromList(num, rgb):
setLed(50, num, rgb[0], rgb[1], rgb[2])
dispatcher.map("/led", setLed)
######################
# HSV TO RGB
######################
# h: 0-360, s: 0-1, v: 0-1 (https://www.rapidtables.com/convert/color/hsv-to-rgb.html)
def HSVtoRGB(h, s, v):
h = h % 360
s = max(min(s, 1), 0)
v = max(min(s, 1), 0)
c = v * s
x = c * (1 - abs((h / 60.0) % 2 - 1))
m = v - c
r, g, b = (0, 0, 0)
if h < 60:
r,g,b = (c, x, 0)
elif h < 120:
r,g,b = (x, c, 0)
elif h < 180:
r,g,b = (0, c, x)
elif h < 240:
r,g,b = (0, x, c)
elif h < 300:
r,g,b = (x, 0, c)
elif h < 360:
r,g,b = (c, 0, x)
rgb = (int((r+m)*254), int((g+m)*254), int((b+m)*254))
return rgb
######################
# RINGTONE CLASS
######################
class Ringtone:
# BUTTONS
B_BUTTON = 8
DOWN_BUTTON = 9
UP_BUTTON = 10
K_BUTTON = 11
# LEDs
NUM_LEDS = 32
NUM_BUTTONS = 4
B_LED = 9
DOWN_LED = 10
UP_LED = 11
K_LED = 12
NOTE_LED_START = 16
LED_UPDATE_INTERVAL = 20
KEY_COLOR_LENGTH = 3
# NOTES/PITCHES
NUM_NOTES = 8
MAJOR_SCALE = [0, 2, 4, 5, 7, 9, 11]
MAJOR_BLUES_SCALE = [0, 3, 5, 6, 7, 10]
MINOR_NATURAL_SCALE = [0, 2, 3, 5, 7, 8, 10]
MINOR_HARMONIC_SCALE = [0, 2, 3, 5, 7, 8, 11]
MINOR_MELODIC_SCALE = [0, 2, 3, 5, 7, 9, 11]
SCALES = [MAJOR_SCALE, MAJOR_BLUES_SCALE, MINOR_NATURAL_SCALE, MINOR_HARMONIC_SCALE, MINOR_MELODIC_SCALE]
ROLL_OFFSET_BOUNDS = (-7, 7)
START_PITCH_BOUNDS = (36, 108)
def __init__(self):
# cap states
self.capStates = [0 for i in range(12)]
self.capFlags = [0 for i in range(12)] # 0 no change, 1 for 0->1, 2 for 1->0
# accel states
self.shakeFlag = 0 # 0 no change, 1 for 0->1, 2 for 1->0
# note/pitch states
self.startPitch = 60
self.scaleIndex = 0
self.bassPitch = 0
self.bassLock = False # bass pitch changed once per B button down
self.rollOffset = 0
self.pitches = [60, 62, 64, 65, 67, 69, 71, 72]
# LED states
self.ledColors = [(0, 0, 0)] * Ringtone.NUM_LEDS
self.noteColors = [self.pitchToRGB(i) for i in range(60, 73)]
self.buttonColors = [(0, 0, 0)] * Ringtone.NUM_BUTTONS
self.keyColor = self.pitchToRGB(self.startPitch % 12 + 72)
self.keyColorStart = 0
# device state
self.state = 0
# LED update counter
self.ledUpdateCounter = 0
self.updateButtonColors()
self.updateNoteColors()
# NOTE/PITCH METHODS
def rollUpOctave(self):
self.startPitch += 12
self.startPitch = max(self.startPitch, Ringtone.START_PITCH_BOUNDS[0])
self.startPitch = min(self.startPitch, Ringtone.START_PITCH_BOUNDS[1])
self.updatePitches()
def rollDownOctave(self):
self.startPitch -= 12
self.startPitch = max(self.startPitch, Ringtone.START_PITCH_BOUNDS[0])
self.startPitch = min(self.startPitch, Ringtone.START_PITCH_BOUNDS[1])
self.updatePitches()
def rollUpNote(self):
self.rollOffset += 1
self.rollOffset = max(self.rollOffset, Ringtone.ROLL_OFFSET_BOUNDS[0])
self.rollOffset = min(self.rollOffset, Ringtone.ROLL_OFFSET_BOUNDS[1])
self.updatePitches()
def rollDownNote(self):
self.rollOffset -= 1
self.rollOffset = max(self.rollOffset, Ringtone.ROLL_OFFSET_BOUNDS[0])
self.rollOffset = min(self.rollOffset, Ringtone.ROLL_OFFSET_BOUNDS[1])
self.updatePitches()
def incrementKey(self):
self.startPitch += 1
self.startPitch = max(self.startPitch, Ringtone.START_PITCH_BOUNDS[0])
self.startPitch = min(self.startPitch, Ringtone.START_PITCH_BOUNDS[1])
self.updatePitches()
def decrementKey(self):
self.startPitch -= 1
self.startPitch = max(self.startPitch, Ringtone.START_PITCH_BOUNDS[0])
self.startPitch = min(self.startPitch, Ringtone.START_PITCH_BOUNDS[1])
self.updatePitches()
def cycleScale(self):
self.scaleIndex += 1
self.scaleIndex %= len(Ringtone.SCALES)
self.updatePitches()
def setBassPitch(self, note): # note: pitch of note button pressed
if not self.bassLock:
self.bassPitch = note - 12
self.bassLock = True
client.send_message('/basspitch', self.bassPitch)
def clearBassPitch(self):
self.bassPitch = 0
self.bassLock = False
client.send_message('/basspitch', self.bassPitch)
def sendBassNoteOff(self):
client.send_message('/bassnote', 0)
def sendBassNoteOn(self):
client.send_message('/bassnote', 1)
def sendPitches(self):
for i in range(len(self.pitches)):
address = '/pitch' + str(i)
val = self.pitches[i]
client.send_message(address, val)
def updatePitches(self):
newPitches = []
for n in range(Ringtone.NUM_NOTES):
s = Ringtone.SCALES[self.scaleIndex]
i = n + self.rollOffset
p = self.startPitch + s[i % len(s)]
if i < 0:
p -= 12 * (int(-i / len(s)) + 1)
elif i >= len(s):
p += 12 * (int(i / len(s)))
newPitches.append(p)
self.pitches = newPitches
# LED methods
def pitchToRGB(self, pitch):
p = pitch % 12
o = int(pitch / 12.0) - 1
return HSVtoRGB((p*30+320) % 360, 0.70 + (o-4)*0.05, 0.75)
def updateButtonColors(self):
self.buttonColors = [(128, 128, 128) if self.capStates[i] == 1 else (0, 0, 0) for i in range(Ringtone.B_BUTTON, Ringtone.B_BUTTON+Ringtone.NUM_BUTTONS)]
self.ledColors[Ringtone.B_LED:Ringtone.B_LED+Ringtone.NUM_BUTTONS] = self.buttonColors
self.sendButtonColors()
def updateNoteColors(self):
self.noteColors = [self.pitchToRGB(p) for p in self.pitches]
self.ledColors[Ringtone.NOTE_LED_START:Ringtone.NOTE_LED_START+Ringtone.NUM_NOTES] = self.noteColors
self.sendNoteColors()
def updateKeyColor(self):
self.keyColor = self.pitchToRGB(self.startPitch % 12 + 72)
def cycleKeyColor(self):
self.keyColorStart += 1
self.keyColorStart %= Ringtone.NUM_LEDS
self.ledColors = [(0, 0, 0)] * Ringtone.NUM_LEDS
for i in range(Ringtone.KEY_COLOR_LENGTH):
self.ledColors[(self.keyColorStart+i) % Ringtone.NUM_LEDS] = self.keyColor
for i in range(Ringtone.KEY_COLOR_LENGTH + 4, 2*Ringtone.KEY_COLOR_LENGTH + 4):
self.ledColors[(self.keyColorStart+i) % Ringtone.NUM_LEDS] = self.keyColor
for i in range(2*Ringtone.KEY_COLOR_LENGTH + 8, 3*Ringtone.KEY_COLOR_LENGTH + 8):
self.ledColors[(self.keyColorStart+i) % Ringtone.NUM_LEDS] = self.keyColor
self.updateButtonColors()
self.updateNoteColors()
self.sendAllLedColors()
def sendLedFlash(self):
for i in range(len(self.ledColors)):
setLedFromList(i, (64, 64, 64))
def sendButtonColors(self):
for i in range(len(self.buttonColors)):
setLedFromList(Ringtone.B_LED+i, self.buttonColors[i])
def sendNoteColors(self):
for i in range(len(self.noteColors)):
setLedFromList(Ringtone.NOTE_LED_START+i, self.noteColors[i])
def sendAllLedColors(self):
for i in range(len(self.ledColors)):
setLedFromList(i, self.ledColors[i])
# reset methods
def resetCapFlags(self):
self.capFlags = [0 for i in range(12)]
def resetShakeFlag(self):
self.shakeFlag = 0
def interpretMessage(self, message):
if message is None:
return
if(0):
print ('mirror', message)
if(len(message) < 3):
if( SERIAL_ENABLE ): ser.read(ser.in_waiting)
return
if(message[0] == 1):
print(message[1], message[2])
return
# analog inputs
if(message[0] in OSC_INDEX_ARRAY):
address = OSC_ADDRESSES[message[0]]['address']
val = (message[1]<<8) + message[2]
# gyro normalize
if (message[0] in [150, 151, 152]):
val = int((val - GYRO_NORMS[int(message[0] - 150)]) / GYRO_SCALE)
# accel normalize
elif (message[0] in [153, 154, 155]):
val = int((val - ACCEL_NORMS[int(message[0] - 153)]) / ACCEL_SCALE)
if val >= SHAKE_THRESHOLD and self.shakeFlag == 0:
self.shakeFlag = 1
elif val < SHAKE_THRESHOLD and self.shakeFlag == 1:
self.shakeFlag = 2
else:
self.shakeFlag = 0
if(PACKET_INCOMING_SERIAL_MONITOR ):
if not FILTER_ON or address in FILTER_ADDRESSES:
print(address,val)
client.send_message(address, val)
# capacitive values
elif(message[0] >= 64 and message[0] < 76):
num = message[0] - 64
address = '/capsense' + str(num);
state_address = '/capstate' + str(num)
val = (message[1] << 8) + message[2] - 4096
if val > CAP_MAX:
return
# cap state transitions
if val >= CAP_THRESHOLD and self.capStates[num] == 0: # cap down
self.capFlags[num] = 1
client.send_message(state_address, 1)
elif val < CAP_THRESHOLD and self.capStates[num] == 1: # cap up
self.capFlags[num] = 2
self.capStates[num] = 1 if val >= CAP_THRESHOLD else 0
client.send_message(state_address, 0)
else:
self.capFlags[num] = 0
self.capStates[num] = 1 if val >= CAP_THRESHOLD else 0
if( PACKET_INCOMING_SERIAL_MONITOR ):
if not FILTER_ON or address in FILTER_ADDRESSES:
print(address, val) # /capsenseX
if not FILTER_ON or state_address in FILTER_ADDRESSES:
print(state_address, self.capStates[num]) # /capstateX
if not FILTER_ON or '/capflag' + str(num) in FILTER_ADDRESSES:
print('cap_flag' + str(num), self.capFlags[num])
# client.send_message(address, val)
# client.send_message(state_address, self.capStates[num])
def run(self):
'''
* handled in Pd
0 - PLAY MODE
8 down/up: note on/off*
B up: note off (8vb), clear bass pitch
+ down: roll +1 scale degree
- down: roll -1 scale degree
B down: enter bass select mode (1)
K down: enter key select mode (2)
1 - BASS SELECT MODE (B down)
8 down: change bass pitch, note on (8vb), enter play mode (0)
B up: note off (8vb), clear bass pitch, enter play mode (0)
K down: enter octave select mode (3)
2 - KEY SELECT MODE (K down)
8 down/up: note on/off*
+ down: increase key 1 semitone
- down: decrease key 1 semitone
shake: change scale (major/minor, etc.)
K up: enter play mode (0)
B down: enter octave select mode (3)
3 - OCTAVE SELECT MODE (K and B down)
8 down/up: note on/off*
+ down: roll +1 octave
- down: roll -1 octave
K up: enter bass select mode (1)
B up: enter key select mode (2)
'''
pitchesChangedFlag = False
# update button LEDs
if sum(self.capFlags[Ringtone.B_BUTTON:Ringtone.B_BUTTON+Ringtone.NUM_BUTTONS]) > 0:
self.updateButtonColors()
# if PRINT_LOG_MSGS: print('LEDs: button colors updated')
self.sendButtonColors()
# if PRINT_LOG_MSGS: print('LEDs: button colors sent')
# update key color cycle for all LEDs
if self.ledUpdateCounter == 0:
self.cycleKeyColor()
# if PRINT_LOG_MSGS: print('LEDs: all LEDs sent')
self.ledUpdateCounter = Ringtone.LED_UPDATE_INTERVAL
else:
self.ledUpdateCounter -= 1
# send pitches to Pd
self.sendPitches()
if self.state == 0: # PLAY MODE
if self.capFlags[Ringtone.B_BUTTON] == 2: # B up
self.sendBassNoteOff()
if PRINT_LOG_MSGS: print('bass: note off')
self.clearBassPitch()
if PRINT_LOG_MSGS: print('bass: pitch cleared')
if self.capFlags[Ringtone.UP_BUTTON] == 1: # + down
self.rollUpNote()
pitchesChangedFlag = True
if PRINT_LOG_MSGS: print('rollUpNote: new offset', self.rollOffset)
if self.capFlags[Ringtone.DOWN_BUTTON] == 1: # - down
self.rollDownNote()
pitchesChangedFlag = True
if PRINT_LOG_MSGS: print('rollDownNote: new offset', self.rollOffset)
if self.capFlags[Ringtone.B_BUTTON] == 1 and self.capFlags[Ringtone.K_BUTTON] == 1: # B and K down
self.state = 3
if PRINT_LOG_MSGS: print('state: 0 to 3')
elif self.capFlags[Ringtone.B_BUTTON] == 1: # B down
self.state = 1
if PRINT_LOG_MSGS: print('state: 0 to 1')
elif self.capFlags[Ringtone.K_BUTTON] == 1: # K down
self.state = 2
if PRINT_LOG_MSGS: print('state: 0 to 2')
elif self.state == 1: # BASS SELECT MODE
if sum(self.capFlags) > 0: # 8 down
for n in range(Ringtone.NUM_NOTES):
if self.capFlags[n] == 1: # find first note down
self.setBassPitch(self.pitches[n])
if PRINT_LOG_MSGS: print('setBassPitch:', self.bassPitch)
self.sendBassNoteOn()
if PRINT_LOG_MSGS: print('bass: note on')
self.state = 0
break
if self.capFlags[Ringtone.B_BUTTON] == 2: # B up
self.sendBassNoteOff()
if PRINT_LOG_MSGS: print('bass: note off')
self.clearBassPitch()
if PRINT_LOG_MSGS: print('bass: pitch cleared')
self.state = 0
if PRINT_LOG_MSGS: print('state: 1 to 0')
if self.capFlags[Ringtone.B_BUTTON] == 2 and self.capFlags[Ringtone.K_BUTTON] == 1: # B up and K down
self.state = 2
if PRINT_LOG_MSGS: print('state: 1 to 2')
elif self.capFlags[Ringtone.K_BUTTON] == 1: # K down
self.state = 3
if PRINT_LOG_MSGS: print('state: 1 to 3')
elif self.state == 2: # KEY SELECT MODE
if self.capFlags[Ringtone.UP_BUTTON] == 1: # + down
self.incrementKey()
self.updateKeyColor()
pitchesChangedFlag = True
if PRINT_LOG_MSGS: print('key: incremented, new start pitch ', self.startPitch)
if self.capFlags[Ringtone.DOWN_BUTTON] == 1: # - down
self.decrementKey()
self.updateKeyColor()
pitchesChangedFlag = True
if PRINT_LOG_MSGS: print('key: decremented, new start pitch ', self.startPitch)
if self.shakeFlag:
self.cycleScale()
self.sendLedFlash()
pitchesChangedFlag = True
if PRINT_LOG_MSGS: print('scale: new index', self.scaleIndex)
if self.capFlags[Ringtone.K_BUTTON] == 2 and self.capFlags[Ringtone.B_BUTTON] == 1: # K up and B down
self.state = 1
if PRINT_LOG_MSGS: print('state: 2 to 1')
elif self.capFlags[Ringtone.K_BUTTON] == 2: # K up
self.state = 0
if PRINT_LOG_MSGS: print('state: 2 to 0')
elif self.capFlags[Ringtone.B_BUTTON] == 1: # B down
self.state = 3
if PRINT_LOG_MSGS: print('state: 2 to 3')
elif self.state == 3: # OCTAVE SELECT MODE
if self.capFlags[Ringtone.UP_BUTTON] == 1: # + down
self.rollUpOctave()
pitchesChangedFlag = True
if PRINT_LOG_MSGS: print('octave: incremented, new start pitch ', self.startPitch)
if self.capFlags[Ringtone.DOWN_BUTTON] == 1: # - down
self.rollDownOctave()
pitchesChangedFlag = True
if PRINT_LOG_MSGS: print('octave: decremented, new start pitch ', self.startPitch)
if self.capFlags[Ringtone.K_BUTTON] == 2 and self.capFlags[Ringtone.B_BUTTON] == 2: # K up and B up
self.state = 0
if PRINT_LOG_MSGS: print('state: 3 to 0')
elif self.capFlags[Ringtone.K_BUTTON] == 2: # K up
self.state = 1
if PRINT_LOG_MSGS: print('state: 3 to 1')
elif self.capFlags[Ringtone.B_BUTTON] == 2: # B up
self.state = 2
if PRINT_LOG_MSGS: print('state: 3 to 2')
# reset cap flags bc they're all consumed by now
self.resetCapFlags()
# reset shake flag too
self.resetShakeFlag()
if pitchesChangedFlag: # if note pitches changed, update LED colors
self.updateNoteColors()
# if PRINT_LOG_MSGS: print('LEDs: note colors updated')
self.sendNoteColors()
# if PRINT_LOG_MSGS: print('LEDs: note colors sent')
######################
#LOOP
######################
async def loop():
debugVal = 0
time.sleep(0.1)
if SERIAL_ENABLE: ser.flushInput()
time.sleep(0.1)
setEnables()
setCapSense()
rt = Ringtone()
while(1):
currentMessage = readNextMessage() # can be None if nothing in input buffer
rt.interpretMessage(currentMessage)
rt.run()
await asyncio.sleep(0)
time.sleep(0.001)
async def init():
server = AsyncIOOSCUDPServer(("127.0.0.1", 5006), dispatcher, asyncio.get_event_loop())
transport, protocol = await server.create_serve_endpoint()
if SERIAL_ENABLE: ser.read(ser.in_waiting)
await loop()
transport.close()
asyncio.run(init())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment