Skip to content

Instantly share code, notes, and snippets.

@MagerValp
Created November 6, 2017 11:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MagerValp/d6e2d04e3566b6a249d6f1fb4d9f34b6 to your computer and use it in GitHub Desktop.
Save MagerValp/d6e2d04e3566b6a249d6f1fb4d9f34b6 to your computer and use it in GitHub Desktop.
Helper class for directory service lookups
# -*- coding: utf-8 -*-
"""Directory Services helper class."""
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import division
from OpenDirectory import ODSession, ODNode, ODQuery, kODRecordTypeUsers, kODAttributeTypeRecordName, kODAttributeTypeStandardOnly, kODMatchEqualTo, kODRecordTypeGroups, kODRecordTypeUsers
__all__ = ["DSHelper", "DSHelperError"]
class DSHelperError(BaseException):
pass
class DSHelper(object):
"""Wrapper for Directory Services."""
def __init__(self):
super(DSHelper, self).__init__()
self.odsession = ODSession.defaultSession()
def get_node(self, nodename):
node, error = ODNode.nodeWithSession_name_error_(self.odsession, nodename, None)
if node is None:
raise DSHelperError("Couldn't open {} node: {}".format(nodename,
error.localizedFailureReason()))
return node
def get_search_node(self):
return self.get_node("Search")
def find_groups_named(self, groupname, node=None):
"""Look up a group name and return an array of group records."""
if node is None:
node = self.get_search_node()
odquery, error = ODQuery.queryWithNode_forRecordTypes_attribute_matchType_queryValues_returnAttributes_maximumResults_error_( node,
kODRecordTypeGroups,
kODAttributeTypeRecordName,
kODMatchEqualTo,
groupname,
kODAttributeTypeStandardOnly,
0,
None)
if odquery is None:
raise DSHelperError("Couldn't query {}: {}".format(node.nodeName,
error.localizedFailureReason()))
result, error = odquery.resultsAllowingPartial_error_(False, None)
if result is None:
raise DSHelperError("Couldn't retrieve query results: {}".format(error.localizedFailureReason()))
return result
def find_users_named(self, username, node=None):
"""Look up a user name and return an array of user records."""
if node is None:
node = self.get_search_node()
odquery, error = ODQuery.queryWithNode_forRecordTypes_attribute_matchType_queryValues_returnAttributes_maximumResults_error_( node,
kODRecordTypeUsers,
kODAttributeTypeRecordName,
kODMatchEqualTo,
username,
kODAttributeTypeStandardOnly,
0,
None)
if odquery is None:
raise DSHelperError("Couldn't query {}: {}".format(node.nodeName,
error.localizedFailureReason()))
result, error = odquery.resultsAllowingPartial_error_(False, None)
if result is None:
raise DSHelperError("Couldn't retrieve query results: {}".format(error.localizedFailureReason()))
return result
def add_user_to_group(self, user, group):
result, error = group.addMemberRecord_error_(user, None)
if not result:
if error:
error_msg = ": " + error.localizedFailureReason()
else:
error_msg = ""
raise DSHelperError("Couldn't add {} to {}{}".format(user.recordName,
group.recordName,
error_msg))
def remove_user_from_group(self, user, group):
result, error = group.removeMemberRecord_error_(user, None)
if not result:
if error:
error_msg = ": " + error.localizedFailureReason()
else:
error_msg = ""
raise DSHelperError("Couldn't remove {} from {}{}" % (user.recordName,
group.recordName,
error_msg))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment