Skip to content

Instantly share code, notes, and snippets.

@noamraph
Last active April 4, 2017 20:53
Show Gist options
  • Save noamraph/c933b32deb8304ac7ccd to your computer and use it in GitHub Desktop.
Save noamraph/c933b32deb8304ac7ccd to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from __future__ import division, print_function
import sys
import os
from os.path import join, abspath, exists, expanduser
from contextlib import contextmanager
import hashlib
import cStringIO
import json
from collections import OrderedDict
from pylint.lint import PyLinter, Run
import pylint.utils
from pylint.checkers.imports import ImportsChecker
from pylint.interfaces import UNDEFINED
from astroid.bases import NodeNG, YES
from astroid.node_classes import EmptyNode
from astroid import MANAGER
DEFAULT_CACHE_DIR = expanduser('~/.cache/pylint')
def hexdigest(s):
return hashlib.sha1(s).hexdigest()
def depyc(fn):
return fn[:-1] if fn.endswith('.pyc') else fn
def touch(fn, times=None):
with open(fn, 'a'):
os.utime(fn, times)
# The cache is a record of each module that passed cleanly.
# For each such module, we create a file in DEFAULT_CACHE_DIR. Its name is the
# sha1 of '\0'.join([cache_stamp, modname, filename, sha1]).
# cache_stamp is a unique function of the configuration.
# modname is the name of the module which passed.
# filename is the module filename, and sha1 is the hash of its content.
# The content of each such file is a json which includes cache_stamp, modname,
# filename and sha1, and in addition, 'dependencies'. This is a list of
# (modname, filename, sha1) tuples for each dependency.
# The files are touched whenever a cache is hit, so older entries can be
# removed.
def get_cache_stamp(linter):
"""
Return a value that should change if the cache is invalidated.
It includes the sha1 of generate_config() and of this file.
"""
f = cStringIO.StringIO()
linter.generate_config(f)
config = f.getvalue()
config_hash = hexdigest(config)
mysrc = open(depyc(__file__)).read()
src_hash = hexdigest(mysrc)
return 'executable:{} pylint:{} cached_pylint:{} config:{}'.format(
sys.executable, pylint.__version__, src_hash, config_hash)
def get_cache_entry_fn(cache_dir, cache_stamp, modname, fn, sha1):
return join(cache_dir,
hexdigest('\0'.join([cache_stamp, modname, fn, sha1])))
def get_file_sha1(fn, sha1_cache={}):
"""
Get sha1 of the content of a file. Store in memory cache for future calls
"""
fn = abspath(fn)
try:
return sha1_cache[fn]
except KeyError:
r = hexdigest(open(fn).read())
sha1_cache[fn] = r
return r
@contextmanager
def record_infer_depends(depends):
"""
Monkey-patch inference to update a set of dependencies.
depends should be a dict. It will be updated so depends[dependant_mod] will
be a set of dependencies.
"""
orig_infer = NodeNG.infer
assert orig_infer.im_func.func_globals['__name__'] == 'astroid.bases' # pylint: disable=no-member
def infer(self, context=None, **kwargs):
self_mod = self.root().name if not isinstance(self, EmptyNode) else None
for inode in orig_infer(self, context, **kwargs):
if self_mod is not None and inode is not YES:
depends.setdefault(self_mod, set()).add(inode.root().name)
yield inode
NodeNG.infer = infer
try:
yield
finally:
NodeNG.infer = orig_infer
def remove_init(modname):
suf = '.__init__'
return modname[:-len(suf)] if modname.endswith(suf) else modname
class CachedPyLinter(PyLinter):
def __init__(self, *args, **kwargs):
self.cache_dir = kwargs.pop('cache_dir', DEFAULT_CACHE_DIR)
if not exists(self.cache_dir):
os.makedirs(self.cache_dir)
PyLinter.__init__(self, *args, **kwargs)
self._cache_stamp = None
# Used for tracking check operations
self.analyzed_modnames = None
self.nopass_modnames = None
self.dependencies = None
def check(self, files_or_modules):
"""main checking entry: check a list of files or modules from their
name.
"""
self.analyzed_modnames = set()
self.nopass_modnames = set()
self.dependencies = {}
with record_infer_depends(self.dependencies):
PyLinter.check(self, files_or_modules)
self.update_pass_cache()
@property
def cache_stamp(self):
if self._cache_stamp is None:
self._cache_stamp = get_cache_stamp(self)
return self._cache_stamp
def set_current_module(self, modname, filepath=None):
realmodname = remove_init(modname)
self.analyzed_modnames.add(realmodname)
print(realmodname)
return PyLinter.set_current_module(self, modname, filepath)
def add_message(self, msg_descr, line=None, node=None, args=None,
confidence=UNDEFINED):
# We wrap PyLinter.add_message in order to update self.nopass_modnames
msgid = self.msgs_store.check_message_id(msg_descr).msgid
if line is None and node is not None:
line = node.fromlineno
if self.is_message_enabled(msgid, line, confidence):
# The same test is done in PyLinter.add_message - we want to know
# if a message is displayed
modname = (self.current_name if node is None
else pylint.utils.get_module_and_frameid(node)[0])
#import pdb; pdb.set_trace()
self.nopass_modnames.add(modname)
PyLinter.add_message(self, msg_descr, line, node, args, confidence)
def should_analyze_file(self, modname, path):
realmodname = remove_init(modname)
path = abspath(path)
sha1 = get_file_sha1(path)
cache_entry_fn = get_cache_entry_fn(
self.cache_dir, self.cache_stamp, realmodname, path, sha1)
if exists(cache_entry_fn):
d = json.load(open(cache_entry_fn))
if all(get_file_sha1(fn) == sha1
for _modname, fn, sha1 in d['dependencies']):
touch(cache_entry_fn)
return False
# Else
return PyLinter.should_analyze_file(self, modname, path)
def get_imports_checker(self):
return next(checker for checker in self.get_checkers()
if isinstance(checker, ImportsChecker))
def prepare_checkers(self):
# Wrap PyLinter.prepare_checkers to make sure ImportsChecker is enabled
neededcheckers = PyLinter.prepare_checkers(self)
impchecker = self.get_imports_checker()
if impchecker not in neededcheckers:
neededcheckers.append(impchecker)
neededcheckers.sort(key=lambda chk: chk.priority, reverse=True)
return neededcheckers
def update_dependencies_from_imports(self):
# Update dependencies with imports. This is needed because if A imports
# something from B that B imported from C, record_infer_depends() will
# only record that A depends on C.
impchecker = self.get_imports_checker()
for provider_mod, user_mods in impchecker.stats['dependencies'].iteritems():
if provider_mod not in MANAGER.astroid_cache:
# It seems that ImportsChecker also includes functions, eg.
# "from tools import offset_slice_tuple"
# So we just check if a module is in astroid_cache.
continue
for user_mod in user_mods:
self.dependencies.setdefault(user_mod, set()).add(provider_mod)
def update_pass_cache(self):
self.update_dependencies_from_imports()
pass_modnames = self.analyzed_modnames - self.nopass_modnames
for modname in pass_modnames:
d = OrderedDict()
d['cache_stamp'] = self.cache_stamp
d['modname'] = modname
d['filename'] = filename = abspath(MANAGER.astroid_cache[modname].file)
d['sha1'] = sha1 = get_file_sha1(filename)
dependencies = []
for depname in self.dependencies.get(modname, ()):
if (depname == modname or depname == ''
or depname not in MANAGER.astroid_cache):
# I have seen the non-existent 'str' module. Whatever.
continue
fn = MANAGER.astroid_cache[depname].file
if fn is None:
# for built-in modules
continue
fn = abspath(fn)
dependencies.append((depname, fn, get_file_sha1(fn)))
d['dependencies'] = dependencies
cache_entry_fn = get_cache_entry_fn(
self.cache_dir, self.cache_stamp, modname, filename, sha1)
write_fn = cache_entry_fn + '.write'
with open(write_fn, 'w') as f:
json.dump(d, f, indent=2)
os.rename(write_fn, cache_entry_fn)
def main():
class CachedRun(Run):
LinterClass = CachedPyLinter
run = CachedRun(sys.argv[1:], exit=False)
return run.linter.msg_status
if __name__ == '__main__':
sys.exit(main())
@gundalow
Copy link

gundalow commented Apr 4, 2017

Just spotted this, wondering if it got any further?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment