Skip to content

Instantly share code, notes, and snippets.

@tothandras
Last active August 29, 2015 14:17
Show Gist options
  • Save tothandras/d843995e999648d74517 to your computer and use it in GitHub Desktop.
Save tothandras/d843995e999648d74517 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Usage: get_wrong_mails.py -c COMPANY -o OUTFILE.json [-u]
@author: Andras Toth
"""
import argparse
import os
import json
import pyldap
import re
# Define Error for empty search result
# Raised when LDAP search returns an empty list
class EmptyResultError(Exception):
pass
def parse_args(args=None):
parser = argparse.ArgumentParser()
parser.add_argument('-c',
help='name of the company',
type=str,
required=True)
parser.add_argument('-o',
help='output file in JSON format',
type=str,
required=True)
parser.add_argument('-u',
help='insert UID number in output',
action='store_true',
default=False)
# Parse arguments
parsed_args = parser.parse_args(args)
company = parsed_args.c
out_file = parsed_args.o
insert_uid_number = parsed_args.u
return company, out_file, insert_uid_number
def ldap_connection(ldap_url, company):
try:
ldap_client = pyldap.LDAPClient(ldap_url)
ldap_client.set_credentials('SIMPLE', ('cn=Manager,dc=irf,dc=local', 'LaborImage'))
# Open LDAP connection
with ldap_client.connect() as conn:
# Get all persons
persons = conn.search('ou={},ou=Partners,dc=irf,dc=local'.format(company), 1,
'(&(objectClass=inetOrgPerson))')
if not persons:
raise EmptyResultError
return persons
except pyldap.ConnectionError:
print("ERROR: Can't connect to LDAP server")
except EmptyResultError:
print("ERROR: Company doesn't exist")
except pyldap.InvalidDN:
print("ERROR: Invalid company name")
def write_json(out_file, persons, company, insert_uid_number):
try:
# Create directories if not exist
out_dir = os.path.dirname(out_file)
if not os.path.exists(out_dir) and out_dir:
os.makedirs(out_dir)
# Require .json extension
if not out_file.endswith('.json'):
out_file += '.json'
# Open file
with open(out_file, 'w') as f:
# Create dictionary for each person
persons_arr = []
for p in persons:
uid = p.get('uid')[0]
uid_number = p.get('uidNumber')[0]
# Create domain name from the name of the company
domain = re.sub(r'[^\w\d\-]', '', company.lower())
old_mail = p.get('mail', [''])[0]
new_mail = '{}@{}.com'.format(uid, domain)
person = {
'mail': {
'new': new_mail,
'old': old_mail
},
'uid': uid
}
if insert_uid_number:
person['uidNumber'] = uid_number
persons_arr.append(person)
# Write JSON to file
json.dump(persons_arr, f, indent=4, sort_keys=True)
except PermissionError:
print("ERROR: You don't have permission to open {} file".format(out_file))
def main():
# Get arguments
company, out_file, insert_uid_number = parse_args()
# LDAP connection URL
ldap_url = 'ldap://localhost'
# Write to file
persons = ldap_connection(ldap_url, company)
if persons:
write_json(out_file, persons, company, insert_uid_number)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
"""
Usage: test.py
@author: Andras Toth
"""
import unittest
import json
import io
import sys
from src import get_wrong_mails
class TestJsonWriter(unittest.TestCase):
def setUp(self):
self.out = io.StringIO()
self.saved_stdout = sys.stdout
sys.stdout = self.out
def tearDown(self):
self.out.close()
sys.stdout = self.saved_stdout
def test_connection(self):
company = 'X-Corporation'
get_wrong_mails.ldap_connection('ldap://notaserver', company)
console = self.out.getvalue()
self.assertIn('ERROR:', console)
def test_args(self):
# 1
args = ['-o', 'o.json', '-c', 'X-Y']
company, out_file, insert_uid_number = get_wrong_mails.parse_args(args)
self.assertEqual(company, 'X-Y')
self.assertEqual(out_file, 'o.json')
self.assertEqual(insert_uid_number, False)
# 2
args = ['-o', 'o.json', '-c', 'X-Y', '-u']
company, out_file, insert_uid_number = get_wrong_mails.parse_args(args)
self.assertEqual(insert_uid_number, True)
# 3
args = ['-c', 'X-Y', '-u']
self.assertRaises(SystemExit, get_wrong_mails.parse_args, args)
def test_out(self):
out_file = 'tmp/out.json'
company = 'X.Y.Z - Corp'
persons = [
{
'uid': ['foo'],
'uidNumber': ['1001'],
'mail': ['foo@gmail.com']
},
{
'uid': ['baz'],
'uidNumber': ['1003'],
'mail': ['']
}
]
get_wrong_mails.write_json(out_file, persons, company, True)
with open(out_file, 'r') as f:
out = json.load(f)
o1 = out[0]
o2 = out[1]
self.assertEqual(len(out), 2)
# 1
self.assertEqual(o1['mail'], {
'new': 'foo@xyz-corp.com',
'old': 'foo@gmail.com'
})
self.assertEqual(o1['uid'], 'foo')
self.assertEqual(o1['uidNumber'], '1001')
# 2
self.assertEqual(o2['mail'], {
'new': 'baz@xyz-corp.com',
'old': ''
})
self.assertEqual(o2['uid'], 'baz')
self.assertEqual(o2['uidNumber'], '1003')
if __name__ == '__main__':
unittest.main()
#!/usr/bin/env bash
# Correct arguments
../src/get_wrong_mails.py -c "S.T.A.R. Labs" -o temp/out1.json
../src/get_wrong_mails.py -c "X-Corporation" -o temp/out2.json -u
# Company doesn't exist
../src/get_wrong_mails.py -c "XY Corp" -o temp/out3.json
# Invalid company name
../src/get_wrong_mails.py -c "X,Y Corp" -o temp/out4.json
# Permission denied
chmod -w temp
../src/get_wrong_mails.py -c "Advanced Idea Mechanics" -o temp/out5.json
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment