Skip to content

Instantly share code, notes, and snippets.

@mcbridejc
Created December 17, 2019 00:48
Show Gist options
  • Save mcbridejc/4aa71711f546e15f9494518b6346dabe to your computer and use it in GitHub Desktop.
Save mcbridejc/4aa71711f546e15f9494518b6346dabe to your computer and use it in GitHub Desktop.
MJPEG streaming from raspberry pi camera to browser
import cv2
import numpy as np
import threading
from picamera import PiCamera
class Video(object):
WIDTH = 1024
HEIGHT = 768
NBUFFER = 3
def __init__(self):
self.frame_number = 0
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.frames = [np.empty((self.WIDTH * self.HEIGHT * 3,), dtype=np.uint8) for _ in range(self.NBUFFER)]
self.frame_locks = [threading.Lock() for _ in range(self.NBUFFER)]
self.lock = threading.Lock()
self.frame_cv = threading.Condition(self.lock)
self.active_buffer = 0
self.thread.start()
def run(self):
print("Running capture thread")
with PiCamera() as camera:
camera.resolution = (self.WIDTH, self.HEIGHT)
camera.framerate = 30
camera.start_preview()
while True:
next_buffer = (self.active_buffer + 1) % self.NBUFFER
with self.frame_locks[next_buffer]:
camera.capture(self.frames[next_buffer], 'bgr', use_video_port=True)
self.frames[next_buffer] = self.frames[next_buffer].reshape((self.HEIGHT, self.WIDTH, 3))
with self.lock:
self.active_buffer = next_buffer
self.frame_number += 1
self.frame_cv.notify_all()
def mjpeg_frame_generator(self):
last_fn = 0
while True:
data = None
with self.frame_cv:
if self.frame_number > last_fn:
last_fn = self.frame_number
# encode the frame in JPEG format
(flag, encodedImage) = cv2.imencode(".jpg", self.frames[self.active_buffer])
if not flag:
print("Error encoding image %d" % self.frame_number)
continue
data = b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + bytearray(encodedImage) + b'\r\n'
else:
self.frame_cv.wait()
if data is not None:
yield data
from flask import Flask, Response, render_template
from camera import Video
import os
camera = Video()
def create_app(test_config=None):
# create and configure the app
app = Flask(__name__, instance_relative_config=True)
# ensure the instance folder exists
try:
os.makedirs(app.instance_path)
except OSError:
pass
# a simple page that says hello
@app.route('/')
def index():
return render_template("index.html")
@app.route('/video')
def video():
return Response(
camera.mjpeg_frame_generator(),
mimetype = "multipart/x-mixed-replace; boundary=frame")
return app
def main():
app = create_app()
app.run()
# Consider running me with: `FLASK_APP=pd-stream flask run --host 0.0.0.0`
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment