|
# SPDX-FileCopyrightText: 2024 Paulus @PaulskPt on Github |
|
# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries |
|
# SPDX-FileCopyrightText: 2023 Kattni Rembor for Adafruit Industries |
|
# SPDX-License-Identifier: MIT |
|
""" |
|
This is a Python script to test: |
|
A Raspberry Pi 5B-8GB single board computer with, via I2C, connected: |
|
a) an Adafruit Mini I2C Gamepad with seesaw - STEMMA QT / Qwiic (https://www.adafruit.com/product/5743); |
|
b) an Adafruit AHT20 Temperatur & Humidity Sensor Breakout Board - STEMMA QT / Qwiic (https://www.adafruit.com/product/4566). |
|
Added functionality to save data to two separate log files: |
|
1) gamepadqt.log for some system info and data from the Gamepad QT; |
|
2) aht20.log for some system info and data from the AHT20 sensor. |
|
This script has been successfully tested on a Raspberry Pi 5B-8GB |
|
within a virtual enviroment (env) from within an own project directory (/home/<user>/projects/rpi5_tmp_hum/). |
|
This script needs the Adafruit-Blinka repo to be installed on to Raspberry Pi 5B-8GB. |
|
It also uses the Adafruit-Python-extended-Bus repo to make I2C communication possible. |
|
The following lines need to be present in the file: /boot/firmware/config.txt: |
|
dtparam=i2c_arm=on |
|
dtoverlay=i2c-gpio,bus=3,i2c_gpio_delay_us=1,i2c_gpio_sda=17,i2c_gpio_scl=27 |
|
Want more? See my repos on Github.com/PaulskPt and my gists on Github.com/Gist/PaulskPt. |
|
""" |
|
import time as _time |
|
from datetime import datetime, timedelta |
|
import os |
|
import sys |
|
import gc |
|
import board |
|
from micropython import const |
|
from lib.adafruit_ahtx0 import AHTx0 |
|
from adafruit_seesaw.seesaw import Seesaw |
|
from adafruit_extended_bus import ExtendedI2C as I2C |
|
import microcontroller |
|
import logging as logging # see: https://docs.python.org/3/howto/logging-cookbook.html#logging-cookbook |
|
import tomllib # see: https://docs.python.org/3/library/tomllib.html |
|
from urllib.request import urlopen |
|
import urllib.error |
|
|
|
with open("settings.toml", "rb") as f: |
|
data = tomllib.load(f) |
|
if f: |
|
f.close() |
|
del f |
|
|
|
class State: |
|
def __init__(self, saved_state_json=None): |
|
self.board_id = None |
|
self.USE_TAG = None |
|
self.my_debug = False |
|
self.seesaw = None |
|
self.tempSensor = None |
|
self.tmp_cl = None |
|
self.hum_cl = None |
|
self.msg1_shown = False |
|
self.last_x = 0 |
|
self.last_y = 0 |
|
self.qt_btns_present = False |
|
self.aht20_present = False |
|
self.AHT20_LOGFILE = "" |
|
self.GAMEPAD_LOGFILE = "" |
|
self.AHT20_LOGGER_NAME = "" |
|
self.GAMEPAD_LOGGER_NAME = "" |
|
self.LOG_QT = None |
|
self.LOG_AHT = None |
|
self.PUSHING_BOX_DEVID = "" |
|
self.stat_result = "" |
|
self.msg_sent = False |
|
self.msg_nr_sent = 0 |
|
self.get_INT_RTC = True |
|
self.SYS_RTC_is_set = False |
|
self.curr_timestamp = None |
|
self.curr_tm = None # struct_time |
|
self.INTERVAL_SECONDS = 3600 # 10 Minutes |
|
self.RTCtpl = None |
|
self.RTCtpl_DOW = DOW = \ |
|
{ |
|
0: "Monday", |
|
1: "Tuesday", |
|
2: "Wednesday", |
|
3: "Thursday", |
|
4: "Friday", |
|
5: "Saturday", |
|
6: "Sunday" |
|
} |
|
self.RTCtpl_MONTH = \ |
|
{ |
|
0: "Dummy", |
|
1: "January", |
|
2: "February", |
|
3: "March", |
|
4: "April", |
|
5: "May", |
|
6: "June", |
|
7: "July", |
|
8: "August", |
|
9: "September", |
|
10: "October", |
|
11: "November", |
|
12: "December" |
|
} |
|
|
|
self.BUTTON_X = const(6) |
|
self.BUTTON_Y = const(2) |
|
self.BUTTON_A = const(5) |
|
self.BUTTON_B = const(1) |
|
self.BUTTON_SELECT = const(0) |
|
self.BUTTON_START = const(16) |
|
self.button_mask = const( |
|
(1 << self.BUTTON_X) |
|
| (1 << self.BUTTON_Y) |
|
| (1 << self.BUTTON_A) |
|
| (1 << self.BUTTON_B) |
|
| (1 << self.BUTTON_SELECT) |
|
| (1 << self.BUTTON_START) |
|
) |
|
|
|
state = State() |
|
|
|
state.my_debug = True if int(data["MY_DEBUG"]) else False |
|
state.USE_TAG = True if int(data["USE_TAG"]) else False |
|
state.LOG_QT = True if int(data["LOG_QT"]) else False |
|
state.LOG_AHT = True if int(data["LOG_AHT"]) else False |
|
state.INTERVAL_SECONDS = int(data["INTERVAL_SECONDS"]) |
|
state.PUSHING_BOX_DEVID = data["PUSHING_BOX_DEVID"] |
|
state.AHT20_LOGFILE = data["AHT20_LOGFILE"] |
|
state.GAMEPAD_LOGFILE = data["GAMEPAD_LOGFILE"] |
|
state.AHT20_LOGGER_NAME = data["AHT20_LOGGER_NAME"] |
|
state.GAMEPAD_LOGGER_NAME = data["GAMEPAD_LOGGER_NAME"] |
|
|
|
del data |
|
|
|
logfile_aht = state.AHT20_LOGFILE |
|
logfile_qt = state.GAMEPAD_LOGFILE |
|
logging_level = logging.DEBUG |
|
|
|
logger_aht = logging.getLogger(state.AHT20_LOGGER_NAME) # .addHandler(logging.NullHandler()) |
|
logger_qt = logging.getLogger(state.GAMEPAD_LOGGER_NAME) # .addHandler(logging.NullHandler()) |
|
logger_aht.setLevel(logging_level) |
|
logger_qt.setLevel(logging_level) |
|
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
|
formatter = logging.Formatter(format, datefmt="%Y-%m-%d %H:%M:%S") |
|
|
|
file_handler_aht = logging.FileHandler(logfile_aht) |
|
file_handler_aht.setFormatter(formatter) |
|
file_handler_qt = logging.FileHandler(logfile_qt) |
|
file_handler_qt.setFormatter(formatter) |
|
|
|
#console = logging.StreamHandler() |
|
#console.setFormatter(formatter) |
|
|
|
if state.LOG_QT: |
|
logger_qt.addHandler(file_handler_qt) |
|
else: |
|
logger_qt.addHandler(logging.NullHandler()) # Don't log |
|
|
|
#logger_qt.addHandler(console) |
|
if state.LOG_AHT: |
|
logger_aht.addHandler(file_handler_aht) |
|
else: |
|
logger_aht.addHandler(logging.NullHandler()) # Don't log |
|
#logger_aht.addHandler(console) |
|
|
|
del logfile_qt |
|
del logfile_aht |
|
del logging_level |
|
del format |
|
del formatter |
|
del file_handler_qt |
|
del file_handler_aht |
|
|
|
class sensor_tmp: |
|
def __init__(self, tmp, dt): |
|
self._tmp = tmp |
|
self._tmp_old = 0.00 |
|
self._dt = dt |
|
self._dt_old = None |
|
|
|
@property |
|
def tmp(self): |
|
return self._tmp |
|
|
|
@property |
|
def tmp_old(self): |
|
return self._tmp_old |
|
|
|
@tmp.setter |
|
def tmp(self, tmp): |
|
if isinstance(tmp, float): |
|
self._tmp = tmp |
|
elif isinstance(tmp, int): |
|
self._tmp = float(tmp) |
|
|
|
@tmp_old.setter |
|
def tmp_old(self, tmp): |
|
if isinstance(tmp, float): |
|
self._tmp_old = tmp |
|
elif isinstance(tmp, int): |
|
self._tmp_old = float(tmp) |
|
|
|
@property |
|
def last_upd(self): |
|
return self._dt |
|
|
|
@last_upd.setter |
|
def last_upd(self, dt): |
|
if isinstance(dt, str): |
|
self._dt = dt |
|
|
|
@property |
|
def last_upd_old(self): |
|
return self._dt_old |
|
|
|
@last_upd_old.setter |
|
def last_upd_old(self, dt): |
|
if isinstance(dt, str): |
|
self._dt_old = dt |
|
|
|
class sensor_hum: |
|
def __init__(self, hum, dt): |
|
self._hum = hum |
|
self._hum_old = 0.00 |
|
self._dt = dt |
|
|
|
@property |
|
def hum(self): |
|
return self._hum |
|
|
|
@property |
|
def hum_old(self): |
|
return self._hum_old |
|
|
|
@hum.setter |
|
def hum(self, hum): |
|
if isinstance(hum, float): |
|
self._hum = hum |
|
elif isinstance(hum, int): |
|
self._hum = float(hum) |
|
|
|
@hum_old.setter |
|
def hum_old(self, hum): |
|
if isinstance(hum, float): |
|
self._hum_old = hum |
|
elif isinstance(hum, int): |
|
self._hum_old = float(hum) |
|
|
|
@property |
|
def last_upd(self): |
|
return self._dt |
|
|
|
@last_upd.setter |
|
def last_upd(self, dt): |
|
if isinstance(dt, str): |
|
self._dt = dt |
|
|
|
|
|
def get_dt() -> str: |
|
now = datetime.now() |
|
dt = now.timetuple() |
|
dt1 = "{:s}, {:s} {:02d}, {:02d},".format(state.RTCtpl_DOW[dt.tm_wday], state.RTCtpl_MONTH[dt.tm_mon], dt.tm_mday, dt.tm_year) |
|
if dt.tm_hour < 12: |
|
hh = dt.tm_hour |
|
ampm = "am" |
|
elif dt.tm_hour == 12: |
|
hh = dt.tm_hour |
|
ampm = "pm" |
|
elif dt.tm_hour >= 13: |
|
hh = dt.tm_hour - 12 |
|
ampm = "pm" |
|
|
|
dt2 = "at: {:02d}:{:02d}:{:02d} {:s} ".format(hh, dt.tm_min, dt.tm_sec, ampm ) |
|
return dt1+" "+dt2 |
|
|
|
line = "-" * 55 |
|
print(line) |
|
logger_qt.info(line) |
|
logger_aht.info(line) |
|
s = "New run:" |
|
print(s) |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
s = f"from python script: \"{sys.argv[0]}\"" # __file__ includes the full path to the file |
|
print(s) |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
s = "Runtime: "+get_dt() |
|
print(s) |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
print(line) |
|
logger_qt.info(line) |
|
logger_aht.info(line) |
|
del line |
|
|
|
|
|
state.board_id = board.board_id |
|
logger_qt.info(f"board id: {state.board_id}") |
|
logger_aht.info(f"board id: {state.board_id}") |
|
|
|
s= "using Adafruit_Python_extended_Bus. Using I2C bus #3" # github.com/adafruit/Adafruit_Python_extended_Bus |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
del s |
|
|
|
i2c_bus = I2C(3) # I2C bus to be used: /dev/i2c-3 |
|
|
|
if i2c_bus is None: |
|
s = f"Error: type(i2c_bus)= {type(i2c_bus)}. Exiting..." |
|
logger_qt.error(s) |
|
logger_aht.error(s) |
|
sys.exit() |
|
|
|
e = None |
|
|
|
try: |
|
seesaw = Seesaw(i2c_bus, addr=0x50) |
|
seesaw.pin_mode_bulk(state.button_mask, seesaw.INPUT_PULLUP) |
|
if state.my_debug: |
|
print(f"global(): type(seesaw) = {type(seesaw)}") |
|
if seesaw: |
|
state.seesaw = seesaw |
|
state.qt_btns_present = True |
|
del seesaw |
|
except Exception as e: |
|
logger_qt.error(f"global(): Error while creating an instance seesaw class: {e}") |
|
state.qt_btns_present = False |
|
pass |
|
|
|
try: |
|
# tempSensor = PiicoDev_TMP117() # initialise the sensor |
|
tempSensor = AHTx0(i2c_bus) # initialise the sensor |
|
#logger_aht.info(f"type(tempSensor)= {type(tempSensor)}") |
|
if tempSensor: |
|
state.tempSensor = tempSensor |
|
state.aht20_present = True |
|
del tempSensor |
|
except Exception as e: |
|
logger_aht.error(f"global(): Error while creating an instance ath20 sensor class: {e}") |
|
state.aht20_present = False |
|
pass |
|
|
|
del e |
|
del i2c_bus |
|
|
|
s = "Gamepad QT is {} present".format("" if state.qt_btns_present else " not") |
|
logger_qt.info(s) |
|
s = "AHT20 sensor is {} present".format("" if state.aht20_present else " not") |
|
logger_aht.info(s) |
|
del s |
|
|
|
gc.collect() |
|
|
|
def test_msg(state: object): |
|
TAG = "gamepad_test(): " |
|
s1 = s2 = "" |
|
if state.qt_btns_present: |
|
s1 = "the Gamepad QT" |
|
if state.aht20_present: |
|
s2 = "and the AHT20 sensor" |
|
|
|
if not state.msg1_shown: |
|
if not state.qt_btns_present and not state.aht20_present: |
|
s = TAG+f"neither of the Gamepad QT or the AHT20 sensor is present. Check wiring. Exiting..." |
|
print(s) |
|
logger_qt.error(s) |
|
logger_aht.error(s) |
|
# _time.sleep(3) |
|
sys.exit() |
|
elif state.qt_btns_present or state.aht20_present: |
|
state.msg1_shown = True |
|
print(TAG+f"We\'re going to test {s1} {s2} with this {state.board_id}.") |
|
if state.qt_btns_present: |
|
print("\t\tPress any of the buttons (X, Y, A, B, Select or Start) on the Gamepad QT.\n\t\t" + \ |
|
f"To reboot {state.board_id} press Gamepad QT button Start.\n") |
|
|
|
|
|
def ck_usr_answr() -> bool: |
|
ret = False |
|
ays = "Are you sure? (Y/n)+<Enter>: " |
|
answer = "" |
|
while True: |
|
logger_qt.warning(ays) |
|
logger_aht.warning(ays) |
|
answer = input(ays) |
|
s = f"You answered: \'{answer}\'" |
|
print(s) |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
if answer.upper() == "Y": |
|
ret = True |
|
break |
|
elif answer.upper() == "N": |
|
s = "not rebooting" |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
break |
|
return ret |
|
|
|
|
|
def reboot(): |
|
s = "\nRebooting..." |
|
print(s) |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
_time.sleep(3) |
|
os.system("sudo reboot") # for Raspberry Pi boards |
|
#microcontroller.reset() # for CircuitPython boards |
|
|
|
|
|
def pr_btn_name(res: int): |
|
btns = ["X", "Y", "A", "B", "Select", "Start"] |
|
if res >= 0 and res < len(btns): |
|
#blink_led(state) |
|
s = "Button "+btns[res]+" pressed" |
|
print(s) |
|
logger_qt.info(s) |
|
|
|
|
|
# Check for button presses on the Gamepad QT |
|
def ck_qt_btns(state: object): |
|
TAG = "ck_qt_btns(): " |
|
|
|
if not state.qt_btns_present: |
|
print(TAG+f"state.qt_btns_present= {state.qt_btns_present}") |
|
return |
|
|
|
nr_btns = 6 |
|
res_x = res_y = res_a = res_b = res_sel = res_sta = -1 |
|
elapsed_t = None |
|
interval_t = 36 |
|
interval_cnt = 0 |
|
gc.collect() |
|
_time.sleep(0.2) |
|
|
|
try: |
|
# get the joystick x and y axis value |
|
x = 1023 - state.seesaw.analog_read(14) |
|
y = 1023 - state.seesaw.analog_read(15) |
|
except Exception as e: |
|
if e.errno == 121: # Remote I/O Error |
|
logger_qt.error(f"Error: {e}") |
|
pass |
|
if x >= state.last_x: |
|
diff_x = abs(x - state.last_x) |
|
else: |
|
diff_x = abs(state.last_x - x) |
|
|
|
if y >= state.last_y: |
|
diff_y = abs(y - state.last_y) |
|
else: |
|
diff_y = abs(state.last_y - y) |
|
|
|
if (diff_x > 3) or (diff_y > 3): |
|
s = f"joystick: (x, y)= ({x}, {y})" |
|
print(s) |
|
logger_qt.info(s) |
|
# print(TAG+f"diff_x= {diff_x}, diff_y= {diff_y}") |
|
state.last_x = x |
|
state.last_y = y |
|
|
|
# Get the button presses, if any... |
|
buttons = state.seesaw.digital_read_bulk(state.button_mask) |
|
if state.my_debug: |
|
s = "\n"+TAG+f"buttons = {buttons}" |
|
print(s) |
|
logger_qt.info(s) |
|
if buttons == 65639: |
|
if state.my_debug: |
|
logger_qt.info(TAG+f"Gamepad QT: no button pressed") |
|
return |
|
# _time.sleep(0.5) |
|
start_t = _time.monotonic() |
|
|
|
if buttons: |
|
res = -1 |
|
for _ in range(nr_btns): |
|
if _ == 0: |
|
bz = 1 << state.BUTTON_X |
|
if not buttons & (bz): |
|
res = _ |
|
if res_x != res: |
|
pr_btn_name(res) |
|
res_x = res |
|
break |
|
if _ == 1: |
|
bz = 1 << state.BUTTON_Y |
|
if not buttons & (bz): |
|
res = _ |
|
if res_y != res: |
|
pr_btn_name(res) |
|
res_y = res |
|
break |
|
if _ == 2: |
|
bz = 1 << state.BUTTON_A |
|
if not buttons & (bz): |
|
res = _ |
|
if res_a != res: |
|
pr_btn_name(res) |
|
res_a = res |
|
break |
|
if _ == 3: |
|
bz = 1 << state.BUTTON_B |
|
if not buttons & (bz): |
|
res = _ |
|
if res_b != res: |
|
pr_btn_name(res) |
|
res_b = res |
|
break |
|
if _ == 4: |
|
bz = 1 << state.BUTTON_SELECT |
|
if not buttons & (bz): |
|
res = _ |
|
if res_sel != res: |
|
pr_btn_name(res) |
|
res_sel = res |
|
break |
|
if _ == 5: |
|
bz = 1 << state.BUTTON_START |
|
if not buttons & (bz): |
|
res = _ |
|
if res_sta != res: |
|
pr_btn_name(res) |
|
res_sta = res |
|
s = f"About to reboot the {state.board_id}" |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
if ck_usr_answr(): |
|
reboot() # Reboot the board |
|
else: |
|
state.msg1_shown = False |
|
res_sta = -2 |
|
test_msg(state) |
|
break |
|
|
|
curr_t = _time.monotonic() |
|
elapsed_t = (curr_t - start_t) * 1000 |
|
if elapsed_t >= interval_t: |
|
interval_cnt += 1 |
|
if interval_cnt >= 100: |
|
interval_cnt = 0 |
|
res_x = res_y = res_a = res_b = res_sel = res_sta = -2 |
|
start_t = curr_t |
|
|
|
_time.sleep(0.01) |
|
|
|
|
|
def ck_sensor(state: object) -> tuple: |
|
TAG = "rd_sensor(): " |
|
|
|
tpl = ("", 0.00, 0.00) # default |
|
|
|
if not state.tempSensor: |
|
try: |
|
tempSensor = AHTx0(i2c_bus) # initialise the sensor |
|
if tempSensor: |
|
state.tempSensor = tempSensor |
|
state.aht20_present = True |
|
del tempSensor |
|
except ValueError as e: |
|
state.aht20_present = False |
|
s = TAG+f"Error: {e}. Check wiring!" |
|
print(s) |
|
logger_aht.error(s) |
|
return tpl |
|
|
|
try: |
|
tmp = state.tempSensor.temperature # Celsius |
|
hum = state.tempSensor.relative_humidity # |
|
dts = get_dt_for_pushingbox(state) # get datetime from builtin rtc |
|
|
|
state.tmp_cl.tmp = tmp |
|
state.tmp_cl.last_upd = dts |
|
|
|
if tmp != state.tmp_cl.tmp_old: |
|
state.tmp_cl.tmp_old = tmp |
|
|
|
state.hum_cl._hum = hum |
|
state.hum_cl.last_upd = dts |
|
|
|
if hum != state.hum_cl.hum_old: |
|
state.hum_cl.hum_old = hum |
|
|
|
tpl = (dts, tmp, hum) |
|
except ValueError as e: |
|
s = TAG+f"Error: {e}. Check wiring!" |
|
print(s) |
|
logger_aht.error(s) |
|
pass |
|
return tpl |
|
|
|
def get_temphum(state: object): |
|
TAG = "get_temphum(): " |
|
|
|
if not state.aht20_present: |
|
return |
|
|
|
tpl = ck_sensor(state) |
|
|
|
if isinstance(tpl, tuple): |
|
# Convert temperature and humidity values into a string and print the data |
|
lst = [ |
|
"last update: {:s}".format(tpl[0]), |
|
"temperature: {:5.2f} °C".format(tpl[1]), |
|
"humidity: {:5.2f} %".format(tpl[2]) |
|
] |
|
print() |
|
for _ in range(len(tpl)): |
|
s = TAG+f"{lst[_]}" # was: state.tmp_cl.last_upd |
|
print(s) |
|
logger_aht.info(s) |
|
|
|
|
|
def get_rnd_timestamp() -> int: |
|
return round(datetime.timestamp(datetime.now())) |
|
|
|
|
|
# return a rounded timestamp in the future |
|
def calc_future_timestamp(n: datetime, nr_secs: int) -> int: |
|
n2 = n + timedelta(seconds=nr_secs) |
|
return round(n2.timestamp()) |
|
|
|
|
|
def get_dt() -> str: |
|
now = datetime.now() |
|
dt = now.timetuple() |
|
dt1 = "{:d}/{:02d}/{:02d}".format(dt.tm_mon, dt.tm_mday, dt.tm_year) |
|
dt2 = "{:02d}:{:02d}:{:02d} weekday: {:s}".format(dt.tm_hour, dt.tm_min, dt.tm_sec, state.RTCtpl_DOW[dt.tm_wday]) |
|
return dt1+" "+dt2 |
|
|
|
|
|
def get_dt_for_pushingbox(state: object) -> str: |
|
TAG = "get_dt_for_pushingbox(): " |
|
now = datetime.now() |
|
dt = now.timetuple() |
|
dts = "{:d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}Z".format(dt.tm_year, dt.tm_mon, dt.tm_mday, dt.tm_hour, dt.tm_min, dt.tm_sec) |
|
if state.my_debug: |
|
print(TAG+f"dts= {dts}") #, dt.tm_isdst= {dt.tm_isdst}") |
|
return dts |
|
|
|
|
|
def get_INT_RTC(state: object): |
|
if not state.get_INT_RTC: |
|
return |
|
|
|
TAG = "get_INT_RTC(): " |
|
dt = None |
|
now = None |
|
|
|
try: |
|
now = datetime.now() |
|
state.RTCtpl = now.timetuple() |
|
except OSError as e: |
|
logger_qt.error(TAG+f"Error: {e}") |
|
raise |
|
except Exception as e: |
|
raise |
|
|
|
if state.RTCtpl is not None: |
|
if state.my_debug: |
|
logger_aht.info(f"type(state.RTCtpl) = {type(state.RTCtpl)}") |
|
|
|
if state.my_debug: |
|
logger_qt.info(TAG+f"state.RTCtpl: {state.RTCtpl}") |
|
logger_aht.info(TAG+f"state.RTCtpl: {state.RTCtpl}") |
|
state.SYS_RTC_is_set = True |
|
|
|
dt = state.RTCtpl |
|
# print(TAG+f"dt= {dt}") |
|
|
|
dt1 =TAG+"{:d}/{:02d}/{:02d}".format(dt.tm_mon, dt.tm_mday, dt.tm_year) |
|
dt2 = TAG+"{:02d}:{:02d}:{:02d} weekday: {:s}".format(dt.tm_hour, dt.tm_min, dt.tm_sec, state.RTCtpl_DOW[dt.tm_wday]) |
|
logger_qt.info(dt1) |
|
logger_aht.info(dt1) |
|
logger_qt.info(dt2) |
|
logger_aht.info(dt2) |
|
|
|
|
|
def get_th_direct(state: object): |
|
# datetime e.g.: 2022-08-10T16:40:16Z |
|
TAG = "get_th_direct(): " |
|
|
|
if not state.aht20_present: |
|
return |
|
|
|
if state.my_debug: |
|
tpl = ck_sensor(state) |
|
if isinstance(tpl, tuple): |
|
lst = ["updated at", "Temperature:", "Humidity"] |
|
for _ in range(len(tpl)): |
|
s = TAG+f"{lst[_]} {tpl[_]}" # was: state.tmp_cl.last_upd |
|
print(s) |
|
logger_aht.info(s) |
|
|
|
def pr_th_msg(state: object) -> bool: |
|
TAG = "pr_th_msg(): " |
|
if state.stat_result is None: |
|
state.stat_result = "" |
|
|
|
ret = True |
|
upd_dt = state.tmp_cl.last_upd # [:10] # e.g.: 2022-08-10T16:40:16Z |
|
tmp_s = str(round(state.tmp_cl.tmp, 4)) |
|
hum_s = str(round(state.hum_cl.hum, 4)) |
|
text_items = ["response to send: "+state.stat_result, "-" * 23, upd_dt, "Temperature: "+tmp_s+" C", "Humidity: "+hum_s+" %", "-" * 23] |
|
|
|
for _ in range(len(text_items)): |
|
s = text_items[_] |
|
print(s) |
|
logger_aht.info(s) |
|
print() |
|
#logger_aht.info("-" * 23) |
|
|
|
return ret |
|
|
|
def send_to_pushingbox(state: object) -> bool: |
|
TAG = "send_to_pushingbox(): " |
|
ret = True |
|
tmp_s = None |
|
hum_s = None |
|
response = None |
|
state.stat_result = "" |
|
devid = state.PUSHING_BOX_DEVID # device ID on Pushingbox for our Scenario |
|
|
|
upd_dt = state.tmp_cl.last_upd # [:10] # e.g.: 2022-08-10T16:40:16Z |
|
upd_dt_old = state.tmp_cl.last_upd_old |
|
|
|
# -------------------------------------------------------------- |
|
tmp = round(state.tmp_cl.tmp, 4) |
|
tmp_s = str(tmp) |
|
tmp_old = state.tmp_cl.tmp_old |
|
|
|
if state.my_debug: |
|
print(TAG+f"tmp_s= \'{tmp_s}\', tmp_old= {tmp_old}") |
|
# -------------------------------------------------------------- |
|
hum = round(state.hum_cl.hum, 4) |
|
hum_s = str(hum) |
|
hum_old = state.hum_cl.hum_old |
|
|
|
if state.my_debug: |
|
print(TAG+f"hum_s= \'{hum_s}\', hum_old= {hum_old}") |
|
# -------------------------------------------------------------- |
|
|
|
# le_old = len(upd_dt_old) |
|
# if (le_old > 0) and (tmp == tmp_old) and (hum == hum_old): # and upd_dt == upd_dt_old |
|
if (upd_dt_old == upd_dt) and (tmp == tmp_old) and (hum == hum_old): # and upd_dt == upd_dt_old |
|
print(TAG+"datetime stamp unchanged. Waiting for new sensor data...") |
|
return ret # We don't want to send duplicates |
|
else: |
|
upd_dt_old = upd_dt # |
|
state.tmp_cl.last_upd_old = upd_dt |
|
#upd_tm = t_dict["updated_at"][11:19] |
|
|
|
state.msg_nr_sent += 1 # increase the messages sent count |
|
# dteData = upd_dt # Added 2024-03-22 because this variable was missing. |
|
# ?date=$date$&time=$time$&temp=$temp$&hum=$hum$ |
|
# 2024-04-15 added board_id: &id=$id$ |
|
# ?date=$date$&time=$time$&temp=$temp$&hum=$hum$&id=$id$ |
|
|
|
s = "http://api.pushingbox.com/pushingbox?devid=" |
|
s += devid |
|
s += "&date=\"" + upd_dt + "\"" |
|
s += "&temp=" + tmp_s |
|
s += "&hum=" + hum_s |
|
s += "&id=\"" + state.board_id + "\"" |
|
|
|
if state.my_debug: |
|
print(TAG+upd_dt, end='') |
|
print(". Sending Data message nr: ", end='') |
|
print(state.msg_nr_sent, end='') |
|
print(" to middle-man server...", end='\n') |
|
print(TAG+"Going to send:", end='') |
|
print("\n\""+s, end='') |
|
print("\"", end='\n') # To complete the message to the Serial (REPL) window |
|
|
|
try: |
|
webUrl=urllib.request.urlopen(s) |
|
response = str(webUrl.getcode()) |
|
except OSError as e: |
|
s = TAG+f"Error: {e}" |
|
print(s) |
|
logger_aht.error(s) |
|
if e.args[0] == -2: # gaierror |
|
# See: https://docs.circuitpython.org/_/downloads/en/6.3.x/pdf/, page 216 |
|
return # do nothing |
|
except RuntimeError as e: # can be activated by adafruit_requests, OutOfRetries error |
|
# or by get_socket. RuntimeError, EHOSTUNREACH errno 118 |
|
s = TAG+f"Error: {e}" |
|
print(s) |
|
logger_aht.error(s) |
|
return # do nothing |
|
|
|
if state.my_debug: |
|
s = TAG+"Data Sent" |
|
logger_aht.info(s) |
|
print(s) |
|
print() |
|
s = TAG+f"response= {response}, len(response)= {len(response)}, type(response)= {type(response)}" |
|
print(s) |
|
logger_aht.info(s) |
|
if response: |
|
le = len(response) |
|
if le == 3: |
|
status = int(response) |
|
print() |
|
print(TAG,end='') |
|
if status == 404: |
|
s = "error 404: file not found on this server." |
|
print(s) |
|
logger_aht.warning(s) |
|
elif status != 200 and status != 404: |
|
s = f"response= {response}" |
|
print(s) |
|
logger_aht.info(s) |
|
else: |
|
status = -1 |
|
|
|
state.stat_result = "OK" if status == 200 else "NO" |
|
s1 = "response=" |
|
s2 = "{} (= {})".format(status, state.stat_result) |
|
if state.my_debug: |
|
print() |
|
s = TAG+f"{s1} {s2}" |
|
print(s) |
|
logger_aht.info(s) |
|
#blink_leds(RED) |
|
|
|
s =TAG+f"send result: {status}" |
|
if status == 200: |
|
logger_aht.info(s) |
|
#blink_leds(GRN) |
|
else: |
|
logger_aht.warning(s) |
|
#pr_msg(["send result = "+s2]) |
|
response = None |
|
if state.my_debug: |
|
print(TAG+f"upd_dt= {upd_dt}, tmp_s= {tmp_s}, hum_s= {hum_s}") |
|
pr_th_msg(state) |
|
return ret |
|
|
|
def pr_msg(msg): |
|
text_items = [] |
|
le_max = 5 |
|
if isinstance(msg, list): |
|
le_msg = len(msg) |
|
if le_msg > 0 and le_msg <= le_max: |
|
for _ in range(len(msg)): |
|
text_items.append(msg[_]) |
|
le = len(text_items) |
|
if le_max - le > 0: # fill up with blank lines |
|
for _ in range(le_max - le): |
|
text_items.append(" ") |
|
for i in range(len(text_items)): |
|
s = text_items[i] # Display the dt, tm and tz |
|
print(s) |
|
logger_aht.info(s) |
|
|
|
def setup(state: object): |
|
TAG = "setup(): " |
|
|
|
if state.get_INT_RTC : |
|
if state.my_debug: |
|
logger_aht.info(TAG+"Going to get datetime from internal (SYS) RTC") |
|
get_INT_RTC(state) |
|
|
|
# Create instances of the sensor_tmp and sensor_hum classes |
|
state.tmp_cl = sensor_tmp(0.00, "") # create class instance with default values |
|
state.hum_cl = sensor_hum(0.00, "") # same |
|
|
|
gc.collect() |
|
|
|
|
|
def main(): |
|
TAG= "main(): " |
|
loopnr = 0 |
|
setup(state) |
|
# state.curr_tm = _time.time() # set curr_tm for the first time |
|
state.curr_timestamp = get_rnd_timestamp() |
|
start = True |
|
test_msg(state) |
|
|
|
s = "\n\nAdafruit Gamepad QT test:\n" |
|
logger_qt.info(s) |
|
s = "\n\nAdafruit AHT20 test:\n" |
|
logger_aht.info(s) |
|
s = f"DataToGoogle interval: {state.INTERVAL_SECONDS} seconds" |
|
print(s) |
|
logger_aht.info(s) |
|
|
|
if state.aht20_present: |
|
logger_aht.info('logged values of temperature and humidity from Adafruit AHT20 sensor:') |
|
|
|
if state.INTERVAL_SECONDS is None: |
|
state.INTERVAL_SECONDS = 3600 # Set for 1 hour |
|
elif state.INTERVAL_SECONDS == 0: |
|
state.INTERVAL_SECONDS = 3600 # Same |
|
|
|
interval = state.INTERVAL_SECONDS |
|
|
|
togo_send_secs_old = 0.00 |
|
togo_temphum_secs_old = 0.00 |
|
do_temphum = False |
|
do_send = False |
|
interval_short = 120 # default: 3600 // 6 = 600 seconds = 10 minutes. See setup(). |
|
interval_long = interval # interval # default: 3600 seconds = 1 hour |
|
future_send_timestamp = calc_future_timestamp(datetime.now(), interval_long) |
|
future_temphum_timestamp = calc_future_timestamp(datetime.now(), interval_short) |
|
while True: |
|
try: |
|
loopnr += 1 |
|
# print(f"\nLoopnr: {loopnr}") |
|
ck_qt_btns(state) |
|
# _time.sleep(0.5) |
|
|
|
if state.SYS_RTC_is_set: |
|
curr_timestamp = get_rnd_timestamp() |
|
do_send = True if curr_timestamp >= future_send_timestamp else False |
|
do_temphum = True if curr_timestamp >= future_temphum_timestamp else False |
|
togo_temphum_secs = future_temphum_timestamp - curr_timestamp |
|
togo_send_secs = future_send_timestamp - curr_timestamp |
|
|
|
if togo_temphum_secs_old != togo_temphum_secs: |
|
togo_temphum_secs_old = togo_temphum_secs |
|
if togo_temphum_secs % 10 == 0: |
|
print() |
|
print(TAG+"next get_temphum() in: {:4d} seconds".format(togo_temphum_secs)) |
|
|
|
if togo_send_secs_old != togo_send_secs: |
|
togo_send_secs_old = togo_send_secs |
|
if togo_send_secs % 10 == 0: |
|
print(TAG+"next SendToGoogle in: {:4d} seconds".format(togo_send_secs)) |
|
|
|
if start or do_temphum: |
|
if state.msg_sent: |
|
state.msg_sent = False |
|
get_temphum(state) |
|
future_temphum_timestamp = calc_future_timestamp(datetime.now(), interval_short) # in practice we will use var 'interval' |
|
|
|
if (start or (do_send) or (not state.tempSensor)) and (not state.msg_sent): |
|
start = False |
|
#blink_leds() # switch off LEDs |
|
get_th_direct(state) |
|
gc.collect() |
|
if not send_to_pushingbox(state): |
|
s = TAG+f"send data to Pushingbox.com failed!" |
|
logger_aht.error(s) |
|
raise KeyboardInterrupt |
|
else: |
|
state.msg_sent = True |
|
future_send_timestamp = calc_future_timestamp(datetime.now(), interval_long) |
|
# if c2 == 30: |
|
# sys.exit() # Temporary forced end of execution |
|
|
|
|
|
#_time.sleep(1) |
|
|
|
except KeyboardInterrupt: |
|
s = TAG+"KeyboardInterrrupt. Exiting..." |
|
print(s) |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
break |
|
|
|
if loopnr >= 1000: |
|
loopnr = 0 |
|
# break |
|
|
|
sys.exit() |
|
|
|
if __name__ == '__main__': |
|
main() |
Hardware: I2C wiring
See the images of pushingbox.com, Google Apps Scripts script and Google Sheets spreadsheet: tools
See the script outline
Image of console output and data in spreadsheet: moment of data sent to spreadsheet