Skip to content

Instantly share code, notes, and snippets.

@voodoojello
Last active October 16, 2018 17:04
Show Gist options
  • Save voodoojello/e7fd157c17eeced500c0922d6422ea62 to your computer and use it in GitHub Desktop.
Save voodoojello/e7fd157c17eeced500c0922d6422ea62 to your computer and use it in GitHub Desktop.
A simple Python3 HTTP/CGI or CLI script for setting / reading GPIO pin states with the KS0212 keyestudio RPI 4-channel Relay Shield
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
#
#-------------------------------------------------------------------------------
# KS0212 keyestudio RPI 4-channel Relay Shield CLI/CGI Interface
#-------------------------------------------------------------------------------
# A simple Python3 HTTP/CGI or CLI script for setting/reading GPIO pin states
# with the KS0212 keyestudio RPI 4-channel Relay Shield. Tested on Raspberry
# Pi 3 Model B Plus Rev 1.3 running Raspbian GNU/Linux 9.
#-------------------------------------------------------------------------------
# Author: Mark Page [mark@very3.net]
# Modified: Tue Oct 16 11:35:54 2018 -0500
# Version: 18.10.16.11
# Copyright: (c)2018 Mark Page (mark@very3.net)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless
# required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#-------------------------------------------------------------------------------
#
# Use "python3 ks0212.py -h" for command-line help
#
# HTTP GET VARS:
# action [GET/SET/PULSE]
# relay [1-4/ALL]
# state [ON/OFF]
# duration [seconds] (pulse mode only)
#
# Examples:
# Return relay names and pins:
# http://[IPADDRESS]/ks0212.py
#
# Return relay1 state:
# http://[IPADDRESS]/ks0212.py?action=GET&relay=1
#
# Return all relay states:
# http://[IPADDRESS]/ks0212.py?action=GET&relay=ALL
#
# Turn ON relay1:
# http://[IPADDRESS]/ks0212.py?action=SET&relay=1&state=ON
#
# Turn OFF relay1:
# http://[IPADDRESS]/ks0212.py?action=SET&relay=1&state=OFF
#
# Turn ON ALL relays:
# http://[IPADDRESS]/ks0212.py?action=SET&relay=ALL&state=ON
#
# Turn OFF ALL relays:
# http://[IPADDRESS]/ks0212.py?action=SET&relay=ALL&state=OFF
#
# Pulse ON ALL relays for 5 seconds:
# http://[IPADDRESS]/ks0212.py?action=PULSE&relay=ALL&state=ON&duration=5
#
# Pulse ON relay1 for 5 seconds:
# http://[IPADDRESS]/ks0212.py?action=PULSE&relay=1&state=ON&duration=5
#
# To facilitate name:value URLs try adding this to .htaccess:
#
# RewriteCond %{REQUEST_FILENAME} !-d
# RewriteRule ^([^/]+):([^/]+)(/.*)?$ $3?$1=$2 [N,QSA]
# RewriteRule ^(/[^/]+|[^/]+/|/?)$ /ks0212.py [L,QSA]
# DirectoryIndex ks0212.py
#
# Then try: http://[IPADDRESS]/action:PULSE/relay:1/state:ON/duration:5
#
#-------------------------------------------------------------------------------
#
from __future__ import print_function
from collections import defaultdict
import RPi.GPIO as gpio
import os
import sys
import time
import pprint
import cgitb
import cgi
import json
import argparse
debug = 0
gpio.setmode(gpio.BOARD)
gpio.setwarnings(False)
#-------------------------------------------------------------------------------
# Set your relay names here:
#-------------------------------------------------------------------------------
relays = defaultdict(dict)
relays = {
'r1': {
'pin': 7,
'name': 'DOCSIS Modem',
'apid': 1
},
'r2': {
'pin': 15,
'name': 'WAN Router',
'apid': 2
},
'r3': {
'pin': 31,
'name': 'PWS Controller',
'apid': 3
},
'r4': {
'pin': 37,
'name': 'PoE AP Controller',
'apid': 4
}
}
#-------------------------------------------------------------------------------
def main():
cgitb.enable(display=1, logdir="../logs")
parser = argparse.ArgumentParser(description="A simple script for manipulating the KS0212 relay board")
parser.add_argument("-a", action="store", dest="action", help="GET/SET/PULSE")
parser.add_argument("-r", action="store", dest="relay", help="1-4/ALL")
parser.add_argument("-s", action="store", dest="state", help="ON/OFF")
parser.add_argument("-d", action="store", dest="duration", help="Seconds (pulse mode only)")
args = parser.parse_args()
_get = cgi.FieldStorage()
_buffer = defaultdict(dict)
_start = time.time()
_buffer['time'] = str(time.ctime())
_buffer['request'] = os.environ.get("QUERY_STRING", "NO_QUERY")
if debug:
sys.stderr = sys.stdout
if not args.action:
print ("Content-Type: text/html;charset=utf-8\n")
_buffer['debug'] = str(_get)
pprint.pprint(os.environ.get)
else:
if not args.action:
print ("Content-Type: application/json;charset=utf-8\n")
if args.action:
a_action = args.action
a_relay = args.relay
a_state = args.state
a_duration = args.duration
else:
a_action = _get.getvalue("action")
a_relay = _get.getvalue("relay")
a_state = _get.getvalue("state")
a_duration = _get.getvalue("duration")
if a_action:
_buffer['action'] = a_action.upper()
if a_action.upper() == 'GET':
_buffer['data'] = gpio_get(a_relay)
if a_action.upper() == 'SET':
_buffer['data'] = gpio_set(a_relay,a_state)
if a_action.upper() == 'PULSE':
_buffer['pulse'] = a_duration
_buffer['data'] = gpio_pulse(a_relay,a_state,a_duration)
else:
_buffer['action'] = 'NONE'
_buffer['data'] = relays
_buffer['run_time'] = (time.time() - _start)
print (json.dumps(_buffer,sort_keys=True,indent=2))
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
def gpio_get(_r):
_return = defaultdict(dict)
_relays = []
if _r.upper() == 'ALL':
for k,v in relays.items(): _relays.append(k)
else:
_relays.append('r'+_r)
for __r in _relays:
try:
gpio.setup(int(relays[__r]['pin']), gpio.OUT)
_return[__r]['state'] = gpio.input(int(relays[__r]['pin']))
_return[__r]['error'] = 0
_return[__r]['name'] = relays[__r]['name']
_return[__r]['pin'] = relays[__r]['pin']
except:
_return['error'] = "PIN_RANGE_ERROR"
_return['pin'] = _r
return _return
return _return
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
def gpio_set(_r,_s):
_return = defaultdict(dict)
_relays = []
if _r.upper() == 'ALL':
for k,v in relays.items(): _relays.append(k)
else:
_relays.append('r'+_r)
for __r in _relays:
try:
gpio.setup(int(relays[__r]['pin']), gpio.OUT)
_return[__r]['prev_state'] = gpio.input(int(relays[__r]['pin']))
if _s.upper() == 'ON':
gpio.output(int(relays[__r]['pin']), gpio.HIGH)
elif _s.upper() == 'OFF':
gpio.output(int(relays[__r]['pin']), gpio.LOW)
else:
_return[__r]['error'] = "ILLEGAL_STATE_REQUEST"
_return[__r]['state'] = _s
return _return
_return[__r]['state'] = gpio.input(int(relays[__r]['pin']))
_return[__r]['error'] = 0
_return[__r]['name'] = relays[__r]['name']
_return[__r]['pin'] = relays[__r]['pin']
except:
_return['error'] = "STATE_RANGE_ERROR"
_return['pin'] = _r
return _return
return _return
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
def gpio_pulse(_r,_s,_d):
_return = defaultdict(dict)
_relays = []
if _r.upper() == 'ALL':
for k,v in relays.items(): _relays.append(k)
else:
_relays.append('r'+_r)
for __r in _relays:
_return[__r]['pin'] = relays[__r]['pin']
_return[__r]['name'] = relays[__r]['name']
try:
gpio.setup(int(relays[__r]['pin']), gpio.OUT)
_return[__r]['prev_state1'] = gpio.input(int(relays[__r]['pin']))
if _s.upper() == 'ON':
gpio.output(int(relays[__r]['pin']), gpio.HIGH)
elif _s.upper() == 'OFF':
gpio.output(int(relays[__r]['pin']), gpio.LOW)
else:
_return[__r]['error1'] = "ILLEGAL_STATE_REQUEST"
_return[__r]['req_state1'] = _s
return _return
_return[__r]['error1'] = 0
_return[__r]['state1'] = gpio.input(int(relays[__r]['pin']))
except:
_return['error1'] = "STATE_RANGE_ERROR"
return _return
time.sleep(int(_d))
for __r in _relays:
try:
gpio.setup(int(relays[__r]['pin']), gpio.OUT)
_return[__r]['prev_state2'] = gpio.input(int(relays[__r]['pin']))
if _s.upper() == 'ON':
gpio.output(int(relays[__r]['pin']), gpio.LOW)
elif _s.upper() == 'OFF':
gpio.output(int(relays[__r]['pin']), gpio.HI)
else:
_return[__r]['error2'] = "ILLEGAL_STATE_REQUEST"
_return[__r]['req_state2'] = _s
return _return
_return[__r]['error2'] = 0
_return[__r]['state2'] = gpio.input(int(relays[__r]['pin']))
except:
_return['error2'] = "STATE_RANGE_ERROR"
return _return
return _return
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
if __name__ == "__main__": main()
#-------------------------------------------------------------------------------
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment