Created
December 11, 2016 06:30
-
-
Save nickoala/5353de7a7cf3d5a828990258c241fbb8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Raspberry Pi + GPS Car Camera | |
Achieved with four threads: | |
1. Main thread - | |
Continuously capture and display on a pygame screen. | |
Let user see what the camera is seeing. | |
Check for QUIT event. Allow user to press [X] to exit. | |
2. Video-recording thread - | |
Record indefinitely. | |
Split video into 15-minute clips. | |
3. GPS thread - | |
Read GPS location. | |
4. Annotate thread - | |
Put GPS location and datetime on image. | |
Refresh every one second to simulate a running clock. | |
Should be coupled with a cron job which deletes old video clips, | |
to prevent disk from filling up. | |
**Customize video size, display size, and clip directory before trying.** | |
""" | |
import picamera | |
import io | |
import pygame | |
from datetime import datetime | |
import threading | |
import time | |
import os.path | |
import gps | |
video_size = (640, 480) | |
photo_size = (320, 240) | |
clip_directory = '/home/pi/where/are/the/clips' | |
def record(camera): | |
def datetime_filename(): | |
while 1: | |
yield ( | |
os.path.join( | |
clip_directory, | |
datetime.now().strftime('%Y%m%d-%H%M') + '.h264')) | |
def seconds_to_cutoff(): | |
cutoffs = [15, 30, 45, 60] | |
now = datetime.now() | |
for c in cutoffs: | |
if c <= now.minute: | |
continue | |
return c*60 - now.minute*60 - now.second | |
for filename in camera.record_sequence(datetime_filename()): | |
camera.wait_recording(seconds_to_cutoff()) | |
latest_location = None | |
def gpsread(): | |
global latest_location | |
session = gps.gps(mode=gps.WATCH_ENABLE | gps.WATCH_NEWSTYLE) | |
for report in session: | |
if report['class'] == 'TPV': | |
if report['mode'] in [2,3]: | |
latest_location = (report['lat'], report['lon']) | |
else: | |
latest_location = None | |
def annotate(camera): | |
def compose_text(dt, location): | |
if location: | |
lat, lon = location | |
loctext = '%.4f%s %.4f%s' % (lat, | |
'N' if lat >= 0 else 'S', | |
lon, | |
'E' if lon >= 0 else 'W') | |
else: | |
loctext = '--lat-- --lon--' | |
return dt.strftime('%Y-%m-%d %H:%M:%S ') + loctext | |
global latest_location | |
camera.annotate_text_size = 24 | |
while 1: | |
camera.annotate_text = compose_text(datetime.now(), | |
latest_location) | |
time.sleep(1) | |
pygame.init() | |
pygame.display.set_caption('GPS Cam') | |
screen = pygame.display.set_mode(photo_size) | |
camera = picamera.PiCamera(resolution=video_size, framerate=30) | |
record_thread = threading.Thread(target=record, args=[camera]) | |
record_thread.daemon = True | |
record_thread.start() | |
gps_thread = threading.Thread(target=gpsread) | |
gps_thread.daemon = True | |
gps_thread.start() | |
annotate_thread = threading.Thread(target=annotate, args=[camera]) | |
annotate_thread.daemon = True | |
annotate_thread.start() | |
while 1: | |
buffer = io.BytesIO() | |
camera.capture(buffer, | |
format='bmp', | |
resize=photo_size, | |
use_video_port=True) | |
buffer.seek(0) | |
img = pygame.image.load(buffer, 'z.bmp') | |
screen.blit(img, (0,0)) | |
pygame.display.update() | |
for event in pygame.event.get(): | |
if event.type == pygame.QUIT: | |
exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment