Skip to content

Instantly share code, notes, and snippets.

@sanghviharshit
Last active January 8, 2024 06:41
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save sanghviharshit/913d14b225399e0fa4211b3e785671aa to your computer and use it in GitHub Desktop.
Save sanghviharshit/913d14b225399e0fa4211b3e785671aa to your computer and use it in GitHub Desktop.
Scratch pad for working with Milacares API for monitoring and controlling Mila air purifier. Read my blog post for more details - https://blog.sanghviharshit.com/reverse-engineering-private-api-ssl-pinning/
# Scratch pad for working with Milacares API for monitoring and controlling their air purifier devices.
# Based on the code from https://www.stefaanlippens.net/oauth-code-flow-pkce.html for PKCE code verifier and challenge.
import base64
import hashlib
import html
import json
import os
import re
import urllib.parse
import requests
import math
import time
LOGIN_PROVIDER = "https://id.milacares.com"
REDIRECT_URI = "milacares://anyurl.com/"
API_BASE = "https://api.milacares.com/mms"
CLIENT_ID = "prod-ui"
USERNAME = "email@xyz.com"
PASSWORD = "super-secret"
access_token = None
def get_new_access_token():
'''
Gets user access token using OAuth Code flow with PKCE
'''
# PKCE code verifier and challenge
'''
We need a code verifier, which is a long enough random alphanumeric string, only to be used "client side". We'll use a simple urandom/base64 trick to generate one:
'''
code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8')
code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier)
code_verifier, len(code_verifier)
'''
To create the PKCE code challenge we hash the code verifier with SHA256 and encode the result in URL-safe base64 (without padding)
'''
code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8')
code_challenge = code_challenge.replace('=', '')
code_challenge, len(code_challenge)
# Create random Nonce and State for login request
nonce = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8')
state = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8')
# Request login page
'''
We now have all the pieces for the initial request, which will give us the login page of the authentication provider. Adding the code challenge signals to the OAuth provider that we are expecting the PKCE based flow.
'''
resp = requests.get(
url=LOGIN_PROVIDER + "/auth/realms/prod/protocol/openid-connect/auth",
params={
"response_type": "code",
"client_id": CLIENT_ID,
"nonce": nonce,
"scope": "openid,profile",
"redirect_uri": REDIRECT_URI,
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
},
allow_redirects=False
)
print("GET login page response status", resp.status_code)
# Parse login page (response)
'''
Get cookie data from response headers (requires a bit of manipulation).
'''
cookie = resp.headers['Set-Cookie']
cookie = '; '.join(c.split(';')[0] for c in cookie.split(', '))
# print(cookie)
'''
Extract the login URL to post to from the page HTML code. Because the the Keycloak login page is straightforward HTML we can get away with some simple regexes.
'''
page = resp.text
form_action = html.unescape(re.search('<form\s+.*?\s+action="(.*?)"', page, re.DOTALL).group(1))
# print(form_action)
# Do the login (aka authenticate)
'''
Now, we post the login form with the user we created earlier, passing it the extracted cookie as well.
'''
resp = requests.post(
url=form_action,
data={
"username": USERNAME,
"password": PASSWORD,
},
headers={"Cookie": cookie},
allow_redirects=False
)
print("POST login response status", resp.status_code)
'''
As expected we are forwarded, let's get the redirect URL.
'''
redirect = resp.headers['Location']
print("Redirect", redirect)
assert redirect.startswith(REDIRECT_URI)
# Extract authorization code from redirect
'''
The redirect URL contains the authentication code.
'''
query = urllib.parse.urlparse(redirect).query
redirect_params = urllib.parse.parse_qs(query)
# print(redirect_params)
auth_code = redirect_params['code'][0]
print("Auth Code: ", auth_code)
# Exchange authorization code for an access token
'''
We can now exchange the authorization code for an access token. In the normal OAuth authorization flow we should include a static secret here, but instead we provide the code verifier here which acts proof that the initial request was done by us.
'''
resp = requests.post(
url=LOGIN_PROVIDER + "/auth/realms/prod/protocol/openid-connect/token",
data={
"grant_type": "authorization_code",
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"code": auth_code,
"code_verifier": code_verifier,
},
allow_redirects=False
)
print("GET Access Token response status", resp.status_code)
'''
In the response we get, among others, the access token
'''
result = resp.json()
print(result)
access_token = result['access_token']
print("Access Token:", access_token)
refresh_token = result['refresh_token']
print("Refresh Token:", refresh_token)
return access_token
def get_device_info():
"""
Get Profile and device information
"""
resp = requests.get(
url = API_BASE + "/profile",
headers = {
"Authorization": 'Bearer {}'.format(access_token)
}
)
result = resp.json()
# print("Profile Info:", result)
# Appliances Meta
resp = requests.get(
url = API_BASE + "/appliances/meta",
headers = {
"Authorization": 'Bearer {}'.format(access_token)
}
)
result = resp.json()
device_id_1 = result["data"][0]["id"]
appliance_code_1 = result["data"][0]["appliance_code"]
print("Device ID:", device_id_1)
print("Appliance Code", appliance_code_1)
return device_id_1, appliance_code_1
def get_modes(device_id):
"""
Get the modes for various settings
"""
resp = requests.get(
url = API_BASE + "/appliance/" + str(device_id) + "/config",
headers = {
"Authorization": 'Bearer {}'.format(access_token)
}
)
result = resp.json()
quiet_enabled = result["data"]["quiet_enabled"]
night_enabled = result["data"]["night_enabled"]
housekeeper_enabled = result["data"]["housekeeper_enabled"]
quarantine_enabled = result["data"]["quarantine_enabled"]
sleep_enabled = result["data"]["sleep_enabled"]
turndown_enabled = result["data"]["turndown_enabled"]
whitenoise_enabled = result["data"]["whitenoise_enabled"]
sounds_enabled = result["data"]["sounds_enabled"]
print(f"quiet_enabled {quiet_enabled}")
print(f"night_enabled {night_enabled}")
print(f"housekeeper_enabled {housekeeper_enabled}")
print(f"quarantine_enabled {quarantine_enabled}")
print(f"sleep_enabled {sleep_enabled}")
print(f"turndown_enabled {turndown_enabled}")
print(f"whitenoise_enabled {whitenoise_enabled}")
print(f"sounds_enabled {sounds_enabled}")
def get_sensors_data(appliance_code):
"""
Get all the available sensors' latest value
"""
SENSOR_TYPES = ["pm_1.0", "pm_2.5", "pm_10", "tvoc", "coppm", "eco2", "temperature", "humidity"]
response_data = {}
for sensor in SENSOR_TYPES:
response_data[sensor] = get_sensor_data(appliance_code=appliance_code, sensor_name=sensor)
return response_data
def get_sensor_data(appliance_code, sensor_name):
"""
Get a sensor's latest value
"""
resp = requests.get(
url = API_BASE + f"/sensor/appliance?deviceId={appliance_code}&metric={sensor_name}",
headers = {
"Authorization": 'Bearer {}'.format(access_token)
}
)
result = resp.json()
sensor_value = result["data"]["meta"]["latest_sensor_value"]["value"]
print(f"{sensor_name}: {sensor_value}")
def set_mode_manual(appliance_code, fan_speed_percentage):
"""
Set Fan Mode to Manual
"""
if fan_speed_percentage < 0 or fan_speed_percentage > 100:
return
# Fan Speed Percentage to RPM mapping:
# SPEED_PERCENT_TO_RPM_MAP = {
# 0: 0,
# 10: 600,
# 20: 740,
# 30: 880,
# 40: 1020,
# 50: 1160,
# 60: 1300,
# 70: 1440,
# 80: 1580,
# 90: 1720,
# 100: 2000,
# }
SPEED_PERCENT_TO_RPM_MAP = [0, 600, 740, 880, 1020, 1160, 1300, 1440, 1580, 1720, 2000]
fan_speed = SPEED_PERCENT_TO_RPM_MAP[math.floor(fan_speed_percentage / 10)]
if access_token == None:
return
target_aqi = "10"
print("Fan Speed %:", fan_speed_percentage, "Fan Speed:", fan_speed, "Target AQI:", target_aqi)
headers = {
"Authorization": 'Bearer {}'.format(access_token),
"Content-Type": 'application/json'
}
payload = {
"target_aqi_float": target_aqi,
"fan_rpm_int": fan_speed,
"enable_display_int": -1
}
resp = requests.post(
url = API_BASE + "/appliance/" + appliance_code + "/command/control-mode/manual",
headers = headers,
data = json.dumps(payload)
)
print("POST command manual response status", resp.status_code)
# print("POST command manual response", resp.json())
# It takes a while for the speed to update to the requested fan speed.
# The app makes multiple requests to /command/force-data until fan speed matches the requested speed.
cur_fan_speed = 5000
retry_count = 0
while abs(cur_fan_speed - fan_speed) >= 30 and retry_count < 5:
time.sleep(5)
resp = requests.post(
url = API_BASE + "/appliance/" + appliance_code + "/command/force-data",
headers = {
"Authorization": 'Bearer {}'.format(access_token)
},
)
print(retry_count, "POST force data status", resp.status_code)
cur_data = resp.json()
print("POST force data response", cur_data)
if resp.status_code == 200:
cur_fan_speed = cur_data["data"]["speed"]
print("Current fan speed", cur_fan_speed)
else:
print("Error fetching latest data")
retry_count += 1
def set_mode_auto(appliance_code):
"""
Set Fan Mode to Auto
"""
resp = requests.post(
url = API_BASE + "/appliance/" + appliance_code + "/command/control-mode/auto",
headers = {
"Authorization": 'Bearer {}'.format(access_token)
},
data={
"enable_display_int": -1
},
allow_redirects=False
)
print("POST command auto response status", resp.status_code)
# (Unused code) Decode the JWT tokens
'''
The access and id tokens are JWT tokens apparently. Let's decode the payload.
def _b64_decode(data):
data += '=' * (4 - len(data) % 4)
return base64.b64decode(data).decode('utf-8')
def jwt_payload_decode(jwt):
_, payload, _ = jwt.split('.')
return json.loads(_b64_decode(payload))
print(jwt_payload_decode(access_token))
'''
access_token = get_new_access_token()
device_id, appliance_code = get_device_info()
get_modes(device_id)
get_sensors_data(appliance_code)
set_mode_manual(appliance_code, 40)
set_mode_auto(appliance_code)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment