Skip to content

Instantly share code, notes, and snippets.

@RicterZ
Last active May 2, 2022 11:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RicterZ/686e8e34e9e1c8f31da9d17baef5fe94 to your computer and use it in GitHub Desktop.
Save RicterZ/686e8e34e9e1c8f31da9d17baef5fe94 to your computer and use it in GitHub Desktop.
import dbus
import os
import glob
import copy
import json
import sys
from xml.dom import minidom
SIGNAL_DETAIL = {
'Arguments': ''
}
PROPERTY_DETAIL = {
'Privileged': False,
'ReadOnly': True,
'Type': ''
}
METHOD_DETAIL = {
'Arguments': '',
'Privileged': False
}
INTERFACE_DETAIL = {
'Methods': {},
'Properties': {},
'Signals': {}
}
PATH_DETAIL = {
'Interfaces': {},
'Ref': None
}
DBUS_DETAIL = {
'Paths': {},
'Policies': {}
}
DBUS_DICT = {
}
POLKIT_ACTIONS = {
}
SYSTEMD_PATH = [
'/usr/lib/systemd/system',
'/etc/systemd/system',
]
DBUS_PATH = [
'/usr/share/dbus-1/system-services',
'/usr/local/share/dbus-1/system-services',
'/lib/dbus-1/system-services',
]
POLKIT_PATH = [
'/usr/share/polkit-1/actions',
]
IGNORED_PATH = (
)
IGNORED_DBUS = (
)
IGNORED_INTERFACES = (
'org.freedesktop.DBus.Introspectable',
'org.freedesktop.DBus.Peer',
'org.freedesktop.DBus.Properties',
'org.freedesktop.DBus.ObjectManager',
)
def logger(msg):
if os.getenv('DEBUG') == '1':
print(msg)
def parse_dbus(paths, identifier='Name='):
for i in paths:
if not os.path.exists(i):
continue
logger('[*] Parsing files under directory \'{}\''.format(i))
for f in glob.glob('{}/**/*'.format(i), recursive=True):
if not os.path.isfile(f):
continue
with open(f, 'r') as f:
data = f.read()
for line in data.splitlines():
if line.startswith(identifier):
_, name = line.split('=', 1)
name = name.strip()
logger('[+] Got dbus name \'{}\''.format(name))
if name in IGNORED_DBUS:
continue
if name not in DBUS_DICT:
obj = copy.deepcopy(DBUS_DETAIL)
DBUS_DICT[name] = obj
def get_interfaces(dom):
ret = {}
interfaces = dom.getElementsByTagName('interface')
for iface in interfaces:
iface_name = iface.getAttribute('name')
if iface_name in IGNORED_INTERFACES:
continue
logger('[+] Found interface \'{}\''.format(iface_name))
iface_obj = copy.deepcopy(INTERFACE_DETAIL)
iface_obj['Methods'] = get_methods(iface)
iface_obj['Properties'] = get_properties(iface)
iface_obj['Signals'] = get_signals(iface)
ret[iface_name] = iface_obj
return ret
def get_methods(dom):
ret = {}
methods = dom.getElementsByTagName('method')
for method in methods:
method_obj = copy.deepcopy(METHOD_DETAIL)
method_name = method.getAttribute('name')
logger('[*] Found method \'{}\''.format(method_name))
args = method.getElementsByTagName('arg')
args_list = []
for arg in args:
arg_name = arg.getAttribute('name')
arg_type = arg.getAttribute('type')
arg_direction = arg.getAttribute('direction')
if arg_direction == 'in':
args_list.append('in {} {}'.format(arg_type, arg_name).strip())
annotations = method.getElementsByTagName('annotation')
for annotation in annotations:
if 'Privileged' in annotation.getAttribute('name'):
method_obj['Privileged'] = True
method_obj['Arguments'] = ', '.join(args_list)
ret[method_name] = method_obj
return ret
def get_properties(dom):
ret = {}
properties = dom.getElementsByTagName('property')
for p in properties:
property_name = p.getAttribute('name')
property_type = p.getAttribute('type')
property_access = p.getAttribute('access')
logger('[*] Found property \'{}\''.format(property_name))
property_obj = copy.deepcopy(PROPERTY_DETAIL)
property_obj['ReadOnly'] = property_access == 'read'
property_obj['Type'] = property_type
annotations = p.getElementsByTagName('annotation')
for annotation in annotations:
if 'Privileged' in annotation.getAttribute('name'):
property_obj['Privileged'] = True
ret[property_name] = property_obj
return ret
def get_signals(dom):
ret = {}
signals = dom.getElementsByTagName('signal')
for signal in signals:
name = signal.getAttribute('name')
logger('[*] Found signal \'{}\''.format(name))
args = signal.getElementsByTagName('arg')
args_list = []
for arg in args:
arg_name = arg.getAttribute('name')
arg_type = arg.getAttribute('type')
args_list.append('{} {}'.format(arg_type, arg_name).strip())
signal_obj = copy.deepcopy(SIGNAL_DETAIL)
signal_obj['Arguments'] = ', '.join(args_list)
ret[name] = signal_obj
return ret
def introspect_bus(name, path='/'):
logger('[*] Introspecting \'{}\' with path \'{}\''.format(name, path))
DBUS_DICT[name]['Policies'] = find_match_polkit_actions(name)
bus = dbus.SystemBus()
try:
obj = bus.get_object(name, path)
introspect = obj.Introspect(dbus_interface='org.freedesktop.DBus.Introspectable')
# logger(introspect)
except Exception as e:
logger('[-] Error while introspecting \'{}\': {}'.format(name, e))
return
path_obj = copy.deepcopy(PATH_DETAIL)
try:
logger(introspect)
dom = minidom.parseString(introspect)
except Exception as e:
logger('[-] Error while formatting xml: {}'.format(e))
logger(introspect)
return
nodes = dom.getElementsByTagName('node')
for node in nodes:
node_name = node.getAttribute('name')
if node_name == '' or node_name == path:
path_obj['Interfaces'] = get_interfaces(node)
continue
found_path = path.rstrip('/') + '/' + node_name
if found_path in IGNORED_PATH:
continue
logger('[+] Found path \'{}\''.format(found_path))
if found_path != path:
introspect_bus(name, found_path)
if path_obj['Interfaces']:
ret = merge_path(name, path, path_obj['Interfaces'])
if ret:
logger('[*] Path \'{}\' now is refer to \'{}\''.format(path, ret))
path_obj['Interfaces'] = {}
path_obj['Ref'] = ret
DBUS_DICT[name]['Paths'][path] = path_obj
def merge_path(dbus_name, path_name, interfaces):
path_name = path_name.rsplit('/', 1)[0]
keys = set(interfaces.keys())
paths = DBUS_DICT.get(dbus_name, {})['Paths']
for path, path_detail in paths.items():
if path.startswith(path_name) and path_detail['Ref'] is None:
keys2 = set(path_detail['Interfaces'].keys())
if not keys ^ keys2:
return path
def get_all_running_dbus():
bus = dbus.SystemBus()
obj = bus.get_object('org.freedesktop.DBus','/org/freedesktop/DBus')
interface = dbus.Interface(obj, dbus_interface='org.freedesktop.DBus')
m = interface.get_dbus_method('ListNames')
for i in m():
if i.startswith(':'):
continue
if i not in DBUS_DICT:
logger('[+] Found new dbus \'{}\''.format(i))
obj = copy.deepcopy(DBUS_DETAIL)
DBUS_DICT[i] = obj
def print_dbus(name_filter=''):
printed_ref = set()
for dbus_name, dbus_detail in DBUS_DICT.items():
print()
if name_filter and name_filter != dbus_name:
continue
print(dbus_name + ' {')
for k, v in dbus_detail['Policies'].items():
print(' % {}: {}'.format(k, v))
for path, path_detail in dbus_detail['Paths'].items():
if path_detail['Ref']:
print(' * {} -> {}'.format(path, path_detail['Ref']))
continue
print(' * ' + path)
for iface, iface_detail in path_detail['Interfaces'].items():
print(' - ' + iface)
for method, method_detail in iface_detail['Methods'].items():
if method_detail['Privileged']:
sign = '+ privileged'
else:
sign = '+'
print(' {} {}({})'.format(sign, method, method_detail['Arguments']))
for signal, signal_detail in iface_detail['Signals'].items():
print(' @ {}({})'.format(signal, signal_detail['Arguments']))
for p, p_detail in iface_detail['Properties'].items():
if p_detail['Privileged']:
sign = '$ privileged'
else:
sign = '$'
if p_detail['ReadOnly']:
readable = 'readonly'
else:
readable = 'readwrite'
print(' {} {}({}) {}'.format(sign, readable, p_detail['Type'], p))
print('}')
def load_polkit(paths):
for i in paths:
if not os.path.exists(i):
continue
for f in glob.glob('{}/*.policy'.format(i), recursive=True):
with open(f, 'r') as f:
data = f.read()
dom = minidom.parseString(data)
actions = dom.getElementsByTagName('action')
for action in actions:
action_name = action.getAttribute('id')
allow_any = action.getElementsByTagName('allow_any')
if allow_any:
allow = allow_any[0].firstChild.nodeValue
else:
allow = 'unknown'
POLKIT_ACTIONS[action.getAttribute('id')] = allow
def find_match_polkit_actions(name):
ret = {}
for k, v in POLKIT_ACTIONS.items():
prefix = k.lower().rsplit('.', 1)[0].replace('-', '')
if name.lower().startswith(prefix):
ret[k] = v
return ret
if __name__ == '__main__':
load_polkit(POLKIT_PATH)
name_filter = '' if len(sys.argv) < 2 else sys.argv[1]
if not name_filter:
parse_dbus(DBUS_PATH)
parse_dbus(SYSTEMD_PATH, 'BusName=')
get_all_running_dbus()
for key in DBUS_DICT.keys():
introspect_bus(key)
else:
DBUS_DICT[name_filter] = copy.deepcopy(DBUS_DETAIL)
introspect_bus(name_filter)
print_dbus(name_filter)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment