Skip to content

Instantly share code, notes, and snippets.

@N0ciple
Created December 24, 2021 16:15
Show Gist options
  • Save N0ciple/deade052fd1a87d5b213d2e5cde31bc2 to your computer and use it in GitHub Desktop.
Save N0ciple/deade052fd1a87d5b213d2e5cde31bc2 to your computer and use it in GitHub Desktop.
Subscribe to KEF LS50 Wireless II event queue
import requests
import uuid
# Change this value to
speaker_ip = "192.168.xxx.yyy"
# Queue are identify by an uuid, we can generate a uuid to create a new queue
queue_uuid = uuid.uuid4()
print(queue_uuid)
payload = {
"path":"settings:/kef/host/kefId",
"roles":"value",
"value":
{
"string_":"{}".format(queue_uuid),
"type":"string_"
}
}
# create a queue but speaker answer with new queue id
with requests.post("http://{}/api/setData".format(speaker_ip), json=payload) as response:
output = response.text
print(output)
# Create a payload subscribing to as much stuff as possible
payload = {
# queueId seams to be optional
# "queueId":"{{{}}}".format(new_queue),
"subscribe":[
{"path":"player:player/data/playTime","type":"itemWithValue"},
{"path":"settings:/mediaPlayer/playMode","type":"itemWithValue"},
{"path":"playlists:pq/getitems","type":"rows"},
{"path":"notifications:/display/queue","type":"rows"},
{"path":"settings:/kef/host/maximumVolume","type":"itemWithValue"},
{"path":"player:volume","type":"itemWithValue"},
{"path":"kef:fwupgrade/info","type":"itemWithValue"},
{"path":"settings:/kef/host/volumeStep","type":"itemWithValue"},
{"path":"settings:/kef/host/volumeLimit","type":"itemWithValue"},
{"path":"settings:/mediaPlayer/mute","type":"itemWithValue"},
{"path":"settings:/kef/host/speakerStatus","type":"itemWithValue"},
{"path":"settings:/kef/play/physicalSource","type":"itemWithValue"},
{"path":"player:player/data","type":"itemWithValue"},
{"path":"kef:speedTest/status","type":"itemWithValue"},
{"path":"network:info","type":"itemWithValue"},
{"path":"kef:eqProfile","type":"itemWithValue"},
{"path":"settings:/kef/host/modelName","type":"itemWithValue"},
{"path":"settings:/version","type":"itemWithValue"},
{"path":"settings:/deviceName","type":"itemWithValue"},
],
# Unsubscribe from nothing
"unsubscribe":[]
}
# send subscription
with requests.post( "http://{}/api/event/modifyQueue".format(speaker_ip), json=payload) as response :
json_output = response.json()
# Speaker answer with a new queue id ? strange
print(json_output)
#get
out_queue = json_output[1:-1]
# Poll queue
payload = {
"queueId":"{{{}}}".format(out_queue),
# "timeout" = get events from the last x seconds (here 5s)
"timeout":5
}
with requests.get( "http://{}/api/event/pollQueue".format(speaker_ip), params=payload, timeout=3) as response :
json_output = response.json()
# Create a dict to store all the new events
events = dict()
# fill events lists
for j in json_output:
if events.get(j['path'], False):
events[j['path']].append(j)
else:
events[j['path']] = [j]
# prune events lists (get last event by event type)
for k in events:
events[k] = events[k][-1].get('itemValue','updated')
# show events
print(events)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment