Skip to content

Instantly share code, notes, and snippets.

@josephlou5
Created March 8, 2023 05:11
Show Gist options
  • Save josephlou5/99d385d78f294dd2df6550c38e033b6a to your computer and use it in GitHub Desktop.
Save josephlou5/99d385d78f294dd2df6550c38e033b6a to your computer and use it in GitHub Desktop.
Runs the codePost tests for the given assignments or submissions
"""
Runs the codePost tests for the given assignments.
The requests in this script come from looking at the network calls on
the codePost webpage. As such, they are not part of any agreed-upon API,
and are subject to change at any time. Use at your own risk.
This script uses the `asyncio` module to make requests for each
assignment in parallel. This means that to run the tests for all the
assignments will only take as long as the slowest assignment. However,
since requests made by the `codepost` module are not coroutines, it is
still not the fastest it could be.
Last updated: 2023-03-08
"""
# =============================================================================
import argparse
import asyncio
import sys
import time
import codepost
from codepost.api_requestor import STATIC_REQUESTOR
# =============================================================================
CHECK_INTERVAL_DEFAULT = 15
# =============================================================================
def _cp_request(**kwargs):
# pylint: disable=protected-access
return STATIC_REQUESTOR._request(**kwargs)
# =============================================================================
async def run_assignment_tests(prefix, assignment, check_interval):
"""Runs the tests for the given assignment.
Returns a tuple of whether the run was successful and how long it
took.
"""
start_time = time.time()
print(prefix, f"Running tests for assignment {assignment.name!r}")
def time_elapsed():
return time.time() - start_time
# get the assignment's environment id
environment_id = None
# try from the assignment's cached request data
assignment_data = getattr(assignment, "_data", None)
if assignment_data is not None:
environment_id = assignment_data.get("environment", None)
if environment_id is None:
response = _cp_request(
method="GET", endpoint=assignment.instance_endpoint
)
if response.status_code != 200:
print(
prefix,
"Error fetching assignment: status code",
response.status_code,
)
return False, time_elapsed()
environment_id = response.json.get("environment", None)
if environment_id is None:
print(prefix, "Error: could not get assignment environment id")
return False, time_elapsed()
num_submissions = len(assignment.list_submissions())
# trigger the tests to run
response = _cp_request(
method="PATCH",
endpoint=f"/autograder/environments/{environment_id}/runAll/",
data={"id": environment_id, "sendEmail": False},
)
if response.status_code != 200:
print(
prefix,
"Error triggering autograder: status code",
response.status_code,
)
return False, time_elapsed()
task_id = response.json.get("task", None)
if task_id is None:
print(prefix, "Error: could not get autograder task id")
return False, time_elapsed()
times_checked = 0
while True:
# see if the tests are done
times_checked += 1
print(
prefix, f"Checking status (request #{times_checked})... ", end=""
)
response = _cp_request(
method="GET", endpoint=f"/autograder/tasks/{task_id}/"
)
if response.status_code != 200:
print() # newline for previous
print(
prefix,
"Error checking run status: status code",
response.status_code,
)
return False, time_elapsed()
run_status = response.json.get("status", None)
if run_status is None:
print() # newline for previous
print(prefix, "Error: could not get run status")
return False, time_elapsed()
# `run_status` is one of: "PENDING", "PROGRESS", "SUCCESS"
if run_status == "PROGRESS":
# print the progress
test_results = response.json.get("result", None)
if test_results is None:
print(run_status)
else:
num_submissions_done = 0
# the keys of `test_results` are test case ids
for test_case_results in test_results.values():
# `test_case_results` has the keys:
# "passed", "failed", "error"
test_case_submissions = sum(test_case_results.values())
# the number of submissions that are done is the max
# number of submissions processed by any test case
num_submissions_done = max(
num_submissions_done, test_case_submissions
)
percentage_done = num_submissions_done / num_submissions
print(
f"{run_status}: {num_submissions_done}/{num_submissions} "
f"({percentage_done:.2%})"
)
else:
print(run_status)
if run_status == "SUCCESS":
# run is done!
break
# wait for a bit before next check
await asyncio.sleep(check_interval)
return True, time_elapsed()
# =============================================================================
def time_elapsed_str(seconds):
if seconds == 0:
return "0 sec"
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
segments = []
include_rest = False
if hours > 0:
include_rest = True
segments.append(f"{hours:.0f} hr")
if include_rest or minutes > 0:
include_rest = True
segments.append(f"{minutes:.0f} min")
if include_rest or seconds > 0:
include_rest = True
segments.append(f"{seconds:.2f} sec")
return ", ".join(segments)
async def run_tests(assignments, check_interval):
"""Runs the tests for the given assignment objects."""
print(
"Running tests for the given assignments with a check interval of "
f"{check_interval} seconds"
)
num_assignments = len(assignments)
num_digits = len(str(num_assignments))
prefixes = []
max_assignment_name_len = 0
routines = []
for i, assignment in enumerate(assignments):
prefix = f"[{i+1: ^{num_digits}}]"
prefixes.append(prefix)
max_assignment_name_len = max(
max_assignment_name_len, len(assignment.name)
)
routines.append(
run_assignment_tests(prefix, assignment, check_interval)
)
results = await asyncio.gather(*routines)
print("=" * 30)
print("SUMMARY:")
for prefix, assignment, (success, time_elapsed) in zip(
prefixes, assignments, results
):
result = "SUCCESS" if success else "FAILURE"
print(
prefix,
f"{assignment.name: <{max_assignment_name_len}}",
"==>",
result,
f"({time_elapsed_str(time_elapsed)})",
)
# =============================================================================
def get_course(course_name, course_period):
for course in codepost.course.list_available():
if course.name == course_name and course.period == course_period:
return course
return None
async def main():
parser = argparse.ArgumentParser(
description="Runs the codePost tests for the given assignments.",
add_help=True,
)
parser.add_argument("course_name")
parser.add_argument("course_period")
parser.add_argument("assignments", nargs=argparse.REMAINDER)
parser.add_argument(
"-i",
dest="interval",
action="store",
type=int,
default=CHECK_INTERVAL_DEFAULT,
help=(
"The number of seconds to wait before checking if the tests have "
"finished running. Must be at least 5. Defaults to "
f"{CHECK_INTERVAL_DEFAULT}."
),
)
args = parser.parse_args()
course_name = args.course_name
course_period = args.course_period
assignment_args = args.assignments
check_interval = args.interval
if check_interval < 5:
print("The check interval must be at least 5.")
sys.exit(1)
# remove duplicates while keeping order
assignments = list(
{assignment: True for assignment in assignment_args}.keys()
)
if len(assignments) == 0:
print("No assignments given")
sys.exit(1)
api_key = codepost.configure_api_key()
if not codepost.util.config.validate_api_key(api_key):
print("Invalid codePost API key")
sys.exit(1)
course = get_course(course_name, course_period)
if course is None:
print(
f"Course with name {course_name!r} and period {course_period!r} "
"not found"
)
sys.exit(1)
course_assignments = {
assignment.name: assignment for assignment in course.assignments
}
assignment_objs = []
not_found = False
for assignment_name in assignments:
if assignment_name not in course_assignments:
not_found = True
print(f"Assignment {assignment_name!r} not found in course")
else:
assignment_objs.append(course_assignments[assignment_name])
if not_found:
sys.exit(1)
await run_tests(assignment_objs, check_interval)
if __name__ == "__main__":
asyncio.run(main())
"""
Runs the codePost tests for the given submissions.
The requests in this script come from looking at the network calls on
the codePost webpage. As such, they are not part of any agreed-upon API,
and are subject to change at any time. Use at your own risk.
This script uses the `asyncio` module to make requests for each
submission in parallel. This means that to run the tests for all the
submissions will only take as long as the slowest submission. However,
since requests made by the `codepost` module are not coroutines, it is
still not the fastest it could be.
Last updated: 2023-03-08
"""
# =============================================================================
import argparse
import asyncio
import sys
import time
import codepost
import codepost.errors
from codepost.api_requestor import STATIC_REQUESTOR
# =============================================================================
# The number of seconds to wait before checking if the tests have
# finished running. Since individual submissions should not take long to
# run, this value cannot be customized on the command-line.
CHECK_INTERVAL = 2
# =============================================================================
def _cp_request(**kwargs):
# pylint: disable=protected-access
return STATIC_REQUESTOR._request(**kwargs)
# =============================================================================
async def run_submission_test(prefix, environment_id, submission_id):
"""Runs the tests for the given submission.
Assumes the given submission belongs to the assignment that the
environment id belongs to.
Returns a tuple of whether the run was successful and how long it
took.
"""
start_time = time.time()
print(prefix, f"Running tests for submission {submission_id}")
def time_elapsed():
return time.time() - start_time
# trigger the test to run
response = _cp_request(
method="PATCH",
endpoint=f"/autograder/environments/{environment_id}/run/",
data={
"id": environment_id,
"simulate": False,
"submission": submission_id,
},
)
if response.status_code != 200:
print(
prefix,
"Error triggering autograder: status code",
response.status_code,
)
return False, time_elapsed()
task_id = response.json.get("task", None)
if task_id is None:
print(prefix, "Error: could not get autograder task id")
return False, time_elapsed()
times_checked = 0
while True:
# see if the tests are done
times_checked += 1
print(
prefix, f"Checking status (request #{times_checked})... ", end=""
)
response = _cp_request(
method="GET", endpoint=f"/autograder/tasks/{task_id}/"
)
if response.status_code != 200:
print() # newline for previous
print(
prefix,
"Error checking run status: status code",
response.status_code,
)
return False, time_elapsed()
run_status = response.json.get("status", None)
if run_status is None:
print() # newline for previous
print(prefix, "Error: could not get run status")
return False, time_elapsed()
# `run_status` is one of: "PENDING", "PROGRESS", "SUCCESS"
print(run_status, end="")
if run_status == "SUCCESS":
# run is done!
# if you want, the results of the run are here:
result = response.json.get("result", {})
submission_tests = result.get("submissionTests", None)
if submission_tests is not None:
num_passed = 0
num_total = 0
for test in submission_tests:
# each `test` object is a dictionary for a submission
# test object with the additional fields:
# "testCategory", "created" (date), "modified" (date)
# https://docs.codepost.io/reference/the-submission-test-object
num_total += 1
if test["passed"]:
num_passed += 1
print(f": {num_passed} out of {num_total} passed")
else:
print() # newline for previous
break
else:
print() # newline for previous
# wait for a bit before next check
await asyncio.sleep(CHECK_INTERVAL)
return True, time_elapsed()
# =============================================================================
def time_elapsed_str(seconds):
if seconds == 0:
return "0 sec"
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
segments = []
include_rest = False
if hours > 0:
include_rest = True
segments.append(f"{hours:.0f} hr")
if include_rest or minutes > 0:
include_rest = True
segments.append(f"{minutes:.0f} min")
if include_rest or seconds > 0:
include_rest = True
segments.append(f"{seconds:.2f} sec")
return ", ".join(segments)
async def run_tests(submission_ids):
"""Runs the tests for the given submissions."""
print("Running tests for the given submissions")
# need to find out the environment ids, which means we need to find
# out the assignment ids, which means we'll need to fetch each
# submission
# maps: assignment id -> environment id
assignment_environments = {}
# maps: submission id -> environment id
submission_environment_ids = {}
for submission_id in submission_ids:
if submission_id in submission_environment_ids:
# duplicate submission id
continue
try:
submission = codepost.submission.retrieve(submission_id)
except codepost.errors.NotFoundAPIError:
print(f"Error: Submission {submission_id} not found")
continue
except codepost.errors.AuthorizationAPIError:
print(
f"Error: You do not have access to submission {submission_id}"
)
continue
assignment_id = submission.assignment
if assignment_id not in assignment_environments:
# find environment id for this assignment
response = _cp_request(
method="GET",
endpoint=codepost.assignment.instance_endpoint_by_id(
assignment_id
),
)
if response.status_code != 200:
print(
f"Error fetching assignment {assignment_id} for "
f"submission {submission_id}: status code "
f"{response.status_code}"
)
continue
environment_id = response.json.get("environment", None)
if environment_id is None:
assignment_name = response.json.get("name", None)
if assignment_name is not None:
assignment_id = f"{assignment_name!r} ({assignment_id})"
print(
"Error: could not get environment id for assignment "
f"{assignment_id} for submission {submission_id}"
)
continue
assignment_environments[assignment_id] = environment_id
# save environment id for this submission
submission_environment_ids[submission_id] = assignment_environments[
assignment_id
]
if len(submission_environment_ids) == 0:
print("No submissions to process")
sys.exit(1)
num_submissions = len(submission_environment_ids)
num_digits = len(str(num_submissions))
prefixes = []
max_submission_id_len = 0
routines = []
for submission_id, environment_id in submission_environment_ids.items():
prefix = f"[{submission_id: ^{num_digits}}]"
prefixes.append(prefix)
max_submission_id_len = max(
max_submission_id_len, len(str(submission_id))
)
routines.append(
run_submission_test(prefix, environment_id, submission_id)
)
results = await asyncio.gather(*routines)
print("=" * 30)
print("SUMMARY:")
for prefix, (success, time_elapsed) in zip(prefixes, results):
result = "SUCCESS" if success else "FAILURE"
print(prefix, result, f"({time_elapsed_str(time_elapsed)})")
# =============================================================================
async def main():
parser = argparse.ArgumentParser(
description="Runs the codePost tests for the given submissions."
)
parser.add_argument("submission_ids", nargs="*", type=int)
args = parser.parse_args()
# duplicates are handled in the function call
submission_ids = args.submission_ids
if len(submission_ids) == 0:
print("No submissions given")
sys.exit(1)
await run_tests(submission_ids)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment