# Scratch pad for working with Milacares API for monitoring and controlling their air purifier devices.
# Based on the code from for PKCE code verifier and challenge.
import base64
import hashlib
import html
import json
import os
import re
import urllib.parse
import requests
import math
import time
REDIRECT_URI = "milacares://"
CLIENT_ID = "prod-ui"
PASSWORD = "super-secret"
access_token = None
def get_new_access_token():
Gets user access token using OAuth Code flow with PKCE
# PKCE code verifier and challenge
We need a code verifier, which is a long enough random alphanumeric string, only to be used "client side". We'll use a simple urandom/base64 trick to generate one:
code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8')
code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier)
code_verifier, len(code_verifier)
To create the PKCE code challenge we hash the code verifier with SHA256 and encode the result in URL-safe base64 (without padding)
code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8')
code_challenge = code_challenge.replace('=', '')
code_challenge, len(code_challenge)
# Create random Nonce and State for login request
nonce = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8')
state = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8')
# Request login page
We now have all the pieces for the initial request, which will give us the login page of the authentication provider. Adding the code challenge signals to the OAuth provider that we are expecting the PKCE based flow.
resp = requests.get(
url=LOGIN_PROVIDER + "/auth/realms/prod/protocol/openid-connect/auth",
"response_type": "code",
"client_id": CLIENT_ID,
"nonce": nonce,
"scope": "openid,profile",
"redirect_uri": REDIRECT_URI,
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
print("GET login page response status", resp.status_code)
# Parse login page (response)
Get cookie data from response headers (requires a bit of manipulation).
cookie = resp.headers['Set-Cookie']
cookie = '; '.join(c.split(';')[0] for c in cookie.split(', '))
# print(cookie)
Extract the login URL to post to from the page HTML code. Because the the Keycloak login page is straightforward HTML we can get away with some simple regexes.
page = resp.text
form_action = html.unescape('<form\s+.*?\s+action="(.*?)"', page, re.DOTALL).group(1))
# print(form_action)
# Do the login (aka authenticate)
Now, we post the login form with the user we created earlier, passing it the extracted cookie as well.
resp =
"username": USERNAME,
"password": PASSWORD,
headers={"Cookie": cookie},
print("POST login response status", resp.status_code)
As expected we are forwarded, let's get the redirect URL.
redirect = resp.headers['Location']
print("Redirect", redirect)
assert redirect.startswith(REDIRECT_URI)
# Extract authorization code from redirect
The redirect URL contains the authentication code.
query = urllib.parse.urlparse(redirect).query
redirect_params = urllib.parse.parse_qs(query)
# print(redirect_params)
auth_code = redirect_params['code'][0]
print("Auth Code: ", auth_code)
# Exchange authorization code for an access token
We can now exchange the authorization code for an access token. In the normal OAuth authorization flow we should include a static secret here, but instead we provide the code verifier here which acts proof that the initial request was done by us.
resp =
url=LOGIN_PROVIDER + "/auth/realms/prod/protocol/openid-connect/token",
"grant_type": "authorization_code",
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"code": auth_code,
"code_verifier": code_verifier,
print("GET Access Token response status", resp.status_code)
In the response we get, among others, the access token
result = resp.json()
access_token = result['access_token']
print("Access Token:", access_token)
refresh_token = result['refresh_token']
print("Refresh Token:", refresh_token)
return access_token
def get_device_info():
Get Profile and device information
resp = requests.get(
url = API_BASE + "/profile",
headers = {
"Authorization": 'Bearer {}'.format(access_token)
result = resp.json()
# print("Profile Info:", result)
# Appliances Meta
resp = requests.get(
url = API_BASE + "/appliances/meta",
headers = {
"Authorization": 'Bearer {}'.format(access_token)
result = resp.json()
device_id_1 = result["data"][0]["id"]
appliance_code_1 = result["data"][0]["appliance_code"]
print("Device ID:", device_id_1)
print("Appliance Code", appliance_code_1)
return device_id_1, appliance_code_1
def get_modes(device_id):
Get the modes for various settings
resp = requests.get(
url = API_BASE + "/appliance/" + str(device_id) + "/config",
headers = {
"Authorization": 'Bearer {}'.format(access_token)
result = resp.json()
quiet_enabled = result["data"]["quiet_enabled"]
night_enabled = result["data"]["night_enabled"]
housekeeper_enabled = result["data"]["housekeeper_enabled"]
quarantine_enabled = result["data"]["quarantine_enabled"]
sleep_enabled = result["data"]["sleep_enabled"]
turndown_enabled = result["data"]["turndown_enabled"]
whitenoise_enabled = result["data"]["whitenoise_enabled"]
sounds_enabled = result["data"]["sounds_enabled"]
print(f"quiet_enabled {quiet_enabled}")
print(f"night_enabled {night_enabled}")
print(f"housekeeper_enabled {housekeeper_enabled}")
print(f"quarantine_enabled {quarantine_enabled}")
print(f"sleep_enabled {sleep_enabled}")
print(f"turndown_enabled {turndown_enabled}")
print(f"whitenoise_enabled {whitenoise_enabled}")
print(f"sounds_enabled {sounds_enabled}")
def get_sensors_data(appliance_code):
Get all the available sensors' latest value
SENSOR_TYPES = ["pm_1.0", "pm_2.5", "pm_10", "tvoc", "coppm", "eco2", "temperature", "humidity"]
response_data = {}
for sensor in SENSOR_TYPES:
response_data[sensor] = get_sensor_data(appliance_code=appliance_code, sensor_name=sensor)
return response_data
def get_sensor_data(appliance_code, sensor_name):
Get a sensor's latest value
resp = requests.get(
url = API_BASE + f"/sensor/appliance?deviceId={appliance_code}&metric={sensor_name}",
headers = {
"Authorization": 'Bearer {}'.format(access_token)
result = resp.json()
sensor_value = result["data"]["meta"]["latest_sensor_value"]["value"]
print(f"{sensor_name}: {sensor_value}")
def set_mode_manual(appliance_code, fan_speed_percentage):
Set Fan Mode to Manual
if fan_speed_percentage < 0 or fan_speed_percentage > 100:
# Fan Speed Percentage to RPM mapping:
# 0: 0,
# 10: 600,
# 20: 740,
# 30: 880,
# 40: 1020,
# 50: 1160,
# 60: 1300,
# 70: 1440,
# 80: 1580,
# 90: 1720,
# 100: 2000,
# }
SPEED_PERCENT_TO_RPM_MAP = [0, 600, 740, 880, 1020, 1160, 1300, 1440, 1580, 1720, 2000]
fan_speed = SPEED_PERCENT_TO_RPM_MAP[math.floor(fan_speed_percentage / 10)]
if access_token == None:
target_aqi = "10"
print("Fan Speed %:", fan_speed_percentage, "Fan Speed:", fan_speed, "Target AQI:", target_aqi)
headers = {
"Authorization": 'Bearer {}'.format(access_token),
"Content-Type": 'application/json'
payload = {
"target_aqi_float": target_aqi,
"fan_rpm_int": fan_speed,
"enable_display_int": -1
resp =
url = API_BASE + "/appliance/" + appliance_code + "/command/control-mode/manual",
headers = headers,
data = json.dumps(payload)
print("POST command manual response status", resp.status_code)
# print("POST command manual response", resp.json())
# It takes a while for the speed to update to the requested fan speed.
# The app makes multiple requests to /command/force-data until fan speed matches the requested speed.
cur_fan_speed = 5000
retry_count = 0
while abs(cur_fan_speed - fan_speed) >= 30 and retry_count < 5:
resp =
url = API_BASE + "/appliance/" + appliance_code + "/command/force-data",
headers = {
"Authorization": 'Bearer {}'.format(access_token)
print(retry_count, "POST force data status", resp.status_code)
cur_data = resp.json()
print("POST force data response", cur_data)
if resp.status_code == 200:
cur_fan_speed = cur_data["data"]["speed"]
print("Current fan speed", cur_fan_speed)
print("Error fetching latest data")
retry_count += 1
def set_mode_auto(appliance_code):
Set Fan Mode to Auto
resp =
url = API_BASE + "/appliance/" + appliance_code + "/command/control-mode/auto",
headers = {
"Authorization": 'Bearer {}'.format(access_token)
"enable_display_int": -1
print("POST command auto response status", resp.status_code)
# (Unused code) Decode the JWT tokens
The access and id tokens are JWT tokens apparently. Let's decode the payload.
def _b64_decode(data):
data += '=' * (4 - len(data) % 4)
return base64.b64decode(data).decode('utf-8')
def jwt_payload_decode(jwt):
_, payload, _ = jwt.split('.')
return json.loads(_b64_decode(payload))
access_token = get_new_access_token()
device_id, appliance_code = get_device_info()
set_mode_manual(appliance_code, 40)
