Created
February 4, 2021 12:25
-
-
Save William-HL1991/6b04109f0e57797c7d125910f4b88b94 to your computer and use it in GitHub Desktop.
code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import os | |
import copy | |
import json | |
import logging | |
import pymysql | |
import itertools | |
import pandas as pd | |
import multiprocessing as mp | |
PWD = os.path.dirname(os.path.realpath(__file__)) | |
LOGPATH = os.path.join(PWD, './info.log') | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s', | |
filename=LOGPATH) | |
class FuncTactic: | |
def __init__(self, task_id): | |
self.task_id = task_id | |
def tactic(self): | |
case_id_path = "/Users/xxxx/Downloads/" + str(self.task_id) + ".log" | |
with open(case_id_path, 'r', encoding='utf-8') as content: | |
result_list = json.load(content) | |
content.close() | |
case_id_func = {} # 这个是id对应的方法变更 | |
for i in result_list: | |
case_id_func[str(i.get('c_id'))] = i.get('c_c_func_origin_list') | |
conn = self.connect_db("localhost", 3306, "root", "root1234", "code_trees") | |
table_name = "android_code_tree" | |
sql = "SELECT case_id, trees FROM %s where case_id in (%s)" % (table_name, str(list(case_id_func.keys()))[ | |
1:-1]) | |
covs_pd = pd.read_sql(sql, conn) | |
funcs_dict = {} # 这个是id对应的关系树 | |
for row in covs_pd.itertuples(): | |
id = int(getattr(row, "case_id")) | |
funcs = getattr(row, "trees") | |
if funcs is None or len(funcs) < 1: | |
continue | |
else: | |
funcs_dict[id] = funcs | |
conn.close() | |
# 读取文件获取方法变更和对应的case | |
func_case_id = {} # 方法对应的变更id | |
conn = self.connect_db("x.x.x.x", 3306, "test", "test", "code_ing") | |
table_name = "task_diff_case_relation" | |
sql = "SELECT relation FROM %s where taskid = %s" % (table_name, self.task_id) | |
relation = pd.read_sql(sql, conn) | |
functions = "" | |
for row in relation.itertuples(): | |
functions = getattr(row, "relation") | |
conn.close() | |
for i in json.loads(functions): | |
func_case_id[i.get('diff_code')] = i.get('caseid_list') | |
# 计算基类方法 | |
pool = mp.Pool(mp.cpu_count()) | |
jobs = [] | |
for _dict in self.split_dict(func_case_id, mp.cpu_count()): | |
jobs.append(pool.apply(self.get_dict_common_func, _dict)) | |
res = [job.get() for job in jobs] | |
pool.close() | |
common_funcs = list(itertools.chain(*map(eval, res))) | |
# 过滤 | |
result_case_id_func = copy.deepcopy(case_id_func) | |
flitur_result = {} | |
for id, fun_list in case_id_func.items(): | |
if set(fun_list) < set(common_funcs): | |
in_list = [] | |
for fun in fun_list: | |
func_case_id[fun].remove(id) | |
if len(func_case_id[fun]) > 0: | |
if id in result_case_id_func: | |
del result_case_id_func[id] | |
in_list.append(fun) | |
flitur_result[id] = in_list | |
print(result_case_id_func) | |
print(flitur_result) | |
logging.info(str(result_case_id_func.keys())) | |
logging.info(str(flitur_result.keys())) | |
print( | |
len(list(flitur_result.keys())) / (len(list(flitur_result.keys())) + len(list(result_case_id_func.keys())))) | |
def connect_db(self, host, port, user, passwd, db): | |
try: | |
conn = pymysql.connect(host=host, port=port, user=user, passwd=passwd, db=db) | |
return conn | |
except Exception as e: | |
logging.error("connect db error : " + str(e)) | |
return None | |
def get_dict_common_func(self, _dict): | |
common_funcs = [] | |
for fun, ids in _dict.items(): | |
if len(_dict[fun]) >= 40: | |
common_funcs.append(fun) | |
continue | |
else: | |
in_list = [self.get_funcs_lines_from_tree(_dict[int(x)], fun) for x in ids] | |
if len(in_list) > 100: | |
common_funcs.append(fun) | |
continue | |
return common_funcs | |
def get_funcs_lines_from_tree(self, tree_string, func): | |
data = tree_string.split("\n") | |
index_list = [data.index(i) for i in data if func in i] | |
result_list = [] | |
for i in index_list: | |
list_in = [data[i].split(" ")[-1].lstrip("L")] | |
prefix = self.rreplace(data[i].split("L")[0], "| ", "") | |
for j in data[0:i][::-1]: | |
if prefix in j: | |
# print("old new_prefix: ", prefix) | |
prefix = self.rreplace(prefix, "| ", "") | |
# print("新建new_prefix: ", prefix) | |
list_in.append(j.split(" ")[-1].lstrip("L")) | |
continue | |
else: | |
pass | |
# 查找完成所有的前向调用,之后查找后向调用,先还原prefxi | |
prefix = self.rreplace(data[i].split("L")[0], "| ", "| | ") | |
list_in = list_in[::-1] | |
for j in data[i:]: | |
if prefix in j: | |
prefix = self.rreplace(prefix, "| ", "| | ") | |
list_in.append(j.split(" ")[-1].lstrip("L")) | |
continue | |
else: | |
pass | |
if len(list_in) > 0: | |
result_list.append(list_in) | |
if len(result_list) > 0: | |
return result_list | |
else: | |
return | |
def rreplace(self, s, old, new): | |
li = s.rsplit(old, 1) | |
return new.join(li) | |
def split_dict(self, x, chunks): | |
i = itertools.cycle(range(chunks)) | |
split = [dict() for _ in range(chunks)] | |
for k, v in x.items(): | |
split[next(i)][k] = v | |
return split | |
if __name__ == "__main__": | |
fun = FuncTactic(993) | |
fun.tactic() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment