Skip to content

Instantly share code, notes, and snippets.

@atx
Created August 1, 2019 11:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save atx/ef4df535f81389dea4905761ae7e05f7 to your computer and use it in GitHub Desktop.
Save atx/ef4df535f81389dea4905761ae7e05f7 to your computer and use it in GitHub Desktop.
Hacky script to export Mozilla webthings gateway values to influxdb
#! /usr/bin/env python3
import argparse
import asyncio
import configparser
import datetime
import influxdb
import json
import pathlib
import requests
import websockets
from urllib.parse import urljoin
class Translator:
def __init__(self, config):
self.gateway_url = config["gateway"]["url"]
self.gateway_token = config["gateway"]["token"]
self.session = requests.Session()
self.session.headers["Accept"] = "application/json"
self.session.headers["Authorization"] = \
"Bearer {}".format(config["gateway"]["token"])
self.connected_to = set()
self.influx = influxdb.InfluxDBClient(
host=config["influx"]["host"],
port=int(config["influx"]["port"])
)
self.influx.switch_database(config["influx"]["database"])
def update_things(self):
loop = asyncio.get_event_loop()
jsn = self.session.get(urljoin(self.gateway_url, "/things")).json()
for thing in jsn:
ws_url = urljoin(self.gateway_url.replace("http", "ws"),
thing["href"] + "?jwt={}".format(self.gateway_token))
if ws_url in self.connected_to:
continue
print("Connecting to {}".format(thing["href"]))
self.connected_to.add(ws_url)
loop.create_task(self._thing_task(thing["href"], ws_url))
return jsn
@staticmethod
def _metric_name(thing_name, property_name):
return thing_name + "_" + property_name
def _send_measurement(self, thing_name, property_name, value):
metric_name = self.__class__._metric_name(thing_name, property_name)
timestamp = datetime.datetime.utcnow().isoformat() + "Z"
print("Sending {} = {} @{}".format(metric_name, value, timestamp))
json_body = {
"measurement": metric_name,
"time": timestamp,
"fields": {
"value": value
}
}
self.influx.write_points([json_body])
async def _thing_task(self, thing_name, ws_url):
async with websockets.connect(ws_url) as ws:
while True:
msg = json.loads(await ws.recv())
if msg["messageType"] != "propertyStatus":
continue
for property_name, property_value in msg["data"].items():
if not isinstance(property_value, float):
property_value = float(property_value)
self._send_measurement(thing_name, property_name, property_value)
async def run(self):
while True:
self.update_things()
await asyncio.sleep(10)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", type=pathlib.Path,
default=(pathlib.Path(__file__).parent / "config.ini"))
args = parser.parse_args()
print("Loading config file from '{}'".format(args.config))
config = configparser.RawConfigParser()
config.read(args.config)
print("Starting translator...")
translator = Translator(config)
loop = asyncio.get_event_loop()
loop.run_until_complete(translator.run())
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment