Skip to content

Instantly share code, notes, and snippets.

@georgeyjm
Created April 16, 2023 17:50
Show Gist options
  • Save georgeyjm/a9ec69868af22b23b0cb1f2af9602161 to your computer and use it in GitHub Desktop.
Save georgeyjm/a9ec69868af22b23b0cb1f2af9602161 to your computer and use it in GitHub Desktop.
OXAM Crawler.py
import re
import shutil
from pathlib import Path
from queue import Queue, Empty
from threading import Thread, Event
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from fuzzywuzzy import fuzz
from tqdm import tqdm
root_domain = 'https://weblearn.ox.ac.uk'
main_url = '/portal/site/:oxam/tool/8a98905b-a664-4618-9200-4ceb2118b0d6/advanced'
root_dest_path = Path('OXAM')
num_workers = 10
def get_authenticated_session() -> requests.Session:
driver = webdriver.Chrome()
driver.get(root_domain + main_url)
input('Please press Enter after logging in.')
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.34'
}
sess = requests.session()
sess.headers.update(headers)
for cookie in driver.get_cookies():
sess.cookies.update({cookie['name']: cookie['value']})
driver.quit()
return sess
def get_all_courses() -> list:
resp = sess.get(root_domain + main_url)
soup = BeautifulSoup(resp.text, features='html.parser')
return soup.select('#exam > optgroup > option')
def match_courses(search_text: str, all_courses: list) -> list:
# Preprocess search text
search_text = search_text.replace('&', 'and')
matched_courses = []
scores = []
name_extractor_re = re.compile(r'(.{4}) {4}(?:(.+)(?:(?: in )|(?: of )))?(.+)')
for el in all_courses:
value = el.get('value')
name = el.text.replace('\xa0', ' ')
course_id, course_type, course_name = name_extractor_re.match(name).groups()
course_name = course_name.rstrip('.')
assert course_id == value
score = fuzz.partial_ratio(search_text, course_name.lower().ljust(len(search_text))) # ljust is for padding name, to make sure all tokens in search appear in name
if score >= 80: # fuzz.token_sort_ratio, fuzz.token_set_ratio
matched_courses.append({'id': course_id, 'type': course_type, 'name': course_name})
scores.append(score)
matched_courses = [c for _, c in sorted(zip(scores, matched_courses), key=lambda x: x[0], reverse=True)]
return matched_courses
def get_course_files(url: str, course: dict):
# Recursively paginate and yield all files of a given course URL
resp = sess.get(url)
soup = BeautifulSoup(resp.text, features='html.parser')
for el in soup.select('.content > div > ul > li'):
link_el = el.select_one('a')
name_els = link_el.select('span')
info_els = el.select(':scope > span') # Top-level span only
paper_href = link_el.get('href')
paper_id = name_els[0].text
paper_title = name_els[1].text.rstrip('.')
paper_year = info_els[0].text
paper_term = info_els[1].text
yield {
'course': course,
'paper_id': paper_id,
'title': paper_title,
'year': paper_year,
'term': paper_term,
'href': paper_href,
}
# Recursively paginate
pagination_buttons = soup.select('.content > div > div > a')
if pagination_buttons[-1].text != 'Next':
return
next_url = root_domain + parent_url + pagination_buttons[-1].get('href')
for file in get_course_files(next_url, course):
yield file
def download_file(url: str, dest_path: Path, skip_exist: bool=True):
if skip_exist and dest_path.exists():
return
# Ensure parent directory exists
dest_path.parents[0].mkdir(parents=True, exist_ok=True)
try:
with sess.get(root_domain + url, stream=True) as resp:
resp.raise_for_status()
with dest_path.open('wb') as f:
shutil.copyfileobj(resp.raw, f)
except Exception as e:
print(e)
print(root_domain + url)
return
def worker_get_all_course_files():
while not courses_done.is_set():
try:
course = courses_queue.get(block=True, timeout=2.0)
except Empty:
if courses_done.is_set():
return
continue
url = f'{root_domain}{main_url}?exam={course["id"]}'
for file in get_course_files(url, course):
files_queue.put(file)
pbar.update(1)
courses_queue.task_done()
def worker_download_all_files():
while not files_done.is_set():
try:
file = files_queue.get(block=True, timeout=2.0)
except Empty:
if files_done.is_set():
return
continue
# TODO: custom file organization
original_filename = file['href'].split('/')[-1]
suffix = original_filename.split('.')[-1]
dest_path = root_dest_path / '{}, {}'.format(file['course']['name'], file['course']['type']) / '{} {}'.format(file['paper_id'], file['title']) / '{} {}.{}'.format(file['year'], file['term'], suffix)
download_file(file['href'], dest_path)
pbar.update(1)
files_queue.task_done()
##### Get authenticated session through manual login using Selenium #####
sess = get_authenticated_session()
# TODO: check if indeed logged in
print('Login success.')
##### Getting the courses to crawl according to user keywords #####
all_courses = []
while True:
# Prompt search and output search results
search_text = input('\nSearch prompt: ').lower()
print('Finding courses...')
if not all_courses:
all_courses = get_all_courses()
matched_courses = match_courses(search_text, all_courses)
if not matched_courses:
print('No matched results. Try another prompt.')
continue
print('Results:')
for i, course in enumerate(matched_courses):
print(f'[{i + 1}] {course["type"]}, {course["name"]}')
# Select which results to crawl
reenter_flag = False
while True:
selected_course_indices = input('Please select courses IDs (separated by space), leave blank to re-enter search prompt: ').strip().split()
if not selected_course_indices:
reenter_flag = True
break
try:
selected_course_indices = list(map(int, selected_course_indices))
for i in selected_course_indices:
assert 1 <= i <= len(matched_courses)
break
except Exception as e:
print('Invalid input. Try again.')
if reenter_flag:
continue
break
selected_courses = []
for i in selected_course_indices:
selected_courses.append(matched_courses[i - 1])
##### Getting all files #####
courses_queue = Queue()
courses_done = Event()
files_queue = Queue()
files_done = Event()
print('\nFetching files for selected courses...')
parent_url = '/'.join(main_url.split('/')[:-1]) + '/'
for course in selected_courses:
courses_queue.put(course)
pbar = tqdm(total=len(selected_courses))
workers = []
for t in range(num_workers):
worker = Thread(target=worker_get_all_course_files)
workers.append(worker)
worker.daemon = True
worker.start()
courses_queue.join()
courses_done.set()
pbar.close()
print('Closing workers...')
for worker in workers:
worker.join()
total_num_files = files_queue.qsize()
print(f'Done. Total files: {total_num_files}')
##### Download all files #####
print('\nDownloading all files...')
pbar = tqdm(total=total_num_files)
workers = []
for t in range(num_workers):
worker = Thread(target=worker_download_all_files)
workers.append(worker)
worker.daemon = True
worker.start()
files_queue.join()
files_done.set()
pbar.close()
print('Closing workers...')
for worker in workers:
worker.join()
print('All done.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment