Skip to content

Instantly share code, notes, and snippets.

@hackprime
Last active May 16, 2019 02:24
Show Gist options
  • Save hackprime/047320bbd5fef2408bd8b959010b1cae to your computer and use it in GitHub Desktop.
Save hackprime/047320bbd5fef2408bd8b959010b1cae to your computer and use it in GitHub Desktop.
Simple wrapper class over FreeIPA client implementation from https://github.com/nordnet/python-freeipa-json
"""
Simple wrapper class over FreeIPA client implementation from
https://github.com/nordnet/python-freeipa-json
Usage
* Create user:
>>> manager = LDAPClient('ldap.example.com', 'admin', 'qw3rty12')
>>> user = manager.create_user('user1', 'systemadmins', 'asd1234567')
>>> print(user['uid'])
'user1'
* Update user's password:
>>> manager.update_user_password('user1', '1234wfggggh')
* Delete User:
>>> manager.delete_user('user1')
"""
from json import JSONDecodeError
import logging
import ipahttp
class LDAPError(Exception):
def __init__(self, error_data):
self.message = error_data.get('message')
self.code = error_data.get('code', 0)
self.name = error_data.get('name')
def __str__(self):
return '%d: %s' % (self.code, self.message)
class LDAPAccessDenied(Exception):
message = 'Unable to log in into LDAP server'
def __str__(self):
return self.message
class LDAPClient:
_connection = None
_never_expire_datetime = '20500101000000Z'
def __init__(self, host, username, password):
self._host = host
self._username = username
self._password = password
@property
def connection(self):
if self._connection is None:
self._connection = ipahttp.ipa(self._host, sslverify=True)
self._connection.login(self._username, self._password)
return self._connection
def request(self, method, *args, **kwargs):
try:
data = getattr(self.connection, method)(*args, **kwargs)
except JSONDecodeError as e:
if 'Unable to verify your Kerberos credentials' in e.doc:
raise LDAPAccessDenied()
logging.error(e.doc)
raise
if data['error'] is not None:
raise LDAPError(data['error'])
return data.get('result', {}).get('result')
def get_group(self, group_name):
return self.request('group_show', group_name)
def get_user(self, user_name):
return self.request('user_show', user_name)
def is_group_exist(self, group_name):
try:
self.get_group(group_name)
except LDAPError as e:
if e.code == 4001:
return False
raise
return True
def is_user_exist(self, user_name):
try:
self.get_user(user_name)
except LDAPError as e:
if e.code == 4001:
return False
raise
return True
def create_group(self, group_name):
return self.request(
'group_add',
group_name,
description=(
'Group "%s", created by XTREME-STARGATE API' % group_name))
def create_user(self, user_name, group_name, password, public_keys=None):
group = self.is_group_exist(group_name)
opts = {
'givenname': user_name,
'sn': group_name,
'cn': '%s %s' % (group_name, user_name),
'userpassword': password,
'mail': '%s@xtreme-stargate.com' % user_name,
'homedirectory': '/home/%s/%s' % (group_name, user_name),
}
if public_keys is not None:
opts['ipasshpubkey'] = public_keys
user_data = self.request('user_add', user_name, opts=opts)
self.request('group_add_member', group_name, user_name, 'user')
user_data['memberof_group'].append(group_name)
# NOTE: For some reason, ipahttp cannot set password expiration time
# during the user creation operation and password will expire
# instatntly on the first login. To prevent that -
# updating password expiration time on a separate request.
expiration_prolong_data = self.request(
'user_mod',
user_name,
setattrs=[
'krbpasswordexpiration=%s' % self._never_expire_datetime])
user_data['krbpasswordexpiration'] = expiration_prolong_data.get(
'krbpasswordexpiration')
return user_data
def update_user_public_keys(self, user_name, public_keys):
try:
return self.request(
'user_mod',
user_name,
setattrs=['ipasshpubkey=%s' % key for key in public_keys])
except LDAPError as e:
# Skip non-critical error "no modifications to be performed"
if e.code == 4202:
return {'error': e}
raise
def delete_user_public_keys(self, user_name, public_keys):
try:
return self.request(
'user_mod',
user_name,
delattrs=['ipasshpubkey=%s' % key for key in public_keys])
except LDAPError as e:
# Skip non-critical errors:
# 4026: "ipasshpubkey does not contain {value}"
# 3009: "invalid 'ipasshpubkey': No such attribute on this entry"
if e.code in (4026, 3009):
return {'error': e}
raise
def update_user_password(self, user_name, password):
user_data = self.request(
'user_mod', user_name, setattrs=['userpassword=%s' % password])
expiration_prolong_data = self.request(
'user_mod',
user_name,
setattrs=[
'krbpasswordexpiration=%s' % self._never_expire_datetime])
user_data['krbpasswordexpiration'] = expiration_prolong_data.get(
'krbpasswordexpiration')
logging.error(user_data)
return user_data
def delete_user(self, user_name):
return self.request('user_del', user_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment