import asyncio
import json
import os
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
from collections import OrderedDict
from custom_encoder import CustomEncoder
from datetime import datetime, timedelta
from db import DB
"""
https://github.com/bemasher/rtlamr
"""
ROUTER_URL = "ws://192.168.1.50:9443/ws"
ROUTER_REALM = "realm1"
DB_HOST = "192.168.1.51"
DB_DATABASE = "power"
DB_USER = "dave"
DB_PASSWORD = "password"
class SessionComponent(ApplicationSession):
def __init__(self, config):
ApplicationSession.__init__(self, config)
self.loop = asyncio.get_event_loop()
self.db = DB(host=DB_HOST, database=DB_DATABASE, user=DB_USER, password=DB_PASSWORD)
async def onJoin(self, details):
try:
# await self.db.create_tables()
await asyncio.sleep(5)
print("Joined {} @ {}!".format(ROUTER_REALM, ROUTER_URL))
# TODO: Comment For Debug ===============
get_readings = "com.powertrac.get_readings"
await self.register(self.get_readings, get_readings)
print("Registered: {}".format(get_readings))
tasks = [asyncio.ensure_future(self.tail_readings())]
await asyncio.wait(tasks)
# TODO: =================================
# TODO: For Debug
# self.subscribe(self.got_readings, "com.powertrac.reading_event")
# print("Subscribed: {}".format("com.powertrac.reading_event"))
# await self.get_readings()
except Exception as e:
print(e)
def onLeave(self, details):
self.disconnect()
print("Left {} @ {}!".format(ROUTER_REALM, ROUTER_URL))
# async def got_readings(self):
# print(json.dumps(await self.get_readings(), sort_keys=True, indent=3, cls=CustomEncoder))
async def tail_readings(self):
tail = await asyncio.create_subprocess_shell("/home/dave/go/bin/rtlamr -msgtype=scm+ --format=json --filterid=69206873",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
loop=self.loop,
cwd=os.getcwd(),
env=os.environ.copy())
while True:
try:
stdout = await tail.stdout.readline()
if stdout:
text = stdout.decode()
text_dict = json.loads(text)
await self.insert_reading(text_dict)
print(json.dumps(text_dict, indent=3, sort_keys=True))
self.publish("com.powertrac.reading_event")
else:
await asyncio.sleep(3)
except Exception as e:
print(e)
async def get_readings(self, start_date=None, end_date=None):
try:
readings = self.db.readings
default_start = datetime.now() + timedelta(days=-1)
query_params = readings.select().where(readings.c.time >= default_start).order_by(readings.c.time.asc())
if start_date and end_date:
query_params = readings.select().where(readings.c.time >= start_date,
readings.c.time < end_date).order_by(readings.c.time.asc())
d = []
last_row_index_with_delta = 0
data = await self.db.query(query_params)
for index, reading_row in enumerate(data):
r = OrderedDict()
r["group"] = "raw"
r["consumption"] = reading_row["consumption"]
r["time"] = reading_row["time"]
d.append(r)
if index > 0 and reading_row["consumption"] > data[last_row_index_with_delta]["consumption"]:
last_data_point_time = data[last_row_index_with_delta]["time"]
time_delta_seconds = (reading_row["time"] - last_data_point_time).total_seconds()
avg_duration_datetime = reading_row["time"] - timedelta(seconds=(time_delta_seconds / 2))
time_delta_hours = (time_delta_seconds / 3600)
# NOTE: Meter reads in decawatt; divide the power value by 100 to get kilowatt
last_data_point_consumption = data[last_row_index_with_delta]["consumption"]
consumption_delta_kilowatt = (reading_row["consumption"] - last_data_point_consumption) / 100
avg_kwh_consumption = (consumption_delta_kilowatt / time_delta_hours)
last_row_index_with_delta = index
a = OrderedDict()
a["group"] = "avg"
a["consumption"] = consumption_delta_kilowatt
a["time"] = avg_duration_datetime
d.append(a)
a_kwh = OrderedDict()
a_kwh["group"] = "avg_kwh"
a_kwh["consumption"] = avg_kwh_consumption
a_kwh["time"] = avg_duration_datetime
d.append(a_kwh)
return json.dumps(d, sort_keys=True, cls=CustomEncoder)
except Exception as e:
print(e)
"""
$ go/bin/rtlamr -msgtype=scm+ --format=json --filterid=69206873
{
"Time":"2019-01-25T16:36:22.635038758-05:00",
"Offset":0,
"Length":0,
"Type":"SCM+",
"Message":{
"FrameSync":5795,
"ProtocolID":30,
"EndpointType":7,
"EndpointID":69206873,
"Consumption":5086180,
"Tamper":512,
"PacketCRC":33947
}
}
"""
async def insert_reading(self, text_dict):
for reading_row in await self.db.query(self.db.readings.insert().values(
packet_crc=text_dict["Message"]["PacketCRC"],
tamper=text_dict["Message"]["Tamper"],
consumption=text_dict["Message"]["Consumption"],
endpoint_id=text_dict["Message"]["EndpointID"],
endpoint_type=text_dict["Message"]["EndpointType"],
protocol_id=text_dict["Message"]["ProtocolID"],
frame_sync=text_dict["Message"]["FrameSync"],
type=text_dict["Type"],
length=text_dict["Length"],
offset=text_dict["Offset"],
time=text_dict["Time"]).returning(self.db.readings.c.id)):
print(json.dumps(reading_row, indent=3))
if __name__ == "__main__":
runner = ApplicationRunner(url=ROUTER_URL, realm=ROUTER_REALM)
runner.run(SessionComponent)
Last active
May 24, 2022 19:44
-
-
Save singlecheeze/702b876a6e1b56aaa45bdba47c8e897a to your computer and use it in GitHub Desktop.
PowerTrac
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment