|
#!/usr/bin/env python3 |
|
|
|
import csv |
|
import getpass |
|
import json |
|
import re |
|
import requests |
|
import sys |
|
import time |
|
import traceback |
|
|
|
from bs4 import BeautifulSoup |
|
from requests.adapters import HTTPAdapter |
|
from requests.packages.urllib3.util.retry import Retry |
|
|
|
__author__ = "David Zou" |
|
|
|
class EMU: |
|
HTTP_REQUEST_TIMEOUT = 30 |
|
HTTP_REQUEST_RETRIES = 10 |
|
HTTP_REQUEST_BACKOFF = 1 |
|
HTTP_REQUEST_FORCELIST = (500, 502, 504) |
|
|
|
HTTP_HEADERS = { |
|
"Accept-Language": "en-GB,en;q=0.5", |
|
"Accept-Encoding": "gzip, deflate", |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0", |
|
"Connection": "keep-alive" |
|
} |
|
|
|
SSO_DOMAIN = "sso.det.nsw.edu.au" |
|
SSO_URL = "https://{domain}/sso/json/realms/root/authenticate".format(domain=SSO_DOMAIN) |
|
|
|
EMU_DOMAIN = "online.det.nsw.edu.au" |
|
EMU_BASE_URL = "https://{domain}/emu/".format(domain=EMU_DOMAIN) |
|
|
|
SCRAPE_RESET_SUCCESS = "Password reset request has been submitted for the user" |
|
SCRAPE_UNAUTHORISED = "Unauthorized" |
|
SCRAPE_USER_NOT_FOUND = "No details found for this user" |
|
|
|
|
|
def __init__(self): |
|
""" |
|
EMU Constructor |
|
""" |
|
# Set up requests session to keep login session during execution |
|
self.session = requests.Session() |
|
self.session.headers.update(EMU.HTTP_HEADERS) |
|
|
|
# Set up auto retry |
|
retry = Retry( |
|
total=EMU.HTTP_REQUEST_RETRIES, |
|
read=EMU.HTTP_REQUEST_TIMEOUT, |
|
connect=EMU.HTTP_REQUEST_TIMEOUT, |
|
backoff_factor=EMU.HTTP_REQUEST_BACKOFF, |
|
status_forcelist=EMU.HTTP_REQUEST_FORCELIST, |
|
) |
|
adapter = HTTPAdapter(max_retries=retry) |
|
self.session.mount('http://', adapter) |
|
self.session.mount('https://', adapter) |
|
|
|
self.current_school_code = "" |
|
|
|
|
|
def authenticate_sso(self, username, password): |
|
""" |
|
SSO Authenticate |
|
|
|
Authenticate through the DET SSO to get access to EMU |
|
|
|
Parameters: |
|
username (str): DET Username |
|
password (str): DET Password |
|
|
|
Returns: |
|
bool: True if successful |
|
""" |
|
|
|
SSO_HEADERS = EMU.HTTP_HEADERS.copy() |
|
SSO_HEADERS.update({ |
|
"Accept": "application/json, text/javascript, */*; q=0.01", |
|
"Content-Type": "application/json", |
|
"X-Requested-With": "XMLHttpRequest", |
|
}) |
|
|
|
SSO_TOKEN_ID = "authId" |
|
|
|
# Get a token |
|
response = self.session.post( |
|
EMU.SSO_URL, |
|
data=None, |
|
headers=SSO_HEADERS, |
|
) |
|
|
|
try: |
|
response_json = response.json() |
|
except: |
|
raise Exception("Failed to parse json to get token") |
|
|
|
if SSO_TOKEN_ID in response_json: |
|
token = response_json[SSO_TOKEN_ID] |
|
else: |
|
raise Exception("Token not found in response json") |
|
|
|
login_data = { |
|
SSO_TOKEN_ID: token, |
|
"template": "", |
|
"stage": "AD1", |
|
"header": "Sign in", |
|
"callbacks": [{ |
|
"type": "NameCallback", |
|
"output": [{ |
|
"name": "prompt", |
|
"value": "User Name:", |
|
}], |
|
"input": [{ |
|
"name": "IDToken1", |
|
"value": username, |
|
}] |
|
}, |
|
{ |
|
"type": "PasswordCallback", |
|
"output":[{ |
|
"name": "prompt", |
|
"value": "Password:", |
|
}], |
|
"input":[{ |
|
"name": "IDToken2", |
|
"value": password, |
|
}] |
|
}] |
|
} |
|
|
|
response = self.session.post( |
|
EMU.SSO_URL, |
|
data=json.dumps(login_data), |
|
headers=SSO_HEADERS, |
|
) |
|
|
|
if response.status_code == 200: |
|
return True |
|
else: |
|
raise Exception("Failed to authenticate:\n{}".format(response.text)) |
|
|
|
def get_schools(self): |
|
""" |
|
Get Schools Available |
|
|
|
Get a dict of schools available to the currently authenticated EMU account |
|
|
|
Returns: |
|
dict: Dictionary of schools where the key is the school code and value is the school name |
|
""" |
|
|
|
response = self.session.get(EMU.EMU_BASE_URL) |
|
soup = BeautifulSoup(response.text, features="html5lib") |
|
|
|
schools = {} |
|
|
|
for option in soup.select("select[name='locationId'] option"): |
|
schools[option.get("value")] = option.text.strip() |
|
|
|
return schools |
|
|
|
def set_school(self, school_code): |
|
""" |
|
Set Active School |
|
|
|
Sets the school being worked on |
|
|
|
Parameters: |
|
school_code (str): 4-digit school code |
|
|
|
Returns: |
|
bool: True if successful |
|
""" |
|
|
|
post_data = { |
|
"locationId": school_code, |
|
"select": "Select", |
|
} |
|
|
|
response = self.session.post( |
|
EMU.EMU_BASE_URL + "student/selectLocation", |
|
data=post_data, |
|
) |
|
|
|
if EMU.SCRAPE_UNAUTHORISED in response.text: |
|
raise Exception("Failed to set school '{}'".format(school_code)) |
|
else: |
|
self.current_school_code = school_code |
|
return True |
|
|
|
|
|
def get_userinfo(self, username): |
|
""" |
|
Get User Details |
|
|
|
Get a dict of user details from EMU |
|
|
|
Parameters: |
|
username (str): User's username |
|
|
|
Returns: |
|
dict: Dictionary of details |
|
""" |
|
|
|
info = {} |
|
|
|
response = self.session.get( |
|
EMU.EMU_BASE_URL + "{school_code}/students/{username}".format( |
|
school_code=self.current_school_code, |
|
username=username, |
|
) |
|
) |
|
|
|
if EMU.SCRAPE_USER_NOT_FOUND in response.text: |
|
# User not found |
|
return None |
|
|
|
soup = BeautifulSoup(response.text, features="html5lib") |
|
|
|
# Details |
|
for fg in soup.select(".aui .field-group"): |
|
field = fg.select_one("label").text.strip().replace(" :", "") |
|
value = fg.select_one("p.text").text.strip() |
|
info[field] = value |
|
|
|
FDNs = [] |
|
groups = [] |
|
formatted_FNDs = [] |
|
|
|
# Group Memberships |
|
for group in soup.select("#tabs-group-memberships table tbody tr"): |
|
DCs = [] |
|
OUs = [] |
|
CN = "" |
|
|
|
cols = group.select("td") |
|
|
|
if not cols: |
|
continue |
|
|
|
CN = cols[0].text.strip() |
|
groups.append(CN) |
|
|
|
FDN = cols[1].text.strip() |
|
FDNs.append(FDN) |
|
|
|
attrs = re.split(r"(?<!\\),", FDN) |
|
attrs.reverse() |
|
|
|
for attr in attrs: |
|
attribute = attr[:3] |
|
value = re.sub(r"\\(.)", r"\1", attr[3:]) |
|
|
|
if attribute == "DC=": |
|
DCs.insert(0, value) |
|
elif attribute == "OU=": |
|
OUs.append(value) |
|
elif attribute == "CN=": |
|
CN = value |
|
else: |
|
print("Unexpected attribute", attribute) |
|
|
|
formatted_FNDs.append("{DCs}\\{OUs}\\{CN}".format( |
|
DCs = ".".join(DCs), |
|
OUs = "\\".join(OUs), |
|
CN=CN, |
|
)) |
|
|
|
info["FDNs"] = FDNs |
|
info["groups"] = groups |
|
info["formatted_FNDs"] = formatted_FNDs |
|
|
|
return info |
|
|
|
def reset_pwd(self, username, password): |
|
""" |
|
Reset a user's password |
|
|
|
Reset a user's password with one provided |
|
|
|
Parameters: |
|
username (str): User's username |
|
password (str): User's new password |
|
|
|
Returns: |
|
bool: True if successful, False if not |
|
""" |
|
|
|
post_data = { |
|
"password": password, |
|
"confirmPwd": password, |
|
"reset": "Reset", |
|
} |
|
|
|
response = self.session.post( |
|
EMU.EMU_BASE_URL + "{school_code}/students/{username}/password".format( |
|
school_code=self.current_school_code, |
|
username=username, |
|
), |
|
data=post_data, |
|
) |
|
|
|
if EMU.SCRAPE_RESET_SUCCESS in response.text: |
|
return True |
|
else: |
|
return False |
|
|
|
|
|
|
|
# Example CSV Batch Reset Implementation |
|
if __name__ == '__main__': |
|
# Sleep seconds between requests to prevent being blocked due to suspicious activity |
|
REQ_SLEEP_TIME = 1.0 |
|
|
|
if len(sys.argv) != 2: |
|
print("Run this script with the CSV file as an argument,\nor drag and drop the CSV into this script.\n") |
|
input("Press {ENTER} to exit") |
|
exit(1) |
|
|
|
CSV_FILE = sys.argv[1] |
|
|
|
data = [] |
|
num_todo = 0 |
|
|
|
# Load data |
|
with open(CSV_FILE, "r", encoding="utf-8-sig") as csvfile: |
|
reader = csv.DictReader(csvfile) |
|
for row in reader: |
|
if "USERNAME" not in row or "PASSWORD" not in row: |
|
print("CSV must contain 'USERNAME' and 'PASSWORD' fields") |
|
input("Press {ENTER} to exit") |
|
exit(1) |
|
|
|
if "EMU_RESET" not in row: |
|
row["EMU_RESET"] = None |
|
|
|
if row["EMU_RESET"] != "Complete": |
|
num_todo += 1 |
|
|
|
data.append(row) |
|
|
|
if not data: |
|
print("No accounts to reset. Exiting") |
|
exit(1) |
|
|
|
print("Loaded {} records, {} to process".format(len(data), num_todo)) |
|
|
|
print() |
|
|
|
# Instantiate EMU |
|
emu = EMU() |
|
|
|
# Authenticate |
|
det_un = input("Enter DET username: ") |
|
det_pw = getpass.getpass("Enter DET password: ") |
|
emu.authenticate_sso(det_un, det_pw) |
|
|
|
# Get Schools |
|
print("Getting list of schools available") |
|
schools = emu.get_schools() |
|
|
|
if not schools: |
|
print("No schools availble") |
|
exit(1) |
|
|
|
# Select school |
|
if len(schools) == 1: |
|
# Select only school available |
|
school_code = list(schools.keys())[0] |
|
else: |
|
# Ask user to enter school code |
|
print("Schools available:") |
|
for s in schools: |
|
print(" ", s, schools[s]) |
|
print() |
|
while True: |
|
school_code = input("Enter a school code: ").strip() |
|
if school_code in schools: |
|
break |
|
else: |
|
print("Invalid school code:", school_code) |
|
|
|
school_name = schools[school_code] |
|
|
|
print("Selecting '{school_name}' ({school_code})".format( |
|
school_name=school_name, |
|
school_code=school_code |
|
)) |
|
|
|
# Set School |
|
emu.set_school(school_code) |
|
|
|
# Reset Passwords |
|
try: |
|
for item in data: |
|
if item["EMU_RESET"] != "Complete": |
|
print("Resetting {} ... ".format(item["USERNAME"]), end="", flush=True) |
|
|
|
result = emu.reset_pwd(item["USERNAME"], item["PASSWORD"]) |
|
|
|
if result: |
|
item["EMU_RESET"] = "Complete" |
|
print("OK") |
|
else: |
|
item["EMU_RESET"] = "Failed" |
|
print("Failed") |
|
|
|
time.sleep(REQ_SLEEP_TIME) |
|
except: |
|
print("Something went wrong. Saving progress and exiting.") |
|
print(traceback.format_exc()) |
|
|
|
with open(CSV_FILE, "w", encoding="utf-8", newline="") as csvfile: |
|
writer = csv.DictWriter( |
|
csvfile, |
|
lineterminator="\r\n", |
|
fieldnames=list(data[0].keys()) |
|
) |
|
|
|
writer.writeheader() |
|
|
|
for item in data: |
|
writer.writerow(item) |