-
-
Save capjamesg/fcb78d15586f7e4173ac888420ab2320 to your computer and use it in GitHub Desktop.
Code for the "Transforming Raspberry Pi into a Squirrel Sentry with Computer Vision" on roboflow.com. Code by Warren, the author of the post.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cv2 | |
from flask import Flask, Response, render_template | |
from imutils.video import VideoStream | |
import imutils | |
import base64 | |
import time | |
import requests | |
import pygame as pg | |
from PIL import Image | |
import RPi.GPIO as GPIO | |
from time import sleep | |
# Initialize GPIO - this is for our relay | |
GPIO.setwarnings(False) | |
GPIO.setmode(GPIO.BOARD) | |
GPIO.setup(8, GPIO.OUT, initial=GPIO.LOW) | |
# Define our project variables | |
# ** Replace the API_KEY, PROJECT_NAME, and PROJECT_VERSION with your own ** | |
API_KEY = "" | |
PROJECT_NAME = "squirrel-detector-1.1" | |
PROJECT_VERSION = 1 | |
# Get model from Roboflow | |
from roboflow import Roboflow | |
rf = Roboflow(api_key=API_KEY) | |
project = rf.workspace().project(PROJECT_NAME) | |
model = project.version(PROJECT_VERSION, local="http://127.0.0.1:9001/").model | |
# Get predictions from Roboflow Infer API | |
infer_url = "http://127.0.0.1:9001/squirrel-detector-1.1/1?api_key=" + API_KEY | |
# Initialize Flask app | |
app = Flask(__name__) | |
# Configure web routes | |
@app.route('/') | |
def index(): | |
return render_template('index.html') | |
# Main function | |
def get_video_feed(): | |
# Variables that control motion detection sensitivity | |
min_area = 2000 | |
# Initialize camera | |
camera = VideoStream(src=0).start() | |
# Allow camera to warm up | |
time.sleep(2.0) | |
# Initialize first frame - this will be used to compare against the current frame for motion detection | |
firstFrame = None | |
while True: | |
# Grab the current frame from the camera | |
frame = camera.read() | |
# Initialize text to be displayed on screen | |
text = "" | |
# If the frame could not be grabbed, then exit | |
if frame is None: | |
break | |
# The following section of code is used to detect motion | |
# Credit to PyImageSearch for the motion detection code | |
# ====================================================== | |
# resize the frame, convert it to grayscale, and blur it | |
frame = imutils.resize(frame, width=500) | |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
gray = cv2.GaussianBlur(gray, (21, 21), 0) | |
# if the first frame is None, initialize it | |
if firstFrame is None: | |
firstFrame = gray | |
print("First frame initialized") | |
continue | |
# compute the absolute difference between the current frame and first frame | |
frameDelta = cv2.absdiff(firstFrame, gray) | |
thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1] | |
# dilate the thresholded image to fill in holes, then find contours on thresholded image | |
thresh = cv2.dilate(thresh, None, iterations=2) | |
cnts = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
cnts = imutils.grab_contours(cnts) | |
# loop over the contours | |
for c in cnts: | |
# if the contour is too small, ignore it | |
if cv2.contourArea(c) < min_area: | |
continue | |
text = "Motion detected" | |
# Place text on screen | |
cv2.putText(frame, text, (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2) | |
# End of motion detection code | |
# ====================================================== | |
# Convert image to Base64 so it can be sent to the server | |
ret, buffer = cv2.imencode('.jpg', frame) | |
img_str = base64.b64encode(buffer) | |
# If motion is detected, get predictions from Roboflow Infer API | |
if text == "Motion detected": | |
# Send image to Roboflow Infer API - gets a JSON object back | |
resp = requests.post(infer_url, data=img_str, headers={ "Content-Type": "application/x-www-form-urlencoded" }, stream=True).json()['predictions'] | |
print(resp) | |
for prediction in resp: | |
# If we are 80% or more confident that we have detected a squirrel, trigger the relay | |
if prediction["confidence"] > .8: | |
# Sets GPIO pin 8 to HIGH for 3 seconds | |
GPIO.output(8, GPIO.HIGH) | |
sleep(3) | |
# Sets GPIO pin 8 to LOW | |
GPIO.output(8, GPIO.LOW) | |
# Convert the frame into a byte array so it can be sent to the browser | |
frame = buffer.tobytes() | |
yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n') | |
# Release the camera | |
camera.release() | |
# Configure video feed route - this is where the video will be streamed to the browser | |
@app.route('/video_feed') | |
def video_feed(): | |
# Return the video to the browser | |
return Response(get_video_feed(), mimetype='multipart/x-mixed-replace; boundary=frame') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment