Skip to content

Instantly share code, notes, and snippets.

@THS-on
Last active April 14, 2024 20:17
Show Gist options
  • Save THS-on/a8fe22891c1008454a948e86d25315f5 to your computer and use it in GitHub Desktop.
Save THS-on/a8fe22891c1008454a948e86d25315f5 to your computer and use it in GitHub Desktop.
WirePlumber script to automatically attach a filter to a device
#!/usr/bin/wpexec
local mic_name = "alsa_input.usb-C-Media_Electronics_Inc._USB_Audio_Device-00.mono-fallback"
local filter_name = "rnnoise_source"
local link_props = {
["link.output.port"] = nil,
["link.input.port"] = nil,
["link.output.node"] = nil,
["link.input.node"] = nil
}
local link = nil
mic_om = ObjectManager {
Interest {
type = "node",
Constraint {
"node.name", "=", mic_name
}
}
}
mic_port_om = ObjectManager {
Interest {
type = "port",
Constraint {
"port.direction", "=", "out"
}
}
}
filter_om = ObjectManager {
Interest {
type = "node",
Constraint {
"node.name", "=", filter_name
}
}
}
filter_port_om = ObjectManager {
Interest {
type = "port",
Constraint {
"port.direction", "=", "in"
}
}
}
link_om = ObjectManager {
Interest {
type = "link"
}
}
function find_port(node_id, port_om)
for port in port_om:iterate() do
if port.properties["node.id"] == node_id then
return port.properties["object.id"]
end
end
end
function add_node(node, direction, port_om)
local props = node.properties
local object_id = props["object.id"]
link_props["link." .. direction .. ".node"] = object_id
if link_props["link." .. direction .. ".port"] == nil then
link_props["link." .. direction .. ".port"] = find_port(object_id, port_om)
end
end
function try_activate_link()
if link ~= nil then
print("Link is active")
return
end
local count = 0
for k,v in pairs(link_props) do
count = count + 1
end
if count ~= 4 then
print("Not all properties are set")
return
end
print("Setup link")
for l in link_om:iterate {Constraint {"link.input.node","=", link_props["link.input.node"]}} do
print("Removed link that were attached to filter")
l:request_destroy()
end
link = Link("link-factory", link_props)
link:activate(1)
end
function deactivate_link()
if link == nil then
return
end
link:request_destroy()
link = nil
link_props["link.output.port"] = nil
link_props["link.output.node"] = nil
end
function add_port(port, direction)
if link_props["link." .. direction .. ".node"] == port.properties["node.id"] then
link_props["link." .. direction .. ".port"] = port.properties["object.id"]
end
end
mic_port_om:connect("object-added", function(om, port)
add_port(port, "output")
try_activate_link()
end)
filter_port_om:connect("object-added", function(om, port)
add_port(port, "input")
try_activate_link()
end)
filter_om:connect("object-added", function(om, node)
add_node(node, "input", filter_port_om)
try_activate_link()
end)
mic_om:connect("object-added", function(om, node)
add_node(node, "output", mic_port_om)
try_activate_link()
end)
mic_om:connect("object-removed", function (om, node)
deactivate_link()
end)
link_om:connect("object-removed", function (om, l)
if l == link then
print("Link was removed")
link = nil
print("Try to add the link again")
try_activate_link()
end
end)
mic_om:activate()
filter_om:activate()
link_om:activate()
mic_port_om:activate()
filter_port_om:activate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment