Last active
August 29, 2015 14:22
-
-
Save TinajaLabs/e0a4a9c54303b70d620f to your computer and use it in GitHub Desktop.
watch bunny - code that detects motion and sends a tweet when detected
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# example code - https://github.com/bear/python-twitter/blob/master/examples/tweet.py | |
# motion detector - https://www.adafruit.com/products/189 - | |
# running on Raspberry Pi A+ | |
import RPi.GPIO as io | |
import os, sys | |
import time | |
import datetime | |
import ConfigParser | |
import logging | |
import urllib2 | |
logging.basicConfig(filename='/var/log/detectMotion.log',level=logging.DEBUG,format='%(asctime)s - %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') | |
import twitter | |
configFile = "/home/pi/.tweetrc" | |
print("loading... watchbunny.py...") | |
logging.info("loading... watchbunny.py...") | |
io.setmode(io.BCM) | |
pir_pin = 18 | |
led_pin = 4 | |
io.setup(pir_pin, io.IN) # activate input | |
io.setup(led_pin, io.OUT) # activate output for LED | |
timeoutSeconds = 60*3 # 3 minutes | |
nextCheck = datetime.datetime.now() + datetime.timedelta(seconds=timeoutSeconds) | |
lastMotion = datetime.datetime.now() | |
class TweetRc(object): | |
def __init__(self): | |
self._config = None | |
def GetConsumerKey(self): | |
return self._GetOption('consumer_key') | |
def GetConsumerSecret(self): | |
return self._GetOption('consumer_secret') | |
def GetAccessKey(self): | |
return self._GetOption('access_key') | |
def GetAccessSecret(self): | |
return self._GetOption('access_secret') | |
def _GetOption(self, option): | |
try: | |
return self._GetConfig().get('Tweet', option) | |
except: | |
return None | |
def _GetConfig(self): | |
if not self._config: | |
# print("not self._config: ") | |
self._config = ConfigParser.ConfigParser() | |
self._config.read([os.path.expanduser(configFile)]) | |
return self._config | |
def sendTwitter(msg): | |
api = twitter.Api( | |
consumer_key=consumer_key, consumer_secret=consumer_secret, | |
access_token_key=access_key, access_token_secret=access_secret,input_encoding="utf-8") | |
try: | |
status = api.PostUpdate(msg) | |
except UnicodeDecodeError: | |
logging.warning("Your message could not be encoded. Perhaps it contains non-ASCII characters? ") | |
logging.warning("Try explicitly specifying the encoding with the --encoding flag") | |
except: | |
# print "Unexpected error:", sys.exc_info()[0] | |
logging.warning("Unexpected error:", sys.exc_info()[0]) | |
raise | |
def eyeball(state): | |
if state == "ON": | |
io.output(led_pin,True) | |
# print("LED On") | |
else: | |
io.output(led_pin,False) | |
# print("LED Off") | |
def internet_on(): | |
try: | |
response=urllib2.urlopen('http://74.125.228.100',timeout=1) | |
return True | |
except urllib2.URLError as err: pass | |
return False | |
if not (os.path.isfile(configFile)): | |
logging.warning("config file not found...") | |
sys.exit(2) | |
rc = TweetRc() | |
consumer_key = rc.GetConsumerKey() | |
consumer_secret = rc.GetConsumerSecret() | |
access_key = rc.GetAccessKey() | |
access_secret = rc.GetAccessSecret() | |
#print(consumer_key) | |
#print(consumer_secret) | |
#print(access_key) | |
#print(access_secret) | |
print("starting... watchbunny.py...") | |
logging.info("starting... watchbunny.py...") | |
while True: | |
# get the current state of the sensor | |
pirState = io.input(pir_pin) | |
# print(" Now: %s" % datetime.datetime.now()) | |
# print("next: %s" % nextCheck) | |
if (pirState and nextCheck <= datetime.datetime.now()): | |
nextCheck = datetime.datetime.now() + datetime.timedelta(seconds=timeoutSeconds) | |
io.output(led_pin,True) | |
logging.info(">>>>> PIR re-triggerred at: %s <<<<<<<<<<<<<<<<<<<<<<<<<<<<\a" % datetime.datetime.now()) | |
logging.info('sending twitter...') | |
if (internet_on): | |
sendTwitter("somebody moved at: %s" % datetime.datetime.now()) | |
else: | |
if (nextCheck <= datetime.datetime.now()): | |
logging.debug("waiting for next event...") | |
io.output(led_pin,False) | |
else: | |
logging.debug("on hold until: %s" % nextCheck) | |
io.output(led_pin,True) | |
if pirState: | |
io.output(led_pin,False) | |
time.sleep(0.1) | |
nextCheck = datetime.datetime.now() + datetime.timedelta(seconds=timeoutSeconds) | |
io.output(led_pin,True) | |
time.sleep(0.5) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment