-
-
Save sahib/2f5343041c7ba3c97db0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
Currently this is a testbed for an event driven approach to the input boards. | |
Here's a very short overview: | |
A button-switch is connected to IO_1_4 of the input board, which is mapped to | |
the GPIO Pin 110 on the gnublin over the gpio-sysfs. State-changes should get | |
noticed once the button is pressed, since the gnublin receives an interrupt on | |
GPIO 11. At the same time a mqtt client is polled for changes in the same | |
thread. | |
Pin overview of the input board is here: | |
http://wiki.gnublin.org/index.php/Port_Expander_PCA9555 | |
Before trying this, you need to: | |
- Solder a piece of copper between the interrupt output (pin 1, top left) | |
of the pca9555 chip of the interrupt board to pin11 of the gnublin | |
connector. Put a resistor between 3.3V and GPIO11 on the Gnublin board. | |
(Yes, really. Shame on you, Mr. Sauter.) | |
- Load the pca9555 gpio sysfs:: | |
$ modprobe pca953x cnt=1 addr1=0x20 # addr2=0x21 | |
0x20 is the address of the input board, which is given by the 3 jumpers | |
on the input board. If all of them are set to low it will be 0x20, if all | |
are set to high it will be 0x28. | |
- Load the network module:: | |
$ modprobe enc28j60 irq_pin=12 cs_pin=19 | |
You need to connect to the internet ofc., e.g. with dhclient eth0. If the | |
gnublin is suddenly rebooting: get another one; hardware is most likely | |
broken. | |
- You do not need to export the pins, all accessed pins should | |
get exported automatically by this script. (you're welcome.) | |
""" | |
# stdlib: | |
import os | |
import io | |
import select | |
# External libaries: | |
import paho.mqtt.client as mqtt | |
########################### | |
# GPIO Utilities # | |
########################### | |
class Pin(io.FileIO): | |
"""Offer a file-like API for a pin, | |
but read() will return the state of the pin. | |
The edge (interrupt behaviour) and the direction | |
(if output or input pin) can be set via the respective properties. | |
As utility method, the /sys/class/gpio/gpioXY directory can be accessed | |
using the directory() method, or as classmethod via Pin.directory(XY). | |
Getting the lowlevel fileno is possible by .fileno() | |
""" | |
def __init__(self, pin, direction=None, edge=None): | |
"""Opens a pin for reading or writing. | |
:param pin: The pin number to open. | |
:param direction: Either 'in' or 'out', by default 'in' | |
:param edge: The interrupt behaviour, see the docs. | |
:returns: A normal file descriptor. | |
""" | |
self._pin, self._direction, self._edge = pin, direction, edge | |
if not os.access(Pin.directory(pin), os.F_OK): | |
Pin.export(pin) | |
if edge is not None: | |
self.edge = edge | |
if direction is not None: | |
self.direction = direction | |
io.FileIO.__init__( | |
self, os.open( | |
os.path.join(Pin.directory(pin), 'value'), | |
os.O_RDONLY | os.O_NONBLOCK | |
) | |
) | |
################ | |
# Properties # | |
################ | |
@property | |
def direction(self): | |
return self._direction | |
@direction.setter | |
def direction(self, new_direction): | |
self._direction = new_direction | |
if new_direction not in ('in', 'out'): | |
raise ValueError('direction must be either "in" or "out"') | |
dir_file = os.path.join(Pin.directory(self._pin), 'direction') | |
with open(dir_file, 'w') as handle: | |
handle.write(new_direction + '\n') | |
@property | |
def edge(self): | |
return self._edge | |
@edge.setter | |
def edge(self, new_edge): | |
self._edge = new_edge | |
if new_edge not in ('both', 'falling', 'sinking'): | |
raise ValueError( | |
'edge must be one of "both", "falling", "sinking"' | |
) | |
edge_file = os.path.join(Pin.directory(self._pin), 'edge') | |
with open(edge_file, 'w') as handle: | |
handle.write(new_edge) | |
################# | |
# Class Methods # | |
################# | |
@staticmethod | |
def directory(pin): | |
return '/sys/class/gpio/gpio{p}'.format(p=pin) | |
@staticmethod | |
def export(pin): | |
with open('/sys/class/gpio/export', 'w') as handle: | |
handle.write(str(pin) + '\n') | |
@staticmethod | |
def unexport(pin): | |
with open('/sys/class/gpio/unexport', 'w') as handle: | |
handle.write(str(pin) + '\n') | |
################# | |
# Real Methods # | |
################# | |
def read(self): | |
# We would love to use pread() (which is seek() + read() in one), | |
# but python 3.1 does not offer this - silly Debian. This would | |
# be nice since pread() is atomar, which should not matter much though. | |
# Therefore we do what we can do best: fake stuff. | |
self.seek(0) | |
return bool(int(io.FileIO.read(self, 1))) | |
def close(self): | |
self.unexport() | |
return io.FileIO.close(self) | |
########################################### | |
# STUPID TEST MAIN # | |
########################################### | |
# This is just basically the paho example, | |
# but with own mainloop instead of .loop_forever() | |
def create_mqtt_client(): | |
# The callback for when the client receives a CONNACK response | |
def on_connect(client, userdata, flags, rc): | |
print("Connected with result code", rc) | |
# Subscribing in on_connect() means that if we lose the connection and | |
client.subscribe("plant/#") | |
# The callback for when a PUBLISH message is received from the server. | |
def on_message(client, userdata, msg): | |
print('{topic}: {payload}'.format( | |
topic=msg.topic, payload=str(msg.payload) | |
)) | |
client = mqtt.Client() | |
client.on_connect = on_connect | |
client.on_message = on_message | |
client.connect("iot.eclipse.org", 1883, 60) | |
return client | |
def main(): | |
# GPIO Pin 11 is connected to the Interrupt output | |
# of the pca9555 chip on the input boards | |
pin_intrpt = Pin(11, edge='both') | |
# A button/switch used for testing | |
# is connected to this pin on the testsetup. | |
pin_switch = Pin(110) | |
# TODO: make this configurable with host and port. | |
dummy_mqtt = create_mqtt_client() | |
# Sorry for the cheap jokes in variable names. | |
epolla = select.epoll() | |
epolla.register(pin_intrpt.fileno(), select.EPOLLPRI | select.EPOLLERR) | |
epolla.register(dummy_mqtt.socket().fileno()) | |
def handle_hw_interrupt(event): | |
if event & select.EPOLLPRI: | |
# There is urgent data to read. | |
intrpt_state = pin_intrpt.read() | |
switch_state = pin_switch.read() | |
print('int={i} switch is {s}pressed'.format( | |
i="active" if intrpt_state else "inactive", | |
s="" if switch_state else "not " | |
)) | |
dummy_mqtt.publish('plant/sensor/110', switch_state) | |
else: | |
print('fatal: interrupt pin vanished.') | |
def handle_mqtt_client_event(event): | |
if event & select.EPOLLIN or event & select.EPOLLPRI: | |
dummy_mqtt.loop_read() | |
elif event & select.EPOLLOUT: | |
dummy_mqtt.loop_write() | |
elif event & select.EPOLLHUP: | |
print('socket hangup (oops)') | |
############################################# | |
# Mainloop - maybe use GLib or libuv later. # | |
############################################# | |
try: | |
while True: | |
# mqtt client tells us when he wants to write data, | |
# so epoll does not wake up all the time, since | |
# the socket will be writable all the time usually. | |
flags = select.EPOLLIN | select.EPOLLPRI | select.EPOLLHUP | |
if dummy_mqtt.want_write(): | |
flags |= select.EPOLLOUT | |
epolla.modify(dummy_mqtt.socket().fileno(), flags) | |
for fd, event in epolla.poll(timeout=30.0): | |
if fd is pin_intrpt.fileno(): | |
handle_hw_interrupt(event) | |
if fd is dummy_mqtt.socket().fileno(): | |
handle_mqtt_client_event(event) | |
else: | |
# No event happened - probably a timeout. | |
# Let the mqtt client do some misc IO (ping etc.) | |
dummy_mqtt.loop_misc() | |
except KeyboardInterrupt: | |
print('Ctrl-C\'d') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment