Skip to content

Instantly share code, notes, and snippets.

@Gadgetoid
Created June 19, 2018 13:59
Show Gist options
  • Save Gadgetoid/b24c4f9756ea97f066d07d44a46c1a72 to your computer and use it in GitHub Desktop.
Save Gadgetoid/b24c4f9756ea97f066d07d44a46c1a72 to your computer and use it in GitHub Desktop.
NaturewatchCameraServer with Button SHIM support
#!/usr/bin/env python
import json
import cv2
import os
import imutils
import buttonshim
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from SocketServer import ThreadingMixIn
from ChangeDetector import ChangeDetector
import time
from threading import Thread
os.chdir("/home/pi/NaturewatchCameraServer")
config = json.load(open("config.json"))
os.chdir("/var/www/html/photos")
# NatureCam implementation
changeDetectorInstance = ChangeDetector(config)
# Handle HTTP requests.
class CamHandler(BaseHTTPRequestHandler):
def do_GET(self):
print(self.path)
if self.path.endswith('.mjpg'):
self.send_response(200)
self.send_header('Content-type', 'multipart/x-mixed-replace; boundary=--jpgboundary')
self.end_headers()
print("Serving mjpg...")
while True:
img = changeDetectorInstance.get_current_image()
r, buf = cv2.imencode(".jpg", img)
self.wfile.write("--jpgboundary\r\n")
self.send_header('Content-type', 'image/jpeg')
self.send_header('Content-length', str(len(buf)))
self.end_headers()
self.wfile.write(bytearray(buf))
self.wfile.write('\r\n')
if self.path.endswith('less'):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('success')
changeDetectorInstance.minWidth = config["less_sensitivity"]
changeDetectorInstance.minHeight = config["less_sensitivity"]
return
if self.path.endswith('more'):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('success')
changeDetectorInstance.minWidth = config["more_sensitivity"]
changeDetectorInstance.minHeight = config["more_sensitivity"]
return
if self.path.endswith('default'):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('success')
changeDetectorInstance.minWidth = config["min_width"]
changeDetectorInstance.minHeight = config["min_width"]
return
if self.path.endswith('start'):
buttonshim.set_pixel(128, 0, 0)
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('success')
changeDetectorInstance.arm()
return
if self.path.endswith('stop'):
buttonshim.set_pixel(0, 128, 0)
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('success')
changeDetectorInstance.disarm()
return
if self.path.endswith('delete-final'):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('success')
os.system('rm /var/www/html/photos/*')
return
if self.path.endswith('get-status'):
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
sensitivity = "unknown"
if changeDetectorInstance.minWidth == config["less_sensitivity"]:
sensitivity = "less"
elif changeDetectorInstance.minWidth == config["min_width"]:
sensitivity = "default"
elif changeDetectorInstance.minWidth == config["more_sensitivity"]:
sensitivity = "more"
send_data = {
"mode": changeDetectorInstance.mode,
"sensitivity": sensitivity
}
json_data = json.dumps(send_data)
self.wfile.write(json_data)
return
# Threaded server
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
"""Handle requests in separate threads"""
def main():
try:
changeDetectorInstance.start()
server = ThreadedHTTPServer(('', 9090), CamHandler)
@buttonshim.on_press(buttonshim.BUTTON_B)
def btn_arm(button, pressed):
changeDetectorInstance.arm()
buttonshim.set_pixel(128, 0, 0)
@buttonshim.on_press(buttonshim.BUTTON_C)
def btn_disarm(button, pressed):
changeDetectorInstance.disarm()
buttonshim.set_pixel(0, 128, 0)
@buttonshim.on_press(buttonshim.BUTTON_A)
def btn_take_photo(button, pressed):
hrs = changeDetectorInstance.hiResStream.next()
hi_res_image = hrs.array
changeDetectorInstance.hiResCapture.truncate(0)
changeDetectorInstance.hiResCapture.seek(0)
t_save = Thread(target=changeDetectorInstance.take_photo, args=[hi_res_image])
t_save.start()
changeDetectorInstance.numOfPhotos += 1
changeDetectorInstance.lastPhotoTime = time.time()
print("Photo snapped")
print("server started")
server.serve_forever()
except (KeyboardInterrupt, SystemExit):
changeDetectorInstance.cancel()
server.socket.close()
if __name__ == '__main__':
main()
@Gadgetoid
Copy link
Author

Gadgetoid commented Jun 19, 2018

Currently this code does:

  • Button A - Snap a photo on demand
  • Button B - Arm the camera, LED should light up red
  • Button C - Disarm the camera, LED should light up green

The LED will also change when arming/disarming via the web UI.

Suggestions for D and E welcome!

You can buy a Button SHIM from here: https://shop.pimoroni.com/products/button-shim
And you'll also need a header:

To get this code installed you need an HDMI adaptor, and a USB adaptor plus a keyboard. Fire up your camera with HDMI and keyboard connected to run the commands below.

Save the Button SHIM python-buttonshim_0.0.2_all.deb package from https://github.com/pimoroni/button-shim/releases/tag/v0.0.2 onto the /boot partition of your camera along with this code, and install with:

  • sudo dpkg -i /boot/python-buttonshim_0.0.2_all.deb.
  • mv ~/NaturewatchCameraServer/NaturewatchCameraServer.py ~/NaturewatchCameraServer/NaturewatchCameraServer-backup.py
  • cp /boot/NaturewatchCameraServer.py ~/NaturewatchCameraServer/NaturewatchCameraServer.py

You will also need to edit /boot/config.txt and uncomment the line #dtparam=i2c_arm=on so that it reads dtparam=i2c_arm=on

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment