Skip to content

Instantly share code, notes, and snippets.

@cristianmiranda
Created July 20, 2016 23:49
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cristianmiranda/89790ddd10cfe3149ca43a06696fb994 to your computer and use it in GitHub Desktop.
Save cristianmiranda/89790ddd10cfe3149ca43a06696fb994 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
# open a microphone in pyAudio and listen for taps
# brew install portaudio
# pip install --allow-external pyaudio --allow-unverified pyaudio pyaudio
import pyaudio
import struct
import math
import time
import requests
INITIAL_TAP_THRESHOLD = 0.050
FORMAT = pyaudio.paInt16
SHORT_NORMALIZE = (1.0/32768.0)
CHANNELS = 2
RATE = 44100
INPUT_BLOCK_TIME = 0.05
INPUT_FRAMES_PER_BLOCK = int(RATE*INPUT_BLOCK_TIME)
# if we get this many noisy blocks in a row, increase the threshold
OVERSENSITIVE = 15.0/INPUT_BLOCK_TIME
# if we get this many quiet blocks in a row, decrease the threshold
UNDERSENSITIVE = 120.0/INPUT_BLOCK_TIME
# if the noise was longer than this many blocks, it's not a 'tap'
MAX_TAP_BLOCKS = 0.15/INPUT_BLOCK_TIME
ALLOWED_DIFF_BETWEEN_CLAPS = 1000
def get_rms( block ):
# RMS amplitude is defined as the square root of the
# mean over time of the square of the amplitude.
# so we need to convert this string of bytes into
# a string of 16-bit samples...
# we will get one short out for each
# two chars in the string.
count = len(block)/2
format = "%dh"%(count)
shorts = struct.unpack( format, block )
# iterate over the block.
sum_squares = 0.0
for sample in shorts:
# sample is a signed short in +/- 32768.
# normalize it to 1.0
n = sample * SHORT_NORMALIZE
sum_squares += n*n
return math.sqrt( sum_squares / count )
class TapTester(object):
def __init__(self):
self.pa = pyaudio.PyAudio()
self.stream = self.open_mic_stream()
self.tap_threshold = INITIAL_TAP_THRESHOLD
self.noisycount = MAX_TAP_BLOCKS+1
self.quietcount = 0
self.errorcount = 0
self.last_timestamp = 0
def stop(self):
self.stream.close()
def find_input_device(self):
device_index = None
for i in range( self.pa.get_device_count() ):
devinfo = self.pa.get_device_info_by_index(i)
print( "Device %d: %s"%(i,devinfo["name"]) )
for keyword in ["mic","input"]:
if keyword in devinfo["name"].lower():
print( "Found an input: device %d - %s"%(i,devinfo["name"]) )
device_index = i
return device_index
if device_index == None:
print( "No preferred input found; using default input device." )
return device_index
def open_mic_stream( self ):
device_index = self.find_input_device()
stream = self.pa.open( format = FORMAT,
channels = CHANNELS,
rate = RATE,
input = True,
input_device_index = device_index,
frames_per_buffer = INPUT_FRAMES_PER_BLOCK)
return stream
def tapDetected(self):
global consecutive_taps
millis = int(round(time.time() * 1000))
diff = millis - self.last_timestamp
# Reset consecutive timestamp
if millis - self.last_timestamp > ALLOWED_DIFF_BETWEEN_CLAPS:
self.last_timestamp = 0
if not self.last_timestamp:
self.last_timestamp = millis
print "Tap!"
elif diff <= ALLOWED_DIFF_BETWEEN_CLAPS:
print "Consecutive Tap!. Diff %d" % diff
self.last_timestamp = 0
self.toggle_light(self.get_light_id())
else:
self.last_timestamp = 0
print "Tap!"
def listen(self):
try:
block = self.stream.read(INPUT_FRAMES_PER_BLOCK)
except IOError, e:
# dammit.
self.errorcount += 1
print( "(%d) Error recording: %s"%(self.errorcount,e) )
self.noisycount = 1
time.sleep(2)
#raise
amplitude = get_rms( block )
if amplitude > self.tap_threshold:
# noisy block
self.quietcount = 0
self.noisycount += 1
if self.noisycount > OVERSENSITIVE:
# turn down the sensitivity
self.tap_threshold *= 1.1
else:
# quiet block.
if 1 <= self.noisycount <= MAX_TAP_BLOCKS:
self.tapDetected()
self.noisycount = 0
self.quietcount += 1
if self.quietcount > UNDERSENSITIVE:
# turn up the sensitivity
self.tap_threshold *= 0.9
'''
LIFX API methods
'''
def get_light_id(self):
return 'd073d5123254'
def get_token(self):
return 'cb3d4e8f6494e59f09913a6cb42df876d5623ad46f0653fd063c59029c7a25b5'
def _do_lifx_auth_post(self, url, data):
try:
token = self.get_token()
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + token,
}
if data:
response = requests.post(url, data=data, headers=headers)
else:
response = requests.post(url, headers=headers)
print 'Response: {0}'.format(response)
return response
except requests.HTTPError as e:
print 'Unable to submit post data {url} - {error}'.format(url=url, error=e.reason)
raise
def toggle_light(self, light_id):
print 'Toggle State to light {light_id}.'.format(light_id=light_id)
url = 'https://api.lifx.com/v1/lights/id:' + light_id + '/toggle'
try:
return self._do_lifx_auth_post(url, {})
except:
return None
if __name__ == "__main__":
tt = TapTester()
while True:
time.sleep(0.05)
try:
tt.listen()
except:
print 'Error. Resetting stream...'
tt.stop()
tt = TapTester()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment