Instantly share code, notes, and snippets.

Embed
What would you like to do?
Freeipa client with kerberos authentication
#
# Copyright 2017-2017 Ghent University
#
# This file is part of vsc-freeipa,
# originally created by the HPC team of Ghent University (http://ugent.be/hpc/en),
# with support of Ghent University (http://ugent.be/hpc),
# the Flemish Supercomputer Centre (VSC) (https://www.vscentrum.be),
# the Flemish Research Foundation (FWO) (http://www.fwo.be/en)
# and the Department of Economy, Science and Innovation (EWI) (http://www.ewi-vlaanderen.be/en).
#
# https://github.ugent.be/hpcugent/vsc-freeipa
#
# vsc-freeipa is free software: you can redistribute it and/or modify
# it under the terms of the GNU Library General Public License as
# published by the Free Software Foundation, either version 2 of
# the License, or (at your option) any later version.
#
# vsc-freeipa is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU Library General Public License
# along with vsc-freeipa. If not, see <http://www.gnu.org/licenses/>.
#
"""
A simple client for freeipa
Uses kerberos authentication
"""
import requests
import json
import logging
from requests_kerberos import HTTPKerberosAuth, REQUIRED
class FreeIPAClientException(Exception):
"""Exception thrown on a FreeIPA Error"""
pass
class FreeIPAClientEmptyModlistException(FreeIPAClientException):
"""Exception thrown when an empty modification list was requested"""
pass
class FreeIPAClient(object):
""""
A python client to the freeipa rest api
Not a full client, methods get added on a per needed base.
Set the KRB5_KTNAME environment variable to point to your keytab to make the kerberos authentication work.
authentication with kerberos should be done before trying to use this client.
provide a valid ca cert if you are using your own ca's
"""
def __init__(self, url='https://myipaserver.example.com/ipa', user='sync_user', realm='MYREALM.EXAMPLE.COM',
ca=None, dry_run=False):
"""
Initialize the client
Supply a user with correct permissions (default sync_user).
"""
principal = '%s@%s' % (user, realm)
self.pid = 0
self.url = url + '/json'
self.version = '4.4.0'
self.kerberos_auth = HTTPKerberosAuth(mutual_authentication=REQUIRED, principal=principal)
self.session = requests.session()
self.session.headers.update({
'referer': url,
'Content-Type': 'application/json',
'Accept': 'application/json',
})
self.ca = ca
self.dry_run = dry_run
def post(self, method, jsondata):
"""
post json data to a certain ipa method
method: method to post to
jsondata: a list of a list of arguments and a dictionary of options.
- example:
method: user-show
arguments: admin
optons: empty
you need to specify jsondata:
[ [ "admin" ], { "all": false, "no_members": false, "raw": false, "rights": false } ]
caveat: doesn't work with truncated results yet
"""
data = json.dumps({
'method': method,
'params': jsondata,
'id': self.pid,
'version': self.version,
})
logging.debug('performing post request to ipa: %s', data)
response = self.session.post(self.url, auth=self.kerberos_auth, data=data, verify=self.ca)
# Handle the error and warning messsages in a response
response = response.json()
logging.debug(response)
if response.get('error', False):
logging.error(response['error']['message'])
if response['error']['name'] == 'EmptyModlist':
raise FreeIPAClientEmptyModlistException(response['error']['message'])
raise FreeIPAClientException(response['error']['message'])
result = response['result']
if 'summary' in result:
logging.info(result['summary'])
if 'messages' in result:
for message in result['messages']:
if message['type'] == 'warning':
logging.warning(message['message'])
else:
logging.info(message['message'])
logging.debug(message)
# TODO: repeat truncated results?
assert('truncated' not in result or not result['truncated'])
self.pid += 1
return result['result']
def user_find(self, username=""):
"""
Find all users
"""
return self.post('user_find', [[username], {}])
def user_add(self, username, uid, givenname, surname, guidnumber, pubkey):
"""
Add a user by given username
params:
- uid integer
- givenname string
- surname string
- guidnumber integer
- pubkey comma seperated string of pubkeys
"""
result = self.post('user_add', [[username], {
'uidnumber': uid, # documentation would have you believe this is uid, but it expects uidnumber
'givenname': givenname,
'sn': surname,
'gidnumber': guidnumber,
'ipasshpubkey': pubkey,
}])
return result
def user_mod(self, username, pubkey):
"""updates the pubkeys for a given user"""
try:
result = self.post('user_mod', [[username], {
'ipasshpubkey': pubkey,
}])
except FreeIPAClientEmptyModlistException:
logging.info('user_mod: nothing modified for user %s', username)
return None
return result
def user_del(self, username):
"""Delete user with given username"""
result = self.post('user_del', [[username], {}])
return result
def group_show(self, groupname):
"""get info for a given group"""
return self.post('group_show', [[groupname], {}])
def group_add(self, groupname, gid):
"""add a group with given grouname and gid to ipa"""
return self.post('group_add', [[groupname], {'gidnumber': gid}])
def group_add_member(self, groupname, users):
"""add a member or a list of members to a group with given grouname to ipa"""
if isinstance(users, list):
users = ",".join([str(x) for x in users])
return self.post('group_add_member', [[groupname], {'user': users}])
def group_remove_member(self, groupname, users):
"""remove a member or a list of members to a group with given grouname to ipa"""
if isinstance(users, list):
users = ",".join([str(x) for x in users])
return self.post('group_remove_member', [[groupname], {'user': users}])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment