Skip to content

Instantly share code, notes, and snippets.

@georgeyjm
Created January 17, 2018 03:24
Show Gist options
  • Save georgeyjm/7eccd9f7edf956670c2dc03eb474e182 to your computer and use it in GitHub Desktop.
Save georgeyjm/7eccd9f7edf956670c2dc03eb474e182 to your computer and use it in GitHub Desktop.
A web crawler for IGCSE past papers on PapaCambridge.
from bs4 import BeautifulSoup
from threading import Thread
import requests
import re
import shutil
import os
import time
MAX_THREAD = 30
ROOT_DIR_NAME = 'PapaCambridge'
ROOT_URL = 'http://pastpapers.papacambridge.com/'
LOG_LEVELS = {0: 'DEBUG', 1: 'INFO', 2: 'WARNING', 3: 'ERROR', 4: 'CRITICAL'}
LOG_COLORS = {'DEBUG': '', 'INFO': '\033[36m', 'WARNING': '\033[93m', 'ERROR': '\033[91m', 'CRITICAL': '\u001b[48;5;9m'}
def log(level, msg, criticalPause=True):
currentTime = time.strftime('%Y-%m-%d %H:%M:%S')
level = LOG_LEVELS.get(level, level)
logMsg = '{} {}\033[1m[{}]\033[0m {}'
print(logMsg.format(currentTime, LOG_COLORS.get(level, ''), level, msg))
if level == 'CRITICAL':
input('A critical error occurred, the program has paused, press enter to continue.')
def getUrl(url, **options):
try:
web = requests.get(url, **options)
except requests.packages.urllib3.exceptions.ReadTimeoutError:
log(3, 'Timeout, url: {}'.format(url))
return -1
except requests.exceptions.ConnectionError:
log(3, 'Connection error, url: {}'.format(url))
return -1
except Exception as e:
log(3, 'Uncaught exception ({}): {} when requesting url: {}'.format(e.__class__.__name__, e, url))
return -1
else:
return web
def getCourseUrls(*courses):
courses = list(courses)
# regex = re.compile(r'(?:.* \((\d{4})\))|(?:.* - (\d{4}))')
regex = re.compile(r'\d{4}')
url = ROOT_URL + '?dir=Cambridge%20International%20Examinations%20%28CIE%29/IGCSE'
web = getUrl(url)
if web == -1:
exit()
soup = BeautifulSoup(web.text, 'lxml')
for courseTitle in soup.select('span.file-name'):
courseId = regex.findall(courseTitle.get_text())
if courseId and courseId[0] in courses:
courses.remove(courseId[0])
yield courseTitle.get_text().strip(), ROOT_URL + courseTitle.parent.parent.get('href')
if courses:
log(2, 'Course number(s) not found: {}'.format(', '.join(courses)))
def getPaperUrls(courseUrl, crawlFileTypes=('qp', 'ms', 'in', 'pre')):
regex = re.compile('[0-9]{{4}}_[swm][0-9]{{1,2}}_({})_[0-9]{{1,2}}\.pdf'.format('|'.join(crawlFileTypes)))
courseWeb = getUrl(courseUrl)
if courseWeb == -1:
return -1
courseSoup = BeautifulSoup(courseWeb.text, 'lxml')
for folder in courseSoup.select('span.file-name'):
if folder.get_text().strip() == '..':
continue
web = getUrl(ROOT_URL + folder.parent.parent.get('href'))
if web == -1:
return -1
soup = BeautifulSoup(web.text, 'lxml')
for file in soup.select('span.file-name'):
if regex.findall(file.get_text()):
yield ROOT_URL + file.parent.parent.get('href').replace('view.php?id=', '')
def downloadFile(url, dirName):
name = url.split('/')[-1]
req = getUrl(url, timeout=60, stream=True)
if req == -1:
return -1
if req.status_code == 200:
with open(os.path.join(ROOT_DIR_NAME, dirName, name), 'wb') as file:
try:
req.raw.decode_content = True
shutil.copyfileobj(req.raw, file)
except Exception as e:
log(3, 'Uncaught exception ({}): {} when copying file object: {}'.format(e.__class__.__name__, e, name))
return -1
else:
log(3, 'Error response [{}], url: {}'.format(req.status_code, url))
return -1
try:
courses = input('Input the course numbers, separate using spaces: ')
log(1, 'Getting course URLs')
if not os.path.isdir(ROOT_DIR_NAME):
os.mkdir(ROOT_DIR_NAME)
for courseName, courseUrl in getCourseUrls(*courses.split()):
log(1, 'Crawling: \033[4m{}\033[0m'.format(courseName))
if not os.path.isdir(os.path.join(ROOT_DIR_NAME, courseName)):
os.mkdir(os.path.join(ROOT_DIR_NAME, courseName))
allPapers = [i for i in getPaperUrls(courseUrl)]
log(1, 'Downloading {} files'.format(len(allPapers)))
threadPool = []
for fileUrl in allPapers:
thread = Thread(target=downloadFile, args=(fileUrl, courseName))
threadPool.append(thread)
if len(threadPool) == MAX_THREAD:
for thread in threadPool:
thread.start()
thread.join()
threadPool = []
for thread in threadPool:
thread.start()
thread.join()
log(1, 'Completed!')
except KeyboardInterrupt:
log(1, 'Keyboard interrupted')
exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment