Last active
December 20, 2022 03:16
-
-
Save Gowee/e36a9398bc668cfd0375ad2b1d6a4caf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/dev python3 | |
# -*- coding: utf-8 -*- | |
import requests | |
from bs4 import BeautifulSoup | |
import urllib.parse | |
import json | |
from PIL import Image | |
import pytesseract | |
import time | |
import hashlib | |
import random | |
# Basic functions | |
def str_between(s, left, right): | |
try: | |
start = s.index(left) + len(left) | |
end = s.index(right, start + 1) | |
return s[start:end] | |
except ValueError: | |
return "" | |
def str_between_b(s, left, right): | |
try: | |
end = s.rindex(right) | |
start = s.rindex(left, 0, end) + len(left) | |
return s[start:end] | |
except ValueError: | |
return "" | |
class Erya: | |
mooc_server_url = "http://mooc1.chaoxing.com" | |
portal_url = "http://passport2.chaoxing.com" | |
mutual_headers = { | |
'User-Agent': | |
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36", | |
'Referer': portal_url + "/login?refer=http%3A%2F%2Fi.mooc.chaoxing.com", | |
'Origin': "https://passport2.chaoxing.com" | |
} | |
def __init__(self, timeout=15, proxies={}): | |
self.session = requests.session() | |
self.timeout = timeout | |
self.username = "" | |
self.password = "" | |
self.fid = "" | |
self.proxies = proxies | |
def get_validation_code(self, extra=False): | |
if extra: | |
validation_code_url = "/img/code" | |
else: | |
validation_code_url = "/num/code" | |
response = self.session.get(self.portal_url + validation_code_url, headers=self.mutual_headers, | |
timeout=self.timeout) | |
return response.content | |
def login(self, username, password, fid, validation_code="", extra_validation_code=""): | |
self.username = username | |
self.password = password | |
self.fid = fid | |
login_url = self.portal_url + "/login?refer=http%3A%2F%2Fi.mooc.chaoxing.com" | |
payload = { | |
'refer_0x001': "http%3A%2F%2Fi.mooc.chaoxing.com", | |
'pid': "-1", | |
'pidName': "", | |
'fid': fid, | |
'fidName': "西南大学", | |
'allowJoin': "0", | |
'isCheckNumCode': "1", | |
'f': "0", | |
'uname': username, | |
'password': password, | |
'numcode': validation_code, | |
'verCode': extra_validation_code | |
} | |
response = self.session.post(login_url, payload, allow_redirects=False, headers=self.mutual_headers, | |
timeout=self.timeout, proxies=self.proxies) | |
err_msg = str_between(response.text, '<td class="show_error" id="show_error">', '</td>') | |
print(response.status_code, err_msg) | |
def get_courses(self): | |
courses_list_url = self.mooc_server_url + "/visit/courses" | |
courses = [] | |
data = self.session.get(courses_list_url, headers=self.mutual_headers, timeout=self.timeout, | |
proxies=self.proxies) | |
for course in BeautifulSoup(data.text, "html.parser").find_all("div", {'class': "Mconright"}): | |
courses.append({'url': urllib.parse.urljoin(self.mooc_server_url, course.find("a").get("href")), | |
'courseName': course.find("a").get_text()}) | |
if len(courses) == 0: | |
print(data.text) | |
return courses | |
def get_lessons_by_course(self, course_url): | |
"""Status of one lesson: passed, incomplete, other, unknown(may be locked)""" | |
lessons = [] | |
for lesson in BeautifulSoup(self.session.get(course_url, headers=self.mutual_headers, timeout=self.timeout, | |
proxies=self.proxies).text, | |
"html.parser").select("div > h3.clearfix"): | |
hlink = lesson.find(class_="articlename").find("a") | |
status = lesson.find(class_="icon").find("em").get("class") | |
status = (("passed" if "openlock" in status else ("incomplete" if "orange" in status else ( | |
"notstarted" if "blank" in status else "other"))) if len(status) >= 1 else "unknown") \ | |
if status is not None else 'unknown' | |
lessons.append({'url': urllib.parse.urljoin(self.mooc_server_url, hlink.get("href")), 'lessonName': | |
hlink.get("title"), 'status': status}) | |
return lessons | |
def request(self, url): | |
"""may do help to unlock the next lesson""" | |
return self.session.get(url, headers=self.mutual_headers, timeout=self.timeout, proxies=self.proxies).content | |
def get_tabs(self, lesson_url): | |
tabs_url = "https://mooc1-1.chaoxing.com/mycourse/studentstudyAjax" | |
lesson_metadata = urllib.parse.parse_qs(urllib.parse.urlparse(lesson_url).query) | |
payload = { | |
'courseId': lesson_metadata['courseId'], | |
'clazzid': lesson_metadata['clazzid'][0], | |
'chapterId': lesson_metadata['chapterId'][0] | |
} | |
data = self.session.post(tabs_url, data=payload, headers=self.mutual_headers, timeout=self.timeout, | |
proxies=self.proxies).text | |
tabs = [] | |
for span in BeautifulSoup(data, "html.parser").find_all("span"): | |
print(str(span)) | |
if span.get("id").startswith("dct"): | |
tab = { | |
'title': span.get("title"), | |
'num': int(span.get('onclick').split("(")[1].split(",")[0]) - 1 | |
} | |
tabs.append(tab) | |
if len(tabs) == 0: | |
tabs.append({ | |
'title': "This lesson has only one tab", | |
'num': "0" | |
}) | |
return tabs | |
def get_content(self, lesson_url, tab_num=0): | |
knowledge_card_url = self.mooc_server_url + "/knowledge/cards" | |
lesson_metadata = urllib.parse.parse_qs(urllib.parse.urlparse(lesson_url).query) | |
knowledge_card_url += "?" + urllib.parse.urlencode({'courseid': lesson_metadata['courseId'][0], | |
'clazzid': lesson_metadata['clazzid'][0], | |
'knowledgeid': lesson_metadata['chapterId'][0], | |
'num': tab_num}) | |
data = self.session.get(knowledge_card_url, headers=self.mutual_headers, timeout=self.timeout, | |
proxies=self.proxies) | |
open("r.html", "wb").write(data.content) | |
return json.loads(str_between(str_between(str_between_b(data.text, "<script>", "</script>"), | |
"try{", "}catch(e){"), "mArg = ", ";")) | |
def _get_video_duration(self, object_id): | |
duration_url = self.mooc_server_url + "/ananas/status/" + object_id + "?" + urllib.parse.urlencode( | |
{'k': self.fid}) | |
return json.loads(self.session.get(duration_url, headers=self.mutual_headers, timeout=self.timeout, | |
proxies=self.proxies).text)['duration'] | |
@staticmethod | |
def _get_enc(basic_info, object_property, video_duration, current_second): | |
clip_time = "0_" + str(video_duration) | |
data = "[" + str(basic_info['clazzId']) + "]" + "[" + str(basic_info['userid']) + "]" + "[" + \ | |
str(object_property['_jobid']) + "]" + "[" + str(object_property['objectid']) + "]" + "[" + \ | |
str(current_second * 1000) + "]" + "[d_yHJ!$pdA~5]" + "[" + \ | |
str(int(video_duration) * 1000) + "]" + "[" + clip_time + "]" | |
enc = hashlib.md5(data.encode("utf-8")).hexdigest() | |
print(data, enc) | |
return enc | |
def send_log(self, basic_info, object_info, current_second=-1, is_drag=3, video_duration=""): | |
if not video_duration: | |
video_duration = self._get_video_duration(object_info['property']['objectid']) | |
if current_second == -1: | |
current_second = video_duration - 1 | |
log_url = basic_info['reportUrl'] if 'reportUrl' in basic_info and basic_info['reportUrl'] != "" \ | |
else self.mooc_server_url + "/multimedia/log" | |
payload = { | |
'clazzId': basic_info['clazzId'], | |
'duration': video_duration, | |
'jobid': object_info['property']['_jobid'], | |
'objectId': object_info['property']['objectid'], | |
'otherInfo': object_info['otherInfo'], | |
'rt': "0.9", | |
'dtype': 'Video',#object_info['type'], | |
'clipTime': "0_" + str(video_duration), | |
'userid': basic_info['userid'], | |
'enc': self._get_enc(basic_info, object_info['property'], video_duration, current_second), | |
'view': "pc", | |
'playingTime': current_second, | |
'isdrag': is_drag, | |
} | |
data = self.session.get(log_url + "?" + urllib.parse.urlencode(payload), headers=self.mutual_headers, | |
timeout=self.timeout, proxies=self.proxies) | |
return data.text | |
"""above for handling validation code of high frequency """ | |
if "isPassed" not in data.text: | |
open("./vvv.jpg", "wb").write(self.session.get(self.mooc_server_url + "/processVerifyPng.ac", | |
headers=self.mutual_headers, timeout=self.timeout, | |
proxies=self.proxies)) | |
print(pytesseract.image_to_string(Image.open("./vvv.jpg"))) | |
def get_visit_stat_url(self, course_url): | |
possible_prefix_urls = ["http://fystat-ans.chaoxing.com/log/setlog", | |
"https://fystat-ans.chaoxing.com/log/setlog"] | |
data = self.session.get(course_url, headers=self.mutual_headers, timeout=self.timeout, proxies=self.proxies) | |
for script_element in BeautifulSoup(data.text, "html.parser").find_all("script"): | |
stat_url = script_element.get("src") | |
for url in possible_prefix_urls: | |
if stat_url and stat_url.startswith(url): | |
return stat_url | |
return False | |
def do_visit(self, stat_url): | |
return True if self.session.get(stat_url , headers=self.mutual_headers, timeout=self.timeout, | |
proxies=self.proxies).text == "'success'" else False | |
def _get_work_answer(self, basic_info, object_info): | |
"""Only available for multiple choice, for crawler.""" | |
answer_sheet_url = self.mooc_server_url + "/api/work" | |
payload = { | |
'courseid': basic_info['courseid'], | |
'workId': object_info['property']['workid'], | |
'api': "1", | |
'knowledgeid': basic_info['knowledgeid'], | |
'clazzId': basic_info['clazzId'], | |
'oldworkid': object_info['property']['jobid'].split("-", 1)[1], | |
'jobid': object_info['property']['jobid'], | |
'type': "", | |
'enc': object_info['enc'], | |
'needRedirect': True, | |
'ut': "t" | |
} | |
answer_sheet_url += "?" + urllib.parse.urlencode(payload) | |
questions = {} | |
soup = BeautifulSoup(self.session.get(answer_sheet_url, headers=self.mutual_headers, timeout=self.timeout, | |
proxies=self.proxies).text, "html.parser") | |
ssoup = soup.find_all(class_="TiMu") | |
for item in ssoup: | |
question = item.select(".Zy_TItle.clearfix > div")[0].get_text().strip() | |
answer = item.select(".Py_answer.clearfix > span")[0].get_text().split("正确答案:", 1)[1].strip() | |
questions[question] = answer | |
return questions | |
def do_work(self, basic_info, object_info, delay=0): | |
question_sheet_url = self.mooc_server_url + "/api/work" | |
work_submit_url = self.mooc_server_url + "/work/addStudentWorkNewWeb" | |
payload = { | |
'courseid': basic_info['courseid'], | |
'workId': object_info['property']['workid'], | |
'api': "1", | |
'knowledgeid': basic_info['knowledgeid'], | |
'clazzId': basic_info['clazzId'], | |
'oldworkid': object_info['property']['jobid'].split("-", 1)[1], | |
'jobid': object_info['property']['jobid'], | |
'type': "", | |
'enc': object_info['enc'], | |
'needRedirect': True, | |
'ut': "s" | |
} | |
question_sheet_url += "?" + urllib.parse.urlencode(payload) | |
data = self.session.get(question_sheet_url, headers=self.mutual_headers, timeout=self.timeout, | |
proxies=self.proxies) | |
work_form = BeautifulSoup(data.text, "html.parser").find(id="form1") | |
items = str(work_form).split('<div class="ZyBottom" id="ZyBottom">')[1].split('<div class="TiMu">') | |
del items[0] | |
questions = {} | |
for item in items: | |
item = BeautifulSoup(item, "html.parser") | |
if str(item.find("input", type="hidden").get("value")) != "0": | |
print("Question Error: Not multiple choice!", str(item)) | |
continue | |
question = item.find(class_="Zy_TItle").find("div", class_="clearfix").get_text().strip() | |
answer_id = item.find("input", type="hidden").get("name").split("answertype")[1] | |
assert answer_id | |
questions[question] = answer_id | |
payload = {} | |
for param in BeautifulSoup(str(work_form).split('<div class="ZyBottom" id="ZyBottom">')[0], "html.parser").select("input[type=hidden]"): | |
payload[param.get("name")] = param.get("value") | |
answerwqbid = [] | |
time.sleep(delay) | |
answers = self._get_work_answer(basic_info, object_info) | |
for question in questions: | |
if question not in answers: | |
print(questions, answers) | |
if False and input("No matching answers found, skip it and continue?").lower() == "y": | |
continue | |
else: | |
raise Exception | |
answerwqbid.append(str(questions[question])) | |
payload['answer'+str(questions[question])] = answers[question] | |
payload['answertype' + str(questions[question])] = "0" | |
payload['answerwqbid'] = ','.join(answerwqbid) | |
work_submit_url += "?" + urllib.parse.urlencode({'_classId': basic_info['clazzId'], | |
'courseid': basic_info['courseid']}) | |
time.sleep(delay) | |
response = self.session.post(work_submit_url, data=payload, headers=self.mutual_headers, | |
timeout=self.timeout, proxies=self.proxies) | |
print(work_submit_url, payload, response.text) | |
def visit(): | |
e = Erya() | |
open("./v.jpg", "wb").write(e.get_validation_code()) | |
validation_code = pytesseract.image_to_string(Image.open("./v.jpg")) | |
e.login(username, password, school_id, validation_code, validation_code) | |
n = 0 | |
vsu = e.get_visit_stat_url(e.get_courses()[0]['url']) | |
while True: | |
n += 1 | |
print(str(e.do_visit(vsu)) + " {}".format(n)) | |
time.sleep(61) | |
def main(): | |
delay = 5 | |
e = Erya() | |
open("./v.jpg", "wb").write(e.get_validation_code()) | |
validation_code = pytesseract.image_to_string(Image.open("./v.jpg")) | |
if True: | |
open("./vv.jpg", "wb").write(e.get_validation_code(True)) | |
start_time = time.time() | |
extra_validation_code = input("CAPTCHA:\n") | |
print("Validation code:\n", extra_validation_code, | |
"\nAuto recognition finished in " + str(round(time.time() - start_time, 5)) + " s") | |
time.sleep(delay) | |
e.login(username, password, school_id, validation_code, extra_validation_code) | |
course = e.get_courses()[0] | |
time.sleep(delay) | |
while True: | |
print("[Starting][course]: ", course['courseName']) | |
for lesson in e.get_lessons_by_course(course['url']): | |
print("[Fetched][lesson]: ", lesson) | |
if lesson['status'] == "incomplete" or lesson['status'] == "notstarted" : | |
print("[Starting][lesson]: ", lesson['lessonName']) | |
time.sleep(delay) | |
e.request(lesson['url']) | |
time.sleep(delay) | |
for tab in e.get_tabs(lesson['url']): | |
time.sleep(delay) | |
content = e.get_content(lesson['url'], tab['num']) | |
time.sleep(delay) | |
print("[Starting][tab]: ", content) | |
for job in content['attachments']: | |
print("[Fetched][job]", job) | |
if job['type'] == "video" and not('isPassed' in job and job['isPassed']): | |
print("[Starting][job][video]", job) | |
duration = e._get_video_duration(job['property']['objectid']) | |
time.sleep(delay) | |
e.send_log(content['defaults'], job, 0, 3) | |
time.sleep(delay) | |
for i in range(0, int(duration / 75)): | |
e.send_log(content['defaults'], job, abs(i * 75 + random.randrange(-6, 6)), 0) | |
time.sleep(delay + random.randrange(-1, 2)) | |
print(e.send_log(content['defaults'], job, duration - 1, 4)) | |
elif job['type'] == "workid" and 'job' in job and job['job'] == True: | |
print("[Starting][job][work]", job) | |
e.do_work(content['defaults'], job, delay) | |
def test1(): | |
e = Erya() | |
open("./v.jpg", "wb").write(e.get_validation_code()) | |
start_time = time.time() | |
validation_code = pytesseract.image_to_string(Image.open("./v.jpg")) | |
print("Validation code by Tesseract:\n", validation_code, | |
"\nAuto recognition finished in " + str(round(time.time()-start_time, 5)) + " s") | |
extra_validation_code = "" | |
if True: | |
open("./vv.jpg", "wb").write(e.get_validation_code(True)) | |
start_time = time.time() | |
extra_validation_code = input("CAPTCHA:\n") | |
print("Validation code(EXTRA) by Tesseract:\n", extra_validation_code, | |
"\nAuto recognition finished in " + str(round(time.time() - start_time, 5)) + " s") | |
e.login(username, password, school_id, validation_code, extra_validation_code) | |
courses = e.get_courses() | |
#print(courses) | |
lessons = e.get_lessons_by_course(courses[0]['url']) | |
print(lessons) | |
#e.pass_lesson(lessons[0]['url']) | |
print("####################") | |
lesson = e.get_content(lessons[1]['url'], 3) | |
print(lesson) | |
print(e._get_video_duration(lesson['attachments'][0]['property']['objectid'])) | |
print("********************8") | |
#print(e.send_log(lesson['defaults'], lesson['attachments'][0], -1, 4)) | |
#vsu = e.get_visit_stat_url(courses[0]['url']) | |
#print(vsu, e.do_visit(vsu)) | |
print(e.get_tabs(lessons[1]['url'])) | |
def test2(): | |
e = Erya() | |
open("./v.jpg", "wb").write(e.get_validation_code()) | |
validation_code = pytesseract.image_to_string(Image.open("./v.jpg")) | |
if True: | |
open("./vv.jpg", "wb").write(e.get_validation_code(True)) | |
start_time = time.time() | |
extra_validation_code = input("CAPTCHA:\n")#pytesseract.image_to_string(Image.open("./vv.jpg")) | |
print("Validation code(EXTRA) by Tesseract:\n", extra_validation_code, | |
"\nAuto recognition finished in " + str(round(time.time() - start_time, 5)) + " s") | |
e.login(username, password, school_id, validation_code, extra_validation_code) | |
if __name__ == '__main__': | |
print("****************\nby Gowe\nJust work for me\nNo warranty\nUse it AT YOUR OWN RISK\n" | |
"Share should be limited to be with your friends or mates\n" | |
"DO NOT MAKE THIS CODE PUBLIC\nI just wanna use it the next term\nNO COMMERCIAL USE\n****************\n" | |
"Courses: the first course in your MOOC space is default, change it in main e.get_courses()[0] if necessary" | |
"If it prompts you with CAPTCHA, please just press enter or enter the code in ./vv.jpg if error occurs.\n" | |
"If it trapped in loop, just restart it and then open your MOOC in the browser to see if there any courses \n" | |
"is not unlocked automatically if the former does not help.\n" | |
"Auto answer is only available for multiple choice. It will raise a exception when gets the other kinds, \n" | |
"in which case you should do it by hand and then restart it after finishing the job.\n" | |
"Dependencies you may need: bs4, pytesseract(comment usage in source code if you do not want auto recognition" | |
"for validation code), requests.\n" | |
"High frequency may cause error due to its anti-spider program, you can change the request delay in main().\n" | |
"main() is for lessons. visit() is for visit count.(1 min per request)" | |
"I do try to avoid being recognized by Chaoxing, but you still have the possibility to be caught and feel bad" | |
"****************\nNOTICE: If you have question, plz read the text above or read the source code otherwise.\n" | |
"****************") | |
username = "222011234567890" | |
password = "000000" | |
school_id = 1840 | |
main()# for lesson | |
#visit() # for visit counter |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
超星慕课(尔雅/泛雅)刷题+刷课
使用方法: