Skip to content

Instantly share code, notes, and snippets.

@K-Ko
Last active May 15, 2017 09:58
Show Gist options
  • Save K-Ko/4bb5a0aa35e7d51926a26ae64402e7bd to your computer and use it in GitHub Desktop.
Save K-Ko/4bb5a0aa35e7d51926a26ae64402e7bd to your computer and use it in GitHub Desktop.
S0 GPIO reader enhanced
#!/usr/bin/env python
# http://raspberrypi.stackexchange.com/a/36245
import sys, getopt, datetime, time, signal
import RPi.GPIO as GPIO
try:
from urllib.request import Request, urlopen # Python 3
except:
from urllib2 import Request, urlopen # Python 2
### --------------------------------------------------------------------------
### Settings
### --------------------------------------------------------------------------
LED_heartbeat = 35 ### Pin for red LED
LED_impulse = 47 ### Pin for green LED
options = {
'pin': 23, ### A common GPIO pin
'resolution': 1000, ### Impulse per kilo watt hour
'bounce': 300, ### milli seconds
'log': False, ### Foreground
'max': sys.maxint, ### Watts above are errors
'csv': '', ### If given, write also timestamp to log file
'verbose': 0 ### Off
}
### --------------------------------------------------------------------------
### Functions
### --------------------------------------------------------------------------
def usage(rc):
print '''
Listen for S0 impulses on a Raspberry GPIO pin
Usage: %s [options]
Options:
-p <PIN> GPIO pin to listen for impulses, default 23
-r <resolution> Impulses per kilo watt hour, default 1000
-b <milli seconds> Pin bounce time, default, 300ms
-l <file name> Log file to store watts values
If not given, script will run in foreground
-u Upper boundary, ignore Watts obove as error
-s CSV separator, if given, write also timestamp to log file
-v Verbose mode
-h This help
''' % sys.argv[0]
sys.exit(rc)
### --------------------------------------------------------------------------
def log(level, format, *args):
if options['verbose'] >= level:
print '[' + str(datetime.datetime.now()) + '] ' + format % args
### --------------------------------------------------------------------------
### Make argument integer or throw error message
### --------------------------------------------------------------------------
def int_arg(arg, msg):
try:
return int(arg)
except:
print '\n%s: %s\n' % (msg, arg)
sys.exit()
### --------------------------------------------------------------------------
### Init last timestamp pointer
last_timestamp = False
### --------------------------------------------------------------------------
def handleImpulse(event):
### At 1st remember actual time for precise calculations
timestamp = time.time()
### Turn notification LED on, also during wait for 2nd impulse
GPIO.output(LED_impulse, True)
### Import to change
global last_timestamp
if not last_timestamp:
log(1, 'Start measuring ...')
last_timestamp = timestamp
return
watts = 36e5 / (timestamp - last_timestamp) / options['resolution']
if watts > options['max']:
log(1, 'Ignore %9.3f above %d', watts, options['max'])
else:
log(1, '%9.3f W', watts)
if options['log']:
with open(options['log'], 'a') as f:
if options['csv']:
timestr = str(int(round(timestamp)))
### Format to german "<day>.<month>.<year> <hour>:<minute>:<second>"
# timestr = time.strftime('%d.%m.%Y %H:%M:%S', time.localtime(timestamp))
wattstr = str(watts)
### Round watts and make integer
# watts = str(int(round(watts)))
f.write(timestr + options['csv'] + wattstr + '\n')
else:
f.write(str(watts) + '\n')
f.close()
time.sleep(0.05)
### Turn notification LED off
GPIO.output(LED_impulse, False)
last_timestamp = timestamp
### --------------------------------------------------------------------------
def cleanup(signal, frame):
GPIO.cleanup()
sys.exit(0)
### --------------------------------------------------------------------------
### Command line arguments
### --------------------------------------------------------------------------
try:
opts, args = getopt.getopt(sys.argv[1:], "p:r:b:l:u:s:vh")
except getopt.GetoptError:
usage(2)
### --------------------------------------------------------------------------
for opt, arg in opts:
if opt == '-p':
options['pin'] = int_arg(arg, 'Invalid PIN number')
elif opt == '-r':
options['resolution'] = int_arg(arg, 'Invalid resolution')
elif opt == '-b':
options['bounce'] = int_arg(arg, 'Invalid bounce time')
elif opt == '-l':
options['log'] = arg
try:
with open(options['log'], 'a') as f: f.close()
except:
print '\nCan\'t open log file: %s\n' % options['log']
usage(1)
elif opt == '-u':
options['max'] = int_arg(arg, 'Invalid upper Watts boundary')
elif opt == '-s':
options['csv'] = arg
elif opt == '-v':
options['verbose'] += 1
else:
usage(0)
### --------------------------------------------------------------------------
### Let's go
### --------------------------------------------------------------------------
log(2, '--- Configuration ---')
for (key, value) in options.items():
log(2, '%-10s = %s', key, value)
log(2, '---------------------')
### Check log file given
if not options['log']:
options['verbose'] += 1
log(1, 'No log file given, run in foreground')
log(1, 'Press <Ctrl>+C to abort')
### Init GPIO
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
### Init LEDs and turn them off
GPIO.setup(LED_heartbeat, GPIO.OUT)
GPIO.output(LED_heartbeat, False)
GPIO.setup(LED_impulse, GPIO.OUT)
GPIO.output(LED_impulse, False)
### Set up pin as input, pulled up to avoid false detection.
### It is wired to connect to GND on impulse.
### So we'll be setting up falling edge detection
GPIO.setup(options['pin'], GPIO.IN, pull_up_down=GPIO.PUD_UP)
### When a falling edge is detected, regardless of whatever
### else is happening in the program, the function handleImpulse will be run
### 'bouncetime' includes the bounce control written into interrupts2a.py
GPIO.add_event_detect(options['pin'], GPIO.FALLING, callback=handleImpulse, bouncetime=options['bounce'])
### Catch <CRTL>+C silently
signal.signal(signal.SIGINT, cleanup)
###
log(1, 'Wait for 1st impulse ...')
# Make the heartbeat LED flash all the time
while True:
GPIO.output(LED_heartbeat, True)
time.sleep(1)
GPIO.output(LED_heartbeat, False)
time.sleep(1)
cleanup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment