Last active
December 7, 2022 13:24
-
-
Save Cybso/2d9b27589f657b057c34 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# syncldap.py - Sync user accounts from an LDAP or Active Directory to Redmine | |
# | |
# Usage: syncldap.py <database.yml> [environment] | |
# | |
# Where database.yml is the database configuration file for a redmine instance, | |
# and environment is the rails environment that should be used (defaults to | |
# 'production'). | |
# | |
# Author: | |
# Roland Tapken <roland.tapken@tasmiro.de> | |
# | |
# License: | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |
# | |
# Description: | |
# For each configured LDAP authentication source, this programm queries all | |
# entries that would pass the configured filter and creates or updates a | |
# stub entries in 'users' and 'email_addresses' tables. | |
# | |
# This allows project maintainers to work with new users that havn't logged | |
# in, yet. | |
# | |
# All accounts that came from the same authentication source and are not | |
# returned by the LDAP search are disabled (not deleted). | |
# | |
# All database transaction are done within the same transaction. This means, | |
# if ANYTHING goes wrong, no changes will have been done to the database | |
# (beside a potentially incremented ID counter for 'users' and | |
# 'email_addresses') | |
# | |
# Dependencies: | |
# - python-mysql.connector | |
# - python-ldap | |
# | |
# Changelog: | |
# 2017-02-13 Roland Tapken: Fixed some problems that could occur with other | |
# other LDAP/AD servers (thanks to Svet) | |
import sys | |
import ldap | |
import mysql.connector | |
import yaml | |
if len(sys.argv) < 2 or len(sys.argv) > 3: | |
print('Usage: %s <database.yml> [<environment=production>]' % (sys.argv[0])) | |
sys.exit(1) | |
conf = None | |
with open(sys.argv[1], 'r') as stream: | |
conf = yaml.load(stream) | |
if conf: | |
if len(sys.argv) == 3: | |
conf = conf.get(sys.argv[2], None) | |
else: | |
conf = conf.get('production', None) | |
if not conf: | |
print('Failed to load database configuration from %s' % (sys.argv[1])) | |
sys.exit(2) | |
if not conf.get('adapter', '') in ['mysql', 'mysql2']: | |
print('This program only supports mysql databases') | |
sys.exit(3) | |
def get_first(iterable, default=None): | |
if iterable: | |
for item in iterable: | |
return item | |
return default | |
db = mysql.connector.connect( | |
user=conf.get('username', None), | |
password=conf.get('password', None), | |
host=conf.get('host', None), | |
database=conf.get('database', None), | |
charset=conf.get('encoding', 'utf8') | |
) | |
cursor = None | |
userCursor = None | |
try: | |
db.start_transaction() | |
userCursor = db.cursor() | |
cursor = db.cursor() | |
# Iterate all LDAP authentication sources | |
cursor.execute("SELECT id, type, name, host, port, account, account_password, base_dn, attr_login, attr_firstname, attr_lastname, attr_mail, onthefly_register, tls, filter FROM auth_sources WHERE type = 'AuthSourceLdap'") | |
for (auth_source_id, auth_source_type, name, host, port, account, account_password, base_dn, attr_login, attr_firstname, attr_lastname, attr_mail, onthefly_register, tls, filter) in cursor: | |
# Query for all avalable accounts within this source | |
#print('LDAP: -H "%s://%s:%d" -W -D "%s" -b "%s" "%s" (with password "%s")' % ('ldaps' if tls else 'ldap', host, port, account, base_dn, filter, account_password)) | |
l = ldap.initialize('%s://%s:%d' % ('ldaps' if tls else 'ldap', host, port)) | |
try: | |
if tls: | |
con.start_tls_s() | |
# http://stackoverflow.com/questions/18793040/python-ldap-not-able-to-bind-successfully | |
l.set_option(ldap.OPT_REFERRALS, 0) | |
l.bind_s(account, account_password) | |
attrs = [attr_login.encode('unicode-escape')] | |
if attr_firstname: | |
attrs.append(attr_firstname.encode('unicode-escape')) | |
if attr_lastname: | |
attrs.append(attr_lastname.encode('unicode-escape')) | |
if attr_mail: | |
attrs.append(attr_mail.encode('unicode-escape')) | |
users = l.search_s( base_dn, ldap.SCOPE_SUBTREE, filter, attrs) | |
# First, disable all users. Then, re-enable all users | |
# returned from ldap. This works without side effect since we are in | |
# a global transaction. | |
userCursor.execute('UPDATE users SET status = 0 WHERE auth_source_id = %s', [auth_source_id]) | |
for (cn, user) in users: | |
if cn is None: | |
# Got some strange lines from another person's AD. | |
# They had 'None' as cn and a list as entry. | |
continue | |
# 'login' is no key field, so we cannot simply use 'ON DUPLICATE KEY...' | |
login = get_first(user.get(attr_login, '')) | |
if login: | |
userCursor.execute('SELECT id, firstname, lastname FROM users WHERE login = %s and auth_source_id = %s', (login, auth_source_id)) | |
row = userCursor.fetchone() | |
if row: | |
# Update and enable | |
userCursor.execute('UPDATE users SET firstname = %s, lastname = %s, status = 1 WHERE id = %s', ( | |
get_first(user.get(attr_firstname, row[1])), | |
get_first(user.get(attr_lastname, row[2])), | |
row[0] | |
)) | |
else: | |
# Create | |
userCursor.execute("INSERT INTO users (login, firstname, lastname, type, auth_source_id, created_on, updated_on) VALUES (%s, %s, %s, 'User', %s, NOW(), NOW())", ( | |
login, | |
get_first(user.get(attr_firstname, login)), | |
get_first(user.get(attr_lastname, login)), | |
auth_source_id | |
)) | |
# Fetch generated ID | |
userCursor.execute('SELECT id, firstname, lastname FROM users WHERE login = %s and auth_source_id = %s', (login, auth_source_id)) | |
row = userCursor.fetchone() | |
# Update email address, if available | |
userId = row[0] | |
email = get_first(user.get(attr_mail, '')) | |
if email: | |
userCursor.execute('SELECT id FROM email_addresses WHERE address = %s', [email]) | |
row = userCursor.fetchone() | |
if not row: | |
# Make this the user's default address | |
userCursor.execute('UPDATE email_addresses SET is_default = 0 WHERE user_id = %s', [userId]) | |
userCursor.execute('INSERT INTO email_addresses (user_id, address, is_default, notify, created_on, updated_on) VALUES (%s, %s, 1, 1, NOW(), NOW())', ( | |
userId, | |
)) | |
finally: | |
l.unbind() | |
# Everything was fine. | |
db.commit() | |
except: | |
# Something went wrong. Don't apply any changes to the database. | |
db.rollback() | |
raise | |
finally: | |
if userCursor: | |
userCursor.close() | |
if cursor: | |
cursor.close() | |
db.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment