Skip to content

Instantly share code, notes, and snippets.

@eni23
Created December 15, 2022 21:03
Show Gist options
  • Save eni23/8ac6cb896f35ae9f6a0699d34c300341 to your computer and use it in GitHub Desktop.
Save eni23/8ac6cb896f35ae9f6a0699d34c300341 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import asyncio
import bleak
import igsparser
import influxdb_client
VALID_SENSORS={
"80:6F:B0:C5:4B:35": {
"name": "draussen",
"fields": [
"battery",
"humidity",
"temperature"
]
}
}
INFLUX_URL="http://8.8.8.8:8086"
INFLUX_TOKEN="123qwe"
INFLUX_ORG="bucket"
INFLUX_BUCKET="temp"
async def main():
stop_event = asyncio.Event()
influx_client = influxdb_client.InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG,
bucket=INFLUX_BUCKET
)
influx = influx_client.write_api()
def callback(device, advertising_data):
if device.address not in VALID_SENSORS:
return
# TODO: do it nicer
dlen = list(advertising_data.manufacturer_data)[0]
mfdata = advertising_data.manufacturer_data[dlen]
data = igsparser.MsdParser.parse(b'\r\x00'+mfdata)
settings = VALID_SENSORS[device.address]
point = influxdb_client.Point("climate")
point.tag("device", settings["name"])
for field in settings["fields"]:
point.field(field, vars(data).get(field))
influx.write(org=INFLUX_ORG, bucket=INFLUX_BUCKET, record=point)
influx.flush()
print(point)
pass
async with bleak.BleakScanner(callback) as scanner:
await stop_event.wait()
asyncio.run(main())
async-timeout==4.0.2
bleak==0.19.5
certifi==2022.12.7
dbus-fast==1.82.0
influxdb-client==1.35.0
ingics-message-parser @ git+https://github.com/ingics/ingics-message-parser-py@1b5500f03ed57575d9cf43acb4f046ac5d1cc903
python-dateutil==2.8.2
reactivex==4.0.4
six==1.16.0
typing_extensions==4.4.0
urllib3==1.26.13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment