Skip to content

Instantly share code, notes, and snippets.

@ProfAndreaPollini
Created May 4, 2024 06:53
Show Gist options
  • Save ProfAndreaPollini/02a70f4dc9618ce1560f944f13dbb8fa to your computer and use it in GitHub Desktop.
Save ProfAndreaPollini/02a70f4dc9618ce1560f944f13dbb8fa to your computer and use it in GitHub Desktop.
mds
# import sqlite3 as sq
# from datetime import datetime
# import json
# def get_last_data(days=1):
# rows = []
# with sq.connect("db.sqlite") as conn:
# conn.row_factory = sq.Row
# c = conn.cursor()
# condition = f"orario >= datetime('now', 'localtime', '-{days} day')"
# if days == 1:
# condition = f"date('orario', 'localtime') = date('now', 'localtime')"
# sql = f"""SELECT
# *
# FROM
# misurazioni
# WHERE
# {condition}
# ORDER BY orario ASC"""
# res = c.execute(sql)
# res = [dict(x) for x in res.fetchall()]
# for i in range(len(res)):
# res[i]["orario"] = res[i]["orario"].strip().replace("Z", " ").replace("T", " ")
# res[i]["orario"] = str(datetime.fromisoformat(res[i]["orario"].strip()))
# rows = res
# return rows
# data = {
# "today":get_last_data(days=1),
# "last_week":get_last_data(days=7),
# "last_update":datetime.now().isoformat(),
# "last_month":get_last_data(days=30)
# }
# # remove data.json if it exists
# import os
# if os.path.exists("data.json"):
# os.remove("data.json")
# with open("data.json", "w") as f:
# json.dump(data, f, indent=2)
import sqlite3 as sq
from datetime import datetime
import json
from tqdm import tqdm
def get_last_data(days=1):
rows = []
with sq.connect("db.sqlite") as conn:
conn.row_factory = sq.Row
c = conn.cursor()
condition = f"orario >= datetime('now', 'localtime', '-{days} day')"
if days == 1:
condition = f"date('orario', 'localtime') = date('now', 'localtime')"
sql = f"""SELECT
*
FROM
misurazioni
WHERE
{condition}
ORDER BY orario ASC"""
res = c.execute(sql)
res = [dict(x) for x in res.fetchall()]
for i in range(len(res)):
res[i]["orario"] = res[i]["orario"].strip().replace("Z", " ").replace("T", " ")
res[i]["orario"] = str(datetime.fromisoformat(res[i]["orario"].strip()))
misurazioni = res
if len(misurazioni) == 0:
return
df = []
for misurazione in tqdm(misurazioni):
# misurazione["orario"] =
misurazione = dict(misurazione)
valvole = [misurazione[v] for v in ["valvola_1", "valvola_2", "valvola_3", "valvola_4","valvola_5", "valvola_6"]]
vin=0
try:
vin = valvole.index(1)+1
except:
pass
vout=0
try:
vout = valvole.index(2)+1
except:
pass
# for v in ["valvola_1", "valvola_2", "valvola_3", "valvola_4","valvola_5", "valvola_6"]:
# del misurazione[v]
misurazione["vin"] = vin
misurazione["vout"] = vout
id_misurazione = misurazione["id"]
#print('misurazione:', orario)
c = conn.cursor()
rs = c.execute('SELECT * FROM misurazioni_sensori JOIN sensori ON (misurazioni_sensori.id_sensore = sensori.id) WHERE id_misurazione = ?', (id_misurazione,))
sensori_aggiuntivi = {}
for row in rs.fetchall():
# print(row["descrizione"], row["valore"])
sensori_aggiuntivi[row["descrizione"]] = row["valore"]
# misurazione_completa = {**dict(misurazioni),**dict(sensori_aggiuntivi)}
dict_misurazioni = dict(misurazione) | sensori_aggiuntivi
df.append(dict_misurazioni)
# df = pd.DataFrame(df)
# df.to_json("misurazioni.json", orient="records")
rows = res
return df
data = {
"today":get_last_data(days=1),
"last_week":get_last_data(days=7),
"last_update":datetime.now().isoformat(),
"last_month":get_last_data(days=30)
}
# remove data.json if it exists
import os
if os.path.exists("data.json"):
os.remove("data.json")
with open("data.json", "w") as f:
json.dump(data, f, indent=2)
import sqlite3 as sq
import json
import os
from datetime import datetime
import pandas as pd
from tqdm import tqdm
def get_last_measurement():
with sq.connect("db.sqlite") as conn:
conn.row_factory = sq.Row
c = conn.cursor()
sql = f"""SELECT
*
FROM
misurazioni
ORDER BY orario DESC
LIMIT 1"""
res = c.execute(sql)
res = res.fetchone()
res = dict(res)
res["orario"] = res["orario"].strip().replace("Z", " ").replace("T", " ")
res["orario"] = str(datetime.fromisoformat(res["orario"].strip()))
#c = conn.cursor()
#rs = c.execute(f"SELECT * FROM misurazioni where substr(orario,0,8) = '{year}'")
df = []
misurazione = res
# misurazione["orario"] =
#misurazione = dict(misurazione)
valvole = [misurazione[v] for v in ["valvola_1", "valvola_2", "valvola_3", "valvola_4","valvola_5", "valvola_6"]]
vin=0
try:
vin = valvole.index(1)+1
except:
pass
vout=0
try:
vout = valvole.index(2)+1
except:
pass
#for v in ["valvola_1", "valvola_2", "valvola_3", "valvola_4","valvola_5", "valvola_6"]:
# del misurazione[v]
misurazione["vin"] = vin
misurazione["vout"] = vout
#print(valvole)
#convert misurazione["orario"] to datetime
#orario = datetime.strptime(misurazione["orario"].strip(), '%Y-%m-%d %H:%M:%S')
#misurazione["orario"] = orario
id_misurazione = misurazione["id"]
print('misurazione:', misurazione)
c = conn.cursor()
rs = c.execute('SELECT * FROM misurazioni_sensori JOIN sensori ON (misurazioni_sensori.id_sensore = sensori.id) WHERE id_misurazione = ?', (id_misurazione,))
sensori_aggiuntivi = {}
for row in rs.fetchall():
# print(row["descrizione"], row["valore"])
sensori_aggiuntivi[row["descrizione"]] = row["valore"]
# misurazione_completa = {**dict(misurazioni),**dict(sensori_aggiuntivi)}
dict_misurazioni = dict(misurazione) | sensori_aggiuntivi
df.append(dict_misurazioni)
df = pd.DataFrame(df)
df.to_json("misurazioni.json", orient="records")
return res
# remove last.json file if exists
if os.path.exists("last.json"):
os.remove("last.json")
with open("last.json", "w") as f:
json.dump(get_last_measurement(), f, indent=2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment