-
-
Save cgtobi/703b8e1ebc465424df1625213dfb44c9 to your computer and use it in GitHub Desktop.
Self test script for pyatmo 4
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import logging | |
# from PIL import Image | |
# import io | |
import pyatmo # for version >=3.2.0 | |
from pyatmo.auth import ClientAuth, ALL_SCOPES | |
IGNORE_TYPES = ["When", "date_max_temp", "date_min_temp"] | |
# monkey patch the post_request function to gather data | |
def new_post_request(auth, url, params=None, timeout=10): | |
import requests | |
import json | |
from pprint import pformat | |
import pyatmo | |
def scrmbl(data): | |
keywords = { | |
"password": "my-secret-password", | |
"client_id": "1234a56789b01c2d345ef67a", | |
"client_secret": "TH1S1SMYV3RYPR1V4T3CL13NTS33CR3T", | |
"username": "user.name@mail.com", | |
"access_token": "a1b2c3d4e5f6a7b8c9d0e1f2a30|a09b87c65d43e21f09a87b65c43d2", | |
} | |
output = data | |
for keyword in keywords: | |
if not isinstance(resp, str): | |
continue | |
if keyword in output: | |
output[keyword] = keywords[keyword] | |
return output | |
if "http://" in url: | |
resp = requests.post(url, data=params, timeout=timeout) | |
else: | |
resp = auth._oauth.post(url=url, params=params, timeout=timeout) | |
if not resp.ok: | |
LOG.error("The Netatmo API returned %s", resp.status_code) | |
LOG.debug("Netato API error: %s", resp) | |
resp = ( | |
resp.json() | |
if "application/json" in resp.headers.get("content-type") | |
else resp.content | |
) | |
LOG.debug("url: %s", url) | |
if params: | |
LOG.debug("params: %s", json.dumps(scrmbl(params))) | |
LOG.debug("Response:") | |
if isinstance(resp, str) or isinstance(resp, dict): | |
LOG.debug(json.dumps(scrmbl(resp))) | |
if isinstance(resp, bytes): | |
raise pyatmo.ApiError | |
return resp | |
ClientAuth.post_request.__code__ = new_post_request.__code__ | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--CLIENT_ID", required=True) | |
parser.add_argument("--CLIENT_SECRET", required=True) | |
parser.add_argument("--USERNAME", required=True) | |
parser.add_argument("--PASSWORD", required=True) | |
parser.add_argument( | |
"--modules", | |
nargs="*", | |
default="Energy Public Camera WeatherStation HomeCoach Smokedetector", | |
help="List of supported modules, e.g. " | |
"'Energy Public Camera WeatherStation HomeCoach Smokedetector'", | |
) | |
parser.add_argument( | |
"--mode", default="test", choices=["test", "debug", "long"], help="" | |
) | |
parser.parse_args() | |
args = parser.parse_args() | |
if args.mode == "debug": | |
logging.basicConfig(level=logging.DEBUG, format="%(message)s") | |
LOG = logging.getLogger(__name__) | |
LOG.debug("DEBUG") | |
elif args.mode == "long": | |
logging.basicConfig(level=logging.INFO, format="%(message)s") | |
LOG = logging.getLogger(__name__) | |
LOG.debug("LONG") | |
else: | |
logging.basicConfig(level=logging.WARNING, format="%(message)s") | |
LOG = logging.getLogger(__name__) | |
scope = " ".join(ALL_SCOPES) | |
authorization = pyatmo.ClientAuth( | |
client_id=args.CLIENT_ID, | |
client_secret=args.CLIENT_SECRET, | |
username=args.USERNAME, | |
password=args.PASSWORD, | |
scope=scope, | |
) | |
print(args.modules) | |
if "webhook" in args.modules: | |
webhook_url = "http://foo.bar" | |
authorization.addwebhook(webhook_url) | |
authorization.dropwebhook() | |
if "WeatherStation" in args.modules: | |
try: | |
LOG.warning("\nChecking for available weather stations...") | |
LOG.debug("<pyatmo.WeatherStation>") | |
LOG.debug("<pyatmo.WeatherStationData>") | |
weather_data = pyatmo.WeatherStationData(authorization) | |
LOG.warning(weather_data.stations.keys()) | |
stations = [ | |
(i, s["station_name"]) for i, s in weather_data.stations.items() | |
] | |
for _id, station in stations: | |
LOG.warning(" %s (%s)", station, _id) | |
modules = weather_data.get_modules(station_id=_id) | |
for m_id in modules: | |
LOG.warning( | |
" %s %s (%s)", | |
modules[m_id]["station_name"], | |
modules[m_id]["module_name"], | |
m_id, | |
) | |
data = weather_data.get_last_data(station_id=_id) | |
LOG.debug("lastData() %s", data) | |
if not data: | |
break | |
for sensor in weather_data.get_monitored_conditions(module_id=m_id): | |
if sensor not in IGNORE_TYPES: | |
LOG.info( | |
" %s - %s", | |
sensor, | |
data.get(m_id, {}).get(sensor, None), | |
) | |
LOG.warning("WeatherStation [OK]") | |
except pyatmo.NoDevice: | |
LOG.error("warning, no weather station available for testing") | |
if "HomeCoach" in args.modules: | |
try: | |
LOG.warning("\nChecking for available home coach stations...") | |
LOG.debug("<pyatmo.HomeCoach>") | |
LOG.debug("<pyatmo.HomeCoachData>") | |
home_coach_data = pyatmo.HomeCoachData(authorization) | |
stations = [ | |
(i, s["station_name"]) for i, s in home_coach_data.stations.items() | |
] | |
for _id, station in stations: | |
LOG.warning(" %s (%s)", station, _id) | |
LOG.warning( | |
" Modules %s", home_coach_data.get_module_names(station_id=_id) | |
) | |
data = home_coach_data.get_last_data(station_id=_id) | |
if not data: | |
LOG.info(" No data available") | |
continue | |
for sensor in data[_id]: | |
LOG.info(" %s - %s", sensor, data[_id][sensor]) | |
LOG.warning("HomeCoach [OK]") | |
except pyatmo.NoDevice: | |
LOG.error("warning, no home coach station available for testing") | |
if "Camera" in args.modules: | |
try: | |
LOG.warning("\nChecking for available cameras...") | |
LOG.debug("<pyatmo.Camera>") | |
LOG.debug("<pyatmo.CameraData>") | |
home_data = pyatmo.HomeData(authorization) | |
homes = [(i, d["name"]) for i, d in home_data.homes.items()] | |
LOG.warning("CAMERA HOMES: %s", homes) | |
camera_data = pyatmo.CameraData(authorization) | |
LOG.warning("camera_data") | |
TYPES = {"NACamera": "Welcome", "NOC": "Presence", "NDB": "Doorbell"} | |
for hid, home in homes: | |
LOG.warning(" %s (%s)", home, hid) | |
cameras = [ | |
(i, c["name"], TYPES[c["type"]]) | |
for i, c in camera_data.cameras[hid].items() | |
] | |
if not cameras: | |
continue | |
for c_id, c_name, c_type in cameras: | |
LOG.warning(' %s Camera "%s" (%s)', c_type, c_name, c_id) | |
LOG.debug("<pyatmo.CameraData.cameraUrls>") | |
vpn, local = camera_data.camera_urls(camera_id=c_id) | |
LOG.info(" VPN URL <%s>", vpn) | |
LOG.info(" Local URL <%s>", local) | |
# for event_id, event in camera_data.events.get(c_id).items(): | |
# if "snapshot" in event: | |
# im_bytes, im_type = camera_data.get_camera_picture( | |
# event["snapshot"]["id"], event["snapshot"]["key"] | |
# ) | |
# im = Image.open(io.BytesIO(im_bytes)) | |
# out = im.resize((10, 10)) | |
# with io.BytesIO() as output: | |
# out.save(output, format="JPEG") | |
# LOG.debug(output.getvalue()) | |
# break | |
at_home = camera_data.persons_at_home(home_id=hid) | |
if at_home: | |
LOG.info(" Persons at home %s", at_home) | |
LOG.warning("Camera [OK]") | |
except pyatmo.NoDevice: | |
LOG.error("warning, no camera available for testing") | |
except IndexError: | |
LOG.error("warning, no default camera specified") | |
if "Smokedetector" in args.modules: | |
try: | |
LOG.warning("\nChecking for available smoke detectors...") | |
LOG.debug("<pyatmo.Camera>") | |
LOG.debug("<pyatmo.CameraData>") | |
home_data = pyatmo.HomeData(authorization) | |
homes = [(i, d["name"]) for i, d in home_data.homes.items()] | |
smoke_data = pyatmo.CameraData(authorization) | |
TYPES = {"NSD": "Smoke detector"} | |
for hid, home in homes: | |
LOG.warning(" %s (%s)", home, hid) | |
smokedetectors = [ | |
(i, s["name"], TYPES[s["type"]]) | |
for i, s in smoke_data.smokedetectors[hid].items() | |
] | |
for s_id, s_name, *_ in smokedetectors: | |
LOG.warning(' Smoke detector "%s" (%s)', s_name, s_id) | |
LOG.warning("Smoke detector [OK]") | |
except pyatmo.NoDevice: | |
LOG.error("warning, no smoke detectors available for testing") | |
except IndexError: | |
LOG.error("warning, no default smoke detector specified") | |
if "Public" in args.modules: | |
try: | |
LOG.warning("\nChecking for public data...") | |
LOG.debug("<pyatmo.PublicData>") | |
pd = pyatmo.PublicData( | |
authorization, lat_ne=50.23, lon_ne=8.80, lat_sw=50.20, lon_sw=8.79 | |
) | |
LOG.warning(" Average temperature: %.2f°C", pd.get_average_temperature()) | |
LOG.warning(" Average humidity: %.2f %%", pd.get_average_humidity()) | |
LOG.warning(" Available stations: %i", pd.stations_in_area()) | |
LOG.warning("Public [OK]") | |
except pyatmo.NoDevice: | |
LOG.error("no public weather data available for testing") | |
if "Energy" in args.modules: | |
try: | |
LOG.warning("\nChecking for available thermostats/valves...") | |
LOG.debug("<pyatmo.Thermostat>") | |
LOG.debug("<pyatmo.HomeData>") | |
home_data = pyatmo.HomeData(authorization) | |
TYPES = {"NATherm1": "Thermostat", "NRV": "Valve"} | |
homes = [(i, d["name"]) for i, d in home_data.homes.items()] | |
for hid, home in homes: | |
LOG.debug("<pyatmo.HomeStatus> %s (%s)", home, hid) | |
try: | |
home_status = pyatmo.HomeStatus(authorization, home_id=hid) | |
except pyatmo.NoDevice: | |
LOG.debug("no devices for %s (%s)", home, hid) | |
continue | |
LOG.warning(" %s (%s)", home, hid) | |
plug = {} | |
for m in home_data.homes[hid]["modules"]: | |
if m.get("type") == "NAPlug": | |
plug = m | |
if "NAPlug" == plug.get("type"): | |
LOG.warning( | |
" %s %s (%s)", | |
plug.get("type"), | |
plug.get("name"), | |
plug.get("id"), | |
) | |
rooms = [(i, d["name"]) for i, d in home_data.rooms[hid].items()] | |
for rid, room in rooms: | |
modules = [ | |
(i, d) | |
for i, d in home_data.modules[hid].items() | |
if d.get("room_id") == rid and d.get("type") in TYPES | |
] | |
if modules: | |
r_temp = home_status.measured_temperature(rid) | |
LOG.warning(" %s (%s)", room, rid) | |
LOG.info(" Measured room temperature %s°C", r_temp) | |
for mid, module in modules: | |
if ( | |
module.get("room_id") == rid | |
and module.get("type") in TYPES | |
): | |
LOG.warning( | |
" %s %s (%s)", | |
TYPES[module["type"]], | |
module["name"], | |
mid, | |
) | |
if module.get("type") == "NATherm1": | |
LOG.info( | |
" Boilerstatus: %s", | |
home_status.boiler_status(mid), | |
) | |
LOG.info( | |
" Setpoint Mode: %s", | |
home_status.get_room(rid)[ | |
"therm_setpoint_mode" | |
], | |
) | |
LOG.info( | |
" Battery Level: %s", | |
home_status.get_thermostat(mid).get( | |
"battery_level" | |
), | |
) | |
else: | |
LOG.info( | |
" Heating Request: %s", | |
home_status.get_room(rid).get( | |
"heating_power_request" | |
), | |
) | |
LOG.info( | |
" Setpoint Mode: %s", | |
home_status.get_room(rid).get( | |
"therm_setpoint_mode" | |
), | |
) | |
LOG.info( | |
" Battery Level: %s", | |
home_status.get_valve(mid).get("battery_level"), | |
) | |
LOG.debug("<pyatmo.HomeStatus> %s") | |
try: | |
home_status = pyatmo.HomeStatus(authorization, homes[0][0]) | |
except pyatmo.InvalidHome: | |
LOG.debug("error when checking home status") | |
LOG.warning("Energy [OK]") | |
except pyatmo.NoDevice: | |
LOG.error("warning, no energy devices available for testing") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment