Created
June 11, 2019 01:34
-
-
Save heinrichreimer/e31e8120583a2d50903abb8b2617946c to your computer and use it in GitHub Desktop.
Script to download a student's exercise solutions and task information from Martin Luther University Halle's Institute for Computer Science's exercise portal.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
import re | |
import shutil | |
import tempfile | |
import typing | |
import zipfile | |
import pyquery | |
import requests | |
PORTAL_URL = "https://uebungsportal.informatik.uni-halle.de/ostepu/" | |
PORTAL_GUI_URL = PORTAL_URL + "UI/" | |
PORTAL_GUI_LOGIN_URL = PORTAL_GUI_URL + "Login.php" | |
PORTAL_GUI_HOME_URL = PORTAL_GUI_URL | |
PORTAL_FILE_URL = PORTAL_URL + "FS/" | |
PORTAL_FILE_DOWNLOAD_URL = PORTAL_FILE_URL + "FSBinder/{address}/{display_name}" | |
LOGIN_ERROR_REGEX = re.compile("Die Anmeldung war fehlerhaft!", flags=re.IGNORECASE) | |
SERIES_EXPIRED_ATTRIBUTE_REGEX = re.compile(" \\(abgelaufen\\)", flags=re.IGNORECASE) | |
SERIES_TYPE_ATTRIBUTE_REGEX = re.compile(" - (theorie|praxis)", flags=re.IGNORECASE) | |
SERIES_ATTRIBUTE_REGEX = re.compile( | |
"({0}|{1})".format(SERIES_EXPIRED_ATTRIBUTE_REGEX.pattern, SERIES_TYPE_ATTRIBUTE_REGEX.pattern), | |
flags=re.IGNORECASE) | |
SERIES_NUMBER_REGEX = re.compile( | |
"(?:([0-9]+)\\. ?)?" | |
"(?:übungs)?(?:blatt|serie)" | |
"(?: ?([0-9]+))?", | |
flags=re.IGNORECASE) | |
SERIES_BONUS_REGEX = re.compile("bonus(?:blatt|serie)?", flags=re.IGNORECASE) | |
SERIES_PRACTICE_TESTS_REGEX = re.compile("praxistests", flags=re.IGNORECASE) | |
TASK_NUMBER_REGEX = re.compile("aufgabe_([0-9]+)", flags=re.IGNORECASE) | |
TASK_FILE_NAME_REGEX = re.compile("[a-f0-9]{40}_(.+)", flags=re.IGNORECASE) | |
TASK_FILE_NAME_CORRECTED_REGEX = re.compile("K_[a-f0-9]{40}_(.+)", flags=re.IGNORECASE) | |
Path = typing.Union[str, bytes, os.PathLike] | |
def login(username: str, password: str, session: requests.Session = requests.Session()) -> requests.Session: | |
login_data = { | |
"action": "login", | |
"username": username, | |
"password": password, | |
"loginType": "default" | |
} | |
print("Logging in...") | |
login_response = session.post(PORTAL_GUI_LOGIN_URL, data=login_data) | |
login_doc = pyquery.PyQuery(login_response.content) | |
login_error = login_doc.find("#body-wrapper > div.error").text() | |
if LOGIN_ERROR_REGEX.match(login_error): | |
print("Error: {message}".format(message=login_error.strip()), flush=True) | |
raise Exception("Invalid credentials.") | |
full_name = login_doc.find("#header > div.course-info > div.footer-text").text().strip() | |
assert len(full_name) > 0 | |
print("Successfully logged in. Welcome {name}!".format(name=full_name)) | |
return session | |
def parse_lecture_urls(session: requests.Session) -> [str]: | |
home_response = session.get(PORTAL_GUI_HOME_URL) | |
home_doc = pyquery.PyQuery(home_response.content) | |
link_elements = home_doc.find("#content-wrapper > div > div > div > span > div.course-list > a") | |
links = [pyquery.PyQuery(element).attr("href") for element in link_elements] | |
absolute_links = [PORTAL_GUI_URL + link for link in links] | |
return absolute_links | |
def download_lecture(session: requests.Session, url: str, out_dir: Path): | |
response = session.get(url) | |
doc = pyquery.PyQuery(response.content) | |
lecture_title = doc.find("#header > div.course-info > div.course-title").text() | |
print("Parsing lecture \"{lecture}\"...".format(lecture=lecture_title)) | |
lecture_out_dir = os.path.join(out_dir, lecture_title) | |
all_series = doc.find("#body-wrapper > div > form > div.content-element") | |
for series in all_series: | |
download_series(pyquery.PyQuery(series), session, lecture_out_dir) | |
def replace_literal_numbers(text: str): | |
text = re.sub("achte(?:s)", "8.", text, flags=re.IGNORECASE) | |
text = re.sub("siebte(?:s)", "8.", text, flags=re.IGNORECASE) | |
text = re.sub("dritte(?:s)", "3.", text, flags=re.IGNORECASE) | |
text = re.sub("erste(?:s)", "1.", text, flags=re.IGNORECASE) | |
text = re.sub("zwölf", "12", text, flags=re.IGNORECASE) | |
text = re.sub("elf", "11", text, flags=re.IGNORECASE) | |
text = re.sub("zehn", "10", text, flags=re.IGNORECASE) | |
text = re.sub("neun", "9", text, flags=re.IGNORECASE) | |
text = re.sub("acht", "8", text, flags=re.IGNORECASE) | |
text = re.sub("sieben", "7", text, flags=re.IGNORECASE) | |
text = re.sub("sechs", "7", text, flags=re.IGNORECASE) | |
text = re.sub("fünf", "7", text, flags=re.IGNORECASE) | |
text = re.sub("vier", "7", text, flags=re.IGNORECASE) | |
text = re.sub("drei", "7", text, flags=re.IGNORECASE) | |
text = re.sub("zwei", "7", text, flags=re.IGNORECASE) | |
text = re.sub("eins", "7", text, flags=re.IGNORECASE) | |
text = re.sub("([0-9]+)te(?:[sr])?", "\\1.", text, flags=re.IGNORECASE) | |
return text | |
def remove_series_name_attributes(text: str) -> str: | |
return SERIES_ATTRIBUTE_REGEX.sub("", text) | |
def normalize_series_name(title: str) -> str: | |
title = replace_literal_numbers(title) | |
title = remove_series_name_attributes(title) | |
series_number_search = SERIES_NUMBER_REGEX.search(title) | |
if series_number_search is not None: | |
if series_number_search.group(1) is not None: | |
return series_number_search.group(1).zfill(2) | |
elif series_number_search.group(2) is not None: | |
return series_number_search.group(2).zfill(2) | |
series_bonus_search = SERIES_BONUS_REGEX.search(title) | |
if series_bonus_search is not None: | |
return "bonus" | |
series_practice_tests_search = SERIES_PRACTICE_TESTS_REGEX.search(title) | |
if series_practice_tests_search is not None: | |
return "practice tests" | |
print(series_number_search) | |
print(series_number_search.groups()) | |
raise Exception("Could not parse series number from \"{title}\".".format(title=title)) | |
def download_series(element: pyquery.PyQuery, session: requests.Session, out_dir: Path): | |
title: str = element.find("div > .content-title").text() | |
series = normalize_series_name(title) | |
print("Parsing series \"{series}\"".format(series=series)) | |
series_out_dir = os.path.join(out_dir, "exercise-{series}".format(series=series)) | |
download_info_relative_url = element.find("div > div > ol > li > a.download").attr("href") | |
if download_info_relative_url is None: | |
return | |
download_info_url: str = PORTAL_GUI_URL + download_info_relative_url | |
download_info_response = session.get(download_info_url) | |
download_info = DownloadInfo.from_json(download_info_response.content) | |
download(download_info, series_out_dir, session=session) | |
class DownloadInfo: | |
display_name: str | |
address: str | |
file_size_bytes: int | |
def __init__(self, display_name: str, address: str, file_size_bytes: int): | |
self.display_name = display_name | |
self.address = address | |
self.file_size_bytes = file_size_bytes | |
@classmethod | |
def from_json(cls, json_string: str): | |
json_object = json.loads(json_string) | |
return cls( | |
display_name=json_object["displayName"], | |
address=json_object["address"], | |
file_size_bytes=int(json_object["fileSize"])) | |
def download(download_info: DownloadInfo, download_dir: Path, session: requests.Session): | |
url = PORTAL_FILE_DOWNLOAD_URL.format(address=download_info.address, display_name=download_info.display_name) | |
print("Downloading ZIP \"{url}\"".format(url=url)) | |
response = session.get(url) | |
unzip_download(response.content, download_dir) | |
def unzip_download(content: bytes, out_dir: str): | |
with tempfile.TemporaryFile() as temp: | |
temp.write(content) | |
with zipfile.ZipFile(temp) as zip_file: | |
with tempfile.TemporaryDirectory() as temp_dir: | |
zip_file.extractall(temp_dir) | |
copy_downloaded_files(temp_dir, out_dir) | |
def copy_task_description_files(task_description_files: [str], in_dir: Path, out_dir: Path): | |
for task_description_file in task_description_files: | |
task_description_in_file = os.path.join(in_dir, task_description_file) | |
task_description_out_file = os.path.join(out_dir, task_description_file) | |
print("Copying task description \"{from_path}\" to \"{to_path}\"." | |
.format(from_path=task_description_file, to_path=task_description_out_file)) | |
copy_safely(task_description_in_file, task_description_out_file) | |
def copy_task_dirs(task_dirs: [str], in_dir: Path, out_dir: Path): | |
for task_dir in task_dirs: | |
task_number_search = TASK_NUMBER_REGEX.search(task_dir) | |
task_number = task_number_search.group(1).zfill(2) | |
task_in_dir = os.path.join(in_dir, task_dir) | |
task_out_dir = os.path.join(out_dir, "task-{number}".format(number=task_number)) | |
print("Copying task solutions \"{from_path}\" to \"{to_path}\"." | |
.format(from_path=task_dir, to_path=task_out_dir)) | |
task_files = os.listdir(task_in_dir) | |
copy_task_files(task_files, task_in_dir, task_out_dir) | |
def copy_task_files(task_files: [str], in_dir: Path, out_dir: Path): | |
for task_file in task_files: | |
file_name_search = TASK_FILE_NAME_REGEX.search(task_file) | |
original_file_name = file_name_search.group(1) | |
is_corrected_file: bool = TASK_FILE_NAME_CORRECTED_REGEX.match(task_file) is not None | |
if is_corrected_file: | |
original_file_name = "correction-" + original_file_name | |
task_in_file = os.path.join(in_dir, task_file) | |
task_out_file = os.path.join(out_dir, original_file_name) | |
print("Copying task solution \"{from_path}\" to \"{to_path}\"." | |
.format(from_path=task_file, to_path=task_out_file)) | |
copy_safely(task_in_file, task_out_file) | |
def copy_safely(from_path: Path, to_path: Path): | |
if not os.path.exists(os.path.dirname(to_path)): | |
os.makedirs(os.path.dirname(to_path)) | |
shutil.copyfile(from_path, to_path) | |
def copy_downloaded_files(temp_dir: Path, out_dir: Path): | |
contents = os.listdir(temp_dir) | |
task_dirs = [directory for directory in contents if TASK_NUMBER_REGEX.match(directory)] | |
task_description_files: [str] = [file for file in contents if file not in task_dirs] | |
copy_task_description_files(task_description_files, temp_dir, out_dir) | |
copy_task_dirs(task_dirs, temp_dir, out_dir) | |
def download_everything(session: requests.Session, out_dir: Path): | |
for url in parse_lecture_urls(session): | |
download_lecture(session, url, out_dir) | |
def main(): | |
print("This script helps you download all exercise solutions and task information " | |
"from Martin Luther University Halle's Institute for Computer Science's exercise portal.") | |
download_dir = input("Download directory: ") | |
if not os.path.exists(download_dir): | |
os.mkdir(download_dir) | |
username = input("Username (e.g., abcde): ") | |
password = input("Password: ") | |
session = login(username, password) | |
download_everything(session, download_dir) | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment