Skip to content

Instantly share code, notes, and snippets.

@PaulskPt
Last active April 21, 2024 09:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PaulskPt/893f1c07e95205552e5b142343d02c39 to your computer and use it in GitHub Desktop.
Save PaulskPt/893f1c07e95205552e5b142343d02c39 to your computer and use it in GitHub Desktop.
DataToGoogle_Raspi5_V2

VERSION 2

(Using pushingbox.com service as 'man-in-the-middle')

The Python script raspi5_datatogoogle.py (in this gist) peforms various events:

a) read temperature and humidity data from an I2C connected Adafruit AHT20 sensor;
b) read button presses and/or joystick X/Y-axis values from a connected Adafruit mini Gamepad Qt I2C board;
c) gets the datetime stamp from the Raspberry PI O.S. (it is assumed that the Raspberry Pi is connected via WiFi or ethernet):
d) at intervals send the datetime stamp, the temperature and humidity data and the board ID to pushingbox.com, which in its turn
sends the data to a Google Apps Script macro. This macro sends the data to a Google sheets spreadsheet.

This Python script uses three Classes:

  1. State class. This class contains a lot of variables. The use of this class minimizes the need for global variables;
  2. sensor_tmp class. This class is used to hold temperature data;
  3. sensor_hum class. This class is used to hold humidity data.

Certain settings one can modify in the file settings.toml. At start of the Python script these settings will be read into memory. One can, for example, change the name(s) of the logfile(s) and/or the name(s) of the logger(s).

NOTE: One needs an account with pushingbox.com. This service acts as a kind of man-in-the-middle. The basic service level is free. Only when one needs more than 1000 send events per day one has to make a donation (as far as I remember), however in settings.toml I have set the INTERVAL_SECONDS to 3600 = 1 hour. So, only once in an hour the Python script will send data to pushingbox.com.

In pushingbox.com one has to create at least one Service and one Scenario. The scenario has to contain at least one Action, in the case of this Python script: ?date=$date$&time=$time$&temp=$temp$&hum=$hum$&id=$id$. The "Service" has a name. In our case DataToGoogle_RASPI5. This service contains a string for a GET command. This string contains a link to the Google Appl Script macro, in our case: https://script.google.com/macros/s/<URL_of_the_script_deployment>/exec The Scenario that you created gets a DeviceID. This DeviceID string you have to copy into settings.toml in the line PUSHING_BOX_DEVID.

The Python script uses I2C bus #3. For this the following two lines need to be present in file: /boot/firmware/config.txt:

       dtparam=i2c_arm=on
       dtoverlay=i2c-gpio,bus=3,i2c_gpio_delay_us=1,i2c_gpio_sda=17,i2c_gpio_scl=27

The Python script has to be run in a Virtual Environment. See: https://docs.python.org/3/library/venv.html Several modules have to be installed using the pip3 installer (from within the virtual enfironment):

  • Adafruit-Blinka (see: https://learn.adafruit.com/circuitpython-on-raspberrypi-linux/installing-circuitpython-on-raspberry-pi);
  • Adafruit-Python-extended-Bus (see: https://pypi.org/project/adafruit-extended-bus/);
  • adafruit_ahtx0 (see: https://pypi.org/project/adafruit-circuitpython-ahtx0/);
  • adafruit-seesaw (see: https://pypi.org/project/Adafruit_seesaw/).

The temperature and humidity values will be read every 2 minutes. These values will be printed to the console. They will also be logged to the file: aht20.log. The button presses and the joystick movement values are constantly monitored. The values of buttons: A, B, X, Y, Select and Start will be printed to the console. They will also be logged to the file: gamepadqt.log. The button Start will initiate a reboot of the Raspberry Pi 5, however not before asking the user to conform with Y/n. The warning message as well as the users choice and the resulting action info will be logged to gamepadqt.log.

Don't worry about the Google Apps Scripts script and the Google Sheets spreadsheet carry the name MAGTAGin the filename. These files were initially created for a project in which I was using an Adafruit MAGTAG board.

# SPDX-FileCopyrightText: 2024 Paulus @PaulskPt on Github
# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
# SPDX-FileCopyrightText: 2023 Kattni Rembor for Adafruit Industries
# SPDX-License-Identifier: MIT
"""
This is a Python script to test:
A Raspberry Pi 5B-8GB single board computer with, via I2C, connected:
a) an Adafruit Mini I2C Gamepad with seesaw - STEMMA QT / Qwiic (https://www.adafruit.com/product/5743);
b) an Adafruit AHT20 Temperatur & Humidity Sensor Breakout Board - STEMMA QT / Qwiic (https://www.adafruit.com/product/4566).
Added functionality to save data to two separate log files:
1) gamepadqt.log for some system info and data from the Gamepad QT;
2) aht20.log for some system info and data from the AHT20 sensor.
This script has been successfully tested on a Raspberry Pi 5B-8GB
within a virtual enviroment (env) from within an own project directory (/home/<user>/projects/rpi5_tmp_hum/).
This script needs the Adafruit-Blinka repo to be installed on to Raspberry Pi 5B-8GB.
It also uses the Adafruit-Python-extended-Bus repo to make I2C communication possible.
The following lines need to be present in the file: /boot/firmware/config.txt:
dtparam=i2c_arm=on
dtoverlay=i2c-gpio,bus=3,i2c_gpio_delay_us=1,i2c_gpio_sda=17,i2c_gpio_scl=27
Want more? See my repos on Github.com/PaulskPt and my gists on Github.com/Gist/PaulskPt.
"""
import time as _time
from datetime import datetime, timedelta
import os
import sys
import gc
import board
from micropython import const
from lib.adafruit_ahtx0 import AHTx0
from adafruit_seesaw.seesaw import Seesaw
from adafruit_extended_bus import ExtendedI2C as I2C
import microcontroller
import logging as logging # see: https://docs.python.org/3/howto/logging-cookbook.html#logging-cookbook
import tomllib # see: https://docs.python.org/3/library/tomllib.html
from urllib.request import urlopen
import urllib.error
with open("settings.toml", "rb") as f:
data = tomllib.load(f)
if f:
f.close()
del f
class State:
def __init__(self, saved_state_json=None):
self.board_id = None
self.USE_TAG = None
self.my_debug = False
self.seesaw = None
self.tempSensor = None
self.tmp_cl = None
self.hum_cl = None
self.msg1_shown = False
self.last_x = 0
self.last_y = 0
self.qt_btns_present = False
self.aht20_present = False
self.AHT20_LOGFILE = ""
self.GAMEPAD_LOGFILE = ""
self.AHT20_LOGGER_NAME = ""
self.GAMEPAD_LOGGER_NAME = ""
self.LOG_QT = None
self.LOG_AHT = None
self.PUSHING_BOX_DEVID = ""
self.stat_result = ""
self.msg_sent = False
self.msg_nr_sent = 0
self.get_INT_RTC = True
self.SYS_RTC_is_set = False
self.curr_timestamp = None
self.curr_tm = None # struct_time
self.INTERVAL_SECONDS = 3600 # 10 Minutes
self.RTCtpl = None
self.RTCtpl_DOW = DOW = \
{
0: "Monday",
1: "Tuesday",
2: "Wednesday",
3: "Thursday",
4: "Friday",
5: "Saturday",
6: "Sunday"
}
self.RTCtpl_MONTH = \
{
0: "Dummy",
1: "January",
2: "February",
3: "March",
4: "April",
5: "May",
6: "June",
7: "July",
8: "August",
9: "September",
10: "October",
11: "November",
12: "December"
}
self.BUTTON_X = const(6)
self.BUTTON_Y = const(2)
self.BUTTON_A = const(5)
self.BUTTON_B = const(1)
self.BUTTON_SELECT = const(0)
self.BUTTON_START = const(16)
self.button_mask = const(
(1 << self.BUTTON_X)
| (1 << self.BUTTON_Y)
| (1 << self.BUTTON_A)
| (1 << self.BUTTON_B)
| (1 << self.BUTTON_SELECT)
| (1 << self.BUTTON_START)
)
state = State()
state.my_debug = True if int(data["MY_DEBUG"]) else False
state.USE_TAG = True if int(data["USE_TAG"]) else False
state.LOG_QT = True if int(data["LOG_QT"]) else False
state.LOG_AHT = True if int(data["LOG_AHT"]) else False
state.INTERVAL_SECONDS = int(data["INTERVAL_SECONDS"])
state.PUSHING_BOX_DEVID = data["PUSHING_BOX_DEVID"]
state.AHT20_LOGFILE = data["AHT20_LOGFILE"]
state.GAMEPAD_LOGFILE = data["GAMEPAD_LOGFILE"]
state.AHT20_LOGGER_NAME = data["AHT20_LOGGER_NAME"]
state.GAMEPAD_LOGGER_NAME = data["GAMEPAD_LOGGER_NAME"]
del data
logfile_aht = state.AHT20_LOGFILE
logfile_qt = state.GAMEPAD_LOGFILE
logging_level = logging.DEBUG
logger_aht = logging.getLogger(state.AHT20_LOGGER_NAME) # .addHandler(logging.NullHandler())
logger_qt = logging.getLogger(state.GAMEPAD_LOGGER_NAME) # .addHandler(logging.NullHandler())
logger_aht.setLevel(logging_level)
logger_qt.setLevel(logging_level)
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
formatter = logging.Formatter(format, datefmt="%Y-%m-%d %H:%M:%S")
file_handler_aht = logging.FileHandler(logfile_aht)
file_handler_aht.setFormatter(formatter)
file_handler_qt = logging.FileHandler(logfile_qt)
file_handler_qt.setFormatter(formatter)
#console = logging.StreamHandler()
#console.setFormatter(formatter)
if state.LOG_QT:
logger_qt.addHandler(file_handler_qt)
else:
logger_qt.addHandler(logging.NullHandler()) # Don't log
#logger_qt.addHandler(console)
if state.LOG_AHT:
logger_aht.addHandler(file_handler_aht)
else:
logger_aht.addHandler(logging.NullHandler()) # Don't log
#logger_aht.addHandler(console)
del logfile_qt
del logfile_aht
del logging_level
del format
del formatter
del file_handler_qt
del file_handler_aht
class sensor_tmp:
def __init__(self, tmp, dt):
self._tmp = tmp
self._tmp_old = 0.00
self._dt = dt
self._dt_old = None
@property
def tmp(self):
return self._tmp
@property
def tmp_old(self):
return self._tmp_old
@tmp.setter
def tmp(self, tmp):
if isinstance(tmp, float):
self._tmp = tmp
elif isinstance(tmp, int):
self._tmp = float(tmp)
@tmp_old.setter
def tmp_old(self, tmp):
if isinstance(tmp, float):
self._tmp_old = tmp
elif isinstance(tmp, int):
self._tmp_old = float(tmp)
@property
def last_upd(self):
return self._dt
@last_upd.setter
def last_upd(self, dt):
if isinstance(dt, str):
self._dt = dt
@property
def last_upd_old(self):
return self._dt_old
@last_upd_old.setter
def last_upd_old(self, dt):
if isinstance(dt, str):
self._dt_old = dt
class sensor_hum:
def __init__(self, hum, dt):
self._hum = hum
self._hum_old = 0.00
self._dt = dt
@property
def hum(self):
return self._hum
@property
def hum_old(self):
return self._hum_old
@hum.setter
def hum(self, hum):
if isinstance(hum, float):
self._hum = hum
elif isinstance(hum, int):
self._hum = float(hum)
@hum_old.setter
def hum_old(self, hum):
if isinstance(hum, float):
self._hum_old = hum
elif isinstance(hum, int):
self._hum_old = float(hum)
@property
def last_upd(self):
return self._dt
@last_upd.setter
def last_upd(self, dt):
if isinstance(dt, str):
self._dt = dt
def get_dt() -> str:
now = datetime.now()
dt = now.timetuple()
dt1 = "{:s}, {:s} {:02d}, {:02d},".format(state.RTCtpl_DOW[dt.tm_wday], state.RTCtpl_MONTH[dt.tm_mon], dt.tm_mday, dt.tm_year)
if dt.tm_hour < 12:
hh = dt.tm_hour
ampm = "am"
elif dt.tm_hour == 12:
hh = dt.tm_hour
ampm = "pm"
elif dt.tm_hour >= 13:
hh = dt.tm_hour - 12
ampm = "pm"
dt2 = "at: {:02d}:{:02d}:{:02d} {:s} ".format(hh, dt.tm_min, dt.tm_sec, ampm )
return dt1+" "+dt2
line = "-" * 55
print(line)
logger_qt.info(line)
logger_aht.info(line)
s = "New run:"
print(s)
logger_qt.info(s)
logger_aht.info(s)
s = f"from python script: \"{sys.argv[0]}\"" # __file__ includes the full path to the file
print(s)
logger_qt.info(s)
logger_aht.info(s)
s = "Runtime: "+get_dt()
print(s)
logger_qt.info(s)
logger_aht.info(s)
print(line)
logger_qt.info(line)
logger_aht.info(line)
del line
state.board_id = board.board_id
logger_qt.info(f"board id: {state.board_id}")
logger_aht.info(f"board id: {state.board_id}")
s= "using Adafruit_Python_extended_Bus. Using I2C bus #3" # github.com/adafruit/Adafruit_Python_extended_Bus
logger_qt.info(s)
logger_aht.info(s)
del s
i2c_bus = I2C(3) # I2C bus to be used: /dev/i2c-3
if i2c_bus is None:
s = f"Error: type(i2c_bus)= {type(i2c_bus)}. Exiting..."
logger_qt.error(s)
logger_aht.error(s)
sys.exit()
e = None
try:
seesaw = Seesaw(i2c_bus, addr=0x50)
seesaw.pin_mode_bulk(state.button_mask, seesaw.INPUT_PULLUP)
if state.my_debug:
print(f"global(): type(seesaw) = {type(seesaw)}")
if seesaw:
state.seesaw = seesaw
state.qt_btns_present = True
del seesaw
except Exception as e:
logger_qt.error(f"global(): Error while creating an instance seesaw class: {e}")
state.qt_btns_present = False
pass
try:
# tempSensor = PiicoDev_TMP117() # initialise the sensor
tempSensor = AHTx0(i2c_bus) # initialise the sensor
#logger_aht.info(f"type(tempSensor)= {type(tempSensor)}")
if tempSensor:
state.tempSensor = tempSensor
state.aht20_present = True
del tempSensor
except Exception as e:
logger_aht.error(f"global(): Error while creating an instance ath20 sensor class: {e}")
state.aht20_present = False
pass
del e
del i2c_bus
s = "Gamepad QT is {} present".format("" if state.qt_btns_present else " not")
logger_qt.info(s)
s = "AHT20 sensor is {} present".format("" if state.aht20_present else " not")
logger_aht.info(s)
del s
gc.collect()
def test_msg(state: object):
TAG = "gamepad_test(): "
s1 = s2 = ""
if state.qt_btns_present:
s1 = "the Gamepad QT"
if state.aht20_present:
s2 = "and the AHT20 sensor"
if not state.msg1_shown:
if not state.qt_btns_present and not state.aht20_present:
s = TAG+f"neither of the Gamepad QT or the AHT20 sensor is present. Check wiring. Exiting..."
print(s)
logger_qt.error(s)
logger_aht.error(s)
# _time.sleep(3)
sys.exit()
elif state.qt_btns_present or state.aht20_present:
state.msg1_shown = True
print(TAG+f"We\'re going to test {s1} {s2} with this {state.board_id}.")
if state.qt_btns_present:
print("\t\tPress any of the buttons (X, Y, A, B, Select or Start) on the Gamepad QT.\n\t\t" + \
f"To reboot {state.board_id} press Gamepad QT button Start.\n")
def ck_usr_answr() -> bool:
ret = False
ays = "Are you sure? (Y/n)+<Enter>: "
answer = ""
while True:
logger_qt.warning(ays)
logger_aht.warning(ays)
answer = input(ays)
s = f"You answered: \'{answer}\'"
print(s)
logger_qt.info(s)
logger_aht.info(s)
if answer.upper() == "Y":
ret = True
break
elif answer.upper() == "N":
s = "not rebooting"
logger_qt.info(s)
logger_aht.info(s)
break
return ret
def reboot():
s = "\nRebooting..."
print(s)
logger_qt.info(s)
logger_aht.info(s)
_time.sleep(3)
os.system("sudo reboot") # for Raspberry Pi boards
#microcontroller.reset() # for CircuitPython boards
def pr_btn_name(res: int):
btns = ["X", "Y", "A", "B", "Select", "Start"]
if res >= 0 and res < len(btns):
#blink_led(state)
s = "Button "+btns[res]+" pressed"
print(s)
logger_qt.info(s)
# Check for button presses on the Gamepad QT
def ck_qt_btns(state: object):
TAG = "ck_qt_btns(): "
if not state.qt_btns_present:
print(TAG+f"state.qt_btns_present= {state.qt_btns_present}")
return
nr_btns = 6
res_x = res_y = res_a = res_b = res_sel = res_sta = -1
elapsed_t = None
interval_t = 36
interval_cnt = 0
gc.collect()
_time.sleep(0.2)
try:
# get the joystick x and y axis value
x = 1023 - state.seesaw.analog_read(14)
y = 1023 - state.seesaw.analog_read(15)
except Exception as e:
if e.errno == 121: # Remote I/O Error
logger_qt.error(f"Error: {e}")
pass
if x >= state.last_x:
diff_x = abs(x - state.last_x)
else:
diff_x = abs(state.last_x - x)
if y >= state.last_y:
diff_y = abs(y - state.last_y)
else:
diff_y = abs(state.last_y - y)
if (diff_x > 3) or (diff_y > 3):
s = f"joystick: (x, y)= ({x}, {y})"
print(s)
logger_qt.info(s)
# print(TAG+f"diff_x= {diff_x}, diff_y= {diff_y}")
state.last_x = x
state.last_y = y
# Get the button presses, if any...
buttons = state.seesaw.digital_read_bulk(state.button_mask)
if state.my_debug:
s = "\n"+TAG+f"buttons = {buttons}"
print(s)
logger_qt.info(s)
if buttons == 65639:
if state.my_debug:
logger_qt.info(TAG+f"Gamepad QT: no button pressed")
return
# _time.sleep(0.5)
start_t = _time.monotonic()
if buttons:
res = -1
for _ in range(nr_btns):
if _ == 0:
bz = 1 << state.BUTTON_X
if not buttons & (bz):
res = _
if res_x != res:
pr_btn_name(res)
res_x = res
break
if _ == 1:
bz = 1 << state.BUTTON_Y
if not buttons & (bz):
res = _
if res_y != res:
pr_btn_name(res)
res_y = res
break
if _ == 2:
bz = 1 << state.BUTTON_A
if not buttons & (bz):
res = _
if res_a != res:
pr_btn_name(res)
res_a = res
break
if _ == 3:
bz = 1 << state.BUTTON_B
if not buttons & (bz):
res = _
if res_b != res:
pr_btn_name(res)
res_b = res
break
if _ == 4:
bz = 1 << state.BUTTON_SELECT
if not buttons & (bz):
res = _
if res_sel != res:
pr_btn_name(res)
res_sel = res
break
if _ == 5:
bz = 1 << state.BUTTON_START
if not buttons & (bz):
res = _
if res_sta != res:
pr_btn_name(res)
res_sta = res
s = f"About to reboot the {state.board_id}"
logger_qt.info(s)
logger_aht.info(s)
if ck_usr_answr():
reboot() # Reboot the board
else:
state.msg1_shown = False
res_sta = -2
test_msg(state)
break
curr_t = _time.monotonic()
elapsed_t = (curr_t - start_t) * 1000
if elapsed_t >= interval_t:
interval_cnt += 1
if interval_cnt >= 100:
interval_cnt = 0
res_x = res_y = res_a = res_b = res_sel = res_sta = -2
start_t = curr_t
_time.sleep(0.01)
def ck_sensor(state: object) -> tuple:
TAG = "rd_sensor(): "
tpl = ("", 0.00, 0.00) # default
if not state.tempSensor:
try:
tempSensor = AHTx0(i2c_bus) # initialise the sensor
if tempSensor:
state.tempSensor = tempSensor
state.aht20_present = True
del tempSensor
except ValueError as e:
state.aht20_present = False
s = TAG+f"Error: {e}. Check wiring!"
print(s)
logger_aht.error(s)
return tpl
try:
tmp = state.tempSensor.temperature # Celsius
hum = state.tempSensor.relative_humidity #
dts = get_dt_for_pushingbox(state) # get datetime from builtin rtc
state.tmp_cl.tmp = tmp
state.tmp_cl.last_upd = dts
if tmp != state.tmp_cl.tmp_old:
state.tmp_cl.tmp_old = tmp
state.hum_cl._hum = hum
state.hum_cl.last_upd = dts
if hum != state.hum_cl.hum_old:
state.hum_cl.hum_old = hum
tpl = (dts, tmp, hum)
except ValueError as e:
s = TAG+f"Error: {e}. Check wiring!"
print(s)
logger_aht.error(s)
pass
return tpl
def get_temphum(state: object):
TAG = "get_temphum(): "
if not state.aht20_present:
return
tpl = ck_sensor(state)
if isinstance(tpl, tuple):
# Convert temperature and humidity values into a string and print the data
lst = [
"last update: {:s}".format(tpl[0]),
"temperature: {:5.2f} °C".format(tpl[1]),
"humidity: {:5.2f} %".format(tpl[2])
]
print()
for _ in range(len(tpl)):
s = TAG+f"{lst[_]}" # was: state.tmp_cl.last_upd
print(s)
logger_aht.info(s)
def get_rnd_timestamp() -> int:
return round(datetime.timestamp(datetime.now()))
# return a rounded timestamp in the future
def calc_future_timestamp(n: datetime, nr_secs: int) -> int:
n2 = n + timedelta(seconds=nr_secs)
return round(n2.timestamp())
def get_dt() -> str:
now = datetime.now()
dt = now.timetuple()
dt1 = "{:d}/{:02d}/{:02d}".format(dt.tm_mon, dt.tm_mday, dt.tm_year)
dt2 = "{:02d}:{:02d}:{:02d} weekday: {:s}".format(dt.tm_hour, dt.tm_min, dt.tm_sec, state.RTCtpl_DOW[dt.tm_wday])
return dt1+" "+dt2
def get_dt_for_pushingbox(state: object) -> str:
TAG = "get_dt_for_pushingbox(): "
now = datetime.now()
dt = now.timetuple()
dts = "{:d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}Z".format(dt.tm_year, dt.tm_mon, dt.tm_mday, dt.tm_hour, dt.tm_min, dt.tm_sec)
if state.my_debug:
print(TAG+f"dts= {dts}") #, dt.tm_isdst= {dt.tm_isdst}")
return dts
def get_INT_RTC(state: object):
if not state.get_INT_RTC:
return
TAG = "get_INT_RTC(): "
dt = None
now = None
try:
now = datetime.now()
state.RTCtpl = now.timetuple()
except OSError as e:
logger_qt.error(TAG+f"Error: {e}")
raise
except Exception as e:
raise
if state.RTCtpl is not None:
if state.my_debug:
logger_aht.info(f"type(state.RTCtpl) = {type(state.RTCtpl)}")
if state.my_debug:
logger_qt.info(TAG+f"state.RTCtpl: {state.RTCtpl}")
logger_aht.info(TAG+f"state.RTCtpl: {state.RTCtpl}")
state.SYS_RTC_is_set = True
dt = state.RTCtpl
# print(TAG+f"dt= {dt}")
dt1 =TAG+"{:d}/{:02d}/{:02d}".format(dt.tm_mon, dt.tm_mday, dt.tm_year)
dt2 = TAG+"{:02d}:{:02d}:{:02d} weekday: {:s}".format(dt.tm_hour, dt.tm_min, dt.tm_sec, state.RTCtpl_DOW[dt.tm_wday])
logger_qt.info(dt1)
logger_aht.info(dt1)
logger_qt.info(dt2)
logger_aht.info(dt2)
def get_th_direct(state: object):
# datetime e.g.: 2022-08-10T16:40:16Z
TAG = "get_th_direct(): "
if not state.aht20_present:
return
if state.my_debug:
tpl = ck_sensor(state)
if isinstance(tpl, tuple):
lst = ["updated at", "Temperature:", "Humidity"]
for _ in range(len(tpl)):
s = TAG+f"{lst[_]} {tpl[_]}" # was: state.tmp_cl.last_upd
print(s)
logger_aht.info(s)
def pr_th_msg(state: object) -> bool:
TAG = "pr_th_msg(): "
if state.stat_result is None:
state.stat_result = ""
ret = True
upd_dt = state.tmp_cl.last_upd # [:10] # e.g.: 2022-08-10T16:40:16Z
tmp_s = str(round(state.tmp_cl.tmp, 4))
hum_s = str(round(state.hum_cl.hum, 4))
text_items = ["response to send: "+state.stat_result, "-" * 23, upd_dt, "Temperature: "+tmp_s+" C", "Humidity: "+hum_s+" %", "-" * 23]
for _ in range(len(text_items)):
s = text_items[_]
print(s)
logger_aht.info(s)
print()
#logger_aht.info("-" * 23)
return ret
def send_to_pushingbox(state: object) -> bool:
TAG = "send_to_pushingbox(): "
ret = True
tmp_s = None
hum_s = None
response = None
state.stat_result = ""
devid = state.PUSHING_BOX_DEVID # device ID on Pushingbox for our Scenario
upd_dt = state.tmp_cl.last_upd # [:10] # e.g.: 2022-08-10T16:40:16Z
upd_dt_old = state.tmp_cl.last_upd_old
# --------------------------------------------------------------
tmp = round(state.tmp_cl.tmp, 4)
tmp_s = str(tmp)
tmp_old = state.tmp_cl.tmp_old
if state.my_debug:
print(TAG+f"tmp_s= \'{tmp_s}\', tmp_old= {tmp_old}")
# --------------------------------------------------------------
hum = round(state.hum_cl.hum, 4)
hum_s = str(hum)
hum_old = state.hum_cl.hum_old
if state.my_debug:
print(TAG+f"hum_s= \'{hum_s}\', hum_old= {hum_old}")
# --------------------------------------------------------------
# le_old = len(upd_dt_old)
# if (le_old > 0) and (tmp == tmp_old) and (hum == hum_old): # and upd_dt == upd_dt_old
if (upd_dt_old == upd_dt) and (tmp == tmp_old) and (hum == hum_old): # and upd_dt == upd_dt_old
print(TAG+"datetime stamp unchanged. Waiting for new sensor data...")
return ret # We don't want to send duplicates
else:
upd_dt_old = upd_dt #
state.tmp_cl.last_upd_old = upd_dt
#upd_tm = t_dict["updated_at"][11:19]
state.msg_nr_sent += 1 # increase the messages sent count
# dteData = upd_dt # Added 2024-03-22 because this variable was missing.
# ?date=$date$&time=$time$&temp=$temp$&hum=$hum$
# 2024-04-15 added board_id: &id=$id$
# ?date=$date$&time=$time$&temp=$temp$&hum=$hum$&id=$id$
s = "http://api.pushingbox.com/pushingbox?devid="
s += devid
s += "&date=\"" + upd_dt + "\""
s += "&temp=" + tmp_s
s += "&hum=" + hum_s
s += "&id=\"" + state.board_id + "\""
if state.my_debug:
print(TAG+upd_dt, end='')
print(". Sending Data message nr: ", end='')
print(state.msg_nr_sent, end='')
print(" to middle-man server...", end='\n')
print(TAG+"Going to send:", end='')
print("\n\""+s, end='')
print("\"", end='\n') # To complete the message to the Serial (REPL) window
try:
webUrl=urllib.request.urlopen(s)
response = str(webUrl.getcode())
except OSError as e:
s = TAG+f"Error: {e}"
print(s)
logger_aht.error(s)
if e.args[0] == -2: # gaierror
# See: https://docs.circuitpython.org/_/downloads/en/6.3.x/pdf/, page 216
return # do nothing
except RuntimeError as e: # can be activated by adafruit_requests, OutOfRetries error
# or by get_socket. RuntimeError, EHOSTUNREACH errno 118
s = TAG+f"Error: {e}"
print(s)
logger_aht.error(s)
return # do nothing
if state.my_debug:
s = TAG+"Data Sent"
logger_aht.info(s)
print(s)
print()
s = TAG+f"response= {response}, len(response)= {len(response)}, type(response)= {type(response)}"
print(s)
logger_aht.info(s)
if response:
le = len(response)
if le == 3:
status = int(response)
print()
print(TAG,end='')
if status == 404:
s = "error 404: file not found on this server."
print(s)
logger_aht.warning(s)
elif status != 200 and status != 404:
s = f"response= {response}"
print(s)
logger_aht.info(s)
else:
status = -1
state.stat_result = "OK" if status == 200 else "NO"
s1 = "response="
s2 = "{} (= {})".format(status, state.stat_result)
if state.my_debug:
print()
s = TAG+f"{s1} {s2}"
print(s)
logger_aht.info(s)
#blink_leds(RED)
s =TAG+f"send result: {status}"
if status == 200:
logger_aht.info(s)
#blink_leds(GRN)
else:
logger_aht.warning(s)
#pr_msg(["send result = "+s2])
response = None
if state.my_debug:
print(TAG+f"upd_dt= {upd_dt}, tmp_s= {tmp_s}, hum_s= {hum_s}")
pr_th_msg(state)
return ret
def pr_msg(msg):
text_items = []
le_max = 5
if isinstance(msg, list):
le_msg = len(msg)
if le_msg > 0 and le_msg <= le_max:
for _ in range(len(msg)):
text_items.append(msg[_])
le = len(text_items)
if le_max - le > 0: # fill up with blank lines
for _ in range(le_max - le):
text_items.append(" ")
for i in range(len(text_items)):
s = text_items[i] # Display the dt, tm and tz
print(s)
logger_aht.info(s)
def setup(state: object):
TAG = "setup(): "
if state.get_INT_RTC :
if state.my_debug:
logger_aht.info(TAG+"Going to get datetime from internal (SYS) RTC")
get_INT_RTC(state)
# Create instances of the sensor_tmp and sensor_hum classes
state.tmp_cl = sensor_tmp(0.00, "") # create class instance with default values
state.hum_cl = sensor_hum(0.00, "") # same
gc.collect()
def main():
TAG= "main(): "
loopnr = 0
setup(state)
# state.curr_tm = _time.time() # set curr_tm for the first time
state.curr_timestamp = get_rnd_timestamp()
start = True
test_msg(state)
s = "\n\nAdafruit Gamepad QT test:\n"
logger_qt.info(s)
s = "\n\nAdafruit AHT20 test:\n"
logger_aht.info(s)
s = f"DataToGoogle interval: {state.INTERVAL_SECONDS} seconds"
print(s)
logger_aht.info(s)
if state.aht20_present:
logger_aht.info('logged values of temperature and humidity from Adafruit AHT20 sensor:')
if state.INTERVAL_SECONDS is None:
state.INTERVAL_SECONDS = 3600 # Set for 1 hour
elif state.INTERVAL_SECONDS == 0:
state.INTERVAL_SECONDS = 3600 # Same
interval = state.INTERVAL_SECONDS
togo_send_secs_old = 0.00
togo_temphum_secs_old = 0.00
do_temphum = False
do_send = False
interval_short = 120 # default: 3600 // 6 = 600 seconds = 10 minutes. See setup().
interval_long = interval # interval # default: 3600 seconds = 1 hour
future_send_timestamp = calc_future_timestamp(datetime.now(), interval_long)
future_temphum_timestamp = calc_future_timestamp(datetime.now(), interval_short)
while True:
try:
loopnr += 1
# print(f"\nLoopnr: {loopnr}")
ck_qt_btns(state)
# _time.sleep(0.5)
if state.SYS_RTC_is_set:
curr_timestamp = get_rnd_timestamp()
do_send = True if curr_timestamp >= future_send_timestamp else False
do_temphum = True if curr_timestamp >= future_temphum_timestamp else False
togo_temphum_secs = future_temphum_timestamp - curr_timestamp
togo_send_secs = future_send_timestamp - curr_timestamp
if togo_temphum_secs_old != togo_temphum_secs:
togo_temphum_secs_old = togo_temphum_secs
if togo_temphum_secs % 10 == 0:
print()
print(TAG+"next get_temphum() in: {:4d} seconds".format(togo_temphum_secs))
if togo_send_secs_old != togo_send_secs:
togo_send_secs_old = togo_send_secs
if togo_send_secs % 10 == 0:
print(TAG+"next SendToGoogle in: {:4d} seconds".format(togo_send_secs))
if start or do_temphum:
if state.msg_sent:
state.msg_sent = False
get_temphum(state)
future_temphum_timestamp = calc_future_timestamp(datetime.now(), interval_short) # in practice we will use var 'interval'
if (start or (do_send) or (not state.tempSensor)) and (not state.msg_sent):
start = False
#blink_leds() # switch off LEDs
get_th_direct(state)
gc.collect()
if not send_to_pushingbox(state):
s = TAG+f"send data to Pushingbox.com failed!"
logger_aht.error(s)
raise KeyboardInterrupt
else:
state.msg_sent = True
future_send_timestamp = calc_future_timestamp(datetime.now(), interval_long)
# if c2 == 30:
# sys.exit() # Temporary forced end of execution
#_time.sleep(1)
except KeyboardInterrupt:
s = TAG+"KeyboardInterrrupt. Exiting..."
print(s)
logger_qt.info(s)
logger_aht.info(s)
break
if loopnr >= 1000:
loopnr = 0
# break
sys.exit()
if __name__ == '__main__':
main()
2024-04-18 22:39:10 - RPi5_aht20_test - INFO - -------------------------------------------------------
2024-04-18 22:39:10 - RPi5_aht20_test - INFO - New run:
2024-04-18 22:39:10 - RPi5_aht20_test - INFO - from python script: "raspi5_datatogoogle_v2.py"
2024-04-18 22:39:10 - RPi5_aht20_test - INFO - Runtime: Thursday, April 18, 2024, at: 10:39:10 pm
2024-04-18 22:39:10 - RPi5_aht20_test - INFO - -------------------------------------------------------
2024-04-18 22:39:10 - RPi5_aht20_test - INFO - board id: RASPBERRY_PI_5
2024-04-18 22:39:10 - RPi5_aht20_test - INFO - using Adafruit_Python_extended_Bus. Using I2C bus #3
2024-04-18 22:39:10 - RPi5_aht20_test - INFO - AHT20 sensor is present
2024-04-18 22:39:10 - RPi5_aht20_test - INFO - get_INT_RTC(): 4/18/2024
2024-04-18 22:39:10 - RPi5_aht20_test - INFO - get_INT_RTC(): 22:39:10 weekday: Thursday
2024-04-18 22:39:10 - RPi5_aht20_test - INFO -
Adafruit AHT20 test:
2024-04-18 22:39:10 - RPi5_aht20_test - INFO - DataToGoogle interval: 3600 seconds
2024-04-18 22:39:10 - RPi5_aht20_test - INFO - logged values of temperature and humidity from Adafruit AHT20 sensor:
2024-04-18 22:39:11 - RPi5_aht20_test - INFO - get_temphum(): last update: 2024-04-18T22:39:11Z
2024-04-18 22:39:11 - RPi5_aht20_test - INFO - get_temphum(): temperature: 26.06 °C
2024-04-18 22:39:11 - RPi5_aht20_test - INFO - get_temphum(): humidity: 55.88 %
2024-04-18 22:39:13 - RPi5_aht20_test - INFO - send_to_pushingbox(): send result: 200
2024-04-18 22:39:13 - RPi5_aht20_test - INFO - response to send: OK
2024-04-18 22:39:13 - RPi5_aht20_test - INFO - -----------------------
2024-04-18 22:39:13 - RPi5_aht20_test - INFO - 2024-04-18T22:39:11Z
2024-04-18 22:39:13 - RPi5_aht20_test - INFO - Temperature: 26.0624 C
2024-04-18 22:39:13 - RPi5_aht20_test - INFO - Humidity: 55.8798 %
2024-04-18 22:39:13 - RPi5_aht20_test - INFO - -----------------------
2024-04-18 22:39:36 - RPi5_aht20_test - INFO - About to reboot the RASPBERRY_PI_5
2024-04-18 22:39:36 - RPi5_aht20_test - WARNING - Are you sure? (Y/n)+<Enter>:
2024-04-18 22:39:38 - RPi5_aht20_test - INFO - You answered: 'n'
2024-04-18 22:39:38 - RPi5_aht20_test - INFO - not rebooting
2024-04-18 22:39:48 - RPi5_aht20_test - INFO - main(): KeyboardInterrrupt. Exiting...
2024-04-18 22:39:10 - RPi5_gamepadqt_test - INFO - -------------------------------------------------------
2024-04-18 22:39:10 - RPi5_gamepadqt_test - INFO - New run:
2024-04-18 22:39:10 - RPi5_gamepadqt_test - INFO - from python script: "raspi5_datatogoogle_v2.py"
2024-04-18 22:39:10 - RPi5_gamepadqt_test - INFO - Runtime: Thursday, April 18, 2024, at: 10:39:10 pm
2024-04-18 22:39:10 - RPi5_gamepadqt_test - INFO - -------------------------------------------------------
2024-04-18 22:39:10 - RPi5_gamepadqt_test - INFO - board id: RASPBERRY_PI_5
2024-04-18 22:39:10 - RPi5_gamepadqt_test - INFO - using Adafruit_Python_extended_Bus. Using I2C bus #3
2024-04-18 22:39:10 - RPi5_gamepadqt_test - INFO - Gamepad QT is present
2024-04-18 22:39:10 - RPi5_gamepadqt_test - INFO - get_INT_RTC(): 4/18/2024
2024-04-18 22:39:10 - RPi5_gamepadqt_test - INFO - get_INT_RTC(): 22:39:10 weekday: Thursday
2024-04-18 22:39:10 - RPi5_gamepadqt_test - INFO -
Adafruit Gamepad QT test:
2024-04-18 22:39:11 - RPi5_gamepadqt_test - INFO - joystick: (x, y)= (504, 510)
2024-04-18 22:39:28 - RPi5_gamepadqt_test - INFO - Button A pressed
2024-04-18 22:39:30 - RPi5_gamepadqt_test - INFO - Button B pressed
2024-04-18 22:39:31 - RPi5_gamepadqt_test - INFO - Button X pressed
2024-04-18 22:39:32 - RPi5_gamepadqt_test - INFO - Button Y pressed
2024-04-18 22:39:32 - RPi5_gamepadqt_test - INFO - Button Y pressed
2024-04-18 22:39:34 - RPi5_gamepadqt_test - INFO - Button Select pressed
2024-04-18 22:39:36 - RPi5_gamepadqt_test - INFO - Button Start pressed
2024-04-18 22:39:36 - RPi5_gamepadqt_test - INFO - About to reboot the RASPBERRY_PI_5
2024-04-18 22:39:36 - RPi5_gamepadqt_test - WARNING - Are you sure? (Y/n)+<Enter>:
2024-04-18 22:39:38 - RPi5_gamepadqt_test - INFO - You answered: 'n'
2024-04-18 22:39:38 - RPi5_gamepadqt_test - INFO - not rebooting
2024-04-18 22:39:41 - RPi5_gamepadqt_test - INFO - joystick: (x, y)= (504, 353)
2024-04-18 22:39:42 - RPi5_gamepadqt_test - INFO - joystick: (x, y)= (102, 3)
2024-04-18 22:39:42 - RPi5_gamepadqt_test - INFO - joystick: (x, y)= (504, 489)
2024-04-18 22:39:43 - RPi5_gamepadqt_test - INFO - joystick: (x, y)= (504, 495)
2024-04-18 22:39:48 - RPi5_gamepadqt_test - INFO - main(): KeyboardInterrrupt. Exiting...
/*
* -----------------------------------------------
* Originally published by Mogsdad@Stackoverflow
* Modified for jarkomdityaz.appspot.com
* Adapted for use on an Arduino Nano RP2040 Connect.
* 2022-08-01 adapted for use on an Adafruit MAGTAG datetime script by @Paulskpt Github.
* 2024-04-17 adapted for use on a Raspberry Pi. Added column D for board_id
* 2024-04-17 Created a new deployment (nr 8)
* -----------------------------------------------
*
* GET request query:
* https://script.google.com/macros/s/<URL_of_the_script_deployment>/exec/?date=$date&temp=temp&hum=hum$&id=$id$*
*
* 2024-03-24: Renamed copy of original spreadsheet. New name: "TempHumAHT20_fm_RASPI5"
* https://docs.google.com/spreadsheets/d/<spreadsheet_id>/edit#gid=0
*
* Example with value:
* https://script.google.com/macros/s/<URL_of_the_script_deployment>/exec?date="2024-05-07T19:54:06Z"&temp=28.3878&hum=53.8874&id="RASPBERRY_PI_5"
*
* Using spreadsheet API
*
*/
function doGet(e) {
// Example of received data string: "date=\"2022-08-09T19:54:06Z\"&temp=28.087806701660156&hum=53.8874626159668";
Logger.log( JSON.stringify(e) ); // view parameters
var result = 'Ok'; // assume success
if (e.parameter == undefined) {
result = 'No Parameters';
}
else
{ // Spreadsheet filename: TempHumAHT20_fm_RASPI5
var id = '<spreadsheet_id>' // Spreadsheet ID
var sheet = SpreadsheetApp.openById(id).getActiveSheet();
var newRow = sheet.getLastRow() + 1;
var rowData = [];
for (var param in e.parameter)
{
Logger.log('In for loop, param='+param);
var value = stripQuotes(e.parameter[param]);
Logger.log(param + ':' + e.parameter[param]);
Logger.log("value (stripped)= "); // +value.toString);
Logger.log(value);
switch (param)
{
case 'date': // column_A Date (updated_at) e.g.: 2022-08-09T02:05:27Z
rowData[0] = value;
break;
case 'temp': // column_B Temperature
rowData[1] = value;
break;
case 'hum': // column_C Humidity
rowData[2] = value;
break;
case 'id': // column_D board_id
rowData[3] = value;
break;
default:
result = "unsupported parameter";
}
}
rowData.push(new Date()); // Add the datetime of execution of this macro
Logger.log(JSON.stringify(rowData));
// Write new row below
var newRange = sheet.getRange(newRow, 1, 1, rowData.length);
newRange.setValues([rowData]);
}
// Return result of operation
return ContentService.createTextOutput(result);
}
/**
* Remove leading and trailing single or double quotes
*/
function stripQuotes( value ) {
return value.replace(/^["']|['"]$/g, "");
}
(env) pi@raspi5B-8GB:~/projects/rpi5_tmp_hum $ python3.11 raspi5_datatogoogle_v2.py
-------------------------------------------------------
New run:
from python script: "raspi5_datatogoogle_v2.py"
Runtime: Thursday, April 18, 2024, at: 11:38:41 pm
-------------------------------------------------------
gamepad_test(): We're going to test the Gamepad QT and the AHT20 sensor with this RASPBERRY_PI_5.
Press any of the buttons (X, Y, A, B, Select or Start) on the Gamepad QT.
To reboot RASPBERRY_PI_5 press Gamepad QT button Start.
DataToGoogle interval: 3600 seconds
joystick: (x, y)= (504, 495)
main(): next get_temphum() in: 120 seconds
main(): next SendToGoogle in: 3600 seconds
get_temphum(): last update: 2024-04-18T23:38:42Z
get_temphum(): temperature: 26.10 °C
get_temphum(): humidity: 56.38 %
send_to_pushingbox(): response to send: OK
-----------------------
2024-04-18T23:38:42Z
Temperature: 26.1044 C
Humidity: 56.3754 %
-----------------------
main(): next get_temphum() in: 110 seconds
main(): next SendToGoogle in: 3590 seconds
main(): next get_temphum() in: 100 seconds
main(): next SendToGoogle in: 3580 seconds
main(): next get_temphum() in: 90 seconds
main(): next SendToGoogle in: 3570 seconds
[...]
main(): next get_temphum() in: 10 seconds
main(): next SendToGoogle in: 3490 seconds
main(): next get_temphum() in: 0 seconds
get_temphum(): last update: 2024-04-18T23:40:41Z
get_temphum(): temperature: 26.11 °C
get_temphum(): humidity: 56.40 %
main(): next get_temphum() in: 120 seconds
main(): next SendToGoogle in: 3480 seconds
main(): next get_temphum() in: 110 seconds
main(): next SendToGoogle in: 3470 seconds
main(): next get_temphum() in: 100 seconds
main(): next SendToGoogle in: 3460 seconds
[...]
main(): next get_temphum() in: 20 seconds
main(): next SendToGoogle in: 3380 seconds
main(): next get_temphum() in: 10 seconds
main(): next SendToGoogle in: 3370 seconds
main(): next get_temphum() in: 0 seconds
get_temphum(): last update: 2024-04-18T23:42:41Z
get_temphum(): temperature: 26.11 °C
get_temphum(): humidity: 56.43 %
main(): next get_temphum() in: 120 seconds
main(): next SendToGoogle in: 3360 seconds
main(): next get_temphum() in: 110 seconds
main(): next SendToGoogle in: 3350 seconds
main(): next get_temphum() in: 100 seconds
main(): next SendToGoogle in: 3340 seconds
main(): next get_temphum() in: 90 seconds
main(): next SendToGoogle in: 3330 seconds
[...]
main(): next get_temphum() in: 20 seconds
main(): next SendToGoogle in: 3260 seconds
main(): next get_temphum() in: 10 seconds
main(): next SendToGoogle in: 3250 seconds
main(): next get_temphum() in: 0 seconds
get_temphum(): last update: 2024-04-18T23:44:41Z
get_temphum(): temperature: 26.22 °C
get_temphum(): humidity: 56.03 %
main(): next get_temphum() in: 120 seconds
main(): next SendToGoogle in: 3240 seconds
main(): next get_temphum() in: 110 seconds
main(): next SendToGoogle in: 3230 seconds
main(): next get_temphum() in: 100 seconds
main(): next SendToGoogle in: 3220 seconds
main(): next get_temphum() in: 90 seconds
main(): next SendToGoogle in: 3210 seconds
[...]
main(): next get_temphum() in: 20 seconds
main(): next SendToGoogle in: 3140 seconds
main(): next get_temphum() in: 10 seconds
main(): next SendToGoogle in: 3130 seconds
main(): next get_temphum() in: 0 seconds
get_temphum(): last update: 2024-04-18T23:46:41Z
get_temphum(): temperature: 26.22 °C
get_temphum(): humidity: 56.94 %
main(): next get_temphum() in: 120 seconds
main(): next SendToGoogle in: 3120 seconds
main(): next get_temphum() in: 110 seconds
main(): next SendToGoogle in: 3110 seconds
main(): next get_temphum() in: 100 seconds
main(): next SendToGoogle in: 3100 seconds
main(): next get_temphum() in: 90 seconds
main(): next SendToGoogle in: 3090 seconds
main(): KeyboardInterrrupt. Exiting...
MY_DEBUG = "0"
USE_TAG = "1"
LOG_QT = "1" # Send GamepadQT data to gamepadqt.log
LOG_AHT = "1" # Send AHT20 data to aht20.log
INTERVAL_SECONDS = 3600 # = 1 hour - set for a multiple of 60
PUSHING_BOX_DEVID = "<Your_Pushingbox.com_Virtual_Scnerario_ID>" # e.g. "vDDA.......253C"
AHT20_LOGFILE = "aht20.log"
GAMEPAD_LOGFILE = "gamepadqt.log"
AHT20_LOGGER_NAME = "RPi5_aht20_test"
GAMEPAD_LOGGER_NAME = "RPi5_gamepadqt_test"
@PaulskPt
Copy link
Author

PaulskPt commented Apr 18, 2024

Hardware: I2C wiring
See the images of pushingbox.com, Google Apps Scripts script and Google Sheets spreadsheet: tools
See the script outline
Image of console output and data in spreadsheet: moment of data sent to spreadsheet

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment