Skip to content

Instantly share code, notes, and snippets.

@ethernetdan
Created October 17, 2018 21:16
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ethernetdan/548126751e793f4aa26f90cef54ad44e to your computer and use it in GitHub Desktop.
Save ethernetdan/548126751e793f4aa26f90cef54ad44e to your computer and use it in GitHub Desktop.
Application to open door with WebSockets
#!/usr/bin/env python
import asyncio
from contextlib import contextmanager
import datetime
import RPi.GPIO as GPIO
import http
import random
import signal
import socket
import threading
import websockets
SERVER_IP = socket.gethostbyname(socket.gethostname())
SERVER_PORT = 8080
PI_PIN = 8
def pi_init():
print("Setting up Pi hardware")
GPIO.setmode(GPIO.BOARD)
GPIO.setup(PI_PIN, GPIO.OUT)
def pi_door_unlock():
print("Unlocking door")
GPIO.output(PI_PIN, True)
def pi_door_lock():
print("Locking door")
GPIO.output(PI_PIN, False)
def render_page(ip, port):
return '''
<!DOCTYPE html>
<html>
<head>
<title>WebSocket demo</title>
</head>
<body>
<h1 id="status">Status</h1>
<script>
var open = true;
const ws = new WebSocket("ws://%s:%s/ws");
(function() {
var status = document.getElementById("status");
var ping = function () {
if (open) {
ws.send('Ping!');
status.innerHTML = "Door open!";
setTimeout(ping, 1000);
}
}
ws.addEventListener('open', function (event) {
ping();
});
document.addEventListener("pagehide", function (event) {
open = false;
ws.send('close');
status.innerHTML = "Closed";
});
})();
</script>
</body>
</html>
'''%(ip, port)
class DoorProto(websockets.WebSocketServerProtocol):
async def process_request(self, path, request_headers):
if path == '/':
print("Rendering page")
headers = ("Content-Type", "text/html; charset=utf-8")
page_contents = render_page(SERVER_IP, SERVER_PORT)
return http.HTTPStatus.OK, [headers], page_contents.encode()
async def time(websocket, path):
try:
while True:
with timeout(3):
msg = await websocket.recv()
if msg == "close":
pi_door_lock()
return
pi_door_unlock()
except:
# lock after session
pi_door_lock()
finally:
pi_door_lock()
@contextmanager
def timeout(time):
# Register a function to raise a TimeoutError on the signal.
signal.signal(signal.SIGALRM, raise_timeout)
# Schedule the signal to be sent after ``time``.
signal.alarm(time)
try:
yield
except TimeoutError:
pass
finally:
# Unregister the signal so it won't be triggered
# if the timeout is not reached.
signal.signal(signal.SIGALRM, signal.SIG_IGN)
def raise_timeout(signum, frame):
raise TimeoutError
# ensure door starts locked
pi_door_lock()
print(f"Listening on {SERVER_IP}:{SERVER_PORT}")
start_server = websockets.serve(time, SERVER_IP, SERVER_PORT, create_protocol=DoorProto, timeout=2)
asyncio.get_event_loop().run_until_complete(start_server)
while True:
try:
asyncio.get_event_loop().run_forever()
except TimeoutError:
pi_door_lock()
finally:
pi_door_lock()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment