Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@deostroll
Last active November 12, 2017 19:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save deostroll/f51f9c6064a16c68cc56f86b78de9ae3 to your computer and use it in GitHub Desktop.
Save deostroll/f51f9c6064a16c68cc56f86b78de9ae3 to your computer and use it in GitHub Desktop.
simple alarm system

Here is the source code for my simple alarm system:

Scripts for the wemos wifi mini:

boot.py
main.py
httpclient.py

Scripts for the web server:

express.py
onboard.py

On the raspberry pi, you simply bring up the server by running as root:

sudo python onboary.py 3000

Scripts for testing the web server:

led_on.sh
led_off.sh
from machine import Pin
import network
import time
HIGH = 1
LOW = 0
sta = network.WLAN(network.STA_IF)
ap = network.WLAN(network.AP_IF)
wifi = Pin(16, Pin.OUT)
ap.active(False)
sta.active(False)
wifi.off()
time.sleep(2)
sta.active(True)
sta.connect('SSID', 'password') #change settings here
while sta.isconnected() == False:
wifi.on()
time.sleep(0.3)
wifi.off()
time.sleep(0.3)
print('Connected: ', sta.isconnected())
if sta.isconnected():
wifi.on()
else:
wifi.off()
switch = Pin(15, Pin.IN)
led = Pin(14, Pin.OUT)
led.off()
print('switch active')
#!/usr/bin/env python
# """
# This code is based on a minimal http server by bradmontgomery
# https://gist.github.com/bradmontgomery/2219997
#
# Some part of it was inspired by express (expressjs.com)
# """
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
import SocketServer
class ExpressServerFactory:
def __init__(self):
self.cache = {}
def add(self, path, fn):
self.cache[path] = fn
def get(self, path, fn):
self.cache[path] = fn
def post(self, path, fn):
self.cache[path] = fn
def createClass(self):
cache = self.cache
def fn404(self):
self.send_response(404, 'path: "%s" is not defined...')
class B(BaseHTTPRequestHandler):
def do_GET(self):
try:
fn = cache[self.path]
except Exception as e:
fn = fn404
fn(self)
def do_POST(self):
try:
fn = cache[self.path]
except Exception as e:
fn = fn404
fn(self)
return B
def noop(*args, **kwargs):
pass
def run(server_class=HTTPServer, handler_class=None, port=80, stop_handler=noop):
server_address = ('', port)
httpd = server_class(server_address, handler_class)
print 'Starting httpd...'
try:
httpd.serve_forever()
except KeyboardInterrupt:
stop_handler(httpd)
if __name__ == "__main__":
from sys import argv
connected = False
srv = MyHttpServer()
def foo(self):
length = int(self.headers['Content-Length'])
post_data = self.rfile.read(length)
print type(post_data), post_data
srv.post('/foo', foo)
handlerClass = srv.createClass()
if len(argv) == 2:
run(port=int(argv[1]), handler_class=handlerClass)
else:
run(handler_class=handlerClass)
import usocket
import ujson
try:
import ussl
SUPPORT_SSL = True
except ImportError:
ussl = None
SUPPORT_SSL = False
import time
SUPPORT_TIMEOUT = hasattr(usocket.socket, 'settimeout')
CONTENT_TYPE_JSON = 'application/json'
class Response(object):
def __init__(self, status_code, raw):
self.status_code = status_code
self.raw = raw
self._content = False
self.encoding = 'utf-8'
@property
def content(self):
if self._content is False:
self._content = self.raw.read()
self.raw.close()
self.raw = None
return self._content
@property
def text(self):
content = self.content
return str(content, self.encoding) if content else ''
def close(self):
if self.raw is not None:
self._content = None
self.raw.close()
self.raw = None
def json(self):
return ujson.loads(self.text)
def raise_for_status(self):
if 400 <= self.status_code < 500:
raise OSError('Client error: %s' % self.status_code)
if 500 <= self.status_code < 600:
raise OSError('Server error: %s' % self.status_code)
# Adapted from upip
def request(method, url, json=None, timeout=None, headers=None):
urlparts = url.split('/', 3)
proto = urlparts[0]
host = urlparts[2]
urlpath = '' if len(urlparts) < 4 else urlparts[3]
if proto == 'http:':
port = 80
elif proto == 'https:':
port = 443
else:
raise OSError('Unsupported protocol: %s' % proto[:-1])
if ':' in host:
host, port = host.split(':')
port = int(port)
if json is not None:
content = ujson.dumps(json)
content_type = CONTENT_TYPE_JSON
else:
content = None
ai = usocket.getaddrinfo(host, port)
addr = ai[0][4]
sock = usocket.socket()
if timeout is not None:
assert SUPPORT_TIMEOUT, 'Socket does not support timeout'
sock.settimeout(timeout)
sock.connect(addr)
if proto == 'https:':
assert SUPPORT_SSL, 'HTTPS not supported: could not find ussl'
sock = ussl.wrap_socket(sock)
sock.write('%s /%s HTTP/1.0\r\nHost: %s\r\n' % (method, urlpath, host))
if headers is not None:
for header in headers.items():
sock.write('%s: %s\r\n' % header)
if content is not None:
sock.write('content-length: %s\r\n' % len(content))
sock.write('content-type: %s\r\n' % content_type)
sock.write('\r\n')
sock.write(content)
else:
sock.write('\r\n')
l = sock.readline()
protover, status, msg = l.split(None, 2)
# Skip headers
while sock.readline() != b'\r\n':
pass
return Response(int(status), sock)
def get(url, **kwargs):
return request('GET', url, **kwargs)
def post(url, **kwargs):
return request('POST', url, **kwargs)
if __name__ == '__main__':
#resp = get('http://192.168.0.7:8080/sample.txt')
resp = get('http://192.168.0.4:3000/status')
print('Response:', resp.text)
post('http://192.168.0.4:3000/post', json={ "value": 1 }).close()
time.sleep(5)
post('http://192.168.0.4:3000/post', json={ "value": 0 }).close()
#!/bin/sh
curl http://192.168.0.4:3000/post -d '{"value" : 0}' \
-H 'Content-Type:application/json'
echo
#!/bin/sh
curl http://192.168.0.4:3000/post -d '{"value" : 1}' \
-H 'Content-Type:application/json'
echo
from machine import Pin
import time
import httpclient as http
import network
station = network.WLAN(network.STA_IF)
class L : pass
p = L()
p.WIFI_CONNECTED = station.isconnected()
URL='http://192.168.0.4:3000/post'
switch = Pin(15, Pin.IN)
wifi_led = Pin(16, Pin.OUT)
alarm_led = Pin(14, Pin.OUT)
def set_state(value):
print('value:', value)
alarm_led.value(value)
if p.WIFI_CONNECTED:
http.post(URL, json={'value': value}).close()
def debounce(fn, p, interval=20):
def cb(*args, **kwargs):
count = 0
hits = 0
v = p.value()
while count < interval:
if p.value() == v:
hits = hits + 1
count = count + 1
time.sleep(0.001)
if hits == interval:
fn(v)
return cb
print('setting switch trigger...')
switch.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING,\
handler=debounce(set_state, switch, 20))
print('running infinitely...')
def do_wifi_check():
val = station.isconnected()
p.WIFI_CONNECTED = val
wifi_led.value(val)
while True:
do_wifi_check();
time.sleep(0.5)
# raspberry pi on board led controlling:
# https://raspberrypi.stackexchange.com/a/1504/74294
import os
from express import run, ExpressServerFactory as Factory
from BaseHTTPServer import HTTPServer
import json
class CBServer(HTTPServer):
def server_activate(self):
HTTPServer.server_activate(self)
self.RequestHandlerClass.post_start()
def server_close(self):
HTTPServer.server_close(self)
self.RequestHandlerClass.post_stop()
sfact = Factory()
def post_start(self):
print 'server is running...'
def process(self):
length = int(self.headers['Content-Length'])
data = self.rfile.read(length)
self.send_response(200, 'Ok')
self.wfile.write('\r\n')
self.wfile.write(data)
req = json.loads(data)
if type(req['value']) == int:
val = req['value']
if val:
brightness_on()
else:
brightness_off()
def stat(self):
self.send_response(200, 'Ok')
self.wfile.write('\r\n')
self.wfile.write('hello world')
sfact.post('/post', process)
sfact.get('/status', stat)
sclass = sfact.createClass()
def brightness_on():
os.system("echo 1 > /sys/class/leds/led0/brightness")
def brightness_off():
os.system("echo 0 > /sys/class/leds/led0/brightness")
class Handler(sclass):
@classmethod
def post_start(self):
print 'setting led trigger...'
os.system('echo none > /sys/class/leds/led0/trigger')
print 'done'
@classmethod
def post_stop(self):
print 'defaulting led trigger...'
os.system('echo mmc0 > /sys/class/leds/led0/trigger')
print 'done'
def on_stop(self):
print 'server closing...'
self.server_close()
print 'server closed'
if __name__ == '__main__':
import sys
if len(sys.argv) == 2:
p = int(sys.argv[1])
run(port=p, handler_class=Handler, server_class=CBServer, stop_handler=on_stop)
else:
run(handler_class=Handler, server_class=CBServer, stop_handler=on_stop)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment