|
import io |
|
from time import sleep |
|
from threading import Thread |
|
from collections import deque |
|
|
|
from picamera import PiCamera |
|
from gpiozero import MCP3008, Button |
|
from PIL import Image, ImageDraw, ImageFont |
|
|
|
class DelayedRenderer(object): |
|
def __init__(self, camera, pot): |
|
self.recording = False |
|
self.resolution = camera.resolution |
|
self.framerate = camera.framerate |
|
self.pot = pot |
|
self.font = ImageFont.truetype("/usr/share/fonts/truetype/freefont/FreeSans.ttf", 24) |
|
img = Image.new('RGB', self.resolution) |
|
self.renderer = camera.add_overlay(img.tobytes()) |
|
# Hack the renderer so it's go a few more buffers than usual (this is |
|
# only necessary for smooth catch-up) |
|
self.renderer.renderer.inputs[0].disable() |
|
self.renderer.renderer.inputs[0]._port[0].buffer_num = 5 |
|
self.renderer.renderer.inputs[0].enable(callback=lambda port, buf: True) |
|
self.delay_buffer = deque() |
|
self.img_buffer = io.BytesIO() |
|
self.stopped = False |
|
self.delay_thread = Thread(target=self.delay_run) |
|
self.delay_thread.start() |
|
|
|
def write(self, buf): |
|
if buf.startswith(b'\xff\xd8'): |
|
# Start of new frame, write old one to delay-buffer |
|
if self.img_buffer.tell(): |
|
self.img_buffer.seek(0) |
|
self.delay_buffer.append(self.img_buffer) |
|
self.img_buffer = io.BytesIO(buf) |
|
self.img_buffer.seek(0, io.SEEK_END) |
|
else: |
|
self.img_buffer.write(buf) |
|
|
|
def flush(self): |
|
self.stopped = True |
|
self.delay_thread.join() |
|
|
|
def delay_run(self): |
|
while not self.stopped: |
|
delay_frames = int(60 * self.pot.value * self.framerate) |
|
# If the delay buffer's just been increased, just render blank |
|
# frames with a message until we fill the buffer |
|
if len(self.delay_buffer) < delay_frames - 3: |
|
img = Image.new('RGB', self.resolution) |
|
draw = ImageDraw.Draw(img) |
|
draw.text( |
|
xy=(10, 10), |
|
text='Current delay: ~%.1fs' % (delay_frames / self.framerate), |
|
font=self.font, |
|
fill=(255, 255, 255)) |
|
draw.text( |
|
xy=(10, 50), |
|
text='Buffering %.1fs' % ((delay_frames - len(self.delay_buffer)) / self.framerate), |
|
font=self.font, |
|
fill=(255, 0, 0)) |
|
if self.recording: |
|
draw.ellipse( |
|
xy=[(self.resolution[0] - 30, 10), (self.resolution[0] - 10, 30)], |
|
fill=(255, 0, 0)) |
|
self.renderer.update(img.tobytes()) |
|
# Sleep for full frame-delay while we're catching up |
|
sleep(1 / self.framerate) |
|
else: |
|
while len(self.delay_buffer) > delay_frames + 3: |
|
# We're lagging way behind where we should be, drop frames |
|
# until we're only three behind |
|
self.delay_buffer.popleft() |
|
# Render up to 4 frames from the start of the delay-buffer |
|
num = 0 |
|
while num < 4 and len(self.delay_buffer) > delay_frames: |
|
img = Image.open(self.delay_buffer.popleft()) |
|
draw = ImageDraw.Draw(img) |
|
draw.text( |
|
xy=(10, 10), |
|
text='Current delay: ~%.1f seconds' % (delay_frames / self.framerate), |
|
font=self.font, |
|
fill=(255, 255, 255)) |
|
if self.recording: |
|
draw.ellipse( |
|
xy=[(self.resolution[0] - 30, 10), (self.resolution[0] - 10, 30)], |
|
fill=(255, 0, 0)) |
|
self.renderer.update(img.tobytes()) |
|
num += 1 |
|
# Sleep for a shorter time than the frame-delay to allow |
|
# catch-up in the event we fall a little behind |
|
sleep(1 / (self.framerate * 3)) |
|
|
|
|
|
class DelayedApp(object): |
|
def __init__(self, resolution, framerate): |
|
self.stopped = False |
|
self.pot = MCP3008(channel=1) |
|
self.rec_button = Button(22) |
|
self.quit_button = Button(17) |
|
self.rec_button.when_pressed = self.start_rec |
|
self.quit_button.when_pressed = self.quit |
|
self.camera = PiCamera(resolution=resolution, framerate=framerate) |
|
self.delay_output = DelayedRenderer(self.camera, self.pot) |
|
self.rec_output = io.open('output.h264', 'wb') |
|
|
|
def quit(self): |
|
self.stopped = True |
|
|
|
def start_rec(self): |
|
self.camera.start_recording( |
|
self.rec_output, splitter_port=2, |
|
format='h264', quality=20, bitrate=1000000) |
|
self.delay_output.recording = True |
|
self.rec_button.when_pressed = self.stop_rec |
|
|
|
def stop_rec(self): |
|
self.camera.stop_recording(splitter_port=2) |
|
self.delay_output.recording = False |
|
self.rec_button.when_pressed = self.start_rec |
|
|
|
def run(self): |
|
self.camera.start_recording(self.delay_output, format='mjpeg') |
|
try: |
|
while not self.stopped: |
|
self.camera.wait_recording(0.5) |
|
finally: |
|
self.camera.stop_recording() |
|
if self.camera.recording: |
|
self.camera.stop_recording(splitter_port=2) |
|
self.rec_output.close() |
|
self.camera.close() |
|
self.quit_button.close() |
|
self.rec_button.close() |
|
self.pot.close() |
|
|
|
|
|
if __name__ == '__main__': |
|
app = DelayedApp(resolution=(640, 480), framerate=30) |
|
app.run() |