Skip to content

Instantly share code, notes, and snippets.

@singlecheeze
Last active May 24, 2022 19:44
Show Gist options
  • Save singlecheeze/702b876a6e1b56aaa45bdba47c8e897a to your computer and use it in GitHub Desktop.
Save singlecheeze/702b876a6e1b56aaa45bdba47c8e897a to your computer and use it in GitHub Desktop.
PowerTrac
import asyncio
import json
import os
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
from collections import OrderedDict
from custom_encoder import CustomEncoder
from datetime import datetime, timedelta
from db import DB

"""
https://github.com/bemasher/rtlamr
"""

ROUTER_URL = "ws://192.168.1.50:9443/ws"
ROUTER_REALM = "realm1"

DB_HOST = "192.168.1.51"
DB_DATABASE = "power"
DB_USER = "dave"
DB_PASSWORD = "password"


class SessionComponent(ApplicationSession):
    def __init__(self, config):
        ApplicationSession.__init__(self, config)
        self.loop = asyncio.get_event_loop()
        self.db = DB(host=DB_HOST, database=DB_DATABASE, user=DB_USER, password=DB_PASSWORD)

    async def onJoin(self, details):
        try:
            # await self.db.create_tables()
            await asyncio.sleep(5)
            print("Joined {} @ {}!".format(ROUTER_REALM, ROUTER_URL))

            # TODO: Comment For Debug ===============
            get_readings = "com.powertrac.get_readings"
            await self.register(self.get_readings, get_readings)
            print("Registered: {}".format(get_readings))

            tasks = [asyncio.ensure_future(self.tail_readings())]
            await asyncio.wait(tasks)
            # TODO: =================================

            # TODO: For Debug
            # self.subscribe(self.got_readings, "com.powertrac.reading_event")
            # print("Subscribed: {}".format("com.powertrac.reading_event"))
            # await self.get_readings()
        except Exception as e:
            print(e)

    def onLeave(self, details):
        self.disconnect()
        print("Left {} @ {}!".format(ROUTER_REALM, ROUTER_URL))

    # async def got_readings(self):
    #     print(json.dumps(await self.get_readings(), sort_keys=True, indent=3, cls=CustomEncoder))

    async def tail_readings(self):
        tail = await asyncio.create_subprocess_shell("/home/dave/go/bin/rtlamr -msgtype=scm+ --format=json --filterid=69206873",
                                                     stdout=asyncio.subprocess.PIPE,
                                                     stderr=asyncio.subprocess.STDOUT,
                                                     loop=self.loop,
                                                     cwd=os.getcwd(),
                                                     env=os.environ.copy())
        while True:
            try:
                stdout = await tail.stdout.readline()
                if stdout:
                    text = stdout.decode()
                    text_dict = json.loads(text)
                    await self.insert_reading(text_dict)

                    print(json.dumps(text_dict, indent=3, sort_keys=True))
                    self.publish("com.powertrac.reading_event")
                else:
                    await asyncio.sleep(3)
            except Exception as e:
                print(e)

    async def get_readings(self, start_date=None, end_date=None):
        try:
            readings = self.db.readings
            default_start = datetime.now() + timedelta(days=-1)

            query_params = readings.select().where(readings.c.time >= default_start).order_by(readings.c.time.asc())
            if start_date and end_date:
                query_params = readings.select().where(readings.c.time >= start_date,
                                                       readings.c.time < end_date).order_by(readings.c.time.asc())

            d = []
            last_row_index_with_delta = 0
            data = await self.db.query(query_params)
            for index, reading_row in enumerate(data):
                r = OrderedDict()
                r["group"] = "raw"
                r["consumption"] = reading_row["consumption"]
                r["time"] = reading_row["time"]
                d.append(r)

                if index > 0 and reading_row["consumption"] > data[last_row_index_with_delta]["consumption"]:
                    last_data_point_time = data[last_row_index_with_delta]["time"]
                    time_delta_seconds = (reading_row["time"] - last_data_point_time).total_seconds()
                    avg_duration_datetime = reading_row["time"] - timedelta(seconds=(time_delta_seconds / 2))
                    time_delta_hours = (time_delta_seconds / 3600)

                    # NOTE: Meter reads in decawatt; divide the power value by 100 to get kilowatt
                    last_data_point_consumption = data[last_row_index_with_delta]["consumption"]
                    consumption_delta_kilowatt = (reading_row["consumption"] - last_data_point_consumption) / 100

                    avg_kwh_consumption = (consumption_delta_kilowatt / time_delta_hours)
                    last_row_index_with_delta = index

                    a = OrderedDict()
                    a["group"] = "avg"
                    a["consumption"] = consumption_delta_kilowatt
                    a["time"] = avg_duration_datetime
                    d.append(a)

                    a_kwh = OrderedDict()
                    a_kwh["group"] = "avg_kwh"
                    a_kwh["consumption"] = avg_kwh_consumption
                    a_kwh["time"] = avg_duration_datetime
                    d.append(a_kwh)

            return json.dumps(d, sort_keys=True, cls=CustomEncoder)

        except Exception as e:
            print(e)

    """
    $ go/bin/rtlamr -msgtype=scm+ --format=json --filterid=69206873
    {
        "Time":"2019-01-25T16:36:22.635038758-05:00",
        "Offset":0,
        "Length":0,
        "Type":"SCM+",
        "Message":{
            "FrameSync":5795,
            "ProtocolID":30,
            "EndpointType":7,
            "EndpointID":69206873,
            "Consumption":5086180,
            "Tamper":512,
            "PacketCRC":33947
        }
    }
    """
    async def insert_reading(self, text_dict):
        for reading_row in await self.db.query(self.db.readings.insert().values(
                packet_crc=text_dict["Message"]["PacketCRC"],
                tamper=text_dict["Message"]["Tamper"],
                consumption=text_dict["Message"]["Consumption"],
                endpoint_id=text_dict["Message"]["EndpointID"],
                endpoint_type=text_dict["Message"]["EndpointType"],
                protocol_id=text_dict["Message"]["ProtocolID"],
                frame_sync=text_dict["Message"]["FrameSync"],
                type=text_dict["Type"],
                length=text_dict["Length"],
                offset=text_dict["Offset"],
                time=text_dict["Time"]).returning(self.db.readings.c.id)):
            print(json.dumps(reading_row, indent=3))


if __name__ == "__main__":
    runner = ApplicationRunner(url=ROUTER_URL, realm=ROUTER_REALM)
    runner.run(SessionComponent)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment