Skip to content

Instantly share code, notes, and snippets.

@om2c0de
Created September 29, 2020 14:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save om2c0de/96ec357af20de306385bb07bcc68c98b to your computer and use it in GitHub Desktop.
Save om2c0de/96ec357af20de306385bb07bcc68c98b to your computer and use it in GitHub Desktop.
AD Sync
import os
import mongoengine
from dataclasses import dataclass
from ldif import LDIFRecordList
from auth.plugins.ad_emulator.service import RemoteUser, RemoteResource
# MongoDB settings.
MONGODB_HOST = os.environ.get('VEGA_MONGODB_HOST', '127.0.0.1')
MONGODB_PORT = os.environ.get('VEGA_MONGODB_PORT', 27017)
MONGODB_DB_NAME = os.environ.get('VEGA_MONGODB_DB_NAME', 'vega')
MONGODB_AUTH_ENABLED = os.environ.get('VEGA_MONGODB_AUTH_ENABLED', False)
MONGODB_AUTH_SOURCE = os.environ.get('VEGA_MONGODB_AUTH_SOURCE', 'admin')
MONGODB_USERNAME = os.environ.get('VEGA_MONGODB_USERNAME', 'vega')
MONGODB_PASSWORD = os.environ.get('VEGA_MONGODB_PASSWORD', 'vega_gpn_passwd')
# MongoDB app settings
MONGODB_REMOTE_USER_COLLECTION = RemoteUser()
MONGODB_REMOTE_RESOURCE_COLLECTION = RemoteResource()
# LDIF settings.
LDIF_USER_LOGIN = os.environ.get('LDIF_USER_LOGIN', 'userPrincipalName')
LDIF_USER_FIRST_NAME = os.environ.get('LDIF_USER_FIRST_NAME', 'givenName')
LDIF_USER_LAST_NAME = os.environ.get('LDIF_USER_LAST_NAME', 'displayName')
LDIF_USER_ORGANISATION_UNIT = os.environ.get('LDIF_USER_ORGANISATION_UNIT', 'primaryGroupID')
# Parser settings
IGNORED_ATTR_TYPES = None
MAX_ENTRIES = 10000
@dataclass
class User:
login: str
first_name: str
last_name: str
organisation_unit: str
class LDIFParser:
def __init__(self, filename, ignored_attr_types=None, max_entries=0):
self._filename = filename
self._ignored_attr_types = ignored_attr_types
self._max_entries = max_entries
def __call__(self):
users = self._get_users_from_ldif_file()
print(users)
print(self._db_connection)
for user in RemoteUser.objects:
print(f'l - {user.login}')
def _get_users_from_ldif_file(self):
with open(self._filename, 'rb') as f:
parser = LDIFRecordList(f, ignored_attr_types=self._ignored_attr_types, max_entries=self._max_entries)
parser.parse()
users = []
for dn, entry in parser.all_records:
try:
entry_data = {'login': entry[LDIF_USER_LOGIN],
'first_name': entry[LDIF_USER_FIRST_NAME],
'last_name': entry[LDIF_USER_LAST_NAME],
'organisation_unit': entry[LDIF_USER_ORGANISATION_UNIT]}
user = User(**entry_data)
users.append(user)
except KeyError:
pass
return users
@property
def _db_connection(self):
config = {'db': MONGODB_DB_NAME,
'username': MONGODB_USERNAME,
'password': MONGODB_PASSWORD,
'host': MONGODB_HOST,
'port': MONGODB_PORT}
connection = mongoengine.connect(**config)
return connection
if __name__ == '__main__':
ldif_file = 'ldapsearch.ldif'
LDIFParser(ldif_file, ignored_attr_types=IGNORED_ATTR_TYPES, max_entries=MAX_ENTRIES)()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment