Skip to content

Instantly share code, notes, and snippets.

@io53
Last active September 14, 2021 09:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save io53/84069d8838472742ad442c595de9807f to your computer and use it in GitHub Desktop.
Save io53/84069d8838472742ad442c595de9807f to your computer and use it in GitHub Desktop.
Fetch and parse data from Ruuvi Gateway
import asyncio
import aiohttp
from simple_ruuvitag import decoder
def parse(data):
data = data["data"]
out = {}
for mac in data["tags"]:
rssi = data["tags"][mac]["rssi"]
raw = data["tags"][mac]["data"]
try:
cIdx = raw.index("FF9904")
except ValueError:
print("ruuvi company id not found in data")
continue
broadcast = raw[cIdx+6:]
df = broadcast[0:2]
if df == "03":
rt = decoder.get_decoder(3).decode_data(broadcast)
elif df == "05":
rt = decoder.get_decoder(5).decode_data(broadcast)
else:
print("dataformat "+df+" is not supported")
continue
rt["rssi"] = rssi
out[mac] = rt
return out
async def fetchData(ip, pollRate):
async with aiohttp.ClientSession() as session:
async with session.get('http://'+ip+'/history?time='+str(pollRate)) as response:
if response.status == 200:
data = await response.json()
return parse(data)
else:
print("Response status:", response.status)
async def main():
data = await fetchData("10.99.20.9", 30)
print(data)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment