Last active
May 15, 2017 09:58
-
-
Save K-Ko/4bb5a0aa35e7d51926a26ae64402e7bd to your computer and use it in GitHub Desktop.
S0 GPIO reader enhanced
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# http://raspberrypi.stackexchange.com/a/36245 | |
import sys, getopt, datetime, time, signal | |
import RPi.GPIO as GPIO | |
try: | |
from urllib.request import Request, urlopen # Python 3 | |
except: | |
from urllib2 import Request, urlopen # Python 2 | |
### -------------------------------------------------------------------------- | |
### Settings | |
### -------------------------------------------------------------------------- | |
LED_heartbeat = 35 ### Pin for red LED | |
LED_impulse = 47 ### Pin for green LED | |
options = { | |
'pin': 23, ### A common GPIO pin | |
'resolution': 1000, ### Impulse per kilo watt hour | |
'bounce': 300, ### milli seconds | |
'log': False, ### Foreground | |
'max': sys.maxint, ### Watts above are errors | |
'csv': '', ### If given, write also timestamp to log file | |
'verbose': 0 ### Off | |
} | |
### -------------------------------------------------------------------------- | |
### Functions | |
### -------------------------------------------------------------------------- | |
def usage(rc): | |
print ''' | |
Listen for S0 impulses on a Raspberry GPIO pin | |
Usage: %s [options] | |
Options: | |
-p <PIN> GPIO pin to listen for impulses, default 23 | |
-r <resolution> Impulses per kilo watt hour, default 1000 | |
-b <milli seconds> Pin bounce time, default, 300ms | |
-l <file name> Log file to store watts values | |
If not given, script will run in foreground | |
-u Upper boundary, ignore Watts obove as error | |
-s CSV separator, if given, write also timestamp to log file | |
-v Verbose mode | |
-h This help | |
''' % sys.argv[0] | |
sys.exit(rc) | |
### -------------------------------------------------------------------------- | |
def log(level, format, *args): | |
if options['verbose'] >= level: | |
print '[' + str(datetime.datetime.now()) + '] ' + format % args | |
### -------------------------------------------------------------------------- | |
### Make argument integer or throw error message | |
### -------------------------------------------------------------------------- | |
def int_arg(arg, msg): | |
try: | |
return int(arg) | |
except: | |
print '\n%s: %s\n' % (msg, arg) | |
sys.exit() | |
### -------------------------------------------------------------------------- | |
### Init last timestamp pointer | |
last_timestamp = False | |
### -------------------------------------------------------------------------- | |
def handleImpulse(event): | |
### At 1st remember actual time for precise calculations | |
timestamp = time.time() | |
### Turn notification LED on, also during wait for 2nd impulse | |
GPIO.output(LED_impulse, True) | |
### Import to change | |
global last_timestamp | |
if not last_timestamp: | |
log(1, 'Start measuring ...') | |
last_timestamp = timestamp | |
return | |
watts = 36e5 / (timestamp - last_timestamp) / options['resolution'] | |
if watts > options['max']: | |
log(1, 'Ignore %9.3f above %d', watts, options['max']) | |
else: | |
log(1, '%9.3f W', watts) | |
if options['log']: | |
with open(options['log'], 'a') as f: | |
if options['csv']: | |
timestr = str(int(round(timestamp))) | |
### Format to german "<day>.<month>.<year> <hour>:<minute>:<second>" | |
# timestr = time.strftime('%d.%m.%Y %H:%M:%S', time.localtime(timestamp)) | |
wattstr = str(watts) | |
### Round watts and make integer | |
# watts = str(int(round(watts))) | |
f.write(timestr + options['csv'] + wattstr + '\n') | |
else: | |
f.write(str(watts) + '\n') | |
f.close() | |
time.sleep(0.05) | |
### Turn notification LED off | |
GPIO.output(LED_impulse, False) | |
last_timestamp = timestamp | |
### -------------------------------------------------------------------------- | |
def cleanup(signal, frame): | |
GPIO.cleanup() | |
sys.exit(0) | |
### -------------------------------------------------------------------------- | |
### Command line arguments | |
### -------------------------------------------------------------------------- | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], "p:r:b:l:u:s:vh") | |
except getopt.GetoptError: | |
usage(2) | |
### -------------------------------------------------------------------------- | |
for opt, arg in opts: | |
if opt == '-p': | |
options['pin'] = int_arg(arg, 'Invalid PIN number') | |
elif opt == '-r': | |
options['resolution'] = int_arg(arg, 'Invalid resolution') | |
elif opt == '-b': | |
options['bounce'] = int_arg(arg, 'Invalid bounce time') | |
elif opt == '-l': | |
options['log'] = arg | |
try: | |
with open(options['log'], 'a') as f: f.close() | |
except: | |
print '\nCan\'t open log file: %s\n' % options['log'] | |
usage(1) | |
elif opt == '-u': | |
options['max'] = int_arg(arg, 'Invalid upper Watts boundary') | |
elif opt == '-s': | |
options['csv'] = arg | |
elif opt == '-v': | |
options['verbose'] += 1 | |
else: | |
usage(0) | |
### -------------------------------------------------------------------------- | |
### Let's go | |
### -------------------------------------------------------------------------- | |
log(2, '--- Configuration ---') | |
for (key, value) in options.items(): | |
log(2, '%-10s = %s', key, value) | |
log(2, '---------------------') | |
### Check log file given | |
if not options['log']: | |
options['verbose'] += 1 | |
log(1, 'No log file given, run in foreground') | |
log(1, 'Press <Ctrl>+C to abort') | |
### Init GPIO | |
GPIO.setwarnings(False) | |
GPIO.setmode(GPIO.BCM) | |
### Init LEDs and turn them off | |
GPIO.setup(LED_heartbeat, GPIO.OUT) | |
GPIO.output(LED_heartbeat, False) | |
GPIO.setup(LED_impulse, GPIO.OUT) | |
GPIO.output(LED_impulse, False) | |
### Set up pin as input, pulled up to avoid false detection. | |
### It is wired to connect to GND on impulse. | |
### So we'll be setting up falling edge detection | |
GPIO.setup(options['pin'], GPIO.IN, pull_up_down=GPIO.PUD_UP) | |
### When a falling edge is detected, regardless of whatever | |
### else is happening in the program, the function handleImpulse will be run | |
### 'bouncetime' includes the bounce control written into interrupts2a.py | |
GPIO.add_event_detect(options['pin'], GPIO.FALLING, callback=handleImpulse, bouncetime=options['bounce']) | |
### Catch <CRTL>+C silently | |
signal.signal(signal.SIGINT, cleanup) | |
### | |
log(1, 'Wait for 1st impulse ...') | |
# Make the heartbeat LED flash all the time | |
while True: | |
GPIO.output(LED_heartbeat, True) | |
time.sleep(1) | |
GPIO.output(LED_heartbeat, False) | |
time.sleep(1) | |
cleanup() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment