Last active
May 18, 2024 08:38
-
-
Save and7ey/af7a53f491365111d35ee89fdee89e46 to your computer and use it in GitHub Desktop.
Home Assistant pyscript to get data from OICO / Etalok UK
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
service: pyscript.oico_get_counters | |
data: | |
address_id: 310379 | |
counters: | |
- counter_id: 123456 # электроэнергия | |
mqtt_topic1: 'oico/energy-meter/t1' | |
mqtt_topic2: 'oico/energy-meter/t2' | |
mqtt_topic3: 'oico/energy-meter/t3' | |
- counter_id: 7890123 # отопление | |
mqtt_topic1: 'oico/heating-meter' | |
- counter_id: 112233 # холодная вода, туалет | |
mqtt_topic1: 'oico/water-meter/toilet-cold' | |
- counter_id: 445566 # горячая вода, туалет | |
mqtt_topic1: 'oico/water-meter/toilet-hot' | |
- counter_id: 778899 # холодная вода, ванная | |
mqtt_topic1: 'oico/water-meter/bathroom-cold' | |
- counter_id: 443322 # горячая вода, ванная | |
mqtt_topic1: 'oico/water-meter/bathroom-hot' | |
service: pyscript.oico_get_price | |
data: | |
address_id: 310379 | |
services: | |
- name: "Холодное водоснабжение" | |
topic: "oico/water/cold/price" | |
- name: "Водоотведение " | |
topic: "oico/water/out/price" | |
- name: "Отопление" | |
topic: "oico/heating/price" | |
""" | |
import aiohttp | |
import base64 | |
import json | |
url = 'https://oico.app/prod/api/m/' | |
async def auth(brand): | |
log.info(f'Starting to authorize at oico server, brand {brand}...') | |
url_auth = url + 'account/dev' | |
auth_req_data = {'devType':'W','pushToken':'firebaseSecretToken','brand':brand,'ver':'1.9.1'} | |
async with aiohttp.ClientSession() as session: | |
async with session.post(url_auth, json=auth_req_data) as resp: | |
if resp.status == 200: | |
data_str = resp.text() | |
data_json = json.loads(data_str) | |
auth_token = data_json.get('auth_Token', '') | |
log.debug(f'Auth token received: {auth_token}') | |
auth_token_b64 = base64.b64encode(bytes(auth_token, 'utf-8')) | |
auth_token_b64 = auth_token_b64.decode('utf-8') | |
log.debug(auth_token_b64) | |
return auth_token_b64 | |
else: | |
log.error(f'Failed to authorize, the error code is {resp.status}') | |
log.info('Finished authorization process') | |
@service | |
def oico_get_counters(address_id, counters, brand='ETALON'): | |
auth_token_b64 = auth(brand) | |
log.info(f'Starting to get counters values for address {address_id}...') | |
url_get_counters = url + f'gku/counter/{address_id}' | |
headers = {'Authorization':f'Basic {auth_token_b64}'} | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url_get_counters, headers=headers) as resp: | |
if resp.status == 200: | |
data_str = resp.text() | |
log.debug(data_str) | |
data_json = json.loads(data_str) | |
for counter in counters: | |
for c in data_json: | |
if c.get('factory_num', '') == str(counter['counter_id']): # номер счетчика | |
counter_type = c.get('service_str', None) # тип счетчика, "ХВС" | |
counter_id = counter['counter_id'] | |
counter_t1_current_val = c.get('current_val', None) # Т1 (день), "1.0000" | |
counter_t2_current_val = c.get('current_val2', None) # T2 (ночь), "2.0000" | |
counter_t3_current_val = c.get('current_val3', None) # T3 (пик), "3.0000" | |
counter_val_date = c.get('current_val_date', None) # дата передачи показаний, "24.03.2023" | |
if counter_t1_current_val: counter_t1_current_val = float(counter_t1_current_val) | |
if counter_t2_current_val: counter_t2_current_val = float(counter_t2_current_val) | |
if counter_t3_current_val: counter_t3_current_val = float(counter_t3_current_val) | |
log.info(f'Counter {counter_id} values on {counter_val_date}: 1 - {counter_t1_current_val}, 2 - {counter_t2_current_val}, 3 - {counter_t3_current_val}') | |
log.debug('Sending MQTT message to update sensors') | |
# https://hacs-pyscript.readthedocs.io/en/stable/reference.html#calling-services | |
if counter.get('mqtt_topic1', None) and counter_t1_current_val: | |
service.call('mqtt', 'publish', topic=counter['mqtt_topic1'], payload=counter_t1_current_val, retain=True) | |
if counter.get('mqtt_topic2', None) and counter_t2_current_val: | |
service.call('mqtt', 'publish', topic=counter['mqtt_topic2'], payload=counter_t2_current_val, retain=True) | |
if counter.get('mqtt_topic3', None) and counter_t3_current_val: | |
service.call('mqtt', 'publish', topic=counter['mqtt_topic3'], payload=counter_t3_current_val, retain=True) | |
else: | |
log.error(f'Failed to get counters values, the error code is {resp.status}') | |
log.info(f'Finished to get counters values') | |
async def get_latest_bill_id(auth_token_b64, address_id, brand='ETALON'): | |
log.info(f'Starting to get latest bill id for address {address_id}...') | |
url_get_latest_bill_id = url + f'gku/bill/{address_id}/detail' | |
headers = {'Authorization':f'Basic {auth_token_b64}'} | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url_get_latest_bill_id, headers=headers) as resp: | |
if resp.status == 200: | |
data_str = resp.text() | |
log.debug(data_str) | |
data_json = json.loads(data_str) | |
if data_json[0] and data_json[0].get('bill_id', None): | |
bill_id = data_json[0].get('bill_id') | |
bill_date = data_json[0].get('trans_date') # "2023-03-01T00:00:00.000Z" | |
bill_month = data_json[0].get('month_str') # "Март 2023" | |
log.info(f'Latest bill is {bill_id} for {bill_month}') | |
return bill_id | |
else: | |
log.error(f'Failed to get bill details, the error code is {resp.status}') | |
log.info(f'Finished to get latest bill id') | |
async def get_bill_details(auth_token_b64, bill_id, brand='ETALON'): | |
log.info(f'Starting to get bill {bill_id} details...') | |
url_get_bill_details = url + f'gku/bill/detail/{bill_id}' | |
headers = {'Authorization':f'Basic {auth_token_b64}'} | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url_get_bill_details, headers=headers) as resp: | |
if resp.status == 200: | |
data_str = resp.text() | |
log.debug(data_str) | |
data_json = json.loads(data_str) | |
prices = {} | |
for c in data_json: | |
service_name = c.get('service_name', None) | |
price = c.get('price', None) | |
if service_name and price: | |
prices[service_name] = price | |
log.debug(f'Стоимость {service_name} {price}') | |
return prices | |
# if c.get('service_name', '') == service_name: | |
# price = c.get('price', '') | |
# log.info(f'Стоимость {service_name} {price}') | |
# return price | |
else: | |
log.error(f'Failed to get bill details, the error code is {resp.status}') | |
log.info(f'Finished to get bill details') | |
@service | |
def oico_get_price(address_id, services=None, brand='ETALON'): | |
auth_token_b64 = auth(brand) | |
bill_id = get_latest_bill_id(auth_token_b64, address_id) | |
prices = get_bill_details(auth_token_b64, bill_id) | |
log.debug('Sending MQTT message to update sensors') | |
for s in services: | |
n = s['name'] | |
t = s['topic'] | |
if n and t and prices.get(n, None): | |
service.call('mqtt', 'publish', topic=t, payload=prices[n], retain=True) | |
# the code below is needed just for local testing | |
# import asyncio | |
# import sys | |
# import logging as log | |
# log.basicConfig(stream=sys.stdout, level=log.DEBUG) | |
# asyncio.run(get_counters(address_id=112233)) | |
# asyncio.run(get_bill_details(bill_id=445566)) | |
# asyncio.run(get_latest_bill_id(address_id=112233)) | |
# asyncio.run(get_price(address_id=112233)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment