Skip to content

Instantly share code, notes, and snippets.

@azdle
Created January 13, 2021 15:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save azdle/cde9a7436c2688983459b41560e3205a to your computer and use it in GitHub Desktop.
Save azdle/cde9a7436c2688983459b41560e3205a to your computer and use it in GitHub Desktop.
Find all (?) Belkin Wemo Devices on LAN and Query API Description
local socket = require "socket"
local http = require "socket.http"
local ltn12 = require "ltn12"
--[[
local log = require "log"
local xml2lua = require "xml2lua"
local xml_handler = require "xmlhandler.tree"
--]]
-- find element of nested tables, returns nil if any part of the path can't be resolved
local function tablefind(t, path)
local pathelements = string.gmatch(path, "([^.]+)%.?")
local item = t
for element in pathelements do
if type(item) ~= "table" then item = nil; break end
item = item[element]
end
return item
end
-- record devices when found, so we only query once
local found_devices_by_ip = {}
local function callback(device)
print("found:", device.ip, "\n", device.id, "\n", device.raw.st)
if found_devices_by_ip[device.ip] then
print("already found another way, not querying again")
else
found_devices_by_ip[device.ip] = true
print("new device, querying:", device.raw.location)
-- Read details
-- Wemo responds with chunked encoding, must use ltn12 sink
local responsechunks = {}
local body,status,headers = http.request{
url = device.raw.location,
sink = ltn12.sink.table(responsechunks)
}
print(table.concat(responsechunks))
end
end
local looking_for_all = setmetatable({}, {__index = function() return true end})
local function Set(list)
local set = {}
for _,v in pairs(list) do set[v] = true end
return set
end
local function process_response(val)
local info = {}
val = string.gsub(val, "HTTP/1.1 200 OK\r\n", "", 1)
for k, v in string.gmatch(val, "([%g]+): ([%g ]*)\r\n") do
info[string.lower(k)] = v
end
return info
end
print("starting disco")
local searchterms = {
--[[
"urn:schemas-upnp-org:device:basic:1",
"urn:samsung.com:device:RemoteControlReceiver:1",
"urn:schemas-upnp-org:device:MediaRenderer:1",
--]]
"urn:Belkin:device:sensor:1",
"urn:Belkin:device:lightswitch:1",
"urn:Belkin:device:insight:1",
"urn:Belkin:device:controllee:1",
"urn:Belkin:device:**",
"urn:Belkin:device:*",
"urn:Belkin:device",
--[[
"urn:belkin:device:sensor:1",
"urn:belkin:device:lightswitch:1",
"urn:belkin:device:insight:1",
"urn:belkin:device:controllee:1",
"urn:belkin:device:**",
"ssdp:all",
--]]
}
for _,st in ipairs(searchterms) do
print("searching for:", st)
local looking_for = {}
local number_looking_for
local number_found = 0
if bulb_ids ~= nil then
number_looking_for = #bulb_ids
for _, id in ipairs(bulb_ids) do looking_for[id] = true end
else
looking_for = looking_for_all
number_looking_for = math.maxinteger
end
local s = socket.udp()
assert(s)
local listen_ip = interface or "0.0.0.0"
local listen_port = 0
local multicast_ip = "239.255.255.250"
local multicast_port = 1900
local multicast_msg = table.concat(
{
'M-SEARCH * HTTP/1.1',
'HOST: 239.255.255.250:1900',
'MAN: "ssdp:discover"', -- yes, there are really supposed to be quotes in this one
'MX: 2',
'ST: uuid:Insight-1_0-231707K1201DE2::urn:Belkin:device:insight:1',
'\r\n'
},
"\r\n"
)
-- Create bind local ip and port
-- Yeelight will unicast back to this ip and port
assert(s:setsockname(listen_ip, listen_port))
local timeouttime = socket.gettime() + 4
local ids_found = {} -- used to filter duplicates
print("sending:\n", multicast_msg)
assert(s:sendto(multicast_msg, multicast_ip, multicast_port))
while number_found < number_looking_for do
local time_remaining = math.max(0, timeouttime-socket.gettime())
s:settimeout(time_remaining)
local val, rip, rport = s:receivefrom()
if val then
--print(val)
local headers = process_response(val)
local ip, port = headers["location"]:match("http://([^,/]+):([^,]+)")
local location = headers["location"]
local id = headers["usn"]
local supported_commands = {}
if headers["support"] then
string.gsub(headers["support"], "([^ ]*) ?", function(match) table.insert(supported_commands, match) end)
end
if rip ~= ip then
print("recieved discovery response with reported & source IP mismatch, ignoring")
print(rip, "!=", ip)
elseif ip and port and id and looking_for[id] and not ids_found[id] then
ids_found[id] = true
number_found = number_found + 1
callback({id = id, ip = ip, port = port, raw = headers})
end
elseif rip == "timeout" then
break
else
error(string.format("error receving discovery replies: %s", rip))
end
end
print(string.format("finished, %s found", number_found))
end
print("all finished")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment