Skip to content

Instantly share code, notes, and snippets.

@waveform80
Created July 8, 2017 09:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save waveform80/d18a598e0c8621ba98461a6986270633 to your computer and use it in GitHub Desktop.
Save waveform80/d18a598e0c8621ba98461a6986270633 to your computer and use it in GitHub Desktop.
# WARNING: This probably won't work with current versions of the
# mmalobj layer; it was made while the layer was still being
# developed.
import io
import curses
import datetime as dt
from picamera import array, mmal, mmalobj as mo, PiCameraMMALError
from threading import Thread, Lock
from PIL import Image, ImageDraw, ImageFont
from time import sleep
from math import sin, cos, pi
from collections import namedtuple
class Coord(namedtuple('Coord', ('x', 'y'))):
@classmethod
def clock(cls, radians):
return Coord(sin(radians), -cos(radians))
def __add__(self, other):
try:
return Coord(self.x + other[0], self.y + other[1])
except TypeError:
return Coord(self.x + other, self.y + other)
def __sub__(self, other):
try:
return Coord(self.x - other[0], self.y - other[1])
except TypeError:
return Coord(self.x - other, self.y - other)
def __mul__(self, other):
try:
return Coord(self.x * other[0], self.y * other[1])
except TypeError:
return Coord(self.x * other, self.y * other)
def __truedev__(self, other):
try:
return Coord(self.x / other[0], self.y / other[1])
except TypeError:
return Coord(self.x / other, self.y / other)
def __floordiv__(self, other):
try:
return Coord(self.x // other[0], self.y // other[1])
except TypeError:
return Coord(self.x // other, self.y // other)
class Demo(mo.MMALPythonComponent):
def __init__(self):
super(Demo, self).__init__(outputs=2)
self._lock = Lock()
self._font = ImageFont.truetype('/usr/share/fonts/truetype/freefont/FreeSans.ttf', 24)
self.clock_enabled = False
self._clock_image = None
self.bar_enabled = False
self._bar_image = None
self._demo_thread = None
self._rec_image = None
self.recording = False
def _commit_port(self, port):
super(Demo, self)._commit_port(port)
if port.type == 'in':
if port.format.value != mmal.MMAL_ENCODING_I420:
raise PiCameraMMALError(mmal.MMAL_EINVAL, 'invalid format')
def _demo_run(self):
clock_face = Image.new('L', (100, 100))
draw = ImageDraw.Draw(clock_face)
draw.ellipse([(0, 0), (99, 99)], outline=(255,))
bar_outline = Image.new('L', (300, 30))
draw = ImageDraw.Draw(bar_outline)
draw.rectangle([(0, 0), (249, 29)], outline=(255,))
self._rec_image = Image.new('L', (100, 25))
draw = ImageDraw.Draw(self._rec_image)
draw.text((0, 0), 'REC', (200,), self._font)
while self.enabled:
img = clock_face.copy()
draw = ImageDraw.Draw(img)
now = dt.datetime.now()
midnight = now.replace(hour=0, minute=0, second=0, microsecond=0)
timestamp = (now - midnight).total_seconds()
center = Coord(49, 49)
hour_pos = center + Coord.clock(2 * pi * (timestamp % 43200 / 43200)) * 30
min_pos = center + Coord.clock(2 * pi * (timestamp % 3600 / 3600)) * 45
sec_pos = center + Coord.clock(2 * pi * (timestamp % 60 / 60)) * 45
draw.line([center, hour_pos], fill=(200,), width=2)
draw.line([center, min_pos], fill=(200,), width=2)
draw.line([center, sec_pos], fill=(200,), width=1)
with self._lock:
self._clock_image = img
img = bar_outline.copy()
draw = ImageDraw.Draw(img)
draw.rectangle([(2, 2), (2 + (247 * now.second / 60), 27)],
fill=(200,), outline=(200,))
draw.text((254, 2), '%d%%' % (now.second * 100 / 60), (200,),
self._font)
with self._lock:
self._bar_image = img
sleep(0.2)
def _callback(self, port, buf):
target1 = self.outputs[0].get_buffer(False)
if self.recording:
target2 = self.outputs[1].get_buffer(False)
else:
target2 = None
if target1:
target1.copy_from(buf)
with target1 as data:
img = Image.frombuffer(
'L', self.inputs[0].framesize, data,
'raw', 'L', 0, 1)
img.readonly = 0
if self.recording:
img.paste(self._rec_image, (1180, 10), self._rec_image)
if self.clock_enabled and self._clock_image:
with self._lock:
img.paste(
self._clock_image, (10, 10),
self._clock_image)
if self.bar_enabled and self._bar_image:
with self._lock:
img.paste(
self._bar_image, (10, 680),
self._bar_image)
if target2:
target2.replicate(target1)
try:
self.outputs[0].send_buffer(target1)
except PiCameraMMALError as e:
if e.status != mmal.MMAL_EINVAL:
raise
return True
if target2:
try:
self.outputs[1].send_buffer(target2)
except PiCameraMMALError as e:
if e.status != mmal.MMAL_EINVAL:
raise
return True
return False
def enable(self):
super(Demo, self).enable()
self._demo_thread = Thread(target=self._demo_run)
self._demo_thread.daemon = True
self._demo_thread.start()
def disable(self):
super(Demo, self).disable()
if self._demo_thread:
self._demo_thread.join()
self._demo_thread = None
with self._lock:
self._clock_image = None
def main(window):
global camera, preview, encoder, transform
camera = mo.MMALCamera()
preview = mo.MMALRenderer()
encoder = mo.MMALVideoEncoder()
transform = Demo()
# Configure camera output 0
camera.outputs[0].framesize = (1280, 720)
camera.outputs[0].framerate = 24
camera.outputs[0].commit()
# Configure H.264 encoder
encoder.outputs[0].format = mmal.MMAL_ENCODING_H264
encoder.outputs[0].bitrate = 1000000
encoder.outputs[0].commit()
p = encoder.outputs[0].params[mmal.MMAL_PARAMETER_PROFILE]
p.profile[0].profile = mmal.MMAL_VIDEO_PROFILE_H264_HIGH
p.profile[0].level = mmal.MMAL_VIDEO_LEVEL_H264_41
encoder.outputs[0].params[mmal.MMAL_PARAMETER_PROFILE] = p
encoder.outputs[0].params[mmal.MMAL_PARAMETER_VIDEO_ENCODE_INLINE_HEADER] = True
encoder.outputs[0].params[mmal.MMAL_PARAMETER_INTRAPERIOD] = 30
encoder.outputs[0].params[mmal.MMAL_PARAMETER_VIDEO_ENCODE_INITIAL_QUANT] = 22
encoder.outputs[0].params[mmal.MMAL_PARAMETER_VIDEO_ENCODE_MAX_QUANT] = 22
encoder.outputs[0].params[mmal.MMAL_PARAMETER_VIDEO_ENCODE_MIN_QUANT] = 22
encoder.inputs[0].params[mmal.MMAL_PARAMETER_VIDEO_IMMUTABLE_INPUT] = True
output = io.open('output.h264', 'wb')
def output_callback(port, buf):
output.write(buf.data)
return bool(buf.flags & mmal.MMAL_BUFFER_HEADER_FLAG_EOS)
# Connect everything up
transform.connect(camera.outputs[0])
preview.connect(transform.outputs[0])
encoder.connect(transform.outputs[1])
encoder.outputs[0].enable(output_callback)
try:
annotate = camera.control.params[mmal.MMAL_PARAMETER_ANNOTATE]
window.nodelay(True)
text = ''
while True:
key = window.getch()
if 0x20 <= key < 0x7f:
text += chr(key)
elif key == ord('\n'):
text = ''
elif key == curses.KEY_BACKSPACE:
text = text[:-1]
elif key == curses.KEY_HOME:
transform.clock_enabled = not transform.clock_enabled
elif key == curses.KEY_PPAGE:
transform.bar_enabled = not transform.bar_enabled
elif key == curses.KEY_NPAGE:
transform.recording = not transform.recording
elif key == curses.KEY_END:
break
else:
continue
annotate.enable = 1
annotate.text = text.encode('ascii')
camera.control.params[mmal.MMAL_PARAMETER_ANNOTATE] = annotate
finally:
preview.disconnect()
encoder.disconnect()
transform.disconnect()
#main(None)
curses.wrapper(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment