Skip to content

Instantly share code, notes, and snippets.

@jbaker10
Created October 14, 2019 14:09
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jbaker10/4d03616910b86a5f7e24bbc0dab37023 to your computer and use it in GitHub Desktop.
Save jbaker10/4d03616910b86a5f7e24bbc0dab37023 to your computer and use it in GitHub Desktop.
Code to help sync user 's from G Suite to Active Directory
# MIT License
# Copyright (c) 2019 Jeremy Baker
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import os
import re
import logging
from ldap3 import Server, ServerPool, Connection, ALL, MODIFY_REPLACE, ALL_ATTRIBUTES, SUBTREE, FIRST
from ldap3.core.exceptions import *
AD_USERNAME = ""
AD_PASSWORD = ""
AD_OU = "DN,DOMAIN,COM"
DOMAIN_CONTROLLERS = ["DC1.domain.com", "DC2.domain.com"]
log = logging.getLogger()
def create_username(self, email):
"""
Helper function that takes an email and returns an AD appropriate username
:param email: the email that should be concatenated to a username
:return: a username that is the string in front of the `@` symbol without special characters
"""
try:
username = email.split("@")[0]
return sub("['-]", "", unidecode(username))
except IndexError as e:
log.error("Unable to create username from email {}".format(email))
return ""
def generate_password(length=30, complex_password=True):
"""
Generates a secure password and ensures that the password contains numbers/letters/special symbols (if complex_password=True)
:param length: (int) the length of the password to be generated
:param complex: (bool) if True will include special chars in password
:return: (str) new password
"""
charset = "ABCDEFGHJKMNPQRSTUVWXYZabcdefghjkmnpqrstuvwxyz123456789"
if not complex_password:
for i in range(20):
password = "".join([secrets.choice(charset) for _ in range(0, length)])
if not search("[a-z]", password):
continue
elif not search("[0-9]", password):
continue
elif not search("[A-Z]", password):
continue
else:
return password
special_chars = "!@#$%^&*()"
for i in range(20):
password = "".join([secrets.choice(charset) for _ in range(0, length)]) + "".join(
[secrets.choice(special_chars) for _ in range(0, 2)]
)
if not search("[a-z]", password):
continue
elif not search("[0-9]", password):
continue
elif not search("[A-Z]", password):
continue
elif not search("[!@#$%^&*()]", password):
continue
else:
return password
raise Exception("Unable to create secure password after 20 attempts")
class LDAP:
def __init__(self):
pass
def bind(self):
"""
Function that can be called to bind to a list of domain controllers as a pool
"""
log.info("Trying to connect to domain controllers: {}".format(DOMAIN_CONTROLLERS))
server_pool_list = []
for dc in DOMAIN_CONTROLLERS:
server_pool_list.append(Server(host=dc, get_info=ALL, port=636, use_ssl=True, connect_timeout=120))
server_pool = ServerPool(server_pool_list, FIRST, active=True, exhaust=120)
try:
self.conn = Connection(
server_pool, auto_bind=True, user=AD_USERNAME, password=AD_PASSWORD, raise_exceptions=True
)
self.conn.bind()
except LDAPTimeLimitExceededResult as e:
raise Exception("Unable to establish a connection with the LDAP server, timed out after 60 seconds")
except Exception as e:
raise Exception(
"Unable to create an LDAP connection to provision users in Active Directory. The error was: {}".format(
str(e)
)
)
def list_users(self, ou, attributes=ALL_ATTRIBUTES):
"""
List all users in a given OU or CN
:param ou: The DN path where you want the listing to occur
:param attributes: a list of attributes that you would like returned in the query (must contain at least userPrincipalName), defaults to ALL
:return: a dictionary of returned objects from the specified search DN with the key set to the user's UPN and value their AD record
"""
if not attributes == ALL_ATTRIBUTES and not "userPrincipalName" in attributes:
raise Exception("The attribute 'userPrincipalName' must be included in the attributes list passed")
ad_users = {}
try:
users = self.conn.extend.standard.paged_search(
search_base=ou,
search_filter="(objectClass=User)",
search_scope=SUBTREE,
attributes=attributes,
paged_size=100,
generator=False,
)
except Exception as e:
raise Exception("Unable to list users in AD. The error was: {}".format(str(e)))
if not self.conn.result.get("description") == "success":
raise Exception("Unable to list users in AD. The error was: {}".format(self.conn.result))
elif len(users) == 0:
log.info("No users were found in Active Directory, but got a good response. Proceeding.")
return {}
for user in users:
try:
# we append each user to a dictionary and set the value to the user record
# we are primarily using this as a lookup reference to know if the user was already in AD or not
ad_users[user.get("attributes", {}).get("userPrincipalName", "")] = user
except Exception as e:
log.warning("Unable to add user {} to ad_users list. The error was: {}".format(user.get("dn", ""), e))
return ad_users
def create_user(self, ldap_user_attributes, ou):
"""
Create a new user in Active Directory with default attributes
(including a generated secure password, and account enablement)
:param ou: the path in AD for the user to be created
:param ldap_user_attributes: the JSON formatted user record
:return: bool status of whether the creation succeeded or not
"""
try:
username = ldap_user_attributes.get("cn", {})[0] # sAMAccountName is a list, we need the first entry
except IndexError as e:
log.error("Unable to retrieve the sAMAccountName for record {}".format(ldap_user_attributes))
return False
log.debug("Attempting to create AD user {}".format(username))
dn = "CN={},{}".format(username, ou)
log.debug("The DN will be set to {}".format(dn))
try:
resp = self.conn.add(dn, attributes=ldap_user_attributes)
except LDAPEntryAlreadyExistsResult as e:
log.warning("The user {} already exists in Active Directory, skipping".format(username))
## we return True here since the user does already exist in the domain
return True
except LDAPConstraintViolationResult as e:
log.error("Unable to update account {} due to a Contstraint Violation".format(username))
self.status["errors"].append(str(e))
return False
except Exception as e:
log.error(
"An unexpected error occurred while trying to create user {}. The error was: {}".format(
ldap_user_attributes.get("userPrincipalName", [])[0], str(e)
)
)
return False
if not resp:
log.error(
"Unable to create user {}. The error was: {}".format(
ldap_user_attributes.get("userPrincipalName", ""), self.conn.result
)
)
return False
return True
def main():
## create the LDAP object
ldap = LDAP()
## bind to the domain controllers
ldap.bind()
# get a list of all current LDAP users (specifically their UPN) in a given OU
current_ad_users = ldap.list_users(ou=AD_OU, attributes=["userPrincipalName"])
## This assumes a list of G Suite users to iterate through where each user is a JSON dict representation of the user response from G Suite
## For reference, see the G Suite directory API docs here: https://developers.google.com/admin-sdk/directory/v1/reference/users
for user in gsuite_users:
## since the current_ad_users is a dictionary, we can easy lookup to see if the user already exists in AD and bypass this step
## this assumes your G Suite domain and AD domain (used with the UPN are the same)
if current_ad_users.get(user.get("primaryEmail", "")):
log.warning("User {} already exists in AD, moving on".format(username))
continue
ldap_user_attributes = {}
## get a username that will be used as the UPN when creating the user in AD
username = create_username(user.get("primaryEmail", ""))
try:
ldap_user_attributes = {
"objectClass": ["top", "person", "organizationalPerson", "user"],
"cn": [username],
"sAMAccountName": [username[:20]], # we need to strip the sAMAccountName to 20 chars
"displayName": [username],
"mail": [user.get("primaryEmail", "")],
## this assumes your G Suite domain and AD domain (used with the UPN are the same)
"userPrincipalName": ["{}@{}".format(user.get("primaryEmail", ""))],
# mind the extra set of quotes in the uncodePWd value, this is required, do not change
"unicodePwd": '"{}"'.format(generate_password()).encode(
"utf-16-le" # this is very specific to MS AD, so this needs be the encoding
),
"userAccountControl": "66048", # userAccountControl mappings: https://vaportech.wordpress.com/2007/12/06/useraccountcontrol/
}
# employeeName is normally the 'First Last' name of a user
if user.get("employeeName", ""):
name_split = user.get("employeeName", "").split(" ")
ldap_user_attributes["givenName"] = [name_split[0]]
ldap_user_attributes["sn"] = [" ".join(name_split[1:])]
if user.get("jobTitle", ""):
ldap_user_attributes["title"] = [user.get("jobTitle", "")]
if user.get("department", ""):
ldap_user_attributes["department"] = [user.get("department", "")]
return ldap_user_attributes, True, ""
except Exception as e:
log.error("Unable to create the LDAP resource for user record {}. The error was: {}".format(user.get("primaryEmail", ""), e))
continue
## attempt to create the new user record in AD
try:
ldap.create_user(ldap_user_attributes=ldap_user_attributes, ou=AD_OU)
except Exception as e:
try:
log.error(
"An error occurred while trying to create {}. The error was: {}".format(
user.get("userPrincipalName", "")[0], e
)
)
except IndexError as e:
log.error("An error occurred while trying to create {}".format(user))
log.info("Successfully created {} in {}".format(username, AD_OU))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment