Skip to content

Instantly share code, notes, and snippets.

@emwdx
Created March 29, 2022 08:38
Show Gist options
  • Save emwdx/151d6b5ead1276208319f06066ac103b to your computer and use it in GitHub Desktop.
Save emwdx/151d6b5ead1276208319f06066ac103b to your computer and use it in GitHub Desktop.
This updates an AdafruitIO feed every five seconds with voltages on two pins.
# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
# SPDX-License-Identifier: MIT
import time
from random import randint
import ssl
import socketpool
import wifi
import board
import adafruit_minimqtt.adafruit_minimqtt as MQTT
from adafruit_io.adafruit_io import IO_MQTT
## Set up analog in
from analogio import AnalogIn
input_voltage_1 = AnalogIn(board.A0)
input_voltage_2 = AnalogIn(board.A5)
def get_voltage(pin):
return (pin.value * 3.3) / 65536
### WiFi ###
# Add a secrets.py to your filesystem that has a dictionary called secrets with "ssid" and
# "password" keys with your WiFi credentials. DO NOT share that file or commit it into Git or other
# source control.
# pylint: disable=no-name-in-module,wrong-import-order
try:
from secrets import secrets
except ImportError:
print("WiFi secrets are kept in secrets.py, please add them there!")
raise
# Set your Adafruit IO Username and Key in secrets.py
# (visit io.adafruit.com if you need to create an account,
# or if you need your Adafruit IO key.)
aio_username = secrets["aio_username"]
aio_key = secrets["aio_key"]
print("Connecting to %s" % secrets["ssid"])
wifi.radio.connect(secrets["ssid"], secrets["password"])
print("Connected to %s!" % secrets["ssid"])
# Define callback functions which will be called when certain events happen.
# pylint: disable=unused-argument
def connected(client):
# Connected function will be called when the client is connected to Adafruit IO.
# This is a good place to subscribe to feed changes. The client parameter
# passed to this function is the Adafruit IO MQTT client so you can make
# calls against it easily.
print("Connected to Adafruit IO! Listening for DemoFeed changes...")
# Subscribe to changes on a feed named DemoFeed.
client.subscribe("DemoFeed")
def subscribe(client, userdata, topic, granted_qos):
# This method is called when the client subscribes to a new feed.
print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
def unsubscribe(client, userdata, topic, pid):
# This method is called when the client unsubscribes from a feed.
print("Unsubscribed from {0} with PID {1}".format(topic, pid))
# pylint: disable=unused-argument
def disconnected(client):
# Disconnected function will be called when the client disconnects.
print("Disconnected from Adafruit IO!")
# pylint: disable=unused-argument
def message(client, feed_id, payload):
# Message function will be called when a subscribed feed has a new value.
# The feed_id parameter identifies the feed, and the payload parameter has
# the new value.
print("Feed {0} received new value: {1}".format(feed_id, payload))
# Create a socket pool
pool = socketpool.SocketPool(wifi.radio)
# Initialize a new MQTT Client object
mqtt_client = MQTT.MQTT(
broker="io.adafruit.com",
username=secrets["aio_username"],
password=secrets["aio_key"],
socket_pool=pool,
ssl_context=ssl.create_default_context(),
)
# Initialize an Adafruit IO MQTT Client
io = IO_MQTT(mqtt_client)
# Connect the callback methods defined above to Adafruit IO
io.on_connect = connected
io.on_disconnect = disconnected
io.on_subscribe = subscribe
io.on_unsubscribe = unsubscribe
io.on_message = message
# Connect to Adafruit IO
print("Connecting to Adafruit IO...")
io.connect()
# Below is an example of manually publishing a new value to Adafruit IO.
last = 0
print("Publishing a new message every 5 seconds...")
while True:
# Explicitly pump the message loop.
io.loop()
# Send a new message every 5 seconds.
if (time.monotonic() - last) >= 5:
input_value_1 = get_voltage(input_voltage_1)
input_value_2 = get_voltage(input_voltage_2)
print("Publishing {0} and {1} to solar-stuff feeds.".format(input_value_1,input_value_2))
try:
io.publish("solar-stuff-1", input_value_1)
io.publish("solar-stuff-2", input_value_2)
except(e):
print("reconnecting")
io.connect()
last = time.monotonic()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment