Created
October 17, 2018 21:16
-
-
Save ethernetdan/548126751e793f4aa26f90cef54ad44e to your computer and use it in GitHub Desktop.
Application to open door with WebSockets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import asyncio | |
from contextlib import contextmanager | |
import datetime | |
import RPi.GPIO as GPIO | |
import http | |
import random | |
import signal | |
import socket | |
import threading | |
import websockets | |
SERVER_IP = socket.gethostbyname(socket.gethostname()) | |
SERVER_PORT = 8080 | |
PI_PIN = 8 | |
def pi_init(): | |
print("Setting up Pi hardware") | |
GPIO.setmode(GPIO.BOARD) | |
GPIO.setup(PI_PIN, GPIO.OUT) | |
def pi_door_unlock(): | |
print("Unlocking door") | |
GPIO.output(PI_PIN, True) | |
def pi_door_lock(): | |
print("Locking door") | |
GPIO.output(PI_PIN, False) | |
def render_page(ip, port): | |
return ''' | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>WebSocket demo</title> | |
</head> | |
<body> | |
<h1 id="status">Status</h1> | |
<script> | |
var open = true; | |
const ws = new WebSocket("ws://%s:%s/ws"); | |
(function() { | |
var status = document.getElementById("status"); | |
var ping = function () { | |
if (open) { | |
ws.send('Ping!'); | |
status.innerHTML = "Door open!"; | |
setTimeout(ping, 1000); | |
} | |
} | |
ws.addEventListener('open', function (event) { | |
ping(); | |
}); | |
document.addEventListener("pagehide", function (event) { | |
open = false; | |
ws.send('close'); | |
status.innerHTML = "Closed"; | |
}); | |
})(); | |
</script> | |
</body> | |
</html> | |
'''%(ip, port) | |
class DoorProto(websockets.WebSocketServerProtocol): | |
async def process_request(self, path, request_headers): | |
if path == '/': | |
print("Rendering page") | |
headers = ("Content-Type", "text/html; charset=utf-8") | |
page_contents = render_page(SERVER_IP, SERVER_PORT) | |
return http.HTTPStatus.OK, [headers], page_contents.encode() | |
async def time(websocket, path): | |
try: | |
while True: | |
with timeout(3): | |
msg = await websocket.recv() | |
if msg == "close": | |
pi_door_lock() | |
return | |
pi_door_unlock() | |
except: | |
# lock after session | |
pi_door_lock() | |
finally: | |
pi_door_lock() | |
@contextmanager | |
def timeout(time): | |
# Register a function to raise a TimeoutError on the signal. | |
signal.signal(signal.SIGALRM, raise_timeout) | |
# Schedule the signal to be sent after ``time``. | |
signal.alarm(time) | |
try: | |
yield | |
except TimeoutError: | |
pass | |
finally: | |
# Unregister the signal so it won't be triggered | |
# if the timeout is not reached. | |
signal.signal(signal.SIGALRM, signal.SIG_IGN) | |
def raise_timeout(signum, frame): | |
raise TimeoutError | |
# ensure door starts locked | |
pi_door_lock() | |
print(f"Listening on {SERVER_IP}:{SERVER_PORT}") | |
start_server = websockets.serve(time, SERVER_IP, SERVER_PORT, create_protocol=DoorProto, timeout=2) | |
asyncio.get_event_loop().run_until_complete(start_server) | |
while True: | |
try: | |
asyncio.get_event_loop().run_forever() | |
except TimeoutError: | |
pi_door_lock() | |
finally: | |
pi_door_lock() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment