Created
December 24, 2021 16:15
-
-
Save N0ciple/deade052fd1a87d5b213d2e5cde31bc2 to your computer and use it in GitHub Desktop.
Subscribe to KEF LS50 Wireless II event queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import uuid | |
# Change this value to | |
speaker_ip = "192.168.xxx.yyy" | |
# Queue are identify by an uuid, we can generate a uuid to create a new queue | |
queue_uuid = uuid.uuid4() | |
print(queue_uuid) | |
payload = { | |
"path":"settings:/kef/host/kefId", | |
"roles":"value", | |
"value": | |
{ | |
"string_":"{}".format(queue_uuid), | |
"type":"string_" | |
} | |
} | |
# create a queue but speaker answer with new queue id | |
with requests.post("http://{}/api/setData".format(speaker_ip), json=payload) as response: | |
output = response.text | |
print(output) | |
# Create a payload subscribing to as much stuff as possible | |
payload = { | |
# queueId seams to be optional | |
# "queueId":"{{{}}}".format(new_queue), | |
"subscribe":[ | |
{"path":"player:player/data/playTime","type":"itemWithValue"}, | |
{"path":"settings:/mediaPlayer/playMode","type":"itemWithValue"}, | |
{"path":"playlists:pq/getitems","type":"rows"}, | |
{"path":"notifications:/display/queue","type":"rows"}, | |
{"path":"settings:/kef/host/maximumVolume","type":"itemWithValue"}, | |
{"path":"player:volume","type":"itemWithValue"}, | |
{"path":"kef:fwupgrade/info","type":"itemWithValue"}, | |
{"path":"settings:/kef/host/volumeStep","type":"itemWithValue"}, | |
{"path":"settings:/kef/host/volumeLimit","type":"itemWithValue"}, | |
{"path":"settings:/mediaPlayer/mute","type":"itemWithValue"}, | |
{"path":"settings:/kef/host/speakerStatus","type":"itemWithValue"}, | |
{"path":"settings:/kef/play/physicalSource","type":"itemWithValue"}, | |
{"path":"player:player/data","type":"itemWithValue"}, | |
{"path":"kef:speedTest/status","type":"itemWithValue"}, | |
{"path":"network:info","type":"itemWithValue"}, | |
{"path":"kef:eqProfile","type":"itemWithValue"}, | |
{"path":"settings:/kef/host/modelName","type":"itemWithValue"}, | |
{"path":"settings:/version","type":"itemWithValue"}, | |
{"path":"settings:/deviceName","type":"itemWithValue"}, | |
], | |
# Unsubscribe from nothing | |
"unsubscribe":[] | |
} | |
# send subscription | |
with requests.post( "http://{}/api/event/modifyQueue".format(speaker_ip), json=payload) as response : | |
json_output = response.json() | |
# Speaker answer with a new queue id ? strange | |
print(json_output) | |
#get | |
out_queue = json_output[1:-1] | |
# Poll queue | |
payload = { | |
"queueId":"{{{}}}".format(out_queue), | |
# "timeout" = get events from the last x seconds (here 5s) | |
"timeout":5 | |
} | |
with requests.get( "http://{}/api/event/pollQueue".format(speaker_ip), params=payload, timeout=3) as response : | |
json_output = response.json() | |
# Create a dict to store all the new events | |
events = dict() | |
# fill events lists | |
for j in json_output: | |
if events.get(j['path'], False): | |
events[j['path']].append(j) | |
else: | |
events[j['path']] = [j] | |
# prune events lists (get last event by event type) | |
for k in events: | |
events[k] = events[k][-1].get('itemValue','updated') | |
# show events | |
print(events) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment