Created
April 22, 2014 15:18
-
-
Save NamPNQ/11183305 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def run_test_case(weekly_quiz_level, **kwargs): | |
import urllib | |
import logging | |
import json | |
from google.appengine.ext import ndb | |
from google.appengine.api import urlfetch | |
from application.config import config | |
from application.models import WeeklyQuiz, WeeklyQuizRunCodeResult, File, WeeklyQuizUser | |
from google.appengine.api import channel | |
weekly_quiz_level = ndb.Key(urlsafe=weekly_quiz_level).get() | |
weekly_quiz = WeeklyQuiz.query(WeeklyQuiz.level_keys == weekly_quiz_level.key).get() | |
user = kwargs['user'] | |
is_submit = kwargs['is_submit'] | |
code = kwargs['code'] | |
lang = kwargs['lang'] | |
filename = kwargs['filename'] | |
test_results = [] | |
def handle_result(handle_result_rpc, **handle_result_kwargs): | |
result = handle_result_rpc.get_result() | |
if result and result.content: | |
compile_result = json.loads(result.content) | |
print compile_result | |
compile_result_run_status = compile_result['run_status'] | |
testcase = { | |
'time_used': float(compile_result_run_status.get('time_used', 0)), | |
'memory_used': float(compile_result_run_status.get('memory_used', 0)), | |
'error_html': compile_result_run_status['output_html'] if ( | |
compile_result_run_status['status'] == 'RE') else | |
compile_result_run_status['status_detail'] if (compile_result_run_status['status'] == 'CE') | |
else 'OK' | |
} | |
if compile_result_run_status and \ | |
compile_result_run_status['status'] not in ['CE', 'RE'] and \ | |
compile_result_run_status['output'].strip() == handle_result_kwargs['output']: | |
testcase['result'] = True | |
else: | |
testcase['result'] = False | |
test_results.append(testcase) | |
channel.send_message(str(user.id()), json.dumps( | |
dict({ | |
'type': 'run_code_result' if not is_submit else 'submit_code_result'}, **testcase))) | |
if len(test_results) == len(weekly_quiz_level.test_case): | |
channel.send_message(str(user.id()), json.dumps( | |
{ | |
'type': 'notification', | |
'msg': 'compile done' | |
} | |
)) | |
if is_submit: | |
avg_time = sum( | |
test_result['time_used'] for test_result in test_results) / \ | |
len(test_results) | |
avg_memory = sum( | |
test_result['memory_used'] for test_result in test_results) / \ | |
len(test_results) | |
result = all([test_result['result'] for test_result in test_results]) | |
score = weekly_quiz_level.score | |
if not result or avg_memory > weekly_quiz_level.limit_memory \ | |
or avg_time > weekly_quiz_level.limit_time: | |
score = 0 | |
else: | |
score -= (avg_time * 10 + avg_memory / 100) | |
code_file = File() | |
code_file.content = code | |
code_file.filename = filename | |
code_file.put() | |
run_code_result = WeeklyQuizRunCodeResult() | |
run_code_result.level = weekly_quiz_level.key | |
run_code_result.user = user | |
run_code_result.code = code_file.key | |
run_code_result.result = result | |
run_code_result.time_used = avg_time | |
run_code_result.memory_used = avg_memory | |
run_code_result.language = lang | |
run_code_result.score = score | |
run_code_result.put() | |
weekly_quiz_user = WeeklyQuizUser.get_by_user_and_weekly_quiz(user=user, | |
weekly_quiz=weekly_quiz.key) | |
weekly_quiz_user.run_code_result.append(run_code_result.key) | |
_next_level_key = None | |
if result: | |
weekly_quiz_user.passed_level.append(weekly_quiz_level.key) | |
_next_level_key = weekly_quiz_user.current_level = weekly_quiz.get_next_level( | |
weekly_quiz_level.key) | |
else: | |
weekly_quiz_user.current_level = weekly_quiz_level.key | |
def get_best_score_by_user_and_level(_user, _level): | |
_best = WeeklyQuizRunCodeResult.get_best_score_by_user_and_level( | |
user=_user, | |
level=_level) | |
if _best: | |
return _best.score | |
else: | |
if _user == run_code_result.user and _level == run_code_result.level: | |
return run_code_result.score | |
else: | |
return 0 | |
weekly_quiz_user.score = sum( | |
get_best_score_by_user_and_level(user, level) for level in weekly_quiz.level_keys) | |
print weekly_quiz_user.score | |
weekly_quiz_user.put() | |
#TODO: Recalc rank of all user | |
channel.send_message(str(user.id()), json.dumps(dict( | |
{ | |
'type': 'submit_sumary_result', | |
'result': result, | |
'time_used': avg_time, | |
'memory_used': avg_memory, | |
'score': score, | |
'next_level_key': _next_level_key.urlsafe() if _next_level_key else 'You not passed current level' | |
}) | |
)) | |
logging.debug("Finshed run test case") | |
def create_callback(create_callback_rpc, **create_callback_kwargs): | |
return lambda: handle_result(create_callback_rpc, **create_callback_kwargs) | |
rpcs = [] | |
data = { | |
'source': kwargs['code'], | |
'lang': kwargs['lang'] | |
} | |
data.update({'client_secret': config.get("HACKEREARTH_CLIENT_SECRET")}) | |
data.update({'async': 0}) | |
for test in weekly_quiz_level.test_case: | |
rpc = urlfetch.create_rpc() | |
rpc.callback = create_callback(rpc, output=test.output) | |
data.update({'input': test.input}) | |
form_data = urllib.urlencode(data) | |
urlfetch.make_fetch_call(rpc, url=config.get("HACKEREARTH_RUN_URL"), payload=form_data, | |
method=urlfetch.POST) | |
rpcs.append(rpc) | |
for rpc in rpcs: | |
rpc.wait() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment