Skip to content

Instantly share code, notes, and snippets.

@rasathus
Created March 26, 2013 23:51
Show Gist options
  • Save rasathus/5250381 to your computer and use it in GitHub Desktop.
Save rasathus/5250381 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
from __future__ import with_statement
import os
import time
import logging
import shutil
import tempfile
import Queue
from sys import exit
from uuid import uuid4
import boto
import pygame
import pygame.camera
from pygame.locals import *
from gslib.third_party.oauth2_plugin import oauth2_plugin
from pigredients.ics import lpd6803 as lpd6803
from controlmypi import ControlMyPi
jabber_id = 'Jabber ID'
password = 'password'
panel_id = 'circMan'
panel_name = 'Lights In A Box'
panel_status = 'pub'
led_setter = None
capture_service = None
snap_queue = Queue.Queue()
# Stolen from http://stackoverflow.com/questions/4296249/how-do-i-convert-a-hex-triplet-to-an-rgb-tuple-and-back
HEX = '0123456789abcdef'
HEX2 = dict((a+b, HEX.index(a)*16 + HEX.index(b)) for a in HEX for b in HEX)
def hex_to_rgb(triplet):
triplet = triplet.lower()
return RGB(int(HEX2[triplet[0:2]]*0.2), int(HEX2[triplet[2:4]]*0.2), int(HEX2[triplet[4:6]]*0.2))
def triplet(rgb):
return format((rgb[0]<<16)|(rgb[1]<<8)|rgb[2], '06x')
class Capture_Upload:
def __init__(self, controlmypi):
self.image_id = 0
self.controlmypi = controlmypi
self.project_id = 'rasathus-raspberrypi'
self.storage_bucket = 'image-store'
self.uri = boto.storage_uri(self.storage_bucket, 'gs')
self.temp_dir = tempfile.mkdtemp(prefix='googlestorage')
temp_uuid = str(uuid4())
self.entr = temp_uuid[0:8]
pygame.init()
pygame.camera.init()
self.cam = pygame.camera.Camera('/dev/video0', (640, 480), 'RGB')
self.cam.start()
def say_cheese(self):
filename = "capture-%s-%d.jpeg" % (self.entr, self.image_id)
self.image_id += 1
# for some reason the cam is always two pics behind, so discard the first two.
# Suspect its a bug with the pygame as Jeremey independantly found the same issue.
surf = self.cam.get_image()
surf = self.cam.get_image()
surf = self.cam.get_image()
pygame.image.save(surf, os.path.join(self.temp_dir, filename))
# upload our image
dst_uri = boto.storage_uri(
self.storage_bucket + '/' + filename, 'gs')
image_data = file(os.path.join(self.temp_dir, filename), 'r')
dst_uri.new_key().set_contents_from_file(image_data)
image_data.close()
uri = boto.storage_uri(self.storage_bucket + '/' + filename, 'gs')
#print str(uri.get_acl())
uri.set_canned_acl('public-read')
#print str(uri.get_acl())
logging.info("Image should be available @ http://image-store.storage.googleapis.com/%s , updating controlmypi.com" % filename)
self.controlmypi.update_status({'box_image': "http://image-store.storage.googleapis.com/%s" % filename})
def spring_cleaning(self):
# delete the first old image we come accross
try:
if self.image_id > 10:
file_listing = boto.storage_uri(self.storage_bucket, 'gs')
# buckets don't support indexing, loop through first then break.
for obj in file_listing.get_bucket():
logging.info("Deleting file @ '%s://%s/%s'" % (file_listing.scheme, file_listing.bucket_name, obj.name))
obj.delete()
break
except:
logging.exception("Error whilst deleting, looks like you'll have to clear up manually.")
def shutdown(self):
self.cam.stop()
shutil.rmtree(self.temp_dir) # Don't forget to clean up our temp folder
class RGB(object):
def __init__(self, r=0, g=0, b=0):
self.r = r
self.g = g
self.b = b
def __str__(self):
return "[R %d, G %d, B %d]" % (self.r, self.g, self.b)
class Led_Setter:
def __init__(self):
self.led_chain = lpd6803.LPD6803_Chain(ics_in_chain=5)
self.set(RGB(28,9,46))
def set(self, rgb):
# set our chain state
self.led_chain.set_rgb([rgb.r,rgb.g,rgb.b])
# write out our set state.
self.led_chain.write()
def shutdown(self):
self.set(RGB(0,0,0))
def controlmypi_callback(connection, key, value):
logging.info("Got message from controlmypi.com key : %s Value : %s" % (key, value))
if key == 'wheel':
connection.update_status({'indicator':value})
snap_queue.put(hex_to_rgb(value[1:7]))
if __name__ == '__main__':
logging.basicConfig(filename='box.log', level=logging.INFO)
logging.info("Running up Control My Pi connection.")
panel_description = [[['L','Select a colour, and few seconds later your selection should appear.']],
[['L','To find out how it all works head on over to http://rasathus.blogspot.co.uk']],
[['L','']],
[['P','box_image','http://commondatastorage.googleapis.com/image-store/default.jpeg']],
[['L','Pick a colour:'],['W','wheel'],['L','Current lighting State:'],['I','indicator','#000000']]]
controlmypi_connection = ControlMyPi(jabber_id, password, panel_id, panel_name, panel_description, controlmypi_callback, panel_status)
logging.info("Instantiating our capture service.")
capture_service = Capture_Upload(controlmypi=controlmypi_connection)
led_setter = Led_Setter()
if controlmypi_connection.start_control():
capture_service.say_cheese()
try:
while True:
logging.info("Queue length is : %d" % snap_queue.qsize())
rgb = snap_queue.get(block=True)
led_setter.set(rgb)
capture_service.say_cheese()
capture_service.spring_cleaning()
except KeyboardInterrupt:
logging.warning("Caught keyboard interupt. Shutting down ...")
logging.info("Calling shutdown on ControlMyPi.com")
controlmypi_connection.stop_control()
logging.info("Calling shutdown on capture service")
capture_service.shutdown()
logging.info("Calling shutdown on led chain")
led_setter.shutdown()
logging.info("Calling shutdown on scheduler")
logging.info("Shutting down logger and exiting ...")
logging.shutdown()
exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment