Last active
August 29, 2015 14:17
-
-
Save kakwa/cf1e36f7b8d32763d0ee to your computer and use it in GitHub Desktop.
python-samba-test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from samba.samdb import SamDB | |
from samba.auth import system_session | |
from samba import param | |
import ldb | |
from ldb import Dn | |
class SamDBOverride(SamDB): | |
"""The SAM database.""" | |
def _rmattr(self, entryfilter, attr, value): | |
res = self.search(base=self.domain_dn(), | |
scope=ldb.SCOPE_SUBTREE, | |
expression=entryfilter, attrs=[attr]) | |
try: | |
if len(res) == 0: | |
raise Exception('Unable to find user "%s"' % (username or search_filter)) | |
if len(res) > 1: | |
raise Exception('Matched %u multiple users with filter "%s"' % (len(res), search_filter)) | |
except: | |
self.transaction_cancel() | |
user_dn = res[0].dn | |
val = '' | |
if not value is None: | |
val = "%(attr)s: %(value)s" % { 'attr': attr, 'value': value } | |
mod = """ | |
dn: %(dn)s | |
changetype: modify | |
delete: %(attr)s | |
%(val)s | |
""" % { 'dn': user_dn, 'attr': attr, 'value': value, 'val': val } | |
self.modify_ldif(mod) | |
def _getattr(self, entryfilter, attrs=None): | |
"""Get a list of attributes for a specific entry | |
:param entry: DN of the entry | |
:param attr: List of attributes or None (all attributes) | |
:return: An hash containing the attributes, attributes is set to None | |
if it doesn't exists | |
""" | |
result = None | |
self.transaction_start() | |
try: | |
result = self.search(base=self.domain_dn(), | |
scope=ldb.SCOPE_SUBTREE, | |
expression=entryfilter, attrs=attrs) | |
if len(result) == 0: | |
raise Exception('Unable to find entry') | |
assert(len(result) == 1) | |
except: | |
self.transaction_cancel() | |
raise | |
else: | |
self.transaction_commit() | |
return result[0] | |
def _setattr(self, entryfilter, attr, value, multi=False): | |
res = self.search(base=self.domain_dn(), | |
scope=ldb.SCOPE_SUBTREE, | |
expression=entryfilter, attrs=[attr]) | |
try: | |
if len(res) == 0: | |
raise Exception('Unable to find user "%s"' % (username or search_filter)) | |
if len(res) > 1: | |
raise Exception('Matched %u multiple users with filter "%s"' % (len(res), search_filter)) | |
except: | |
self.transaction_cancel() | |
user_dn = res[0].dn | |
if attr in res[0] and not multi: | |
action = 'replace' | |
else: | |
action = 'add' | |
mod = """ | |
dn: %(dn)s | |
changetype: modify | |
%(action)s: %(attr)s | |
%(attr)s: %(value)s | |
""" % { 'dn': user_dn, 'attr': attr, 'value': value, 'action': action } | |
self.modify_ldif(mod) | |
def rmuserattr(self, username, attr, value=None): | |
""" Remove an attribute for a user | |
:param username: The User name | |
:param attr: the attribute name | |
:param value: the value of the attribute (needed for multivalued attributes) | |
""" | |
userfilter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % ( | |
ldb.binary_encode(username), | |
"CN=Person,CN=Schema,CN=Configuration", | |
self.domain_dn()) | |
self._rmattr(userfilter, attr, value) | |
def rmgroupattr(self, groupname, attr, value=None): | |
""" Remove an attribute for a group | |
:param username: The Group name | |
:param attr: the attribute name | |
:param value: the value of the attribute (needed for multivalued attributes) | |
""" | |
groupfilter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % ( | |
ldb.binary_encode(groupname), | |
"CN=Group,CN=Schema,CN=Configuration", | |
self.domain_dn()) | |
self._rmattr(groupfilter, attr, value) | |
def setuserattr(self, username, attr, value, multi=False): | |
""" Set an attribute for a user | |
:param username: The User name | |
:param attr: the attribute name | |
:param value: the value of the attribute | |
:param multi: wether or not it's a multivalued attribute | |
""" | |
userfilter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % ( | |
ldb.binary_encode(username), | |
"CN=Person,CN=Schema,CN=Configuration", | |
self.domain_dn()) | |
self._setattr(userfilter, attr, value, multi) | |
def setgroupattr(self, groupname, attr, value, multi=False): | |
""" Set an attribute for a group | |
:param groupname: The Group name | |
:param attr: the attribute name | |
:param value: the value of the attribute | |
:param multi: wether or not it's a multivalued attribute | |
""" | |
groupfilter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % ( | |
ldb.binary_encode(groupname), | |
"CN=Group,CN=Schema,CN=Configuration", | |
self.domain_dn()) | |
self._setattr(groupfilter, attr, value, multi) | |
def getuserattr(self, username, attrs=None): | |
""" Get a list of attributes of a user | |
:param username: The User name | |
:param attrs: List of attributes to get (default all attributes) | |
:return: an hash containing the entry attributes | |
""" | |
userfilter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % ( | |
ldb.binary_encode(username), | |
"CN=Person,CN=Schema,CN=Configuration", | |
self.domain_dn()) | |
return self._getattr(userfilter, attrs) | |
try: | |
return self._getattr(userfilter, attrs) | |
except: | |
raise Exception('Unable to find user "%s', username) | |
def getgroupattr(self, groupname, attrs=None): | |
""" Get a list of attributes of a group | |
:param groupname: The Group name | |
:param attrs: List of attributes to get (default all attributes) | |
:return: an hash containing the entry attributes | |
""" | |
groupfilter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % ( | |
ldb.binary_encode(groupname), | |
"CN=Group,CN=Schema,CN=Configuration", | |
self.domain_dn()) | |
return self._getattr(groupfilter, attrs) | |
try: | |
return self._getattr(groupfilter, attrs) | |
except: | |
raise Exception('Unable to find group "%s', groupname) | |
######################## Helper functions ######################## | |
def ldb2hash(ldb): | |
""" Convert an ldb to hash | |
:param ldb: any ldb result | |
:result: an hash containing the entry attributes. | |
key -> value if singled valued attribute | |
key -> list of values if multi-valued attributes | |
""" | |
ret = {} | |
for attr in ldb.keys(): | |
value = ldb.get(attr) | |
if isinstance(value, Dn): | |
ret[attr] = str(value) | |
else: | |
if len(value) == 1: | |
ret[attr] = value.get(0) | |
else: | |
tmplist = [] | |
for i in value: | |
tmplist.append(i) | |
ret[attr] = tmplist | |
return ret | |
from yaml import load, dump | |
try: | |
from yaml import CLoader as Loader, CDumper as Dumper | |
except ImportError: | |
from yaml import Loader, Dumper | |
H=None | |
creds=None | |
lp = None | |
lp = param.LoadParm() | |
samdb = SamDBOverride(url=H, session_info=system_session(), | |
credentials=creds, lp=lp) | |
data = ldb2hash(samdb.getgroupattr('domain users')) | |
print dump(data, default_flow_style=False) | |
data = ldb2hash(samdb.getgroupattr('domain users', attrs=['gidNumber', 'msSFU30NisDomain'])) | |
print dump(data, default_flow_style=False) | |
samdb.setgroupattr('domain users', 'gidNumber', 25042) | |
samdb.rmgroupattr('domain users', 'msSFU30NisDomain') | |
data = ldb2hash(samdb.getgroupattr('domain users', attrs=['gidNumber', 'msSFU30NisDomain'])) | |
print dump(data, default_flow_style=False) | |
samdb.setgroupattr('domain users', 'gidNumber', 10000) | |
samdb.setgroupattr('domain users', 'msSFU30NisDomain', 'dc') | |
data = ldb2hash(samdb.getuserattr('administrator', ['uidNumber', 'msSFU30NisDomain'])) | |
print dump(data, default_flow_style=False) | |
samdb.setuserattr('administrator', 'uidNumber', '10000') | |
data = ldb2hash(samdb.getuserattr('administrator', ['uidNumber'])) | |
print dump(data, default_flow_style=False) | |
samdb.setuserattr('administrator', 'uidNumber', '28643') | |
#data = ldb2hash(samdb.getuserattr('administrator')) | |
#print dump(data, default_flow_style=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment