Skip to content

Instantly share code, notes, and snippets.

@Akkiesoft
Created August 25, 2024 14:35
Show Gist options
  • Save Akkiesoft/06038bc4fd889cba60ed65b616daac22 to your computer and use it in GitHub Desktop.
Save Akkiesoft/06038bc4fd889cba60ed65b616daac22 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# based on: https://github.com/raspberrypi/picamera2/blob/main/examples/mjpeg_server_2.py
import io
import logging
import socketserver
import threading
from http import server
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder
from picamera2.outputs import FileOutput
from libcamera import controls,Transform
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
self.frame = None
self.condition = threading.Condition()
def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()
class StreamingHandler(server.BaseHTTPRequestHandler):
def do_HEAD(self):
self.send_response(400)
def do_GET(self):
if self.path == '/stream1.mjpg':
self.send_stream(0)
elif self.path == '/stream2.mjpg':
self.send_stream(1)
else:
self.send_error(404)
self.end_headers()
def send_stream(self, i):
self.send_response(200)
self.send_header('Age', 0)
self.send_header('Cache-Control', 'no-cache, private')
self.send_header('Pragma', 'no-cache')
self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME')
self.end_headers()
try:
while True:
with camera[i].output.condition:
camera[i].output.condition.wait()
frame = camera[i].output.frame
self.wfile.write(b'--FRAME\r\n')
self.send_header('Content-Type', 'image/jpeg')
self.send_header('Content-Length', len(frame))
self.end_headers()
self.wfile.write(frame)
self.wfile.write(b'\r\n')
except Exception as e:
logging.warning(
'Removed streaming client %s: %s',
self.client_address, str(e))
class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer):
allow_reuse_address = True
daemon_threads = True
class CameraStreaming():
def __init__(self, id = 0, af = False):
self.output = None;
self.camera = Picamera2(id)
self.output = StreamingOutput()
config = self.camera.create_video_configuration(main={"size": (800, 600)}, raw={"size": (1640, 1232)})
self.camera.configure(config)
if af:
self.camera.set_controls({"AfMode": controls.AfModeEnum.Continuous})
def rotation(self, r):
self.camera.rotation = r
def start(self):
self.camera.start_recording(MJPEGEncoder(), FileOutput(self.output))
def stop(self):
self.camera.stop_recording()
camera = []
camera.append(CameraStreaming(id = 1, af=True))
camera[0].start()
camera.append(CameraStreaming(id = 0, af=True))
camera[1].start()
try:
address = ('', 8000)
server = StreamingServer(address, StreamingHandler)
server.serve_forever()
finally:
camera[0].stop()
camera[1].stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment