Skip to content

Instantly share code, notes, and snippets.

@waveform80 waveform80/delay.md
Last active Oct 13, 2016

Embed
What would you like to do?
Delay demo

Delay

The demo script (delay.py) produces a "delayed" preview, with recording facilities. The pot (connected to the MCP3008) is used to control the amount by which the preview is delayed (can vary between 0 and 60 seconds), whilst one button is used to toggle recording (GPIO22), and the other to terminate the script (GPIO17).

Recording is written to "output.h264" in the current directory. This file is wiped when the script starts, and all recording is written to it (i.e. stopping the recording, then starting it again continues the recording from where it left off).

The recording is not delayed like the preview (this would require a lot more work). The display and recording currently run at VGA (640x480) resolution. You may attempt higher resolutions by adjusting the main block at the end of the script but be aware that the script is quite CPU intensive. Higher resolutions may fail to perform well, or may crash outright.

Requirements

  • A reasonably fast Pi (2B or 3B)
  • A Pi camera module (V1 or V2)
  • MCP3008 analog-to-digital converter
  • 3-pin potentiometer
  • 2 push buttons
  • The following packages to be installed:
    • python3-picamera
    • python3-gpiozero
    • python3-pil
    • python3-spidev
    • fonts-freefont-ttf

Setup

Wire the components as shown in the breadboard diagram (a circuit schematic is also provided for reference with the same wiring colors as the breadboard diagram). Ensure all required packages are installed, then execute the script with Python 3:

$ python3 delay.py

Dial the pot up and down to increase the delay (this will result in buffering until enough seconds of video have been recorded) or decrease the delay (this will skip forward in the buffered frames, if enough have been recorded). Press the record button on GPIO22 to start and pause recording. Recording will be indicated by a red circle in the upper-right corner.

The current delay factor will be displayed at all times at the top left. Finally, press the quit button on GPIO17 to terminate the script.

import io
from time import sleep
from threading import Thread
from collections import deque
from picamera import PiCamera
from gpiozero import MCP3008, Button
from PIL import Image, ImageDraw, ImageFont
class DelayedRenderer(object):
def __init__(self, camera, pot):
self.recording = False
self.resolution = camera.resolution
self.framerate = camera.framerate
self.pot = pot
self.font = ImageFont.truetype("/usr/share/fonts/truetype/freefont/FreeSans.ttf", 24)
img = Image.new('RGB', self.resolution)
self.renderer = camera.add_overlay(img.tobytes())
# Hack the renderer so it's go a few more buffers than usual (this is
# only necessary for smooth catch-up)
self.renderer.renderer.inputs[0].disable()
self.renderer.renderer.inputs[0]._port[0].buffer_num = 5
self.renderer.renderer.inputs[0].enable(callback=lambda port, buf: True)
self.delay_buffer = deque()
self.img_buffer = io.BytesIO()
self.stopped = False
self.delay_thread = Thread(target=self.delay_run)
self.delay_thread.start()
def write(self, buf):
if buf.startswith(b'\xff\xd8'):
# Start of new frame, write old one to delay-buffer
if self.img_buffer.tell():
self.img_buffer.seek(0)
self.delay_buffer.append(self.img_buffer)
self.img_buffer = io.BytesIO(buf)
self.img_buffer.seek(0, io.SEEK_END)
else:
self.img_buffer.write(buf)
def flush(self):
self.stopped = True
self.delay_thread.join()
def delay_run(self):
while not self.stopped:
delay_frames = int(60 * self.pot.value * self.framerate)
# If the delay buffer's just been increased, just render blank
# frames with a message until we fill the buffer
if len(self.delay_buffer) < delay_frames - 3:
img = Image.new('RGB', self.resolution)
draw = ImageDraw.Draw(img)
draw.text(
xy=(10, 10),
text='Current delay: ~%.1fs' % (delay_frames / self.framerate),
font=self.font,
fill=(255, 255, 255))
draw.text(
xy=(10, 50),
text='Buffering %.1fs' % ((delay_frames - len(self.delay_buffer)) / self.framerate),
font=self.font,
fill=(255, 0, 0))
if self.recording:
draw.ellipse(
xy=[(self.resolution[0] - 30, 10), (self.resolution[0] - 10, 30)],
fill=(255, 0, 0))
self.renderer.update(img.tobytes())
# Sleep for full frame-delay while we're catching up
sleep(1 / self.framerate)
else:
while len(self.delay_buffer) > delay_frames + 3:
# We're lagging way behind where we should be, drop frames
# until we're only three behind
self.delay_buffer.popleft()
# Render up to 4 frames from the start of the delay-buffer
num = 0
while num < 4 and len(self.delay_buffer) > delay_frames:
img = Image.open(self.delay_buffer.popleft())
draw = ImageDraw.Draw(img)
draw.text(
xy=(10, 10),
text='Current delay: ~%.1f seconds' % (delay_frames / self.framerate),
font=self.font,
fill=(255, 255, 255))
if self.recording:
draw.ellipse(
xy=[(self.resolution[0] - 30, 10), (self.resolution[0] - 10, 30)],
fill=(255, 0, 0))
self.renderer.update(img.tobytes())
num += 1
# Sleep for a shorter time than the frame-delay to allow
# catch-up in the event we fall a little behind
sleep(1 / (self.framerate * 3))
class DelayedApp(object):
def __init__(self, resolution, framerate):
self.stopped = False
self.pot = MCP3008(channel=1)
self.rec_button = Button(22)
self.quit_button = Button(17)
self.rec_button.when_pressed = self.start_rec
self.quit_button.when_pressed = self.quit
self.camera = PiCamera(resolution=resolution, framerate=framerate)
self.delay_output = DelayedRenderer(self.camera, self.pot)
self.rec_output = io.open('output.h264', 'wb')
def quit(self):
self.stopped = True
def start_rec(self):
self.camera.start_recording(
self.rec_output, splitter_port=2,
format='h264', quality=20, bitrate=1000000)
self.delay_output.recording = True
self.rec_button.when_pressed = self.stop_rec
def stop_rec(self):
self.camera.stop_recording(splitter_port=2)
self.delay_output.recording = False
self.rec_button.when_pressed = self.start_rec
def run(self):
self.camera.start_recording(self.delay_output, format='mjpeg')
try:
while not self.stopped:
self.camera.wait_recording(0.5)
finally:
self.camera.stop_recording()
if self.camera.recording:
self.camera.stop_recording(splitter_port=2)
self.rec_output.close()
self.camera.close()
self.quit_button.close()
self.rec_button.close()
self.pot.close()
if __name__ == '__main__':
app = DelayedApp(resolution=(640, 480), framerate=30)
app.run()
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.