Skip to content

Instantly share code, notes, and snippets.

@rpavlik
Created October 28, 2022 21:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rpavlik/041a2bef03765f35b50099d5b3c9b212 to your computer and use it in GitHub Desktop.
Save rpavlik/041a2bef03765f35b50099d5b3c9b212 to your computer and use it in GitHub Desktop.
creature eyes mqtt
# SPDX-FileCopyrightText: 2020 Phillip Burgess for Adafruit Industries
# SPDX-FileCopyrightText: 2021-2022 Ryan Pavlik <ryan@ryanpavlik.com>
#
# SPDX-License-Identifier: MIT
"""
RASTER EYES for Adafruit Matrix Portal: animated spooky eyes.
Updated by Ryan to hide the head fully and have MQTT control
"""
# pylint: disable=import-error
import math
import random
import time
import displayio
import adafruit_imageload
from adafruit_matrixportal.matrix import Matrix
import microcontroller
# import neopixel
from network import mqtt_client, status_light, root_topic
from interval import Interval
# TO LOAD DIFFERENT EYE DESIGNS: change the middle word here (between
# 'eyes.' and '.data') to one of the folder names inside the 'eyes' folder:
from eyes.werewolf.data import EYE_DATA
# from eyes.cyclops.data import EYE_DATA
# from eyes.kobold.data import EYE_DATA
# from eyes.adabot.data import EYE_DATA
# from eyes.skull.data import EYE_DATA
# UTILITY FUNCTIONS AND CLASSES --------------------------------------------
# pylint: disable=too-few-public-methods
class Sprite(displayio.TileGrid):
"""Single-tile-with-bitmap TileGrid subclass, adds a height element
because TileGrid doesn't appear to have a way to poll that later,
object still functions in a displayio.Group.
"""
def __init__(self, filename, transparent=None):
"""Create Sprite object from color-paletted BMP file, optionally
set one color to transparent (pass as RGB tuple or list to locate
nearest color, or integer to use a known specific color index).
"""
bitmap, palette = adafruit_imageload.load(
filename, bitmap=displayio.Bitmap, palette=displayio.Palette
)
assert palette
if isinstance(transparent, (tuple, list)): # Find closest RGB match
closest_distance = 0x1000000 # Force first match
for color_index, color in enumerate(palette): # Compare each...
delta = (
transparent[0] - ((color >> 16) & 0xFF),
transparent[1] - ((color >> 8) & 0xFF),
transparent[2] - (color & 0xFF),
)
rgb_distance = (
delta[0] * delta[0] + delta[1] * delta[1] + delta[2] * delta[2]
) # Actually dist^2
if rgb_distance < closest_distance: # but adequate for
closest_distance = rgb_distance # compare purposes,
closest_index = color_index # no sqrt needed
palette.make_transparent(closest_index)
elif isinstance(transparent, int):
palette.make_transparent(transparent)
super(Sprite, self).__init__(bitmap, pixel_shader=palette)
# ONE-TIME INITIALIZATION --------------------------------------------------
MATRIX = Matrix(bit_depth=6)
DISPLAY = MATRIX.display
# Order in which sprites are added determines the 'stacking order' and
# visual priority. Lower lid is added before the upper lid so that if they
# overlap, the upper lid is 'on top' (e.g. if it has eyelashes or such).
SPRITES = displayio.Group()
SPRITES.append(Sprite(EYE_DATA["eye_image"])) # Base image is opaque
SPRITES.append(Sprite(EYE_DATA["lower_lid_image"], EYE_DATA["transparent"]))
SPRITES.append(Sprite(EYE_DATA["upper_lid_image"], EYE_DATA["transparent"]))
SPRITES.append(Sprite(EYE_DATA["stencil_image"], EYE_DATA["transparent"]))
PARENT = displayio.Group()
BG = None
if "bg_image" in EYE_DATA:
BG = Sprite(EYE_DATA["bg_image"])
PARENT.append(BG)
PARENT.append(SPRITES)
DISPLAY.show(PARENT)
EYE_CENTER = (
(EYE_DATA["eye_move_min"][0] + EYE_DATA["eye_move_max"][0]) # Pixel coords of eye
/ 2, # image when centered
(EYE_DATA["eye_move_min"][1] + EYE_DATA["eye_move_max"][1]) # ('neutral' position)
/ 2,
)
EYE_RANGE = (
abs(
EYE_DATA["eye_move_max"][0]
- EYE_DATA["eye_move_min"][0] # Max eye image motion
)
/ 2, # delta from center
abs(EYE_DATA["eye_move_max"][1] - EYE_DATA["eye_move_min"][1]) / 2,
)
UPPER_LID_MIN = (
min(
EYE_DATA["upper_lid_open"][0], # Motion bounds of
EYE_DATA["upper_lid_closed"][0],
), # upper and lower
min(EYE_DATA["upper_lid_open"][1], EYE_DATA["upper_lid_closed"][1]), # eyelids
)
UPPER_LID_MAX = (
max(EYE_DATA["upper_lid_open"][0], EYE_DATA["upper_lid_closed"][0]),
max(EYE_DATA["upper_lid_open"][1], EYE_DATA["upper_lid_closed"][1]),
)
LOWER_LID_MIN = (
min(EYE_DATA["lower_lid_open"][0], EYE_DATA["lower_lid_closed"][0]),
min(EYE_DATA["lower_lid_open"][1], EYE_DATA["lower_lid_closed"][1]),
)
LOWER_LID_MAX = (
max(EYE_DATA["lower_lid_open"][0], EYE_DATA["lower_lid_closed"][0]),
max(EYE_DATA["lower_lid_open"][1], EYE_DATA["lower_lid_closed"][1]),
)
EYE_PREV = (0, 0)
EYE_NEXT = (0, 0)
class TimedStates:
"""
A state machine class, where there are some number of states,
a duration is set when entering a state, and the states just
follow one after another in a circle.
We can compute "how far into a state" we are with ratio().
"""
def __init__(self, now, first_duration, first_state, num_states):
"""
Construct the base TimedStates object.
now: Current timestamp
first_duration: How long we will stay in the initial state
first_state: The initial state number
num_states: The total number of states, must be >first_state
"""
self.time_of_last = now
"""The time when we entered the current state."""
self.duration = first_duration
"""The planned duration of this current state."""
self.state = first_state
"""The current state number"""
self.num_states = num_states
"""The total number of states."""
def update(self, now):
"""
Process time, checking for a state change, etc.
Can extend, but be sure to call the super implementation.
"""
if now - self.time_of_last > self.duration:
self.time_of_last = now
self.state = (self.state + 1) % self.num_states
self.handle_state_change(now, self.state)
def ratio(self, now):
"""Get the fraction of the current state that has elapsed"""
return (now - self.time_of_last) / self.duration
def reset(self, now, state, duration=None):
"""Reset time, state, and optionally duration."""
self.time_of_last = now
self.state = state
if duration is not None:
self.duration = duration
def handle_state_change(self, now, state):
"""Respond to a state change - must override."""
raise NotImplementedError("Must override handle_state_change")
class MoveStates(TimedStates):
"""State machine for handling periodic movement of the creature eye(s)."""
def __init__(self, now):
self.eye_next = (0, 0)
self.eye_prev = (0, 0)
super().__init__(now, random.uniform(0.1, 3), first_state=0, num_states=2)
def update(self, now):
"""Update state and return eye position tuple"""
super().update(now)
# Fraction of move elapsed (0.0 to 1.0), then ease in/out 3*e^2-2*e^3
ratio = self.ratio(now)
ratio = 3 * ratio * ratio - 2 * ratio * ratio * ratio
eye_pos = (
self.eye_prev[0] + ratio * (self.eye_next[0] - self.eye_prev[0]),
self.eye_prev[1] + ratio * (self.eye_next[1] - self.eye_prev[1]),
)
return eye_pos
def handle_state_change(self, now, state):
if state: # Starting a new move?
self.duration = random.uniform(0.08, 0.17) # Move time
angle = random.uniform(0, math.pi * 2)
self.eye_next = (
math.cos(angle) * EYE_RANGE[0], # (0,0) in center,
math.sin(angle) * EYE_RANGE[1],
) # NOT pixel coords
else: # Starting a new pause
self.duration = random.uniform(0.04, 3) # Hold time
self.eye_prev = self.eye_next
class BlinkStates(TimedStates):
"""State machine for handling periodic blinking of the creature eye(s)."""
def __init__(self, now):
super().__init__(now, random.uniform(0.25, 0.5), first_state=2, num_states=3)
def update(self, now):
super().update(now)
if self.state: # Currently in a blink?
# Fraction of closing or opening elapsed (0.0 to 1.0)
ratio = self.ratio(now)
if self.state == 2: # Opening
ratio = 1.0 - ratio # Flip ratio so eye opens instead of closes
else: # Not blinking
ratio = 0
return ratio
def handle_state_change(self, now, state):
if state == 1: # Starting a new blink (closing)
# print("closing")
self.duration = random.uniform(0.03, 0.07)
elif state == 2: # Starting de-blink (opening)
# print("opening")
self.duration *= 2
else: # Blink ended,
# print("blink ended")
self.duration = random.uniform(self.duration * 3, 4)
class HideStates(TimedStates):
"""Main state machine, for handling the hiding/revealing of the creature entirely"""
def __init__(self, now):
super().__init__(now, random.uniform(0.25, 0.5), first_state=2, num_states=4)
def update(self, now):
super().update(now)
if self.state in (1, 3): # hiding or returning?
ratio = self.ratio(now)
if self.state == 3: # hiding
ratio = 1.0 - ratio
elif self.state == 2:
# visible
ratio = 1
else:
# hidden
ratio = 0
print(f"Hide state {self.state} ratio {ratio}")
return ratio
@property
def hidden(self):
return self.state == 0
def reset(self, now, state, duration=None):
mqtt_client.publish(f"{root_topic}/hide_state", f"Reset hide state {state}, duration {duration}")
return super().reset(now, state, duration)
def handle_state_change(self, now, state):
if state == 1: # popping up
status_light.fill((0, 0, 255))
self.duration = random.uniform(0.1, 0.8)
elif state == 2: # staying around
self.duration *= 10
# let's randomize the stars a bit.
assert BG
BG.flip_x = random.choice((True, False))
BG.flip_y = random.choice((True, False))
elif state == 3:
self.duration = random.uniform(0.1, 1.2)
else:
status_light.fill((0, 255, 0))
self.duration = random.uniform(3, 90)
print(f"Hide state {state}, duration {self.duration}")
mqtt_client.publish(f"{root_topic}/hide_state", f"Hide state {state}, duration {self.duration}")
class FakeHideState:
"""The always-visible alternative to HideStates, for when we don't have a background we can hide using."""
def __init__(self) -> None:
self.hidden = False
def update(self, now):
return 1
def reset(self, now, state, duration=None):
pass
def handle_state_change(self, now, state):
pass
now = time.monotonic()
move_state = MoveStates(now)
blink_state = BlinkStates(now)
if BG:
hide_state = HideStates(now)
else:
hide_state = FakeHideState()
# MAIN LOOP ----------------------------------------------------------------
def clamp(v, minval, maxval):
return min(max(v, minval), maxval)
mqtt_client.will_set(f"{root_topic}/status", "DISCONNECTED", 0, True)
mqtt_client.connect()
mqtt_client.publish(f"{root_topic}/status", "CONNECTED", True)
def peekaboo(client, topic, msg):
print("peekaboo")
# Trigger a reveal that takes half a second
if hide_state.hidden:
now = time.monotonic()
hide_state.reset(now, 1, 0.5)
hide_state.handle_state_change(now, 1)
status_light.fill((0, 255, 255))
TOPIC = f"{root_topic}/peekaboo"
mqtt_client.subscribe(TOPIC, 0)
mqtt_client.add_topic_callback(TOPIC, peekaboo)
# mqtt_interval = Interval(0.5)
while True:
try:
# if mqtt_interval.poll():
now = time.monotonic()
# Update hide state and move/hide the creature as appropriate
visible_ratio = hide_state.update(now)
PARENT[-1].y = int(32 * (1.0 - visible_ratio))
PARENT[-1].hidden = hide_state.hidden
# Poll MQTT if we are hidden
if hide_state.hidden:
print("before poll")
mqtt_client.loop(1)
print("after poll")
# Shortcut the loop if we aren't at least a little visible
update_eyes = BG is None or not hide_state.hidden
if not update_eyes:
continue
# Eye movement ---------------------------------------------------------
eye_pos = move_state.update(now)
# Blinking -------------------------------------------------------------
blink_ratio = blink_state.update(now)
# Eyelid tracking ------------------------------------------------------
# Initial estimate of 'tracked' eyelid positions
UPPER_LID_POS = (
EYE_DATA["upper_lid_center"][0] + eye_pos[0],
EYE_DATA["upper_lid_center"][1] + eye_pos[1],
)
LOWER_LID_POS = (
EYE_DATA["lower_lid_center"][0] + eye_pos[0],
EYE_DATA["lower_lid_center"][1] + eye_pos[1],
)
# Then constrain these to the upper/lower lid motion bounds
UPPER_LID_POS = (
clamp(UPPER_LID_POS[0], UPPER_LID_MIN[0], UPPER_LID_MAX[0]),
clamp(UPPER_LID_POS[1], UPPER_LID_MIN[1], UPPER_LID_MAX[1]),
)
LOWER_LID_POS = (
clamp(LOWER_LID_POS[0], LOWER_LID_MIN[0], LOWER_LID_MAX[0]),
clamp(LOWER_LID_POS[1], LOWER_LID_MIN[1], LOWER_LID_MAX[1]),
)
# Then interpolate between bounded tracked position to closed position
UPPER_LID_POS = (
UPPER_LID_POS[0]
+ blink_ratio * (EYE_DATA["upper_lid_closed"][0] - UPPER_LID_POS[0]),
UPPER_LID_POS[1]
+ blink_ratio * (EYE_DATA["upper_lid_closed"][1] - UPPER_LID_POS[1]),
)
LOWER_LID_POS = (
LOWER_LID_POS[0]
+ blink_ratio * (EYE_DATA["lower_lid_closed"][0] - LOWER_LID_POS[0]),
LOWER_LID_POS[1]
+ blink_ratio * (EYE_DATA["lower_lid_closed"][1] - LOWER_LID_POS[1]),
)
# Move eye sprites -----------------------------------------------------
SPRITES[0].x, SPRITES[0].y = (
int(EYE_CENTER[0] + eye_pos[0] + 0.5),
int(EYE_CENTER[1] + eye_pos[1] + 0.5),
)
SPRITES[2].x, SPRITES[2].y = (
int(UPPER_LID_POS[0] + 0.5),
int(UPPER_LID_POS[1] + 0.5),
)
SPRITES[1].x, SPRITES[1].y = (
int(LOWER_LID_POS[0] + 0.5),
int(LOWER_LID_POS[1] + 0.5),
)
except:
microcontroller.reset()
""" Configuration data for the werewolf eyes """
EYE_PATH = __file__[: __file__.rfind("/") + 1]
EYE_DATA = {
"eye_image": EYE_PATH + "werewolf-eyes.bmp",
"upper_lid_image": EYE_PATH + "werewolf-upper-lids.bmp",
"lower_lid_image": EYE_PATH + "werewolf-lower-lids.bmp",
"stencil_image": EYE_PATH + "werewolf-stencil.bmp",
"bg_image": EYE_PATH + "stars.bmp", # ADDED full panel size bitmap for background
"transparent": (0, 255, 0), # Transparent color in above images
"eye_move_min": (-3, -5), # eye_image (left, top) move limit
"eye_move_max": (7, 6), # eye_image (right, bottom) move limit
"upper_lid_open": (7, -4), # upper_lid_image pos when open
"upper_lid_center": (7, -1), # " when eye centered
"upper_lid_closed": (7, 8), # " when closed
"lower_lid_open": (7, 22), # lower_lid_image pos when open
"lower_lid_center": (7, 21), # " when eye centered
"lower_lid_closed": (7, 17), # " when closed
}
import time
class Interval:
def __init__(self, interval: float):
self.interval = interval
self.next = time.monotonic() + interval
def poll(self):
now = time.monotonic()
if now > self.next:
self.next = now + self.interval
return True
return False
import adafruit_esp32spi.adafruit_esp32spi_socket as socket
import adafruit_minimqtt.adafruit_minimqtt as MQTT
from adafruit_esp32spi import adafruit_esp32spi, adafruit_esp32spi_wifimanager
import board
import busio
import neopixel
from digitalio import DigitalInOut
# Get wifi details and more from a secrets.py file
try:
from secrets import secrets
except ImportError:
print("WiFi secrets are kept in secrets.py, please add them there!")
raise
root_topic = secrets['root_topic']
# If you are using a board with pre-defined ESP32 Pins:
esp32_cs = DigitalInOut(board.ESP_CS)
esp32_ready = DigitalInOut(board.ESP_BUSY)
esp32_reset = DigitalInOut(board.ESP_RESET)
spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
if esp.status == adafruit_esp32spi.WL_IDLE_STATUS:
print("ESP32 found and in idle mode")
print("Firmware vers.", esp.firmware_version)
print("MAC addr:", [hex(i) for i in esp.MAC_address])
for ap in esp.scan_networks():
print("\t%s\t\tRSSI: %d" % (str(ap["ssid"], "utf-8"), ap["rssi"]))
print("Connecting to AP...")
status_light = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2)
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
print("Connecting to WiFi...")
wifi.connect()
print("Connected!")
# Initialize MQTT interface with the esp interface
MQTT.set_socket(socket, esp)
# Set up a MiniMQTT Client
mqtt_client = MQTT.MQTT(
broker=secrets["broker"],
# is_ssl=secrets["mqttssl"]
port=secrets["brokerPort"],
# log=True
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment