Skip to content

Instantly share code, notes, and snippets.

@efc
Last active Jun 28, 2022
Embed
What would you like to do?
Controller for lock based on Adafruit ESP32-S2 Feather
# Lock via wsgi server on Adafruit Feather ESP32S2
# SPDX-FileCopyrightText: 2021 Tenseg LLC
# SPDX-License-Identifier: MIT
# Save to CIRCUITPY as code.py
# Adafruit ESP32-S2 Feather pinouts...
# - (GND) to ground (and to LED resistor)
# - (A1) to red LED
# - (A2) to servo (TowerPro SG-5010) yellow data
# - (USB) to servo red power
# - (5) to reed switch
# - (6) to grey switch
# - (9) to white switch
# - (10) to pink switch
# - (11) to orange switch
# see https://learn.adafruit.com/adafruit-esp32-s2-feather/pinouts
import time
import ssl
import json
import microcontroller
import wifi
import socketpool
import adafruit_requests
import board
import pwmio
from adafruit_motor import servo
import wsgiserver as server
from adafruit_wsgi.wsgi_app import WSGIApp
import neopixel
import digitalio
print("Lock via wsgi server on ESP32S2")
# depends on secrets.py definitions
# NOTE: I have commented out the aio_user so that we
# effectively turn of AIO logging, which seems to have some
# sort of conflict with the HB listener. Not sure why?
try:
from secrets import secrets
except ImportError:
print(
"""
Please create a secrets.py file with content like this...
secrets = {
'ssid': 'SSID',
'password': 'PASSWORD',
'homebridge_server': 'http://127.0.0.1',
'homebridge_auth': 'Basic BASE64_ENCODED_AUTH',
'homebridge_listener_port': '8282',
'homebridge_buttons_port': '5000',
# 'aio_user': 'ADAFRUIT USERNAME',
'aio_feed': 'ADAFRUIT FEED KEY',
'aio_key': 'ADAFRUIT SECRET KEY'
}
"""
)
raise
###########################################################
# TIMING
###########################################################
# reboot every six hours
REBOOT_TIME = time.monotonic() + (60 * 60 * 6)
def check_for_restart():
"""Checks the time passed and reboots at the appropriate time."""
now = time.monotonic()
if now > REBOOT_TIME:
microcontroller.reset()
_autolock_delay = 60 * 5 # 5 minutes, may be overridden by /unlock
_autolock_timer_start = 0 # set each time we start an autolock timer
def set_autolock_delay(seconds=60 * 5):
"""Change the seconds we wait for an autolock to take effect."""
global _autolock_delay
_autolock_delay = seconds
def set_autolock_timer(caller=""):
"""Start the autolock countdown."""
global _autolock_timer_start
_autolock_timer_start = time.monotonic()
log(f"Autolock timer started at {int(_autolock_timer_start)} by {caller}")
def autolock_delay_is_expired():
"""Returns true when the autolock has expired."""
now = time.monotonic()
return _autolock_timer_start < now - _autolock_delay
###########################################################
# GOALS
###########################################################
_goal = '' # only lock, autolock, and unlock are significant
def set_goal(target="", caller=""):
"""
Set the goal.
Only lock, autolock, and unlock are significant.
Any other value means there is no current goal.
"""
global _goal
log(f"Goal set to {target} by {caller}")
_goal = target
def get_goal():
"""Return the value of the current goal."""
global _goal
return _goal
def set_goal_toggle(caller=""):
"""
Toggle the goal between lock and unlock
based on the current state of the door lock servo.
Returns "locking" or "unlocking".
"""
if caller != "":
caller += " "
caller += "toggle"
if door_is_locked():
set_goal("unlock", caller)
return "unlocking"
else:
set_goal("lock", caller)
return "locking"
def enact_goal():
"""Take an action based on our goal as long as the door is closed."""
goal = get_goal()
if 'lock' is goal or ('autolock' is goal and autolock_delay_is_expired()):
lock_door()
if 'unlock' is goal:
unlock_door()
###########################################################
# NETWORKING & WIFI
###########################################################
print("Connecting WiFi")
wifi.radio.connect(secrets['ssid'], secrets['password'])
HOST = repr(wifi.radio.ipv4_address)
PORT = 80
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
def tell_homebridge_listener(path, queries={}):
# Send a message to the homebridge listener and return the response
url = f"{secrets['homebridge_server']}:{secrets['homebridge_listener_port']}/{path}?"
for key, value in queries.items():
url = f'{url}{key}="{value}"&'
url = url[:-1] # remove the final character from the URL
response_text = ''
try:
response = requests.get(url)
response_text = response.text
log(f"Sent {url} Response '{response_text}'")
except Exception as e:
log(f"While attempting to send {url}")
template = "Exception {0}:\n{1!r}"
message = template.format(type(e).__name__, e.args)
log(message)
return response_text
def token_is_valid(token):
# Check that the incomming token is the same as that stored by homebridge
response_text = tell_homebridge_listener('validate', {'token': token})
return response_text == 'valid'
def unauthorized(caller=""):
log(f"Unauthorized {caller}")
return ("401 Unauthorized", [], "401 Unauthorized")
def homebridge_get(path):
# Pokes Homebridge API URLs
# note that homebridge_auth is already base64 encoded
try:
response = requests.get(
secrets['homebridge_server'] + ':' +
secrets['homebridge_buttons_port'] + '/' + path,
headers={'Authorization': secrets['homebridge_auth']}
)
return response
except Exception as e:
log(f'While attempting homebridge path {path}')
template = "Exception {0}:\n{1!r}"
message = template.format(type(e).__name__, e.args)
log(message)
return None
###########################################################
# LOGGING (NOTE: this is currently broken)
###########################################################
_log = "" # accumulates the text of this log entry
_last_log_time = 0 # facilitates accumulation of the log entries and delay of reporting
def log(message):
"""Add new line with this message to the log entry."""
global _log, _last_log_time
print(message)
if _log:
_log += "\n"
_last_log_time = time.monotonic()
_log += message
def write_logs():
"""
Write log entry after a brief delay.
The delay allows us to accumulate log messages
so that we make fewer calls to the logging API.
"""
global _log, _last_log_time
log_delay = time.monotonic() - 10
if _log and log_delay > _last_log_time:
adafruit_io_log(_log)
_log = ""
def adafruit_io_log(message):
"""
Push a log entry to the Adafruit IO API.
NOTE: this is currently not working, it causes conflicts with the Homebridge API.
For now it has been disabled by commenting out the aio_user secret.
"""
if 'aio_user' in secrets and 'aio_feed' in secrets and 'aio_key' in secrets:
try:
response = requests.post(
url=f"https://io.adafruit.com/api/v2/{secrets['aio_user']}/feeds/{secrets['aio_feed']}/data",
headers={
"Content-Type": "application/json",
"X-Aio-Key": secrets['aio_key'],
},
data=json.dumps({
"value": message,
})
)
return response
except Exception as e:
print(f'While attempting AIO log')
template = "Exception {0}:\n{1!r}"
message = template.format(type(e).__name__, e.args)
print(message)
return None
###########################################################
# INDICATORS & SENSORS
###########################################################
# Set up LED
LED = digitalio.DigitalInOut(board.A1)
LED.direction = digitalio.Direction.OUTPUT
# Set up neopixel
PIXEL = neopixel.NeoPixel(
board.NEOPIXEL, 1, brightness=0.008
)
BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
LOCKED_COLOR = RED = (255, 0, 0) # locked
UNLOCKED_COLOR = GREEN = (0, 255, 0) # unlocked
OPENED_COLOR = BLUE = (0, 0, 255) # door just opened
ERROR_COLOR = YELLOW = (255, 255, 0)
ORANGE = (255, 150, 0) # door just closed
CLOSED_COLOR = PURPLE = (255, 0, 255) # error
# Set up the servo assuming signal on pin A2
PWM = pwmio.PWMOut(board.A2, duty_cycle=2 ** 15, frequency=50)
MOTOR = servo.Servo(PWM)
LOCKED_ANGLE = 160
UNLOCKED_ANGLE = 0
SERVO_CHANGE_DELAY = 0.2
class ToggelingPin:
"""Simplify setup of pins for simple toggling switch sensors."""
def __init__(self, pin, name='', on_true=None, on_false=None, true_color=None, false_color=None):
"""Save our attributes to self and setup digital I/O."""
self.pin = pin
self.name = name if name else f'{self.pin}'
self.on_true = on_true
self.on_false = on_false
self.true_color = true_color
self.false_color = false_color
self.sensor = digitalio.DigitalInOut(pin)
self.sensor.direction = digitalio.Direction.INPUT
self.sensor.pull = digitalio.Pull.UP
self.state = True
def check_state(self):
"""
Check and return the true/false state of the pin.
If the state has changed, inform our callback functions.
Note that true for these pins is an open circuit, false is closed.
"""
state = self.sensor.value
if state != self.state:
self.state = state
if state:
if self.true_color:
PIXEL.fill(self.true_color)
if self.on_true:
self.on_true(self)
else:
log(f"{time.monotonic():.1f}: {self.pin} true")
else:
if self.false_color:
PIXEL.fill(self.false_color)
if self.on_false:
self.on_false(self)
else:
log(f"{time.monotonic():.1f}: {self.pin} false")
return state
def value(self):
"""
Convenience function to return these current sensor value.
This reports the value without checking agains old state.
"""
return self.sensor.value
_sensors = {} # references to sensors, buttons, and their handlers
def setup_sensors():
"""Adds entries for each sensor and button."""
_sensors['door'] = ToggelingPin(
board.D5,
'Door',
door_closed,
door_opened
)
_sensors['white'] = ToggelingPin(
board.D9,
'White Key',
button_released,
white_button_pressed,
BLACK,
WHITE
)
_sensors['gray'] = ToggelingPin(
board.D6,
'Gray Key',
button_released,
gray_button_pressed,
BLACK,
BLUE
)
_sensors['pink'] = ToggelingPin(
board.D10,
'Pink Key', # red
button_released,
pink_button_pressed,
BLACK,
RED
)
_sensors['orange'] = ToggelingPin(
board.D11,
'Toggle Lock Key',
button_released,
orange_button_pressed,
BLACK,
ORANGE
)
def check_sensors():
"""Called from the event loop."""
for sensor in _sensors.values():
sensor.check_state()
def button_pressed(pin):
"""Generic callback for a pressed button."""
log(f"{time.monotonic():.1f}: {pin.name} pressed")
def orange_button_pressed(pin):
"""This is actually the green "toggle door" button."""
button_pressed(pin)
set_goal_toggle("orange_button_pressed")
def pink_button_pressed(pin):
"""This is actually the red "unlock garage" button."""
button_pressed(pin)
response = homebridge_get("?accessoryId=Kitchen.Pink&state=true")
def white_button_pressed(pin):
"""This is the white "garage light" button."""
button_pressed(pin)
response = homebridge_get("?accessoryId=Kitchen.White&state=true")
def gray_button_pressed(pin):
"""This is the black "lock everything" button."""
button_pressed(pin)
response = homebridge_get("?accessoryId=Kitchen.Gray&state=true")
def button_released(pin):
"""Generic callback for a relesed button."""
log(f"{time.monotonic():.1f}: {pin.name} released")
def door_opened(pin):
"""The reed switch indicates the door was opened."""
if door_is_locked():
log(f"{time.monotonic():.1f}: {pin.name} error, door opened while locked")
else:
log(f"{time.monotonic():.1f}: {pin.name} opened")
def door_closed(pin):
"""
The reed switch indicates the door was closed.
Closing the door resets the autolock timer.
"""
if door_is_locked():
log(f"{time.monotonic():.1f}: {pin.name} error, door closed while locked")
else:
set_autolock_timer('door_closed')
log(f"{time.monotonic():.1f}: {pin.name} closed")
def door_is_open():
return not _sensors['door'].value()
def door_is_locked():
return 600 > MOTOR.angle > LOCKED_ANGLE / 2
###########################################################
# ACTIONS
###########################################################
def lock_door():
"""Attempt to lock the door as long as it is closed."""
if door_is_open():
log(f"{time.monotonic():.1f}: error, locking not attempted while door open ({MOTOR.angle:.0f})")
tell_homebridge_listener("unlocked")
set_goal('none, tried to lock while door was open', 'lock_door')
else:
MOTOR.angle = LOCKED_ANGLE
time.sleep(SERVO_CHANGE_DELAY)
log(f"{time.monotonic():.1f}: Locking successful ({MOTOR.angle:.0f})")
tell_homebridge_listener("locked")
set_goal('none, successfully locked', 'lock_door')
def unlock_door():
"""Attempt to unlock the door as long as it is closed."""
if door_is_open():
log(f"{time.monotonic():.1f}: error, unlocking not attempted while door open ({MOTOR.angle:.0f})")
tell_homebridge_listener("unlocked")
set_goal('none, tried to unlock while already open', 'unlock_door')
else:
MOTOR.angle = UNLOCKED_ANGLE
time.sleep(SERVO_CHANGE_DELAY)
log(f"{time.monotonic():.1f}: Unlocking successful ({MOTOR.angle:.0f})")
tell_homebridge_listener("unlocked")
# we set the goal to lock so that the autolock will happen
set_goal('autolock', 'unlock_door')
set_autolock_timer('door ulocked')
###########################################################
# WEB SERVER
###########################################################
WEB_APP = WSGIApp() # creates a web app to act as our server
print(f"try: http://{HOST}:{PORT}/lock")
@WEB_APP.route("/lock")
def lock_route(request): # pylint: disable=unused-argument
if token_is_valid(request.query_params.get("token")):
set_goal('lock', 'api/lock')
return ("200 OK", [], "Securing lock")
else:
return unauthorized('/lock')
print(f"try: http://{HOST}:{PORT}/unlock")
@WEB_APP.route("/unlock")
def unlock_route(request): # pylint: disable=unused-argument
if token_is_valid(request.query_params.get("token")):
try:
set_autolock_delay(int(request.query_params.get("auto")))
except Exception:
pass # it is ok for there to be no auto query
set_goal('unlock', 'api/unlock')
return ("200 OK", [], "Unsecuring lock")
else:
return unauthorized('/unlock')
print(f"try: http://{HOST}:{PORT}/toggle")
@WEB_APP.route("/toggle")
def toggle_route(request): # pylint: disable=unused-argument
if token_is_valid(request.query_params.get("token")):
set_goal_toggle("api/toggle")
return ("200 OK", [], f"{'Securing' if ('lock' is get_goal()) else 'Unsecuring'} lock")
else:
return unauthorized('/toggle')
print(f"try: http://{HOST}:{PORT}/currentstatus")
@WEB_APP.route("/currentstatus")
def servo_status(request): # pylint: disable=unused-argument
if token_is_valid(request.query_params.get("token")):
status = "unlocked"
if door_is_locked():
status = "locked"
return ("200 OK", [], status)
else:
return unauthorized('/currentstatus')
print(f"try: http://{HOST}:{PORT}/currentgoal")
@WEB_APP.route("/currentgoal")
def servo_goal(request): # pylint: disable=unused-argument
if token_is_valid(request.query_params.get("token")):
return ("200 OK", [], get_goal())
else:
return unauthorized('/currentgoal')
print(f"try: http://{HOST}:{PORT}/status")
@WEB_APP.route("/status")
def weblock_status(request): # pylint: disable=unused-argument
if token_is_valid(request.query_params.get("token")):
current = 1 if door_is_locked() else 0
target = current
if 'lock' is get_goal():
target = 1
if 'unlock' is get_goal():
target = 0
json = f'{{"target": {target}, "current": {current}}}'
return ("200 OK", [], json)
else:
return unauthorized('/status')
print(f"try: http://{HOST}:{PORT}/angle")
@WEB_APP.route("/angle")
def servo_angle(request): # pylint: disable=unused-argument
if token_is_valid(request.query_params.get("token")):
return ("200 OK", [], f'{MOTOR.angle:.0f}')
else:
return unauthorized('/angle')
# Set up our server, passing in our web_app as the application
# using the wsgi server made for the ESP32
WEB_SERVER = server.WSGIServer(80, application=WEB_APP)
log(f"Listening at: http://{HOST}:{PORT}/")
###########################################################
# EVENT LOOP
###########################################################
log("Lock Restarting")
set_goal('lock', 'startup')
# We start with a locked door, just in case
setup_sensors()
# Start the web server
WEB_SERVER.start()
while True:
# our main loop where we have the server poll for incoming requests
WEB_SERVER.update_poll()
# check the state of our sensors
check_sensors()
# take action based on the goals set by API and sensors
enact_goal()
# write stored logs
write_logs()
# flash the LED if the door is locked
if door_is_locked():
LED.value = int(time.monotonic()) % 4 is not 0
PIXEL.fill(LOCKED_COLOR)
else:
LED.value = False
if door_is_open():
PIXEL.fill(OPENED_COLOR)
else:
PIXEL.fill(UNLOCKED_COLOR)
# reboot every now and then
check_for_restart()
@efc
Copy link
Author

efc commented Jan 24, 2022

See http://eric.clst.org/tech/smoothlock/ for an example of how this code was used.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment