Created
December 12, 2016 21:23
-
-
Save tzickel/ea4541b12c1298a8092c3ec5043ef77e to your computer and use it in GitHub Desktop.
A simple script to check python projects for bugs via cython's static analysis...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import sys | |
import os | |
import subprocess | |
import shutil | |
import tempfile | |
import time | |
import hashlib | |
import argparse | |
try: | |
from tqdm import tqdm | |
has_tqdm = True | |
except ImportError: | |
has_tqdm = False | |
cython_cmd = 'cython' | |
cython_concurrent = 4 | |
cython_verbose = False | |
cython_print_as_they_come = False | |
class SafeDirectoryCleanup(object): | |
def __init__(self): | |
self.dirname = tempfile.mkdtemp() | |
def __enter__(self): | |
return self.dirname | |
def __exit__(self, *excinfo): | |
shutil.rmtree(self.dirname) | |
def parse_run(filename, options): | |
cmd = [cython_cmd] | |
if options: | |
cmd.extend(options) | |
cmd.append(filename) | |
if options[0] == '-o': | |
cmd[2] = os.path.join(options[1], hashlib.sha1(filename.encode('utf-8')).hexdigest() + '.c') | |
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
return proc | |
def parse_check(proc): | |
stdout, stderr = proc.communicate() | |
status = proc.wait() | |
err = [] | |
for line in stderr.splitlines(): | |
line = line.decode('utf-8') | |
if (cython_verbose and line) or (line and line[0] != ' ' and ':' in line.split()[0]): | |
if cython_print_as_they_come: | |
print(line) | |
err.append(line) | |
#TODO can this happen like this (sometimes cython crashes, just print the output ?) | |
if not err and status: | |
err = ['Cython failed'] | |
if cython_print_as_they_come: | |
print(err[0]) | |
return err | |
def check_Files(files, options=None): | |
errors = {} | |
run_queue = [] | |
if has_tqdm: | |
t = tqdm(files) | |
files = iter(t) | |
#TODO why does this work globally? | |
print = t.write | |
else: | |
files = iter(files) | |
has_files = True | |
while True: | |
while has_files and len(run_queue) < cython_concurrent: | |
try: | |
filename = next(files) | |
except StopIteration: | |
has_files = False | |
break | |
run_queue.append((filename, parse_run(filename, options))) | |
toremove = [] | |
for proc in run_queue: | |
if proc[1].poll() is not None: | |
res = parse_check(proc[1]) | |
if res: | |
errors[proc[0]] = res | |
toremove.append(proc) | |
for remove in toremove: | |
run_queue.remove(remove) | |
if not has_files and not run_queue: | |
break | |
time.sleep(0.01) | |
return errors | |
def get_all_pys(files): | |
ret = [] | |
for rootpath in files: | |
if os.path.isfile(rootpath): | |
return [rootpath] | |
for root, dirs, files in os.walk(rootpath): | |
for filename in files: | |
path = os.path.join(root, filename) | |
if path.endswith('.py'): | |
ret.append(path) | |
elif not os.path.splitext(path)[1] and os.path.isfile(path): | |
with open(path, 'rt') as f: | |
line = f.readline() | |
if line.startswith('#!') and 'python' in line: | |
ret.append(path) | |
return ret | |
def print_errors(errors): | |
for filename in errors: | |
for error in errors[filename]: | |
print(error) | |
def main(files, cython_options=None): | |
with SafeDirectoryCleanup() as cleanup_dir: | |
options = ['-o', cleanup_dir] | |
if cython_options: | |
options.extend(cython_options) | |
errors = check_Files(get_all_pys(files), options=options) | |
if errors: | |
if not cython_print_as_they_come: | |
print_errors(errors) | |
return 1 | |
return 0 | |
def split_list(lst, sep): | |
try: | |
index = lst.index(sep) | |
return lst[:index], lst[index + 1:] | |
except: | |
return lst, None | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--cython', default='cython', help='set the cython command to execute on each file') | |
parser.add_argument('--concurrent', type=int, default=4, help='how many cython instances to run concurrently') | |
parser.add_argument('--verbose', action='store_true', help='show verbose error output from cython') | |
parser.add_argument('--nobuffer', action='store_true', help='print errors as they come') | |
args, unknown = parser.parse_known_args() | |
files, cython_options = split_list(unknown, '-') | |
if not files: | |
parser.print_help() | |
print() | |
print('You can extract a package to inspect usin pip to a temp dir using: pip install -I --root temp packages (or --no-deps just for that package)') | |
print('You can pass options to cython (such as -3 to parse the source as python 3 code) by passing them after putting an - seperator in the input') | |
print('error: Please input a list of directories to scan for python source to check') | |
sys.exit(1) | |
cython_cmd = args.cython | |
cython_concurrent = args.concurrent | |
cython_verbose = args.verbose | |
cython_print_as_they_come = args.nobuffer | |
sys.exit(main(files, cython_options)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment