Skip to content

Instantly share code, notes, and snippets.

@grimmo
Last active April 9, 2020 10:03
Show Gist options
  • Save grimmo/a41d6bd5562b1e057b48abeecb37ceb5 to your computer and use it in GitHub Desktop.
Save grimmo/a41d6bd5562b1e057b48abeecb37ceb5 to your computer and use it in GitHub Desktop.
Monitoring cella lievitazione
#!/usr/bin/env python3
# Get values from Inkbird IBS-TH1 and write them to JSON
from time import sleep
import json
from bluepy import btle
import boto3
import logging
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.DEBUG,
datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger('cella')
mac = "<SENSOR MAC ADDRESS>"
read_interval = 150
def float_value(nums):
# check if temp is negative
num = (nums[1]<<8)|nums[0]
if nums[1] == 0xff:
num = -( (num ^ 0xffff ) + 1)
return float(num) / 100
def c_to_f(temperature_c):
return 9.0/5.0 * temperature_c + 32
def get_readings():
try:
dev = btle.Peripheral(mac, addrType=btle.ADDR_TYPE_PUBLIC)
#readings = dev.readCharacteristic(0x28)
readings = dev.readCharacteristic(0x002d)
return readings
except Exception as e:
logger.error("Error reading BTLE: {}".format(e))
return False
def submit_metric(namespace, metric_name, dimensions, value, unit='Count'):
try:
client = boto3.client('cloudwatch')
response = client.put_metric_data(
Namespace=namespace,
MetricData=[
{
'MetricName': metric_name,
'Dimensions': dimensions,
'Value': value,
'Unit': unit
},
]
)
except Exception as e:
logger.error("Error submitting metric: {}".format(e))
pass
print("Mi avvio...")
print("Aggiornamento ogni %i secondi" % read_interval)
while True:
readings = get_readings()
if not readings:
j = open('sux.json','w')
logger.warning("'sensor':'cellalievitazione', 'temperature':NaN, 'humidity': NaN, 'connected': False")
j.write(json.dumps({'id':'cella','temp':'unknown','hum':'unknown'}))
j.close()
continue
#logging.debug("raw data: {}".format(readings))
# little endian, first two bytes are temp_c, second two bytes are humidity
temperature_c = float_value(readings[0:2])
humidity = float_value(readings[2:4])
#temperature_f = c_to_f(temperature_c)
logger.info("'id':'cellalievitazione', 'temperature': temperature_c[{:0.2f}], 'humidity': humidity[{:0.2f}], 'connected': True".format(temperature_c, humidity))
j = open('sux.json','w')
j.write(json.dumps({'id':'cella','temp':temperature_c,'hum':humidity}))
j.close()
sleep(read_interval)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment