Create a gist now

Instantly share code, notes, and snippets.

@sahib /gnublin_epoll.py Secret
Created Dec 15, 2014

What would you like to do?
#!/usr/bin/env python
# encoding: utf-8
"""
Currently this is a testbed for an event driven approach to the input boards.
Here's a very short overview:
A button-switch is connected to IO_1_4 of the input board, which is mapped to
the GPIO Pin 110 on the gnublin over the gpio-sysfs. State-changes should get
noticed once the button is pressed, since the gnublin receives an interrupt on
GPIO 11. At the same time a mqtt client is polled for changes in the same
thread.
Pin overview of the input board is here:
http://wiki.gnublin.org/index.php/Port_Expander_PCA9555
Before trying this, you need to:
- Solder a piece of copper between the interrupt output (pin 1, top left)
of the pca9555 chip of the interrupt board to pin11 of the gnublin
connector. Put a resistor between 3.3V and GPIO11 on the Gnublin board.
(Yes, really. Shame on you, Mr. Sauter.)
- Load the pca9555 gpio sysfs::
$ modprobe pca953x cnt=1 addr1=0x20 # addr2=0x21
0x20 is the address of the input board, which is given by the 3 jumpers
on the input board. If all of them are set to low it will be 0x20, if all
are set to high it will be 0x28.
- Load the network module::
$ modprobe enc28j60 irq_pin=12 cs_pin=19
You need to connect to the internet ofc., e.g. with dhclient eth0. If the
gnublin is suddenly rebooting: get another one; hardware is most likely
broken.
- You do not need to export the pins, all accessed pins should
get exported automatically by this script. (you're welcome.)
"""
# stdlib:
import os
import io
import select
# External libaries:
import paho.mqtt.client as mqtt
###########################
# GPIO Utilities #
###########################
class Pin(io.FileIO):
"""Offer a file-like API for a pin,
but read() will return the state of the pin.
The edge (interrupt behaviour) and the direction
(if output or input pin) can be set via the respective properties.
As utility method, the /sys/class/gpio/gpioXY directory can be accessed
using the directory() method, or as classmethod via Pin.directory(XY).
Getting the lowlevel fileno is possible by .fileno()
"""
def __init__(self, pin, direction=None, edge=None):
"""Opens a pin for reading or writing.
:param pin: The pin number to open.
:param direction: Either 'in' or 'out', by default 'in'
:param edge: The interrupt behaviour, see the docs.
:returns: A normal file descriptor.
"""
self._pin, self._direction, self._edge = pin, direction, edge
if not os.access(Pin.directory(pin), os.F_OK):
Pin.export(pin)
if edge is not None:
self.edge = edge
if direction is not None:
self.direction = direction
io.FileIO.__init__(
self, os.open(
os.path.join(Pin.directory(pin), 'value'),
os.O_RDONLY | os.O_NONBLOCK
)
)
################
# Properties #
################
@property
def direction(self):
return self._direction
@direction.setter
def direction(self, new_direction):
self._direction = new_direction
if new_direction not in ('in', 'out'):
raise ValueError('direction must be either "in" or "out"')
dir_file = os.path.join(Pin.directory(self._pin), 'direction')
with open(dir_file, 'w') as handle:
handle.write(new_direction + '\n')
@property
def edge(self):
return self._edge
@edge.setter
def edge(self, new_edge):
self._edge = new_edge
if new_edge not in ('both', 'falling', 'sinking'):
raise ValueError(
'edge must be one of "both", "falling", "sinking"'
)
edge_file = os.path.join(Pin.directory(self._pin), 'edge')
with open(edge_file, 'w') as handle:
handle.write(new_edge)
#################
# Class Methods #
#################
@staticmethod
def directory(pin):
return '/sys/class/gpio/gpio{p}'.format(p=pin)
@staticmethod
def export(pin):
with open('/sys/class/gpio/export', 'w') as handle:
handle.write(str(pin) + '\n')
@staticmethod
def unexport(pin):
with open('/sys/class/gpio/unexport', 'w') as handle:
handle.write(str(pin) + '\n')
#################
# Real Methods #
#################
def read(self):
# We would love to use pread() (which is seek() + read() in one),
# but python 3.1 does not offer this - silly Debian. This would
# be nice since pread() is atomar, which should not matter much though.
# Therefore we do what we can do best: fake stuff.
self.seek(0)
return bool(int(io.FileIO.read(self, 1)))
def close(self):
self.unexport()
return io.FileIO.close(self)
###########################################
# STUPID TEST MAIN #
###########################################
# This is just basically the paho example,
# but with own mainloop instead of .loop_forever()
def create_mqtt_client():
# The callback for when the client receives a CONNACK response
def on_connect(client, userdata, flags, rc):
print("Connected with result code", rc)
# Subscribing in on_connect() means that if we lose the connection and
client.subscribe("plant/#")
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print('{topic}: {payload}'.format(
topic=msg.topic, payload=str(msg.payload)
))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("iot.eclipse.org", 1883, 60)
return client
def main():
# GPIO Pin 11 is connected to the Interrupt output
# of the pca9555 chip on the input boards
pin_intrpt = Pin(11, edge='both')
# A button/switch used for testing
# is connected to this pin on the testsetup.
pin_switch = Pin(110)
# TODO: make this configurable with host and port.
dummy_mqtt = create_mqtt_client()
# Sorry for the cheap jokes in variable names.
epolla = select.epoll()
epolla.register(pin_intrpt.fileno(), select.EPOLLPRI | select.EPOLLERR)
epolla.register(dummy_mqtt.socket().fileno())
def handle_hw_interrupt(event):
if event & select.EPOLLPRI:
# There is urgent data to read.
intrpt_state = pin_intrpt.read()
switch_state = pin_switch.read()
print('int={i} switch is {s}pressed'.format(
i="active" if intrpt_state else "inactive",
s="" if switch_state else "not "
))
dummy_mqtt.publish('plant/sensor/110', switch_state)
else:
print('fatal: interrupt pin vanished.')
def handle_mqtt_client_event(event):
if event & select.EPOLLIN or event & select.EPOLLPRI:
dummy_mqtt.loop_read()
elif event & select.EPOLLOUT:
dummy_mqtt.loop_write()
elif event & select.EPOLLHUP:
print('socket hangup (oops)')
#############################################
# Mainloop - maybe use GLib or libuv later. #
#############################################
try:
while True:
# mqtt client tells us when he wants to write data,
# so epoll does not wake up all the time, since
# the socket will be writable all the time usually.
flags = select.EPOLLIN | select.EPOLLPRI | select.EPOLLHUP
if dummy_mqtt.want_write():
flags |= select.EPOLLOUT
epolla.modify(dummy_mqtt.socket().fileno(), flags)
for fd, event in epolla.poll(timeout=30.0):
if fd is pin_intrpt.fileno():
handle_hw_interrupt(event)
if fd is dummy_mqtt.socket().fileno():
handle_mqtt_client_event(event)
else:
# No event happened - probably a timeout.
# Let the mqtt client do some misc IO (ping etc.)
dummy_mqtt.loop_misc()
except KeyboardInterrupt:
print('Ctrl-C\'d')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment