Skip to content

Instantly share code, notes, and snippets.

@sandyjmacdonald
Last active July 3, 2022 10:40
Show Gist options
  • Save sandyjmacdonald/d0cce358c18b257cda4de6b547681e38 to your computer and use it in GitHub Desktop.
Save sandyjmacdonald/d0cce358c18b257cda4de6b547681e38 to your computer and use it in GitHub Desktop.
An example of simple API to set the colour of the Pimoroni Inventor 2040's LEDs
import gc
from machine import Pin, PWM
from pimoroni_i2c import PimoroniI2C
from plasma import WS2812
from motor import Motor
from servo import Servo
from encoder import Encoder, MMME_CPR
# IO Pin Constants
GP0 = 0
GP1 = 1
GP2 = 2
A0 = 26
A1 = 27
A2 = 28
GPIOS = (GP0, GP1, GP2, A0, A1, A2)
ADCS = (A0, A1, A2)
# Index Constants
MOTOR_A = 0
MOTOR_B = 1
SERVO_1 = 0
SERVO_2 = 1
SERVO_3 = 2
SERVO_4 = 3
SERVO_5 = 4
SERVO_6 = 5
LED_GP0 = 0
LED_GP1 = 1
LED_GP2 = 2
LED_A0 = 3
LED_A1 = 4
LED_A2 = 5
LED_SERVO_1 = 6
LED_SERVO_2 = 7
LED_SERVO_3 = 8
LED_SERVO_4 = 9
LED_SERVO_5 = 10
LED_SERVO_6 = 11
# Count Constants
NUM_GPIOS = 6
NUM_ADCS = 3
NUM_MOTORS = 2
NUM_SERVOS = 6
NUM_LEDS = 12
class Inventor2040W():
AMP_EN_PIN = 3
I2C_SDA_PIN = 4
I2C_SCL_PIN = 5
MOTOR_A_PINS = (6, 7)
MOTOR_B_PINS = (8, 9)
ENCODER_A_PINS = (19, 18)
ENCODER_B_PINS = (17, 16)
SERVO_1_PIN = 10
SERVO_2_PIN = 11
SERVO_3_PIN = 12
SERVO_4_PIN = 13
SERVO_5_PIN = 14
SERVO_6_PIN = 15
LED_DATA_PIN = 20
PWM_AUDIO_PIN = 21
USER_SW_PIN = 22
AMP_CORRECTION = 4
DEFAULT_VOLUME = 0.2
def __init__(self, motor_gear_ratio=50, init_motors=True, init_servos=True):
# Free up hardware resources
gc.collect()
# Set up the motors and encoders, if the user wants them
self.motors = None
if init_motors:
cpr = MMME_CPR * motor_gear_ratio
self.motors = [Motor(self.MOTOR_A_PINS), Motor(self.MOTOR_B_PINS)]
# Set the encoders to use PIO 0 and State Machines 0 and 1
self.encoders = [Encoder(0, 0, self.ENCODER_A_PINS, counts_per_rev=cpr, count_microsteps=True),
Encoder(0, 1, self.ENCODER_B_PINS, counts_per_rev=cpr, count_microsteps=True)]
# Set up the servos, if the user wants them
self.servos = None
if init_servos:
self.servos = [Servo(i) for i in range(self.SERVO_1_PIN, self.SERVO_6_PIN + 1)]
# Set up the i2c for Qw/st and Breakout Garden
self.i2c = PimoroniI2C(self.I2C_SDA_PIN, self.I2C_SCL_PIN, 100000)
# Set up the amp enable
self.__amp_en = Pin(self.AMP_EN_PIN, Pin.OUT)
self.__amp_en.off()
self.audio_pwm = PWM(Pin(self.PWM_AUDIO_PIN))
self.__volume = self.DEFAULT_VOLUME
# Set up the user switch
self.__switch = Pin(self.USER_SW_PIN, Pin.IN, Pin.PULL_DOWN)
# Set up the WS2812 LEDs, using PIO 0 and State Machine 2
self.leds = WS2812(NUM_LEDS, 0, 2, self.LED_DATA_PIN)
self.leds.start()
def switch_pressed(self):
return self.__switch.value()
def play_tone(self, frequency):
try:
self.audio_pwm.freq(frequency)
except ValueError:
self.play_silence()
raise ValueError("frequency of range. Expected greater than 0")
corrected_volume = (self.__volume ** 4) # Correct for RC Filter curve
self.audio_pwm.duty_u16(int(32768 * corrected_volume))
self.unmute_audio()
def play_silence(self):
self.audio_pwm.freq(44100)
corrected_volume = (self.__volume ** 4) # Correct for RC Filter curve
self.audio_pwm.duty_u16(int(32768 * corrected_volume))
self.unmute_audio()
def stop_playing(self):
self.audio_pwm.duty_u16(0)
self.mute_audio()
def volume(self, volume=None):
if volume is None:
return self.__volume
if volume < 0.01 or volume > 1.0:
raise ValueError("volume out of range. Expected 0.0 to 1.0")
self.__volume = volume
def mute_audio(self):
self.__amp_en.off()
def unmute_audio(self):
self.__amp_en.on()
import sys
CRITICAL = 50
ERROR = 40
WARNING = 30
INFO = 20
DEBUG = 10
NOTSET = 0
_level_dict = {
CRITICAL: "CRIT",
ERROR: "ERROR",
WARNING: "WARN",
INFO: "INFO",
DEBUG: "DEBUG",
}
_stream = sys.stderr
class LogRecord:
def __init__(self):
self.__dict__ = {}
def __getattr__(self, key):
return self.__dict__[key]
class Handler:
def __init__(self):
pass
def setFormatter(self, fmtr):
pass
class Logger:
level = NOTSET
handlers = []
record = LogRecord()
def __init__(self, name):
self.name = name
def _level_str(self, level):
l = _level_dict.get(level)
if l is not None:
return l
return "LVL%s" % level
def setLevel(self, level):
self.level = level
def isEnabledFor(self, level):
return level >= (self.level or _level)
def log(self, level, msg, *args):
if self.isEnabledFor(level):
levelname = self._level_str(level)
if args:
msg = msg % args
if self.handlers:
d = self.record.__dict__
d["levelname"] = levelname
d["levelno"] = level
d["message"] = msg
d["name"] = self.name
for h in self.handlers:
h.emit(self.record)
else:
print(levelname, ":", self.name, ":", msg, sep="", file=_stream)
def debug(self, msg, *args):
self.log(DEBUG, msg, *args)
def info(self, msg, *args):
self.log(INFO, msg, *args)
def warning(self, msg, *args):
self.log(WARNING, msg, *args)
def error(self, msg, *args):
self.log(ERROR, msg, *args)
def critical(self, msg, *args):
self.log(CRITICAL, msg, *args)
def exc(self, e, msg, *args):
self.log(ERROR, msg, *args)
sys.print_exception(e, _stream)
def exception(self, msg, *args):
self.exc(sys.exc_info()[1], msg, *args)
def addHandler(self, hndlr):
self.handlers.append(hndlr)
_level = INFO
_loggers = {}
def getLogger(name="root"):
if name in _loggers:
return _loggers[name]
l = Logger(name)
_loggers[name] = l
return l
def info(msg, *args):
getLogger().info(msg, *args)
def debug(msg, *args):
getLogger().debug(msg, *args)
def basicConfig(level=INFO, filename=None, stream=None, format=None):
global _level, _stream
_level = level
if stream:
_stream = stream
if filename is not None:
print("logging.basicConfig: filename arg is not supported")
if format is not None:
print("logging.basicConfig: format arg is not supported")
import time
from inventor import Inventor2040W, NUM_LEDS
import WIFI_CONFIG
from network_manager import NetworkManager
from urllib import urequest
import uasyncio
import tinyweb
## Requires the tinyweb library: https://github.com/belyalov/tinyweb
## Remember to set your SSID and PSK in the WIFI_CONFIG file!
board = Inventor2040W()
WIFI_COUNTRY = "GB"
network_manager = NetworkManager(WIFI_COUNTRY)
hue = 0.0
sat = 0.0
val = 0.0
state = 0
class LEDs():
def get(self, data):
global hue, sat, val, state
return {"message": {"hue": hue, "sat": sat, "val": val, "state": state}}, 201
def post(self, data):
global hue, sat, val, state
if all(x in ("h", "s", "v", "state") for x in data.keys()):
hue = float(data["h"])
sat = float(data["s"])
val = float(data["v"])
state = int(data["state"])
set_leds(hue, sat, val, state)
return {'message': 'LEDs updated'}, 201
else:
return not_found()
def not_found():
return {'message': 'no values provided'}, 404
def set_leds(hue, sat, val, state):
if state:
for i in range(NUM_LEDS):
board.leds.set_hsv(i, hue, sat, val)
else:
board.leds.clear()
def update():
uasyncio.get_event_loop().run_until_complete(network_manager.client(WIFI_CONFIG.SSID, WIFI_CONFIG.PSK))
def run():
app = tinyweb.webserver()
app.add_resource(LEDs, '/leds')
app.run(host='0.0.0.0', port=8081)
board.leds.clear()
set_leds(hue, sat, val, state)
update()
run()
while True:
continue
import rp2
import network
import machine
import uasyncio
class NetworkManager:
_ifname = ("Client", "Access Point")
def __init__(self, country="GB", client_timeout=30, access_point_timeout=5, status_handler=None):
rp2.country(country)
self._ap_if = network.WLAN(network.AP_IF)
self._sta_if = network.WLAN(network.STA_IF)
self._mode = network.STA_IF
self._client_timeout = client_timeout
self._access_point_timeout = access_point_timeout
self._status_handler = status_handler
self.UID = ("{:02X}" * 8).format(*machine.unique_id())
def isconnected(self):
return self._sta_if.isconnected() or self._ap_if.isconnected()
def ifaddress(self):
if self._sta_if.isconnected():
return self._sta_if.ifconfig()[0]
if self._ap_if.isconnected():
return self._ap_if.ifconfig()[0]
return '0.0.0.0'
def disconnect(self):
if self._sta_if.isconnected():
self._sta_if.disconnect()
if self._ap_if.isconnected():
self._ap_if.disconnect()
async def wait(self, mode):
while not self.isconnected():
self._handle_status(mode, None)
await uasyncio.sleep_ms(1000)
def _handle_status(self, mode, status):
if self._status_handler is not None:
self._status_handler(self._ifname[mode], status, self.ifaddress())
async def client(self, ssid, psk):
if self._sta_if.isconnected():
return
self._ap_if.disconnect()
self._ap_if.active(False)
self._ap_if.deinit()
self._sta_if = network.WLAN(network.STA_IF)
self._sta_if.active(True)
self._sta_if.connect(ssid, psk)
try:
await uasyncio.wait_for(self.wait(network.STA_IF), self._client_timeout)
self._handle_status(network.STA_IF, True)
except uasyncio.TimeoutError:
self._sta_if.active(False)
self._handle_status(network.STA_IF, False)
raise RuntimeError("WIFI Client Failed")
async def access_point(self):
if self._ap_if.isconnected():
return
self._sta_if.disconnect()
self._sta_if.active(False)
self._sta_if.deinit()
self._ap_if = network.WLAN(network.AP_IF)
self._ap_if.active(True)
try:
await uasyncio.wait_for(self.wait(network.AP_IF), self._access_point_timeout)
self._handle_status(network.AP_IF, True)
except uasyncio.TimeoutError:
self._sta_if.active(False)
self._handle_status(network.AP_IF, False)
raise RuntimeError("WIFI AP Failed")
SSID = ""
PSK = ""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment