Skip to content

Instantly share code, notes, and snippets.

@NamPNQ
Created April 22, 2014 15:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save NamPNQ/11183305 to your computer and use it in GitHub Desktop.
Save NamPNQ/11183305 to your computer and use it in GitHub Desktop.
def run_test_case(weekly_quiz_level, **kwargs):
import urllib
import logging
import json
from google.appengine.ext import ndb
from google.appengine.api import urlfetch
from application.config import config
from application.models import WeeklyQuiz, WeeklyQuizRunCodeResult, File, WeeklyQuizUser
from google.appengine.api import channel
weekly_quiz_level = ndb.Key(urlsafe=weekly_quiz_level).get()
weekly_quiz = WeeklyQuiz.query(WeeklyQuiz.level_keys == weekly_quiz_level.key).get()
user = kwargs['user']
is_submit = kwargs['is_submit']
code = kwargs['code']
lang = kwargs['lang']
filename = kwargs['filename']
test_results = []
def handle_result(handle_result_rpc, **handle_result_kwargs):
result = handle_result_rpc.get_result()
if result and result.content:
compile_result = json.loads(result.content)
print compile_result
compile_result_run_status = compile_result['run_status']
testcase = {
'time_used': float(compile_result_run_status.get('time_used', 0)),
'memory_used': float(compile_result_run_status.get('memory_used', 0)),
'error_html': compile_result_run_status['output_html'] if (
compile_result_run_status['status'] == 'RE') else
compile_result_run_status['status_detail'] if (compile_result_run_status['status'] == 'CE')
else 'OK'
}
if compile_result_run_status and \
compile_result_run_status['status'] not in ['CE', 'RE'] and \
compile_result_run_status['output'].strip() == handle_result_kwargs['output']:
testcase['result'] = True
else:
testcase['result'] = False
test_results.append(testcase)
channel.send_message(str(user.id()), json.dumps(
dict({
'type': 'run_code_result' if not is_submit else 'submit_code_result'}, **testcase)))
if len(test_results) == len(weekly_quiz_level.test_case):
channel.send_message(str(user.id()), json.dumps(
{
'type': 'notification',
'msg': 'compile done'
}
))
if is_submit:
avg_time = sum(
test_result['time_used'] for test_result in test_results) / \
len(test_results)
avg_memory = sum(
test_result['memory_used'] for test_result in test_results) / \
len(test_results)
result = all([test_result['result'] for test_result in test_results])
score = weekly_quiz_level.score
if not result or avg_memory > weekly_quiz_level.limit_memory \
or avg_time > weekly_quiz_level.limit_time:
score = 0
else:
score -= (avg_time * 10 + avg_memory / 100)
code_file = File()
code_file.content = code
code_file.filename = filename
code_file.put()
run_code_result = WeeklyQuizRunCodeResult()
run_code_result.level = weekly_quiz_level.key
run_code_result.user = user
run_code_result.code = code_file.key
run_code_result.result = result
run_code_result.time_used = avg_time
run_code_result.memory_used = avg_memory
run_code_result.language = lang
run_code_result.score = score
run_code_result.put()
weekly_quiz_user = WeeklyQuizUser.get_by_user_and_weekly_quiz(user=user,
weekly_quiz=weekly_quiz.key)
weekly_quiz_user.run_code_result.append(run_code_result.key)
_next_level_key = None
if result:
weekly_quiz_user.passed_level.append(weekly_quiz_level.key)
_next_level_key = weekly_quiz_user.current_level = weekly_quiz.get_next_level(
weekly_quiz_level.key)
else:
weekly_quiz_user.current_level = weekly_quiz_level.key
def get_best_score_by_user_and_level(_user, _level):
_best = WeeklyQuizRunCodeResult.get_best_score_by_user_and_level(
user=_user,
level=_level)
if _best:
return _best.score
else:
if _user == run_code_result.user and _level == run_code_result.level:
return run_code_result.score
else:
return 0
weekly_quiz_user.score = sum(
get_best_score_by_user_and_level(user, level) for level in weekly_quiz.level_keys)
print weekly_quiz_user.score
weekly_quiz_user.put()
#TODO: Recalc rank of all user
channel.send_message(str(user.id()), json.dumps(dict(
{
'type': 'submit_sumary_result',
'result': result,
'time_used': avg_time,
'memory_used': avg_memory,
'score': score,
'next_level_key': _next_level_key.urlsafe() if _next_level_key else 'You not passed current level'
})
))
logging.debug("Finshed run test case")
def create_callback(create_callback_rpc, **create_callback_kwargs):
return lambda: handle_result(create_callback_rpc, **create_callback_kwargs)
rpcs = []
data = {
'source': kwargs['code'],
'lang': kwargs['lang']
}
data.update({'client_secret': config.get("HACKEREARTH_CLIENT_SECRET")})
data.update({'async': 0})
for test in weekly_quiz_level.test_case:
rpc = urlfetch.create_rpc()
rpc.callback = create_callback(rpc, output=test.output)
data.update({'input': test.input})
form_data = urllib.urlencode(data)
urlfetch.make_fetch_call(rpc, url=config.get("HACKEREARTH_RUN_URL"), payload=form_data,
method=urlfetch.POST)
rpcs.append(rpc)
for rpc in rpcs:
rpc.wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment