Skip to content

Instantly share code, notes, and snippets.

@deverton
Created September 13, 2022 08:48
Show Gist options
  • Save deverton/d17e302a162fa79f775b784bbe4edabc to your computer and use it in GitHub Desktop.
Save deverton/d17e302a162fa79f775b784bbe4edabc to your computer and use it in GitHub Desktop.
Simple Python script to fetch the Fronius Load Management status from web interface and post it to Home Assistant
#!/usr/bin/env python3
import requests
import http.client as http_client
import logging
import hashlib
import os
import re
import threading
import time
# from enum import Enum
from requests.auth import AuthBase
from requests.compat import urlparse
from requests.cookies import extract_cookies_to_jar
from requests.utils import parse_dict_header
MAX_GPIOS = 4
# FIXME: Not sure if this is all values yet
# class LoadStatusReason(Enum):
# PSTR_EMRS_REASON_0 = "disabled by config"
# PSTR_EMRS_REASON_1 = "PSTR_EMRS_REASON_1"
# PSTR_EMRS_REASON_2 = "PSTR_EMRS_REASON_2"
# PSTR_EMRS_REASON_3 = "minimum consecutive ON-time running"
# PSTR_EMRS_REASON_4 = "power crosses low-limit"
# PSTR_EMRS_REASON_5 = "power crosses high-limit"
# PSTR_EMRS_REASON_6 = "intended runtime requires activation"
# Copied from the requests lib because Fronius uses x-www-authenticate as the header rather than www-authenticate
class HTTPFroniusDigestAuth(AuthBase):
"""Attaches HTTP Digest Authentication to the given Request object."""
def __init__(self, username, password):
self.username = username
self.password = password
# Keep state in per-thread local storage
self._thread_local = threading.local()
def init_per_thread_state(self):
# Ensure state is initialized just once per-thread
if not hasattr(self._thread_local, "init"):
self._thread_local.init = True
self._thread_local.last_nonce = ""
self._thread_local.nonce_count = 0
self._thread_local.chal = {}
self._thread_local.pos = None
self._thread_local.num_401_calls = None
def build_digest_header(self, method, url):
"""
:rtype: str
"""
realm = self._thread_local.chal["realm"]
nonce = self._thread_local.chal["nonce"]
qop = self._thread_local.chal.get("qop")
algorithm = self._thread_local.chal.get("algorithm")
opaque = self._thread_local.chal.get("opaque")
hash_utf8 = None
if algorithm is None:
_algorithm = "MD5"
else:
_algorithm = algorithm.upper()
# lambdas assume digest modules are imported at the top level
if _algorithm == "MD5" or _algorithm == "MD5-SESS":
def md5_utf8(x):
if isinstance(x, str):
x = x.encode("utf-8")
return hashlib.md5(x).hexdigest()
hash_utf8 = md5_utf8
elif _algorithm == "SHA":
def sha_utf8(x):
if isinstance(x, str):
x = x.encode("utf-8")
return hashlib.sha1(x).hexdigest()
hash_utf8 = sha_utf8
elif _algorithm == "SHA-256":
def sha256_utf8(x):
if isinstance(x, str):
x = x.encode("utf-8")
return hashlib.sha256(x).hexdigest()
hash_utf8 = sha256_utf8
elif _algorithm == "SHA-512":
def sha512_utf8(x):
if isinstance(x, str):
x = x.encode("utf-8")
return hashlib.sha512(x).hexdigest()
hash_utf8 = sha512_utf8
KD = lambda s, d: hash_utf8(f"{s}:{d}") # noqa:E731
if hash_utf8 is None:
return None
# XXX not implemented yet
entdig = None
p_parsed = urlparse(url)
#: path is request-uri defined in RFC 2616 which should not be empty
path = p_parsed.path or "/"
if p_parsed.query:
path += f"?{p_parsed.query}"
A1 = f"{self.username}:{realm}:{self.password}"
A2 = f"{method}:{path}"
HA1 = hash_utf8(A1)
HA2 = hash_utf8(A2)
if nonce == self._thread_local.last_nonce:
self._thread_local.nonce_count += 1
else:
self._thread_local.nonce_count = 1
ncvalue = f"{self._thread_local.nonce_count:08x}"
s = str(self._thread_local.nonce_count).encode("utf-8")
s += nonce.encode("utf-8")
s += time.ctime().encode("utf-8")
s += os.urandom(8)
cnonce = hashlib.sha1(s).hexdigest()[:16]
if _algorithm == "MD5-SESS":
HA1 = hash_utf8(f"{HA1}:{nonce}:{cnonce}")
if not qop:
respdig = KD(HA1, f"{nonce}:{HA2}")
elif qop == "auth" or "auth" in qop.split(","):
noncebit = f"{nonce}:{ncvalue}:{cnonce}:auth:{HA2}"
respdig = KD(HA1, noncebit)
else:
# XXX handle auth-int.
return None
self._thread_local.last_nonce = nonce
# XXX should the partial digests be encoded too?
base = (
f'username="{self.username}", realm="{realm}", nonce="{nonce}", '
f'uri="{path}", response="{respdig}"'
)
if opaque:
base += f', opaque="{opaque}"'
if algorithm:
base += f', algorithm="{algorithm}"'
if entdig:
base += f', digest="{entdig}"'
if qop:
base += f', qop="auth", nc={ncvalue}, cnonce="{cnonce}"'
return f"Digest {base}"
def handle_redirect(self, r, **kwargs):
"""Reset num_401_calls counter on redirects."""
if r.is_redirect:
self._thread_local.num_401_calls = 1
def handle_401(self, r, **kwargs):
"""
Takes the given response and tries digest-auth, if needed.
:rtype: requests.Response
"""
# If response is not 4xx, do not auth
# See https://github.com/psf/requests/issues/3772
if not 400 <= r.status_code < 500:
self._thread_local.num_401_calls = 1
return r
if self._thread_local.pos is not None:
# Rewind the file position indicator of the body to where
# it was to resend the request.
r.request.body.seek(self._thread_local.pos)
s_auth = r.headers.get("x-www-authenticate", "")
if "digest" in s_auth.lower() and self._thread_local.num_401_calls < 2:
self._thread_local.num_401_calls += 1
pat = re.compile(r"digest ", flags=re.IGNORECASE)
self._thread_local.chal = parse_dict_header(pat.sub("", s_auth, count=1))
# Consume content and release the original connection
# to allow our new request to reuse the same one.
r.content
r.close()
prep = r.request.copy()
extract_cookies_to_jar(prep._cookies, r.request, r.raw)
prep.prepare_cookies(prep._cookies)
prep.headers["Authorization"] = self.build_digest_header(
prep.method, prep.url
)
_r = r.connection.send(prep, **kwargs)
_r.history.append(r)
_r.request = prep
return _r
self._thread_local.num_401_calls = 1
return r
def __call__(self, r):
# Initialize per-thread state, if needed
self.init_per_thread_state()
# If we have a saved nonce, skip the 401
if self._thread_local.last_nonce:
r.headers["Authorization"] = self.build_digest_header(r.method, r.url)
try:
self._thread_local.pos = r.body.tell()
except AttributeError:
# In the case of HTTPDigestAuth being reused and the body of
# the previous request was a file-like object, pos has the
# file position of the previous body. Ensure it's set to
# None.
self._thread_local.pos = None
r.register_hook("response", self.handle_401)
r.register_hook("response", self.handle_redirect)
self._thread_local.num_401_calls = 1
return r
def __eq__(self, other):
return all(
[
self.username == getattr(other, "username", None),
self.password == getattr(other, "password", None),
]
)
def __ne__(self, other):
return not self == other
def main():
# other interesting path is /config/emrs/ which returns the load management config
# Debug logging
http_client.HTTPConnection.debuglevel = 1
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
r = requests.get("http://INVERTER_IP/status/emrs/", auth=HTTPFroniusDigestAuth('admin', 'INVERTER_ADMIN_PASSWORD'))
gpios = r.json()['Body']['Data']['emrs']['gpios']
for i in range(1, MAX_GPIOS+1):
pin_name = format(f'pin{i}')
pin = gpios[pin_name]
# reason = LoadStatusReason[pin['reason']]
# state = "on" if pin['state'] else "off"
# print(f'Pin {i} is {state} because {reason.value}')
headers = {
"Authorization": "Bearer HOME_ASSISTANT_API_TOKEN",
"Content-Type": "application/json",
}
r = requests.post(format(f'http://HOME_ASSISTANT_SERVER:8123/api/states/sensor.fronius_load_management_{pin_name}'), headers=headers, json={
"state": pin['reason']
})
print(r.text)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment