Last active
August 29, 2015 17:32
-
-
Save Nama/5c78fc8c2e6717ffa164 to your computer and use it in GitHub Desktop.
Control radio sockets and Senseo coffee machine with NetIO-App and do some other stuff. Github Repo: https://github.com/Nama/HomeControl
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import os | |
import sys | |
import socket | |
import time | |
from PIL import Image | |
from lglcd import LogitechLcd | |
from threading import Thread | |
def load_image(sprite): | |
im = Image.open(sprite) | |
im = im.convert('1') | |
return list(im.getdata()) | |
def background(sprite): | |
lcd.set_background(load_image(sprite)) | |
lcd.update() | |
def exit(): | |
s.close() | |
os._exit(0) | |
def status(e): | |
while process_status.is_alive: | |
if process_status_work: | |
s.send(b'senseo status') | |
data = s.recv(buffer_size) | |
status = int(data.decode('utf-8')) | |
if status: | |
background(sprite_on) | |
else: | |
background(sprite_off) | |
time.sleep(1.5) | |
def button_pressed(button, short_press): | |
global mode | |
if button == 1: | |
if short_press: | |
if mode == 'senseo': | |
s.send(b'senseo on-off') | |
data = s.recv(buffer_size) | |
else: | |
s.send(b'light 1 0') | |
data = s.recv(buffer_size) | |
exit() | |
else: | |
if mode == 'lights': | |
s.send(b'light 1 1') | |
data = s.recv(buffer_size) | |
exit() | |
elif button == 2: | |
if short_press: | |
if mode == 'senseo': | |
global process_status_work | |
process_status_work = False | |
s.send(b'senseo coffee') | |
background(sprite_heating) | |
data = s.recv(buffer_size) | |
for i in range(60): | |
background(sprite_coffe1) | |
time.sleep(0.3) | |
background(sprite_coffe2) | |
time.sleep(0.3) | |
background(sprite_coffe3) | |
time.sleep(10) | |
process_status_work = True | |
else: | |
s.send(b'light 2 0') | |
data = s.recv(buffer_size) | |
exit() | |
else: | |
if mode == 'lights': | |
s.send(b'light 2 1') | |
data = s.recv(buffer_size) | |
exit() | |
elif button == 4: | |
if short_press: | |
if mode == 'senseo': | |
process_status_work = False | |
mode = 'lights' | |
background(sprite_lights) | |
else: | |
s.send(b'light 3 0') | |
data = s.recv(buffer_size) | |
exit() | |
else: | |
if mode == 'lights': | |
s.send(b'light 3 1') | |
data = s.recv(buffer_size) | |
exit() | |
elif button == 8: | |
if short_press: | |
if mode == 'senseo': | |
exit() | |
else: | |
s.send(b'light 4 0') | |
data = s.recv(buffer_size) | |
exit() | |
else: | |
if mode == 'lights': | |
s.send(b'light 4 1') | |
data = s.recv(buffer_size) | |
exit() | |
def buttons(button): | |
while process_buttons.is_alive: | |
time.sleep(0.001) | |
if lcd.is_button_pressed(button): | |
pressed = time.time() | |
while lcd.is_button_pressed(button): | |
time.sleep(0.001) | |
pressed_time = int((time.time() - pressed) * 1000) | |
if pressed_time < 350: | |
short_press = True | |
else: | |
short_press = False | |
button_pressed(button, short_press) | |
if __name__ == "__main__": | |
lcd = LogitechLcd('HomeControl Client') | |
sprite_init = 'init.png' | |
sprite_on = 'on.png' | |
sprite_off = 'off.png' | |
sprite_heating = 'preheating.png' | |
sprite_lights = 'lights.png' | |
sprite_coffe1 = 'coffee1.png' | |
sprite_coffe2 = 'coffee2.png' | |
sprite_coffe3 = 'coffee3.png' | |
global mode | |
mode = 'senseo' | |
# Set initialization image, | |
# till connection ist established and status-thread is running | |
background(sprite_init) | |
# Socket-Server | |
ip = 'senseo' | |
port = 54321 | |
buffer_size = 1024 | |
display_time = 300 | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.connect((ip, port)) | |
# Start threads for status and buttons | |
process_status_work = True | |
process_status = Thread(target=status, args=('yorp', )) | |
process_status.start() | |
for button in 1, 2, 4, 8: | |
process_buttons = Thread(target=buttons, args=(button, )) | |
process_buttons.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import os | |
import sys | |
import time | |
import subprocess | |
import socketserver | |
import RPi.GPIO as GPIO | |
help = ''' | |
Possible commands: | |
#cmd device off or on | |
light <1-4> <0,1> | |
#cmd do | |
senseo status # Returns LED status | |
senseo on-off # Turns either on or off, same command for both | |
senseo coffee # Does a cup of coffee. If the Senseo is turned off, it turns on and waits till it preheats and does coffee. | |
#other | |
cmd xbmc status # service xbmc status | |
cmd xbmc start # service xbmc start | |
cmd xbmc stop # service xbmc stop | |
cmd temp # Shows CPU temperature | |
cmd hdd sd # Sends all "df -h" output of precoded devices | |
''' | |
GPIO.setmode(GPIO.BOARD) | |
GPIO.setwarnings(False) | |
'''Settings for Light-Switch''' | |
# Change the key[] variable below according to the dipswitches on your Elro receivers. | |
default_key = [0, 1, 1, 0, 1] | |
# Pin for 433MHz transmitter | |
default_pin = 16 | |
# Number of transmissions | |
repeat = 10 | |
# microseconds | |
pulselength = 300 | |
'''Settings for Senseo''' | |
# On-Off Pin | |
power_pin = 11 | |
# LED-Pin | |
led_pin = 13 | |
# Pin for making coffee | |
coffee_pin = 15 | |
GPIO.setup(led_pin, GPIO.IN) | |
class MyTCPHandler(socketserver.BaseRequestHandler): | |
def handle(self): | |
while 1: | |
# self.request is the TCP socket connected to the client | |
self.data = self.request.recv(1024).strip() | |
if not self.data: | |
break | |
client_data = self.data.decode('utf-8') | |
client_words = client_data.split(' ') | |
res = 'OK 200' | |
# Light-Switch | |
if client_words[0] == 'light': | |
device = RemoteSwitch(device=int(client_words[1]), | |
key=default_key, | |
pin=default_pin) | |
if int(client_words[2]): | |
device.switchOn() | |
else: | |
device.switchOff() | |
# Senseo | |
elif client_words[0] == 'senseo': | |
if client_words[1] == 'status': | |
res = self.senseo_status() | |
elif client_words[1] == 'on-off': | |
self.senseo_on_off() | |
elif client_words[1] == 'coffee': | |
self.senseo_coffee() | |
# Other commands | |
elif client_words[0] == 'cmd': | |
if client_words[1] == 'xbmc': | |
if client_words[2] == 'status': | |
res = xbmc_status() | |
elif client_words[2] == 'start': | |
os.popen('service xbmc start') | |
elif client_words[2] == 'stop': | |
os.popen('service xbmc stop') | |
elif client_words[1] == "temp": | |
res = 'CPU Temp: ' + os.popen('/opt/vc/bin/vcgencmd measure_temp').readline().replace("temp=", "").replace("'C\n", "") + ' °C' | |
elif client_words[1] == "hdd": | |
if client_words[2] == "sd": | |
space = self.df('/dev/root') | |
res = ' SD: ' | |
res += str(space[1:5]).replace(",", "").replace("'", "").replace("(", "").replace(")", "") | |
# Other cmd's here | |
else: | |
res = 'Not all words could be found. ' + help | |
self.request.sendall(res.encode('utf-8')) | |
# Get Senseo status | |
def senseo_status(self): | |
return str(GPIO.input(13)) | |
# Turn Senseo on/off | |
def senseo_on_off(self): | |
try: | |
GPIO.output(power_pin, 0) | |
except RuntimeError: | |
GPIO.setup(power_pin, GPIO.OUT) | |
time.sleep(0.2) | |
GPIO.output(power_pin, 1) | |
# Make coffee | |
def senseo_coffee(self): | |
on = 0 | |
while not on: | |
status_1 = GPIO.input(13) | |
time.sleep(1.5) | |
status_2 = GPIO.input(13) | |
if status_1 and status_2: | |
on = 1 | |
try: | |
GPIO.output(coffee_pin, 0) | |
except RuntimeError: | |
GPIO.setup(coffee_pin, GPIO.OUT) | |
time.sleep(0.2) | |
GPIO.output(coffee_pin, 1) | |
elif not status_1 and not status_2: | |
self.senseo_on_off() | |
else: | |
time.sleep(10) | |
# Get the status of the service xbmc | |
def xbmc_status(): | |
status = subprocess.Popen(['service', 'xbmc', 'status'], stdout=subprocess.PIPE) | |
output = status.communicate()[0] | |
status = output.decode('utf-8') | |
if status == 'xbmc stop/waiting\n': | |
res = '0' | |
else: | |
res = '1' | |
return res | |
# Pretify "df -h" | |
def df(self, filename): | |
df = subprocess.Popen(["df", "-h", filename], stdout=subprocess.PIPE) | |
output = df.communicate()[0] | |
output = output.decode('utf-8') | |
device, size, used, available, percent, mountpoint = output.split("\n")[1].split() | |
return device, size, used, available, percent, mountpoint | |
class RemoteSwitch(object): | |
repeat = repeat | |
pulselength = pulselength | |
def __init__(self, device, key=[1, 1, 1, 1, 1], pin=11): | |
self.pin = pin | |
self.key = key | |
self.device = device | |
GPIO.setup(self.pin, GPIO.OUT) | |
def switchOn(self): | |
self._switch(GPIO.HIGH) | |
def switchOff(self): | |
self._switch(GPIO.LOW) | |
def _switch(self, switch): | |
self.bit = [142, 142, 142, 142, 142, 142, 142, 142, 142, 142, 142, 136, 128, 0, 0, 0] | |
for t in range(5): | |
if self.key[t]: | |
self.bit[t] = 136 | |
x = 1 | |
for i in range(1, 6): | |
if self.device & x > 0: | |
self.bit[4 + i] = 136 | |
x = x << 1 | |
if switch == GPIO.HIGH: | |
self.bit[10] = 136 | |
self.bit[11] = 142 | |
bangs = [] | |
for y in range(16): | |
x = 128 | |
for i in range(1, 9): | |
b = (self.bit[y] & x > 0) and GPIO.HIGH or GPIO.LOW | |
bangs.append(b) | |
x = x >> 1 | |
GPIO.output(self.pin, GPIO.LOW) | |
for z in range(self.repeat): | |
for b in bangs: | |
GPIO.output(self.pin, b) | |
time.sleep(self.pulselength / 1000000.) | |
if __name__ == "__main__": | |
HOST, PORT = "", 54321 | |
# Create the server, binding to localhost on port 9999 | |
server = socketserver.TCPServer((HOST, PORT), MyTCPHandler) | |
# Activate the server; this will keep running until you | |
# interrupt the program with Ctrl-C | |
server.serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment