Skip to content

Instantly share code, notes, and snippets.

@sayuan

sayuan/Schema Secret

Created February 2, 2014 16:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sayuan/9d8611179405989d502c to your computer and use it in GitHub Desktop.
Save sayuan/9d8611179405989d502c to your computer and use it in GitHub Desktop.
Android CTS Report Database
# -*- coding: utf-8 -*-
import libxml2
import datetime
import re
class Fail:
def __init__(self, test_name, starttime, endtime):
self.test_name = test_name
self.starttime = starttime
self.endtime = endtime
def __str__(self):
return ', '.join((
'test_name = "{}"'.format(self.test_name),
'starttime = "{}"'.format(self.starttime),
'endtime = "{}"'.format(self.endtime),
))
class Result:
def __init__(self, project_name, ap_name, tester_name, cts_version, sdk_version):
self.project_name = project_name
self.ap_name = ap_name
self.tester_name = tester_name
self.cts_version = cts_version
self.sdk_version = sdk_version
self.fails = []
def __str__(self):
return '\n'.join((
'project_name = "{}"'.format(self.project_name),
'ap_name = "{}"'.format(self.ap_name),
'tester_name = "{}"'.format(self.tester_name),
))
def get_full_name(node):
l = []
p = node.get_parent()
while (p.get_name() != 'TestPackage'):
l.append(p.prop('name'))
p = p.get_parent()
return '.'.join(l[::-1]) + '#' + node.prop('name')
def parse_date(date):
week_dict = {
'星期一': 'Mon',
'星期二': 'Tue',
'星期三': 'Wed',
'星期四': 'Thu',
'星期五': 'Fri',
'星期六': 'Sat',
'星期日': 'Sun',
}
month_dict = {
'一月': 'Jan',
'二月': 'Feb',
'三月': 'Mar',
'四月': 'Apr',
'五月': 'May',
'六月': 'Jun',
'七月': 'Jul',
'八月': 'Aug',
'九月': 'Sep',
'十月': 'Oct',
'十一月': 'Nov',
'十二月': 'Dec',
}
date = re.sub(r'(星期\S+)', lambda m: week_dict[m.group(1)], date)
date = re.sub(r'(\S+月)', lambda m: month_dict[m.group(1)], date)
date = re.sub(' CST', '', date)
return datetime.datetime.strptime(date, '%c')
def parse(result, result_buf):
doc = libxml2.parseDoc(result_buf)
ctxt = doc.xpathNewContext()
node_test_result = ctxt.xpathEval('/TestResult')[0]
node_device_info = node_test_result.xpathEval('./DeviceInfo')[0]
node_phone_sub_info = node_device_info.xpathEval('./PhoneSubInfo')[0]
node_build_info = node_device_info.xpathEval('./BuildInfo')[0]
node_host_info = node_test_result.xpathEval('./HostInfo')[0]
node_os = node_host_info.xpathEval('./Os')[0]
node_java = node_host_info.xpathEval('./Java')[0]
node_cts = node_host_info.xpathEval('./Cts')[0]
nodes_fail = node_test_result.xpathEval('//TestCase/Test[@result="fail"]')
result.test_plan = node_test_result.prop('testPlan')
result.starttime = parse_date(node_test_result.prop('starttime'))
result.endtime = parse_date(node_test_result.prop('endtime'))
result.build_fingerprint = node_build_info.prop('build_fingerprint')
result.subscriber_id = node_phone_sub_info.prop('subscriberId')
result.network = node_build_info.prop('network')
result.os_version = node_os.prop('version')
result.java_version = node_java.prop('version')
result.cts_version = node_cts.prop('version')
for node_fail in nodes_fail:
test_name = get_full_name(node_fail)
starttime = parse_date(node_fail.prop('starttime'))
endtime = parse_date(node_fail.prop('endtime'))
result.fails.append(Fail(test_name, starttime, endtime))
CREATE TABLE "test" (
"test_name" varchar(256) NOT NULL,
"waive" integer NOT NULL DEFAULT 0,
"url" varchar(256) DEFAULT NULL,
PRIMARY KEY ("test_name")
);
CREATE TABLE "fail" (
"report_id" char(40) NOT NULL,
"test_name" varchar(256) NOT NULL,
"starttime" timestamp NOT NULL ,
"endtime" timestamp NOT NULL ,
PRIMARY KEY ("report_id","test_name")
);
CREATE TABLE "report" (
"report_id" char(40) NOT NULL,
"product_name" varchar(40) NOT NULL,
"ap_name" varchar(40) NOT NULL,
"tester_name" varchar(40) NOT NULL,
"test_plan" varchar(40) NOT NULL,
"starttime" timestamp NOT NULL ,
"endtime" timestamp NOT NULL ,
"build_fingerprint" varchar(256) NOT NULL,
"subscriber_id" varchar(40) NOT NULL,
"network" varchar(40) NOT NULL,
"os_version" varchar(128) NOT NULL,
"java_version" varchar(128) NOT NULL,
"cts_version" varchar(128) NOT NULL,
PRIMARY KEY ("report_id")
);
CREATE TABLE "tester" (
"tester_name" varchar(64) NOT NULL,
"group_name" varchar(64) NOT NULL
);
CREATE INDEX "report_starttime" ON "report" ("starttime");
CREATE INDEX "report_endtime" ON "report" ("endtime");
CREATE INDEX "fail_starttime" ON "fail" ("starttime");
CREATE INDEX "fail_endtime" ON "fail" ("endtime");
CREATE INDEX "test_waive" ON "test" ("waive");
# -*- coding: utf-8 -*-
import hashlib
import django.db
from django import forms
from django.db import transaction
from django.http import HttpResponse
from django.http import HttpResponseRedirect
from django.core.files.storage import default_storage
from django.core.files.base import ContentFile
from django.template import Context, loader
from cts_db import common
from cts_db import result_parser
SQL_INSERT_REPORT = '''
INSERT INTO report (report_id, product_name, ap_name, tester_name,
test_plan, starttime, endtime, build_fingerprint, subscriber_id, network,
os_version, java_version, cts_version, sdk_version)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
'''
SQL_INSERT_FAIL = '''
INSERT INTO fail (report_id, test_name, starttime, endtime)
VALUES (%s, %s, %s, %s)
'''
SQL_INSERT_TEST = '''
INSERT OR IGNORE INTO test (test_name)
VALUES (%s)
'''
class SubmitForm(forms.Form):
project = forms.ChoiceField(
widget = forms.Select,
label = '專案',
choices = [('', '--')] + [(e, e) for e in common.projects],
)
wifi = forms.ChoiceField(
widget = forms.Select,
label = 'WIFI AP',
choices = [('', '--')] + [(e, e) for e in common.wifi_aps],
)
tester = forms.ChoiceField(
widget = forms.Select,
label = '測試者',
choices = [('', '--')] + [(e, e) for e in common.testers],
)
cts_version = forms.ChoiceField(
widget = forms.Select,
label = 'CTS version',
choices = [('', '--')] + [(e, e) for e in common.cts_versions],
)
sdk_version = forms.ChoiceField(
widget = forms.Select,
label = 'SDK version',
choices = [('', '--')] + [(e, e) for e in common.sdk_versions],
)
report_file = forms.FileField(label = 'testResult.xml')
def page(request):
if request.method == 'POST':
form = SubmitForm(request.POST, request.FILES)
if form.is_valid():
project_name = form.cleaned_data['project']
wifi_name = form.cleaned_data['wifi']
tester_name = form.cleaned_data['tester']
cts_version = form.cleaned_data['cts_version']
sdk_version = form.cleaned_data['sdk_version']
data = request.FILES['report_file'].read()
sha1 = hashlib.sha1()
sha1.update(data)
digest = sha1.hexdigest()
filename = '{0}/{1}.xml'.format(digest[:2], digest[2:])
if default_storage.exists(filename):
return HttpResponse('The file is already uploaded.') # TODO
else:
result = result_parser.Result(project_name, wifi_name, tester_name, cts_version, sdk_version)
result_parser.parse(result, data)
default_storage.save(filename, ContentFile(data))
store(digest, result)
return HttpResponseRedirect('.') # TODO
else:
return HttpResponse('Upload fail.') # TODO
else:
template = loader.get_template('submit.html')
form = SubmitForm()
context = Context({
'form': form,
})
return HttpResponse(template.render(context))
def store(digest, result):
cur = django.db.connections['default'].cursor()
cur.execute(SQL_INSERT_REPORT, (
digest,
result.project_name,
result.ap_name,
result.tester_name,
result.test_plan,
result.starttime,
result.endtime,
result.build_fingerprint,
result.subscriber_id,
result.network,
result.os_version,
result.java_version,
result.cts_version,
result.sdk_version,
))
for fail in result.fails:
cur.execute(SQL_INSERT_FAIL, (
digest,
fail.test_name,
fail.starttime,
fail.endtime,
))
cur.execute(SQL_INSERT_TEST, (
fail.test_name,
))
transaction.commit_unless_managed()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment