Skip to content

Instantly share code, notes, and snippets.

@imduffy15
Created February 10, 2023 08:39
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save imduffy15/22503384557dd59edccf9293b7694d34 to your computer and use it in GitHub Desktop.
Save imduffy15/22503384557dd59edccf9293b7694d34 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import json
import requests
import urllib3
urllib3.disable_warnings()
cookie = open("cookie.txt").readline().strip()
csrf_token = [x.split("=") for x in cookie.split(";") if "csrf" in x][0][1]
headers = {"csrf": csrf_token, "Cookie": cookie}
# Get all existing groups...
groups = requests.get(
"https://alexa.amazon.co.uk/api/phoenix/group", headers=headers, verify=False
)
groups = groups.json()
# Delete all existing groups
for group in groups["applianceGroups"]:
print(f"Deleting group {group.get('name')}")
delete = requests.delete(
f"https://alexa.amazon.co.uk/api/phoenix/group/{group.get('groupId')}",
headers=headers,
verify=False,
)
delete.raise_for_status()
# Get home assistant entities and their areas
entity_registry_file = open("core.entity_registry")
entity_registry = json.load(entity_registry_file)
entity_registry = {
x["entity_id"]: {"area_id": x.get("area_id")}
for x in entity_registry["data"]["entities"]
if x.get("area_id")
}
# Get the areas so we have their pretty name format
area_registry_file = open("core.area_registry")
area_registry = json.load(area_registry_file)
areas = {
x.get("id"): x.get("name") for x in area_registry.get("data", []).get("areas", [])
}
# Get all devices on alexa
# This fails for some reason sometimes so there's terrible retry logic here.
raw_devices_json = None
while raw_devices_json is None:
try:
raw_devices = requests.get(
"https://alexa.amazon.co.uk/api/phoenix", headers=headers, verify=False
).json()
raw_devices_json = json.loads(raw_devices["networkDetail"])
except Exception:
time.sleep(1)
group_updates = {}
# Map the alexa devices to their home assistant device and pull the area
# id from home assistant
for skill, details in raw_devices_json["locationDetails"]["locationDetails"][
"Default_Location"
]["amazonBridgeDetails"]["amazonBridgeDetails"].items():
details = details["applianceDetails"]["applianceDetails"]
for device, value in details.items():
if value.get("manufacturerName", "") == "Royal Philips Electronics":
delete = requests.delete(
f"https://alexa.amazon.co.uk/api/phoenix/appliance/{value.get('applianceId', '')}",
headers=headers,
verify=False,
)
delete.raise_for_status()
elif value.get("friendlyDescription", "") == "Amazon smart device":
name = value.get("friendlyName", "").lower().replace(" ", "_")
longestMatch = 0
area_id = None
for area in areas:
if area in name:
if len(area) > longestMatch:
longestMatch = len(area)
area_id = area
group_name = areas.get(area_id)
updates = group_updates.get(group_name, [])
updates.append(value.get("applianceId"))
group_updates[group_name] = updates
elif value.get("manufacturerName", "") == "Home Assistant":
home_assistant_entity_id = (
value.get("friendlyDescription").split(" ")[0].strip()
)
group_name = areas.get(
entity_registry.get(home_assistant_entity_id, {}).get("area_id", ""), {}
)
if group_name:
updates = group_updates.get(group_name, [])
updates.append(value.get("applianceId"))
group_updates[group_name] = updates
# Create all the groups with their devices
for group_name, devices in group_updates.items():
body = {
"applianceIds": devices,
"name": group_name,
}
try:
pretty = {"group": group_name, "devices": [device.split("#")[1] if '#' in device else device for device in devices]}
print(json.dumps(pretty, indent=2))
update_group = requests.post(
f"https://alexa.amazon.co.uk/api/phoenix/group",
headers=headers,
verify=False,
json=body,
)
update_group.raise_for_status()
except:
print("Failed to process this payload:")
print(json.dumps(body, indent=2))
@craiggenner
Copy link

I took this and made a few teaks:

#!/usr/bin/env python
import json
import requests
import urllib3
import sys
urllib3.disable_warnings()

cookie = open("cookie.txt").readline().strip()

csrf_token = [x.split("=") for x in cookie.split(";") if "csrf" in x][0][1]

headers = {"csrf": csrf_token, "Cookie": cookie}

# Get all existing groups...
groups = requests.get(
    "https://alexa.amazon.co.uk/api/phoenix/group", headers=headers, verify=False
)

if groups.status_code == 401:
    print("Auth error, update cookies")
    sys.exit(1)

groups = groups.json()

# Delete all existing groups
for group in groups["applianceGroups"]:
    print(f"Deleting group {group.get('name')}")
    delete = requests.delete(
        f"https://alexa.amazon.co.uk/api/phoenix/group/{group.get('groupId')}",
        headers=headers,
        verify=False,
    )
    delete.raise_for_status()

# Get home assistant entities and their areas
entity_registry_file = open("core.entity_registry")
entity_registry = json.load(entity_registry_file)

device_registry_file = open("core.device_registry")
device_registry = json.load(device_registry_file)['data']['devices']

device_to_area = {
    x["id"]: {"area_id": x.get("area_id"), "name": x.get("name"), "manufacturer": x.get("manufacturer"), "identifiers": x.get("identifiers")}
    for x in device_registry
    if x.get("area_id")
}

new_entity_registry = {}

for x in entity_registry["data"]["entities"]:
    if x.get("area_id"):
        new_entity_registry[x["entity_id"]]= {"area_id": x.get("area_id"), "name": x.get("name")}
    elif x.get("device_id") and device_to_area.get(x["device_id"]):
        area_id = device_to_area[x["device_id"]]["area_id"]
        new_entity_registry[x["entity_id"]]= {"area_id": area_id, "name": x.get("name")}

entity_registry = new_entity_registry

# Get the areas so we have their pretty name format
area_registry_file = open("core.area_registry")
area_registry = json.load(area_registry_file)

areas = {
    x.get("id"): x.get("name") for x in area_registry.get("data", []).get("areas", [])
}

# Get all devices on alexa
# This fails for some reason sometimes so there's terrible retry logic here.
raw_devices_json = None
while raw_devices_json is None:
    try:
        raw_devices = requests.get(
            "https://alexa.amazon.co.uk/api/phoenix", headers=headers, verify=False
        ).json()
        raw_devices_json = json.loads(raw_devices["networkDetail"])
    except Exception:
        time.sleep(1)

group_updates = {}

# Map the alexa devices to their home assistant device and pull the area
# id from home assistant
for skill, details in raw_devices_json["locationDetails"]["locationDetails"][
    "Default_Location"
]["amazonBridgeDetails"]["amazonBridgeDetails"].items():
    details = details["applianceDetails"]["applianceDetails"]
    for device, value in details.items():
        if value.get("manufacturerName", "") == "Royal Philips Electronics":
            print("Found Royal Philips Electronics")
        elif value.get("friendlyDescription", "") == "Amazon smart device":
            for x in device_to_area:
                if value.get("friendlyName", "") == device_to_area[x]['name'] and device_to_area[x]['manufacturer'] == "Amazon" and "alexa_media" in device_to_area[x]['identifiers'][0]:
                    group_name = areas.get(device_to_area[x]['area_id'], "")
            updates = group_updates.get(group_name, [])
            updates.append(value.get("applianceId"))
            group_updates[group_name] = updates
        elif value.get("manufacturerName", "") == "Home Assistant":
            home_assistant_entity_id = (
                value.get("friendlyDescription").split(" ")[0].strip()
            )
            group_name = areas.get(
                entity_registry.get(home_assistant_entity_id, {}).get("area_id", ""), {}
            )
            if group_name:
                updates = group_updates.get(group_name, [])
                updates.append(value.get("applianceId"))
                group_updates[group_name] = updates

# Create all the groups with their devices
for group_name, devices in group_updates.items():
    body = {
        "applianceIds": devices,
        "name": group_name,
    }
    try:

        pretty = {"group": group_name, "devices": [device.split("#")[1] if '#' in device else device for device in devices]}
        #print(json.dumps(pretty, indent=2))
        print(f"Creating group {group_name}")
        update_group = requests.post(
            f"https://alexa.amazon.co.uk/api/phoenix/group",
            headers=headers,
            verify=False,
            json=body,
        )
        update_group.raise_for_status()
    except:
        print("Failed to process this payload:")
        print(json.dumps(body, indent=2))
  • Detect when the cookies has expired
  • Get devices with areas
  • Set the area from the entity or the parent device
  • Amazon device (from alexa_media_player custom integration) room selection, match the HA device to the Alexa device by name etc and put into a group
  • Print group created instead of the data structure

@imduffy15
Copy link
Author

Very nice @craiggenner ! Thanks for sharing.

I haven’t had the cookies expire yet. Is that something you experienced?

@craiggenner
Copy link

@imduffy15 Yes, a lot. Varies from 5 to 15 minutes.

@imduffy15
Copy link
Author

@craiggenner bizarre! I haven't updated mine since feb 6th and its going strong.

Screenshot 2023-02-14 at 19 37 40

@robsonfelix
Copy link

robsonfelix commented Mar 12, 2024

how do to you get the cookies file? and how do you actually run this? do I need to save in a particular folder?

@robsonfelix
Copy link

this is what I get when executing:

➜ python_scripts python alexa_sync.py
Traceback (most recent call last):
File "/homeassistant/python_scripts/alexa_sync.py", line 2, in
import requests
ModuleNotFoundError: No module named 'requests'

@robsonfelix
Copy link

Any ideas on what to do, @craiggenner and @imduffy15 ?

@craiggenner
Copy link

you need to install the requests module.

That said this no longer works for me after a change at Amazons end.

@robsonfelix
Copy link

@craiggenner what a shame :(. We need to find a way for this work. I will research and circle back to you guys.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment