Skip to content

Instantly share code, notes, and snippets.

@harrylincoln
Last active June 2, 2022 17:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save harrylincoln/4007186a0d6b4c31999844eb927b0bf8 to your computer and use it in GitHub Desktop.
Save harrylincoln/4007186a0d6b4c31999844eb927b0bf8 to your computer and use it in GitHub Desktop.
Raspberry pi script to listen for baby cries and predict what might need
import time
import audioop
import pyaudio
from pushbullet import Pushbullet
import wave
from tflite_support.task import core
from tflite_support.task import processor
from tflite_support.task import audio
from datetime import datetime
# camera bits
import io
import picamera
import logging
import socketserver
from threading import Condition
from http import server
# Initialisation for PyAudio
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
RECORD_SECONDS = 1
threshold = 200
reading = 0
previousreading = 0
wav_output_filename = 'bub-30-sec-clip.wav'
# PyAudio Object
ppAudio = pyaudio.PyAudio()
# Initialisation for Pushbullet
pb = Pushbullet('your-api-key')
PAGE = """\
<html>
<head>
<title>picamera MJPEG streaming demo</title>
</head>
<body>
<h1>PiCamera MJPEG Streaming Demo</h1>
<img src="stream.mjpg" width="640" height="480" />
</body>
</html>
"""
class StreamingOutput(object):
def __init__(self):
self.frame = None
self.buffer = io.BytesIO()
self.condition = Condition()
def write(self, buf):
if buf.startswith(b'\xff\xd8'):
# New frame, copy the existing buffer's content and notify all
# clients it's available
self.buffer.truncate()
with self.condition:
self.frame = self.buffer.getvalue()
self.condition.notify_all()
self.buffer.seek(0)
return self.buffer.write(buf)
class StreamingHandler(server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/':
self.send_response(301)
self.send_header('Location', '/index.html')
self.end_headers()
elif self.path == '/index.html':
content = PAGE.encode('utf-8')
self.send_response(200)
self.send_header('Content-Type', 'text/html')
self.send_header('Content-Length', len(content))
self.end_headers()
self.wfile.write(content)
elif self.path == '/stream.mjpg':
self.send_response(200)
self.send_header('Age', 0)
self.send_header('Cache-Control', 'no-cache, private')
self.send_header('Pragma', 'no-cache')
self.send_header(
'Content-Type', 'multipart/x-mixed-replace; boundary=FRAME')
self.end_headers()
try:
while True:
with output.condition:
output.condition.wait()
frame = output.frame
self.wfile.write(b'--FRAME\r\n')
self.send_header('Content-Type', 'image/jpeg')
self.send_header('Content-Length', len(frame))
self.end_headers()
self.wfile.write(frame)
self.wfile.write(b'\r\n')
except Exception as e:
logging.warning(
'Removed streaming client %s: %s',
self.client_address, str(e))
else:
self.send_error(404)
self.end_headers()
class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer):
allow_reuse_address = True
daemon_threads = True
# hangs until script is cancelled
while True:
stream = ppAudio.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
frames = []
for i in range(0, int(RATE/CHUNK*RECORD_SECONDS)):
data = stream.read(70, exception_on_overflow=False)
frames.append(data)
time.sleep(0.001)
reading = audioop.max(data, 2)
if reading - previousreading > threshold:
print(reading)
pb.push_note("Raspberry Pi",
f'Baby started crying at: {datetime.now().strftime("%H:%M:%S")}')
stream.stop_stream()
stream.close()
with picamera.PiCamera(resolution='640x480', framerate=24) as camera:
output = StreamingOutput()
camera.start_recording(output, format='mjpeg')
address = ('', 8000)
server = StreamingServer(address, StreamingHandler)
server.serve_forever()
print("camera starting")
pb.push_note(
"Raspberry Pi", f'Camera started at http://your-pis-ip:8000')
thirtySecStream = ppAudio.open(
format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
thirtySecStreamFrames = []
print("recording audio")
# loop through stream and append audio chunks to frame array
for ii in range(0, int((RATE/CHUNK)*30)):
thirtySecData = thirtySecStream.read(
CHUNK, exception_on_overflow=False)
thirtySecStreamFrames.append(thirtySecData)
print("finished recording audio")
pb.push_note(
"Raspberry Pi", f'Audio captured at {datetime.now().strftime("%H:%M:%S")}')
thirtySecStream.stop_stream()
thirtySecStream.close()
# save the audio frames as .wav file
wavefile = wave.open(wav_output_filename, 'wb')
wavefile.setnchannels(CHANNELS)
wavefile.setsampwidth(ppAudio.get_sample_size(FORMAT))
wavefile.setframerate(RATE)
wavefile.writeframes(b''.join(thirtySecStreamFrames))
wavefile.close()
# TF audio classifer
base_options = core.BaseOptions(
file_name=f'./cries_model.tflite')
classification_options = processor.ClassificationOptions(
max_results=2)
options = audio.AudioClassifierOptions(
base_options=base_options, classification_options=classification_options)
classifier = audio.AudioClassifier.create_from_options(options)
# Run inference
audio_file = audio.TensorAudio.create_from_wav_file(
f'./{wav_output_filename}', classifier.required_input_buffer_size)
audio_result = classifier.classify(audio_file)
print(f'Audio result: {audio_result}')
pb.push_note("Raspberry Pi", f'Predictions: {audio_result}')
print("camera stopping")
camera.stop_recording()
previousreading = reading
stream.stop_stream()
stream.close()
# Clearing the resources
stream.stop_stream()
stream.close()
ppAudio.terminate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment