Skip to content

Instantly share code, notes, and snippets.

@alexey-milovidov
Created October 8, 2020 13:20
Show Gist options
  • Save alexey-milovidov/614794395556265afe72402615dd25d6 to your computer and use it in GitHub Desktop.
Save alexey-milovidov/614794395556265afe72402615dd25d6 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import os
from sandbox import sdk2
import logging
import json
import sandbox.common.types.resource as ctr
from sandbox.projects.clickhouse.BaseOnCommitTask.simple_test_task import SimpleDockerBuildTestTask
from sandbox.projects.clickhouse.BaseOnCommitTask.base import NeedToRunDescription
from sandbox.projects.clickhouse.resources import CLICKHOUSE_REPO
from sandbox.projects.clickhouse.util.task_helper import decompress_fast
NAME = 'ClickHouse'
KEY_VALUT = 'clickhouse-pvs-studio-key'
HTML_REPORT_FOLDER = 'pvs-studio-html-report'
TXT_REPORT_NAME = 'pvs-studio-task-report.txt'
class ClickhousePVSStudioCheck(SimpleDockerBuildTestTask):
@staticmethod
def get_context_name():
return "PVS check"
@staticmethod
def need_to_run(pr_info):
if 'pr-backport' in pr_info.labels or 'release' in pr_info.labels:
return NeedToRunDescription(False, 'Not ran for backport or release PRs', True)
return SimpleDockerBuildTestTask.need_to_run(pr_info)
@staticmethod
def get_images_names():
return ["yandex/clickhouse-pvs-test"]
@classmethod
def get_resources(cls, commit, repo, pull_request):
logging.info("Searching for CLICKHOUSE_REPO at commit %s", commit.sha)
bresources = sdk2.Resource.find(
CLICKHOUSE_REPO,
attrs=dict(
commit=commit.sha,
pr_number=pull_request.number,
),
state=ctr.State.READY
).order(-CLICKHOUSE_REPO.id).limit(1)
logging.info("Search finished")
res = bresources.first()
if not res:
return None
else:
logging.info("Found resource %ld", res.id)
return res
def _save_resources(self, commit, repo, pull_request):
logging.info("Downloading CLICKHOUSE_REPO resource")
self.repo_resource = self.get_resources(commit, repo, pull_request)
repo_data = sdk2.ResourceData(self.repo_resource)
repo_path = str(repo_data.path) # deb package
logging.info("Download finished, repo path %s", repo_path)
decompress_fast(repo_path)
return os.path.join(str(self.path()), "ClickHouse")
def _process_txt_report(self, path):
warnings = []
errors = []
with open(path, 'r') as report_file:
for line in report_file:
if 'viva64' in line:
continue
elif 'warn' in line:
warnings.append(':'.join(line.split('\t')[0:2]))
elif 'err' in line:
errors.append(':'.join(line.split('\t')[0:2]))
return warnings, errors
def get_parent_resource(self):
return sdk2.Resource.find(
CLICKHOUSE_REPO,
attrs=dict(
pr_number=0,
pvs_checked=True,
),
state=ctr.State.READY
).order(-CLICKHOUSE_REPO.id).limit(1).first()
def get_run_cmd(self, repo_path, result_folder, server_log_folder, _, perfraw_path):
key = sdk2.Vault.data(KEY_VALUT)
return "docker run --volume={}:/repo_folder --volume={}:/test_output "\
"-e LICENCE_NAME={} -e LICENCE_KEY={} -e CC=gcc-9 -e CXX=g++-9 {}".format(repo_path, result_folder, NAME, key, self.get_single_image_with_version())
def process_result(self, result_folder, server_log_path, perfraw_path, commit, repo, pull_request):
s3_path_prefix = str(pull_request.number) + "/" + commit.sha + "/pvs_studio_report"
html_urls = self.s3_client.upload_test_folder_to_s3(os.path.join(result_folder, HTML_REPORT_FOLDER), s3_path_prefix)
index_html = None
for url in html_urls:
if 'index.html' in url:
index_html = '<a href="{}">HTML report</a>'.format(url)
break
if not index_html:
return 'failure', 'PVS report failed to build', [], []
txt_report = os.path.join(result_folder, TXT_REPORT_NAME)
warnings, errors = self._process_txt_report(txt_report)
test_results = [(index_html, "Look at the report")]
self.repo_resource.pvs_errors = len(errors)
self.repo_resource.pvs_checked = True
self.repo_resource.pvs_errors_array = json.dumps(errors)
parent_resource = self.get_parent_resource()
if parent_resource and parent_resource.pvs_errors < len(errors):
logging.info("Prev errors %s, current errors %s", parent_resource.pvs_errors, len(errors))
description = "Found {} new errors, total {} errors".format(len(errors) - parent_resource.pvs_errors, len(errors))
status = "failure"
if parent_resource.pvs_errors_array:
old_errors = json.loads(parent_resource.pvs_errors_array)
logging.info("Old errors %s", str(old_errors))
for error in errors:
if error not in old_errors:
logging.info("Error %s is new error", error)
test_results.append((error, "FAIL"))
else:
test_results.append(("No new errors found", "OK"))
description = "New errors 0, total errors {}".format(len(errors))
status = "success"
return status, description, test_results, []
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment