Skip to content

Instantly share code, notes, and snippets.

@briancline
Created January 6, 2014 06:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save briancline/8279225 to your computer and use it in GitHub Desktop.
Save briancline/8279225 to your computer and use it in GitHub Desktop.
First pass at rudamentary doge bark detection using a webcam mic attached to a Raspberry Pi. Also, this constantly keeps track of the current max sample value in Redis (not dB, not RMS, just peak sample value) so that other things can grab the value(s).
#!/usr/bin/env python
from __future__ import print_function
import alsaaudio
import audioop
import sys
import time
import redis
AUDIO_MAX = 32768
HIGH_THRESHOLD = 28000
LOW_THRESHOLD = 3200
REDIS_HOST = 'file02'
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_PREFIX = 'doge:'
cache = None
def redis_key(s):
return REDIS_PREFIX + s
def redis_setup():
global cache
cache = redis.StrictRedis(host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_DB)
def redis_output(length, data, sample_byte_width):
"""Writes a value to a Redis server each time a sample is received, with
the value having no conversions done on it beforehand."""
max_val = audioop.max(data, sample_byte_width)
if max_val > AUDIO_MAX:
print('*** MAX_VAL IS %d' % max_val)
with cache.pipeline() as pipe:
current_key = redis_key('current')
pipe.set(current_key, max_val)
# TODO: Add as a member to a sorted set so that the last N samples
# can always be analyzed together for slightly more advanced analysis
# (i.e., "10 seconds of silence",
# "godawful music for the last 20 seconds",
# "probable barking within the last 12 seconds",
# etc.)
pipe.execute()
def console_output(length, data, sample_byte_width):
"""Writes a one-line mono-channel VU meter to the console, updating it
each time a sample is received. The characters that compose it are a '!'
if the highest sample value exceeds the high threshold, a '.' if the
lowest sample value falls under the low threshold, and a '~' otherwise.
"""
max_val = audioop.max(data, sample_byte_width)
if max_val > AUDIO_MAX:
print('*** MAX_VAL IS %d' % max_val)
vu_max_chars = 100
vu_chars = int(vu_max_chars * max_val / audio_max)
headroom_chars = int(vu_max_chars - vu_chars)
if max_val > HIGH_THRESHOLD:
vu = '!' * vu_chars
elif max_val < LOW_THRESHOLD:
vu = '.' * vu_chars
else:
vu = '~' * vu_chars
headroom = ' ' * headroom_chars
sys.stdout.write('\b' * (vu_max_chars * 4))
sys.stdout.write('[%04d frm / %05d max] [%s%s]' %
(length, max_val, vu, headroom))
sys.stdout.flush()
def sample_forever(card=None, period_size=160, sample_byte_width=2,
post_callback=None):
card = 'default' if card is None else str(card)
pcm = alsaaudio.PCM(type=alsaaudio.PCM_CAPTURE,
mode=alsaaudio.PCM_NONBLOCK,
card=card)
pcm.setchannels(1)
pcm.setperiodsize(period_size)
while True:
length, data = pcm.read()
if not length:
continue
if post_callback:
post_callback(length, data, sample_byte_width)
if __name__ == '__main__':
card_id = 1
period_size = 160
sample_byte_width = 2
redis_setup()
sample_forever(card_id, period_size, sample_byte_width,
redis_output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment