Skip to content

Instantly share code, notes, and snippets.

@seanmhanson
Last active December 15, 2016 19:48
Show Gist options
  • Save seanmhanson/71a36f4605bf35a4c62e to your computer and use it in GitHub Desktop.
Save seanmhanson/71a36f4605bf35a4c62e to your computer and use it in GitHub Desktop.
Spectra DS CMV.INI Cleanup
# CMV.INI Cleanup Script
# @author = Sean Hanson (github: seanmhanson)
#
# Script used to clean the "CMV.INI" initailization file used by
# MediaOcean Spectra DS to associate MediaOcean sessions with Active Directory Users
#
# INPUT: Flag and path as needed to the cmv.ini file. Default value if left blank is "/cmv.ini".
# See also parseArgs below.
# OUTPUT: None, writes files "newcmv_<datestring>.ini" and "removal_report_<datestring>.csv"
# where the date string is in the format YYYYMMDDHHMMSS
import argparse
import csv
import configparser
import getpass
import sys
from argparse import RawTextHelpFormatter
from collections import OrderedDict
from datetime import datetime
from sys import stdout
from ldap3 import Server, Connection, SEARCH_SCOPE_WHOLE_SUBTREE
from win32security import LogonUser, LOGON32_LOGON_NETWORK, LOGON32_PROVIDER_DEFAULT, error
# GLOBAL VARIABLES
####################
#
# CMV.INI Variables (READ)
INI_HEADER_KEYS = ["CCOVERRIDE", "GATEWAY", "SERVER", "PORT", "RESOURCENAME"] # Keys used in headers in INI
OPEN_PLACEHOLDER = "OPEN" # Placeholder for unused, available LUIDs
OTHER_OPEN_PLACEHOLDERS = ["EXLONLY"]
CLOSED_PLACEHOLDER = "DONOTUSE" # Placeholder for unused, unavailable LUIDs
OTHER_CLOSED_PLACEHOLDERS = []
PLACEHOLDER_KEYS = [OPEN_PLACEHOLDER] + OTHER_OPEN_PLACEHOLDERS + [CLOSED_PLACEHOLDER] + OTHER_CLOSED_PLACEHOLDERS
# LDAP Variables
DC_NAME = None # Domain Controller DNS Name (String required)
LDAP_PORT = 389 # SSL not enabled by default
SEARCH_BASE = None # LDAP Query Search Base (String required)
# CMV.INI Variables (Write)
#
# If you require a write function for your CMV to maintain structure and commenting, declare below.
# The write function is called with luid_list as a parameter (a list of LUID/Username Pairs, note the order)
# If "None", the default function uses the below values
CMV_WRITE_FUNCTION = None
CMV_CCOveride = None # CCOveride Value (Int wrapped in String required)
CMV_Gateway = None # MediaOcean Gateway (String required)
CMV_Server = None # MediaOcean Server (String required)
CMV_Port = None # MediaOcean Port (Int wrapped in String required)
CMV_ResourceName = None #MediaOcean ResourceName field (String required)
CMV_A_Header = None # Multiline string with new line characters to write as commented header for A section
# Extension of ConfigParser to allow for duplicate key values
#
# NEW BEHAVIOR:
# When a duplicate key is detected, the value is turned into a list of values
# and each value is appended to the list of values
class ConfigParserMultiples(configparser.RawConfigParser):
def __init__(self):
configparser.RawConfigParser.__init__(self, empty_lines_in_values=False, strict=False)
def _read(self, fp, fpname):
'''Identical to RawConfigParser except for additions
commented in multiline string quotes.
Modifications by @Praetorian on StackOverflow'''
elements_added = set()
cursect = None # None, or a dictionary
sectname = None
optname = None
lineno = 0
indent_level = 0
e = None # None, or an exception
for lineno, line in enumerate(fp, start=1):
comment_start = sys.maxsize
# strip inline comments
inline_prefixes = {p: -1 for p in self._inline_comment_prefixes}
while comment_start == sys.maxsize and inline_prefixes:
next_prefixes = {}
for prefix, index in inline_prefixes.items():
index = line.find(prefix, index+1)
if index == -1:
continue
next_prefixes[prefix] = index
if index == 0 or (index > 0 and line[index-1].isspace()):
comment_start = min(comment_start, index)
inline_prefixes = next_prefixes
# strip full line comments
for prefix in self._comment_prefixes:
if line.strip().startswith(prefix):
comment_start = 0
break
if comment_start == sys.maxsize:
comment_start = None
value = line[:comment_start].strip()
if not value:
if self._empty_lines_in_values:
# add empty line to the value, but only if there was no
# comment on the line
if (comment_start is None and
cursect is not None and
optname and
cursect[optname] is not None):
cursect[optname].append('') # newlines added at join
else:
# empty line marks end of value
indent_level = sys.maxsize
continue
# continuation line?
first_nonspace = self.NONSPACECRE.search(line)
cur_indent_level = first_nonspace.start() if first_nonspace else 0
if (cursect is not None and optname and
cur_indent_level > indent_level):
cursect[optname].append(value)
# a section header or option header?
else:
indent_level = cur_indent_level
# is it a section header?
mo = self.SECTCRE.match(value)
if mo:
sectname = mo.group('header')
if sectname in self._sections:
if self._strict and sectname in elements_added:
raise DuplicateSectionError(sectname, fpname,
lineno)
cursect = self._sections[sectname]
elements_added.add(sectname)
elif sectname == self.default_section:
cursect = self._defaults
else:
cursect = self._dict()
self._sections[sectname] = cursect
self._proxies[sectname] = configparser.SectionProxy(self, sectname)
elements_added.add(sectname)
# So sections can't start with a continuation line
optname = None
# no section header in the file?
elif cursect is None:
raise MissingSectionHeaderError(fpname, lineno, line)
# an option line?
else:
mo = self._optcre.match(value)
if mo:
optname, vi, optval = mo.group('option', 'vi', 'value')
if not optname:
e = self._handle_error(e, fpname, lineno, line)
optname = self.optionxform(optname.rstrip())
if (self._strict and
(sectname, optname) in elements_added):
raise DuplicateOptionError(sectname, optname,
fpname, lineno)
elements_added.add((sectname, optname))
# This check is fine because the OPTCRE cannot
# match if it would set optval to None
if optval is not None:
optval = optval.strip()
'''Modified: if optname exists, make optval a tuple if not
already one and append'''
if (optname in cursect) and (cursect[optname] is not None):
if not isinstance(cursect[optname], tuple):
cursect[optname] = tuple(cursect[optname])
cursect[optname] = cursect[optname] + tuple([optval])
else:
cursect[optname] = [optval]
else:
# valueless option handling
cursect[optname] = None
else:
# a non-fatal parsing error occurred. set up the
# exception but keep going. the exception will be
# raised at the end of the file and will contain a
# list of all bogus lines
e = self._handle_error(e, fpname, lineno, line)
# if any parsing errors occurred, raise an exception
if e:
raise e
self._join_multiline_values()
# Parse Command-line Arguments and provide help text
#
# INPUT: None (pulls from sys.argv)
# OUTPUT: List of (1) argument, containing the path of the cmv.ini file
def parseArgs():
parser = argparse.ArgumentParser(
description="Checks all usernames in cmv.ini file"
" and removes any not found in Active Directory\n"
"Outputs a CSV file of removed users and LUIDs, and"
" a modified INI file (newcmv.ini)",
formatter_class=RawTextHelpFormatter)
parser.add_argument('-i', '--input',
default="cmv.ini",
help="relative path of cmv.ini file\n"
"(default: cmv.ini)")
return parser.parse_args()
# MAIN FUNCTION
#
# Parse arguments, read the CMV.INI file, looks up values against Active Directory,
# then writes removal reports and updated CMV.INI
def main():
ARGS = parseArgs()
luid_list = read_cmv(ARGS.input)
credentials = get_credentials()
luid_list, removed_users = LDAP_lookups(luid_list, credentials)
write_removal_report(removed_users)
write_cmv(luid_list)
# Reads CMV.INI file and pulls username/LUID pairs
# For duplicate keys, add one pair for each distinct value
#
# NB: Ignores [General] section and parses [A] section
# Set to ignore header material in [A] and [GENERAL]
#
# INPUT: Path to CMV.INI file
# OUTPUT: List of (LUID, Username) pairs as duples
def read_cmv(path):
config = ConfigParserMultiples()
config.optionxform=str
config.read(path)
luid_list = []
for pair in config.items('A'):
if pair[0].upper().replace(" ","") not in INI_HEADER_KEYS:
if isinstance(pair[1], tuple):
for value in pair[1]:
luid_list.append([value, pair[0]])
else:
luid_list.append([pair[1], pair[0]])
return sorted(luid_list)
# Writes a CMV file from the updated list of LUIDs and Usernames
#
# INPUT: List of (LUID, Username) pairs as duples
# OUTPUT: None (writes newcmv_<datestring>.ini file by default)
def write_cmv(luid_list):
stdout.write('Writing CMV file...')
if CMV_WRITE_FUNCTION != None:
CMV_WRITE_FUNCTION(luid_list)
else:
config = configparser.ConfigParser(strict=False, allow_no_value=True, dict_type=OrderedDict)
config.optionxform=str
config.add_section("General")
config.set("General", "CCoverride", CMV_CCOveride)
config.add_section("A")
for pair in [("Gateway", CMV_Gateway),
("Server", CMV_Server),
("Port", CMV_Port),
("Resourcename", CMV_ResourceName)]:
config.set("A", pair[0], pair[1])
config.set('A', CMV_A_Header)
for pair in luid_list:
config.set("A", pair[1], pair[0])
now = datetime.now()
filename = "newcmv_{number}.ini".format(number=now.strftime("%Y%m%d%H%M%S"))
with open(filename, 'w') as configfile:
config.write(configfile)
stdout.write("written.\n")
# Prompts user for Active Directory credentials and verifies against
# Logon provider for Windows Machines
# No support for non-Windows, as the software in question is WIN86x/64x only
#
# INPUT: None
# OUTPUT: Triple of Domain, Username, and Password
# RAISES: Exception if user fails logon three times
def get_credentials():
tries = 0
while (tries < 3):
print("Please enter logon information:")
domain = input("Domain: ")
username = input("Username: ")
password = getpass.getpass ("Password: ")
try:
hUser = LogonUser(
username, domain, password, LOGON32_LOGON_NETWORK, LOGON32_PROVIDER_DEFAULT)
except error:
print("Invalid credentials\n")
tries += 1
else:
return (domain, username, password)
raise Exception
# Look up each username in LDAP environment and set to a unique open placeholder if not found
#
# INPUT: List of (LUID, Username) pairs as duples,
# Credential Triple (domain, username, password)
# OUTPUT: Updated list of (LUID, Username) pairs as duples)
# List of (LUID, Username) pairs that were removed
def LDAP_lookups(luid_list, credentials):
removed_users = []
s = Server(DC_NAME, port=LDAP_PORT)
c = Connection(s, user=credentials[0]+"\\"+credentials[1], password=credentials[2], auto_bind=True)
i = 0
for duple in luid_list:
if duple[1].upper().replace(" ","") not in PLACEHOLDER_KEYS:
ldap_filter = "(&(objectClass=User)(sAMAccountName={user}))".format(user=duple[1])
c.search(search_base=SEARCH_BASE, search_filter=ldap_filter, attributes="sAMAccountName",
search_scope=SEARCH_SCOPE_WHOLE_SUBTREE)
if len(c.response) <= 1:
removed_users.append((duple[1], duple[0]))
luid_list[i][1] = OPEN_PLACEHOLDER + "-" + duple[0]
elif duple[1].upper().replace(" ","") in OTHER_OPEN_PLACEHOLDERS + [OPEN_PLACEHOLDER]:
luid_list[i][1] = OPEN_PLACEHOLDER + "-" + duple[0]
else:
luid_list[i][1] = CLOSED_PLACEHOLDER + "-" + duple[0]
i += 1
stdout.write("\rProcessed %i users" % i)
stdout.flush()
stdout.write("\n")
return luid_list, removed_users
# Writes a CSV file of all removed users and their LUIDs
#
# INPUT: List of (LUID, Username) pairs
# OUTPUT: None (writes removed_report_<datestring>.csv)
def write_removal_report(removed_users):
print(str(len(removed_users)) + " users removed")
stdout.write("Writing removal report...")
now = datetime.now()
filename = "removal_report_{number}.csv".format(number=now.strftime("%Y%m%d%H%M%S"))
with open(filename, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["Removed User","Opened LUID"])
for duple in removed_users:
writer.writerow(duple)
stdout.write("written.\n")
# Calls main function if evoked from command line
if __name__=="__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment