Monkey patch autopep8.py for running within StaSh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Copyright (C) 2010-2011 Hideo Hattori | |
# Copyright (C) 2011-2013 Hideo Hattori, Steven Myint | |
# Copyright (C) 2013-2015 Hideo Hattori, Steven Myint, Bill Wendling | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining | |
# a copy of this software and associated documentation files (the | |
# "Software"), to deal in the Software without restriction, including | |
# without limitation the rights to use, copy, modify, merge, publish, | |
# distribute, sublicense, and/or sell copies of the Software, and to | |
# permit persons to whom the Software is furnished to do so, subject to | |
# the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be | |
# included in all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | |
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | |
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND | |
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS | |
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN | |
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN | |
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
"""Automatically formats Python code to conform to the PEP 8 style guide. | |
Fixes that only need be done once can be added by adding a function of the form | |
"fix_<code>(source)" to this module. They should return the fixed source code. | |
These fixes are picked up by apply_global_fixes(). | |
Fixes that depend on pep8 should be added as methods to FixPEP8. See the class | |
documentation for more information. | |
""" | |
from __future__ import absolute_import | |
from __future__ import division | |
from __future__ import print_function | |
from __future__ import unicode_literals | |
import codecs | |
import collections | |
import copy | |
import difflib | |
import fnmatch | |
import inspect | |
import io | |
import keyword | |
import locale | |
import os | |
import re | |
import signal | |
import sys | |
import textwrap | |
import token | |
import tokenize | |
import pep8 | |
reload(pep8) | |
try: | |
unicode | |
except NameError: | |
unicode = str | |
__version__ = '1.2.2a0' | |
CR = '\r' | |
LF = '\n' | |
CRLF = '\r\n' | |
PYTHON_SHEBANG_REGEX = re.compile(r'^#!.*\bpython[23]?\b\s*$') | |
# For generating line shortening candidates. | |
SHORTEN_OPERATOR_GROUPS = frozenset([ | |
frozenset([',']), | |
frozenset(['%']), | |
frozenset([',', '(', '[', '{']), | |
frozenset(['%', '(', '[', '{']), | |
frozenset([',', '(', '[', '{', '%', '+', '-', '*', '/', '//']), | |
frozenset(['%', '+', '-', '*', '/', '//']), | |
]) | |
DEFAULT_IGNORE = 'E24' | |
DEFAULT_INDENT_SIZE = 4 | |
# W602 is handled separately due to the need to avoid "with_traceback". | |
CODE_TO_2TO3 = { | |
'E231': ['ws_comma'], | |
'E721': ['idioms'], | |
'W601': ['has_key'], | |
'W603': ['ne'], | |
'W604': ['repr'], | |
'W690': ['apply', | |
'except', | |
'exitfunc', | |
'numliterals', | |
'operator', | |
'paren', | |
'reduce', | |
'renames', | |
'standarderror', | |
'sys_exc', | |
'throw', | |
'tuple_params', | |
'xreadlines']} | |
if sys.platform == 'win32': # pragma: no cover | |
DEFAULT_CONFIG = os.path.expanduser(r'~\.pep8') | |
else: | |
DEFAULT_CONFIG = os.path.join(os.getenv('XDG_CONFIG_HOME') or | |
os.path.expanduser('~/.config'), 'pep8') | |
PROJECT_CONFIG = ('setup.cfg', 'tox.ini', '.pep8') | |
def open_with_encoding(filename, encoding=None, mode='r'): | |
"""Return opened file with a specific encoding.""" | |
if not encoding: | |
encoding = detect_encoding(filename) | |
return io.open(filename, mode=mode, encoding=encoding, | |
newline='') # Preserve line endings | |
def detect_encoding(filename): | |
"""Return file encoding.""" | |
return 'utf-8' | |
# try: | |
# with open(filename, 'rb') as input_file: | |
# from lib2to3.pgen2 import tokenize as lib2to3_tokenize | |
# encoding = lib2to3_tokenize.detect_encoding(input_file.readline)[0] | |
# | |
# # Check for correctness of encoding | |
# with open_with_encoding(filename, encoding) as test_file: | |
# test_file.read() | |
# | |
# return encoding | |
# except (LookupError, SyntaxError, UnicodeDecodeError): | |
# return 'latin-1' | |
def readlines_from_file(filename): | |
"""Return contents of file.""" | |
with open_with_encoding(filename) as input_file: | |
return input_file.readlines() | |
def extended_blank_lines(logical_line, | |
blank_lines, | |
blank_before, | |
indent_level, | |
previous_logical): | |
"""Check for missing blank lines after class declaration.""" | |
if previous_logical.startswith('class '): | |
if logical_line.startswith(('def ', 'class ', '@')): | |
if indent_level and not blank_lines and not blank_before: | |
yield (0, 'E309 expected 1 blank line after class declaration') | |
elif previous_logical.startswith('def '): | |
if blank_lines and pep8.DOCSTRING_REGEX.match(logical_line): | |
yield (0, 'E303 too many blank lines ({0})'.format(blank_lines)) | |
elif pep8.DOCSTRING_REGEX.match(previous_logical): | |
# Missing blank line between class docstring and method declaration. | |
if ( | |
indent_level and | |
not blank_lines and | |
not blank_before and | |
logical_line.startswith(('def ')) and | |
'(self' in logical_line | |
): | |
yield (0, 'E301 expected 1 blank line, found 0') | |
pep8.register_check(extended_blank_lines) | |
def continued_indentation(logical_line, tokens, indent_level, indent_char, | |
noqa): | |
"""Override pep8's function to provide indentation information.""" | |
first_row = tokens[0][2][0] | |
nrows = 1 + tokens[-1][2][0] - first_row | |
if noqa or nrows == 1: | |
return | |
# indent_next tells us whether the next block is indented. Assuming | |
# that it is indented by 4 spaces, then we should not allow 4-space | |
# indents on the final continuation line. In turn, some other | |
# indents are allowed to have an extra 4 spaces. | |
indent_next = logical_line.endswith(':') | |
row = depth = 0 | |
valid_hangs = ( | |
(DEFAULT_INDENT_SIZE,) | |
if indent_char != '\t' else (DEFAULT_INDENT_SIZE, | |
2 * DEFAULT_INDENT_SIZE) | |
) | |
# Remember how many brackets were opened on each line. | |
parens = [0] * nrows | |
# Relative indents of physical lines. | |
rel_indent = [0] * nrows | |
# For each depth, collect a list of opening rows. | |
open_rows = [[0]] | |
# For each depth, memorize the hanging indentation. | |
hangs = [None] | |
# Visual indents. | |
indent_chances = {} | |
last_indent = tokens[0][2] | |
indent = [last_indent[1]] | |
last_token_multiline = None | |
line = None | |
last_line = '' | |
last_line_begins_with_multiline = False | |
for token_type, text, start, end, line in tokens: | |
newline = row < start[0] - first_row | |
if newline: | |
row = start[0] - first_row | |
newline = (not last_token_multiline and | |
token_type not in (tokenize.NL, tokenize.NEWLINE)) | |
last_line_begins_with_multiline = last_token_multiline | |
if newline: | |
# This is the beginning of a continuation line. | |
last_indent = start | |
# Record the initial indent. | |
rel_indent[row] = pep8.expand_indent(line) - indent_level | |
# Identify closing bracket. | |
close_bracket = (token_type == tokenize.OP and text in ']})') | |
# Is the indent relative to an opening bracket line? | |
for open_row in reversed(open_rows[depth]): | |
hang = rel_indent[row] - rel_indent[open_row] | |
hanging_indent = hang in valid_hangs | |
if hanging_indent: | |
break | |
if hangs[depth]: | |
hanging_indent = (hang == hangs[depth]) | |
visual_indent = (not close_bracket and hang > 0 and | |
indent_chances.get(start[1])) | |
if close_bracket and indent[depth]: | |
# Closing bracket for visual indent. | |
if start[1] != indent[depth]: | |
yield (start, 'E124 {0}'.format(indent[depth])) | |
elif close_bracket and not hang: | |
pass | |
elif indent[depth] and start[1] < indent[depth]: | |
# Visual indent is broken. | |
yield (start, 'E128 {0}'.format(indent[depth])) | |
elif (hanging_indent or | |
(indent_next and | |
rel_indent[row] == 2 * DEFAULT_INDENT_SIZE)): | |
# Hanging indent is verified. | |
if close_bracket: | |
yield (start, 'E123 {0}'.format(indent_level + | |
rel_indent[open_row])) | |
hangs[depth] = hang | |
elif visual_indent is True: | |
# Visual indent is verified. | |
indent[depth] = start[1] | |
elif visual_indent in (text, unicode): | |
# Ignore token lined up with matching one from a previous line. | |
pass | |
else: | |
one_indented = (indent_level + rel_indent[open_row] + | |
DEFAULT_INDENT_SIZE) | |
# Indent is broken. | |
if hang <= 0: | |
error = ('E122', one_indented) | |
elif indent[depth]: | |
error = ('E127', indent[depth]) | |
elif hang > DEFAULT_INDENT_SIZE: | |
error = ('E126', one_indented) | |
else: | |
hangs[depth] = hang | |
error = ('E121', one_indented) | |
yield (start, '{0} {1}'.format(*error)) | |
# Look for visual indenting. | |
if ( | |
parens[row] and | |
token_type not in (tokenize.NL, tokenize.COMMENT) and | |
not indent[depth] | |
): | |
indent[depth] = start[1] | |
indent_chances[start[1]] = True | |
# Deal with implicit string concatenation. | |
elif (token_type in (tokenize.STRING, tokenize.COMMENT) or | |
text in ('u', 'ur', 'b', 'br')): | |
indent_chances[start[1]] = unicode | |
# Special case for the "if" statement because len("if (") is equal to | |
# 4. | |
elif not indent_chances and not row and not depth and text == 'if': | |
indent_chances[end[1] + 1] = True | |
elif text == ':' and line[end[1]:].isspace(): | |
open_rows[depth].append(row) | |
# Keep track of bracket depth. | |
if token_type == tokenize.OP: | |
if text in '([{': | |
depth += 1 | |
indent.append(0) | |
hangs.append(None) | |
if len(open_rows) == depth: | |
open_rows.append([]) | |
open_rows[depth].append(row) | |
parens[row] += 1 | |
elif text in ')]}' and depth > 0: | |
# Parent indents should not be more than this one. | |
prev_indent = indent.pop() or last_indent[1] | |
hangs.pop() | |
for d in range(depth): | |
if indent[d] > prev_indent: | |
indent[d] = 0 | |
for ind in list(indent_chances): | |
if ind >= prev_indent: | |
del indent_chances[ind] | |
del open_rows[depth + 1:] | |
depth -= 1 | |
if depth: | |
indent_chances[indent[depth]] = True | |
for idx in range(row, -1, -1): | |
if parens[idx]: | |
parens[idx] -= 1 | |
break | |
assert len(indent) == depth + 1 | |
if ( | |
start[1] not in indent_chances and | |
# This is for purposes of speeding up E121 (GitHub #90). | |
not last_line.rstrip().endswith(',') | |
): | |
# Allow to line up tokens. | |
indent_chances[start[1]] = text | |
last_token_multiline = (start[0] != end[0]) | |
if last_token_multiline: | |
rel_indent[end[0] - first_row] = rel_indent[row] | |
last_line = line | |
if ( | |
indent_next and | |
not last_line_begins_with_multiline and | |
pep8.expand_indent(line) == indent_level + DEFAULT_INDENT_SIZE | |
): | |
pos = (start[0], indent[0] + 4) | |
yield (pos, 'E125 {0}'.format(indent_level + | |
2 * DEFAULT_INDENT_SIZE)) | |
del pep8._checks['logical_line'][pep8.continued_indentation] | |
pep8.register_check(continued_indentation) | |
class FixPEP8(object): | |
"""Fix invalid code. | |
Fixer methods are prefixed "fix_". The _fix_source() method looks for these | |
automatically. | |
The fixer method can take either one or two arguments (in addition to | |
self). The first argument is "result", which is the error information from | |
pep8. The second argument, "logical", is required only for logical-line | |
fixes. | |
The fixer method can return the list of modified lines or None. An empty | |
list would mean that no changes were made. None would mean that only the | |
line reported in the pep8 error was modified. Note that the modified line | |
numbers that are returned are indexed at 1. This typically would correspond | |
with the line number reported in the pep8 error information. | |
[fixed method list] | |
- e121,e122,e123,e124,e125,e126,e127,e128,e129 | |
- e201,e202,e203 | |
- e211 | |
- e221,e222,e223,e224,e225 | |
- e231 | |
- e251 | |
- e261,e262 | |
- e271,e272,e273,e274 | |
- e301,e302,e303 | |
- e401 | |
- e502 | |
- e701,e702 | |
- e711 | |
- w291 | |
""" | |
def __init__(self, filename, | |
options, | |
contents=None, | |
long_line_ignore_cache=None): | |
self.filename = filename | |
if contents is None: | |
self.source = readlines_from_file(filename) | |
else: | |
sio = io.StringIO(contents) | |
self.source = sio.readlines() | |
self.options = options | |
self.indent_word = _get_indentword(''.join(self.source)) | |
self.long_line_ignore_cache = ( | |
set() if long_line_ignore_cache is None | |
else long_line_ignore_cache) | |
# Many fixers are the same even though pep8 categorizes them | |
# differently. | |
self.fix_e115 = self.fix_e112 | |
self.fix_e116 = self.fix_e113 | |
self.fix_e121 = self._fix_reindent | |
self.fix_e122 = self._fix_reindent | |
self.fix_e123 = self._fix_reindent | |
self.fix_e124 = self._fix_reindent | |
self.fix_e126 = self._fix_reindent | |
self.fix_e127 = self._fix_reindent | |
self.fix_e128 = self._fix_reindent | |
self.fix_e129 = self._fix_reindent | |
self.fix_e202 = self.fix_e201 | |
self.fix_e203 = self.fix_e201 | |
self.fix_e211 = self.fix_e201 | |
self.fix_e221 = self.fix_e271 | |
self.fix_e222 = self.fix_e271 | |
self.fix_e223 = self.fix_e271 | |
self.fix_e226 = self.fix_e225 | |
self.fix_e227 = self.fix_e225 | |
self.fix_e228 = self.fix_e225 | |
self.fix_e241 = self.fix_e271 | |
self.fix_e242 = self.fix_e224 | |
self.fix_e261 = self.fix_e262 | |
self.fix_e272 = self.fix_e271 | |
self.fix_e273 = self.fix_e271 | |
self.fix_e274 = self.fix_e271 | |
self.fix_e309 = self.fix_e301 | |
self.fix_e501 = ( | |
self.fix_long_line_logically if | |
options and (options.aggressive >= 2 or options.experimental) else | |
self.fix_long_line_physically) | |
self.fix_e703 = self.fix_e702 | |
self.fix_w293 = self.fix_w291 | |
def _fix_source(self, results): | |
try: | |
(logical_start, logical_end) = _find_logical(self.source) | |
logical_support = True | |
except (SyntaxError, tokenize.TokenError): # pragma: no cover | |
logical_support = False | |
completed_lines = set() | |
for result in sorted(results, key=_priority_key): | |
if result['line'] in completed_lines: | |
continue | |
fixed_methodname = 'fix_' + result['id'].lower() | |
if hasattr(self, fixed_methodname): | |
fix = getattr(self, fixed_methodname) | |
line_index = result['line'] - 1 | |
original_line = self.source[line_index] | |
is_logical_fix = len(_get_parameters(fix)) > 2 | |
if is_logical_fix: | |
logical = None | |
if logical_support: | |
logical = _get_logical(self.source, | |
result, | |
logical_start, | |
logical_end) | |
if logical and set(range( | |
logical[0][0] + 1, | |
logical[1][0] + 1)).intersection( | |
completed_lines): | |
continue | |
modified_lines = fix(result, logical) | |
else: | |
modified_lines = fix(result) | |
if modified_lines is None: | |
# Force logical fixes to report what they modified. | |
assert not is_logical_fix | |
if self.source[line_index] == original_line: | |
modified_lines = [] | |
if modified_lines: | |
completed_lines.update(modified_lines) | |
elif modified_lines == []: # Empty list means no fix | |
if self.options.verbose >= 2: | |
print( | |
'---> Not fixing {f} on line {l}'.format( | |
f=result['id'], l=result['line']), | |
file=sys.stderr) | |
else: # We assume one-line fix when None. | |
completed_lines.add(result['line']) | |
else: | |
if self.options.verbose >= 3: | |
print( | |
"---> '{0}' is not defined.".format(fixed_methodname), | |
file=sys.stderr) | |
info = result['info'].strip() | |
print('---> {0}:{1}:{2}:{3}'.format(self.filename, | |
result['line'], | |
result['column'], | |
info), | |
file=sys.stderr) | |
def fix(self): | |
"""Return a version of the source code with PEP 8 violations fixed.""" | |
pep8_options = { | |
'ignore': self.options.ignore, | |
'select': self.options.select, | |
'max_line_length': self.options.max_line_length, | |
} | |
results = _execute_pep8(pep8_options, self.source) | |
if self.options.verbose: | |
progress = {} | |
for r in results: | |
if r['id'] not in progress: | |
progress[r['id']] = set() | |
progress[r['id']].add(r['line']) | |
print('---> {n} issue(s) to fix {progress}'.format( | |
n=len(results), progress=progress), file=sys.stderr) | |
if self.options.line_range: | |
start, end = self.options.line_range | |
results = [r for r in results | |
if start <= r['line'] <= end] | |
self._fix_source(filter_results(source=''.join(self.source), | |
results=results, | |
aggressive=self.options.aggressive)) | |
if self.options.line_range: | |
# If number of lines has changed then change line_range. | |
count = sum(sline.count('\n') | |
for sline in self.source[start - 1:end]) | |
self.options.line_range[1] = start + count - 1 | |
return ''.join(self.source) | |
def _fix_reindent(self, result): | |
"""Fix a badly indented line. | |
This is done by adding or removing from its initial indent only. | |
""" | |
num_indent_spaces = int(result['info'].split()[1]) | |
line_index = result['line'] - 1 | |
target = self.source[line_index] | |
self.source[line_index] = ' ' * num_indent_spaces + target.lstrip() | |
def fix_e112(self, result): | |
"""Fix under-indented comments.""" | |
line_index = result['line'] - 1 | |
target = self.source[line_index] | |
if not target.lstrip().startswith('#'): | |
# Don't screw with invalid syntax. | |
return [] | |
self.source[line_index] = self.indent_word + target | |
def fix_e113(self, result): | |
"""Fix over-indented comments.""" | |
line_index = result['line'] - 1 | |
target = self.source[line_index] | |
indent = _get_indentation(target) | |
stripped = target.lstrip() | |
if not stripped.startswith('#'): | |
# Don't screw with invalid syntax. | |
return [] | |
self.source[line_index] = indent[1:] + stripped | |
def fix_e125(self, result): | |
"""Fix indentation undistinguish from the next logical line.""" | |
num_indent_spaces = int(result['info'].split()[1]) | |
line_index = result['line'] - 1 | |
target = self.source[line_index] | |
spaces_to_add = num_indent_spaces - len(_get_indentation(target)) | |
indent = len(_get_indentation(target)) | |
modified_lines = [] | |
while len(_get_indentation(self.source[line_index])) >= indent: | |
self.source[line_index] = (' ' * spaces_to_add + | |
self.source[line_index]) | |
modified_lines.append(1 + line_index) # Line indexed at 1. | |
line_index -= 1 | |
return modified_lines | |
def fix_e201(self, result): | |
"""Remove extraneous whitespace.""" | |
line_index = result['line'] - 1 | |
target = self.source[line_index] | |
offset = result['column'] - 1 | |
if is_probably_part_of_multiline(target): | |
return [] | |
fixed = fix_whitespace(target, | |
offset=offset, | |
replacement='') | |
self.source[line_index] = fixed | |
def fix_e224(self, result): | |
"""Remove extraneous whitespace around operator.""" | |
target = self.source[result['line'] - 1] | |
offset = result['column'] - 1 | |
fixed = target[:offset] + target[offset:].replace('\t', ' ') | |
self.source[result['line'] - 1] = fixed | |
def fix_e225(self, result): | |
"""Fix missing whitespace around operator.""" | |
target = self.source[result['line'] - 1] | |
offset = result['column'] - 1 | |
fixed = target[:offset] + ' ' + target[offset:] | |
# Only proceed if non-whitespace characters match. | |
# And make sure we don't break the indentation. | |
if ( | |
fixed.replace(' ', '') == target.replace(' ', '') and | |
_get_indentation(fixed) == _get_indentation(target) | |
): | |
self.source[result['line'] - 1] = fixed | |
else: | |
return [] | |
def fix_e231(self, result): | |
"""Add missing whitespace.""" | |
line_index = result['line'] - 1 | |
target = self.source[line_index] | |
offset = result['column'] | |
fixed = target[:offset].rstrip() + ' ' + target[offset:].lstrip() | |
self.source[line_index] = fixed | |
def fix_e251(self, result): | |
"""Remove whitespace around parameter '=' sign.""" | |
line_index = result['line'] - 1 | |
target = self.source[line_index] | |
# This is necessary since pep8 sometimes reports columns that goes | |
# past the end of the physical line. This happens in cases like, | |
# foo(bar\n=None) | |
c = min(result['column'] - 1, | |
len(target) - 1) | |
if target[c].strip(): | |
fixed = target | |
else: | |
fixed = target[:c].rstrip() + target[c:].lstrip() | |
# There could be an escaped newline | |
# | |
# def foo(a=\ | |
# 1) | |
if fixed.endswith(('=\\\n', '=\\\r\n', '=\\\r')): | |
self.source[line_index] = fixed.rstrip('\n\r \t\\') | |
self.source[line_index + 1] = self.source[line_index + 1].lstrip() | |
return [line_index + 1, line_index + 2] # Line indexed at 1 | |
self.source[result['line'] - 1] = fixed | |
def fix_e262(self, result): | |
"""Fix spacing after comment hash.""" | |
target = self.source[result['line'] - 1] | |
offset = result['column'] | |
code = target[:offset].rstrip(' \t#') | |
comment = target[offset:].lstrip(' \t#') | |
fixed = code + (' # ' + comment if comment.strip() else '\n') | |
self.source[result['line'] - 1] = fixed | |
def fix_e271(self, result): | |
"""Fix extraneous whitespace around keywords.""" | |
line_index = result['line'] - 1 | |
target = self.source[line_index] | |
offset = result['column'] - 1 | |
if is_probably_part_of_multiline(target): | |
return [] | |
fixed = fix_whitespace(target, | |
offset=offset, | |
replacement=' ') | |
if fixed == target: | |
return [] | |
else: | |
self.source[line_index] = fixed | |
def fix_e301(self, result): | |
"""Add missing blank line.""" | |
cr = '\n' | |
self.source[result['line'] - 1] = cr + self.source[result['line'] - 1] | |
def fix_e302(self, result): | |
"""Add missing 2 blank lines.""" | |
add_linenum = 2 - int(result['info'].split()[-1]) | |
cr = '\n' * add_linenum | |
self.source[result['line'] - 1] = cr + self.source[result['line'] - 1] | |
def fix_e303(self, result): | |
"""Remove extra blank lines.""" | |
delete_linenum = int(result['info'].split('(')[1].split(')')[0]) - 2 | |
delete_linenum = max(1, delete_linenum) | |
# We need to count because pep8 reports an offset line number if there | |
# are comments. | |
cnt = 0 | |
line = result['line'] - 2 | |
modified_lines = [] | |
while cnt < delete_linenum and line >= 0: | |
if not self.source[line].strip(): | |
self.source[line] = '' | |
modified_lines.append(1 + line) # Line indexed at 1 | |
cnt += 1 | |
line -= 1 | |
return modified_lines | |
def fix_e304(self, result): | |
"""Remove blank line following function decorator.""" | |
line = result['line'] - 2 | |
if not self.source[line].strip(): | |
self.source[line] = '' | |
def fix_e401(self, result): | |
"""Put imports on separate lines.""" | |
line_index = result['line'] - 1 | |
target = self.source[line_index] | |
offset = result['column'] - 1 | |
if not target.lstrip().startswith('import'): | |
return [] | |
indentation = re.split(pattern=r'\bimport\b', | |
string=target, maxsplit=1)[0] | |
fixed = (target[:offset].rstrip('\t ,') + '\n' + | |
indentation + 'import ' + target[offset:].lstrip('\t ,')) | |
self.source[line_index] = fixed | |
def fix_long_line_logically(self, result, logical): | |
"""Try to make lines fit within --max-line-length characters.""" | |
if ( | |
not logical or | |
len(logical[2]) == 1 or | |
self.source[result['line'] - 1].lstrip().startswith('#') | |
): | |
return self.fix_long_line_physically(result) | |
start_line_index = logical[0][0] | |
end_line_index = logical[1][0] | |
logical_lines = logical[2] | |
previous_line = get_item(self.source, start_line_index - 1, default='') | |
next_line = get_item(self.source, end_line_index + 1, default='') | |
single_line = join_logical_line(''.join(logical_lines)) | |
try: | |
fixed = self.fix_long_line( | |
target=single_line, | |
previous_line=previous_line, | |
next_line=next_line, | |
original=''.join(logical_lines)) | |
except (SyntaxError, tokenize.TokenError): | |
return self.fix_long_line_physically(result) | |
if fixed: | |
for line_index in range(start_line_index, end_line_index + 1): | |
self.source[line_index] = '' | |
self.source[start_line_index] = fixed | |
return range(start_line_index + 1, end_line_index + 1) | |
else: | |
return [] | |
def fix_long_line_physically(self, result): | |
"""Try to make lines fit within --max-line-length characters.""" | |
line_index = result['line'] - 1 | |
target = self.source[line_index] | |
previous_line = get_item(self.source, line_index - 1, default='') | |
next_line = get_item(self.source, line_index + 1, default='') | |
try: | |
fixed = self.fix_long_line( | |
target=target, | |
previous_line=previous_line, | |
next_line=next_line, | |
original=target) | |
except (SyntaxError, tokenize.TokenError): | |
return [] | |
if fixed: | |
self.source[line_index] = fixed | |
return [line_index + 1] | |
else: | |
return [] | |
def fix_long_line(self, target, previous_line, | |
next_line, original): | |
cache_entry = (target, previous_line, next_line) | |
if cache_entry in self.long_line_ignore_cache: | |
return [] | |
if target.lstrip().startswith('#'): | |
# Wrap commented lines. | |
return shorten_comment( | |
line=target, | |
max_line_length=self.options.max_line_length, | |
last_comment=not next_line.lstrip().startswith('#')) | |
fixed = get_fixed_long_line( | |
target=target, | |
previous_line=previous_line, | |
original=original, | |
indent_word=self.indent_word, | |
max_line_length=self.options.max_line_length, | |
aggressive=self.options.aggressive, | |
experimental=self.options.experimental, | |
verbose=self.options.verbose) | |
if fixed and not code_almost_equal(original, fixed): | |
return fixed | |
else: | |
self.long_line_ignore_cache.add(cache_entry) | |
return None | |
def fix_e502(self, result): | |
"""Remove extraneous escape of newline.""" | |
(line_index, _, target) = get_index_offset_contents(result, | |
self.source) | |
self.source[line_index] = target.rstrip('\n\r \t\\') + '\n' | |
def fix_e701(self, result): | |
"""Put colon-separated compound statement on separate lines.""" | |
line_index = result['line'] - 1 | |
target = self.source[line_index] | |
c = result['column'] | |
fixed_source = (target[:c] + '\n' + | |
_get_indentation(target) + self.indent_word + | |
target[c:].lstrip('\n\r \t\\')) | |
self.source[result['line'] - 1] = fixed_source | |
return [result['line'], result['line'] + 1] | |
def fix_e702(self, result, logical): | |
"""Put semicolon-separated compound statement on separate lines.""" | |
if not logical: | |
return [] # pragma: no cover | |
logical_lines = logical[2] | |
line_index = result['line'] - 1 | |
target = self.source[line_index] | |
if target.rstrip().endswith('\\'): | |
# Normalize '1; \\\n2' into '1; 2'. | |
self.source[line_index] = target.rstrip('\n \r\t\\') | |
self.source[line_index + 1] = self.source[line_index + 1].lstrip() | |
return [line_index + 1, line_index + 2] | |
if target.rstrip().endswith(';'): | |
self.source[line_index] = target.rstrip('\n \r\t;') + '\n' | |
return [line_index + 1] | |
offset = result['column'] - 1 | |
first = target[:offset].rstrip(';').rstrip() | |
second = (_get_indentation(logical_lines[0]) + | |
target[offset:].lstrip(';').lstrip()) | |
# Find inline comment. | |
inline_comment = None | |
if target[offset:].lstrip(';').lstrip()[:2] == '# ': | |
inline_comment = target[offset:].lstrip(';') | |
if inline_comment: | |
self.source[line_index] = first + inline_comment | |
else: | |
self.source[line_index] = first + '\n' + second | |
return [line_index + 1] | |
def fix_e711(self, result): | |
"""Fix comparison with None.""" | |
(line_index, offset, target) = get_index_offset_contents(result, | |
self.source) | |
right_offset = offset + 2 | |
if right_offset >= len(target): | |
return [] | |
left = target[:offset].rstrip() | |
center = target[offset:right_offset] | |
right = target[right_offset:].lstrip() | |
if not right.startswith('None'): | |
return [] | |
if center.strip() == '==': | |
new_center = 'is' | |
elif center.strip() == '!=': | |
new_center = 'is not' | |
else: | |
return [] | |
self.source[line_index] = ' '.join([left, new_center, right]) | |
def fix_e712(self, result): | |
"""Fix (trivial case of) comparison with boolean.""" | |
(line_index, offset, target) = get_index_offset_contents(result, | |
self.source) | |
# Handle very easy "not" special cases. | |
if re.match(r'^\s*if [\w.]+ == False:$', target): | |
self.source[line_index] = re.sub(r'if ([\w.]+) == False:', | |
r'if not \1:', target, count=1) | |
elif re.match(r'^\s*if [\w.]+ != True:$', target): | |
self.source[line_index] = re.sub(r'if ([\w.]+) != True:', | |
r'if not \1:', target, count=1) | |
else: | |
right_offset = offset + 2 | |
if right_offset >= len(target): | |
return [] | |
left = target[:offset].rstrip() | |
center = target[offset:right_offset] | |
right = target[right_offset:].lstrip() | |
# Handle simple cases only. | |
new_right = None | |
if center.strip() == '==': | |
if re.match(r'\bTrue\b', right): | |
new_right = re.sub(r'\bTrue\b *', '', right, count=1) | |
elif center.strip() == '!=': | |
if re.match(r'\bFalse\b', right): | |
new_right = re.sub(r'\bFalse\b *', '', right, count=1) | |
if new_right is None: | |
return [] | |
if new_right[0].isalnum(): | |
new_right = ' ' + new_right | |
self.source[line_index] = left + new_right | |
def fix_e713(self, result): | |
"""Fix (trivial case of) non-membership check.""" | |
(line_index, _, target) = get_index_offset_contents(result, | |
self.source) | |
# Handle very easy case only. | |
if re.match(r'^\s*if not [\w.]+ in [\w.]+:$', target): | |
self.source[line_index] = re.sub(r'if not ([\w.]+) in ([\w.]+):', | |
r'if \1 not in \2:', | |
target, | |
count=1) | |
def fix_w291(self, result): | |
"""Remove trailing whitespace.""" | |
fixed_line = self.source[result['line'] - 1].rstrip() | |
self.source[result['line'] - 1] = fixed_line + '\n' | |
def fix_w391(self, _): | |
"""Remove trailing blank lines.""" | |
blank_count = 0 | |
for line in reversed(self.source): | |
line = line.rstrip() | |
if line: | |
break | |
else: | |
blank_count += 1 | |
original_length = len(self.source) | |
self.source = self.source[:original_length - blank_count] | |
return range(1, 1 + original_length) | |
def get_index_offset_contents(result, source): | |
"""Return (line_index, column_offset, line_contents).""" | |
line_index = result['line'] - 1 | |
return (line_index, | |
result['column'] - 1, | |
source[line_index]) | |
def get_fixed_long_line(target, previous_line, original, | |
indent_word=' ', max_line_length=79, | |
aggressive=False, experimental=False, verbose=False): | |
"""Break up long line and return result. | |
Do this by generating multiple reformatted candidates and then | |
ranking the candidates to heuristically select the best option. | |
""" | |
indent = _get_indentation(target) | |
source = target[len(indent):] | |
assert source.lstrip() == source | |
# Check for partial multiline. | |
tokens = list(generate_tokens(source)) | |
candidates = shorten_line( | |
tokens, source, indent, | |
indent_word, | |
max_line_length, | |
aggressive=aggressive, | |
experimental=experimental, | |
previous_line=previous_line) | |
# Also sort alphabetically as a tie breaker (for determinism). | |
candidates = sorted( | |
sorted(set(candidates).union([target, original])), | |
key=lambda x: line_shortening_rank( | |
x, | |
indent_word, | |
max_line_length, | |
experimental=experimental)) | |
if verbose >= 4: | |
print(('-' * 79 + '\n').join([''] + candidates + ['']), | |
file=wrap_output(sys.stderr, 'utf-8')) | |
if candidates: | |
best_candidate = candidates[0] | |
# Don't allow things to get longer. | |
if longest_line_length(best_candidate) > longest_line_length(original): | |
return None | |
else: | |
return best_candidate | |
def longest_line_length(code): | |
"""Return length of longest line.""" | |
return max(len(line) for line in code.splitlines()) | |
def join_logical_line(logical_line): | |
"""Return single line based on logical line input.""" | |
indentation = _get_indentation(logical_line) | |
return indentation + untokenize_without_newlines( | |
generate_tokens(logical_line.lstrip())) + '\n' | |
def untokenize_without_newlines(tokens): | |
"""Return source code based on tokens.""" | |
text = '' | |
last_row = 0 | |
last_column = -1 | |
for t in tokens: | |
token_string = t[1] | |
(start_row, start_column) = t[2] | |
(end_row, end_column) = t[3] | |
if start_row > last_row: | |
last_column = 0 | |
if ( | |
(start_column > last_column or token_string == '\n') and | |
not text.endswith(' ') | |
): | |
text += ' ' | |
if token_string != '\n': | |
text += token_string | |
last_row = end_row | |
last_column = end_column | |
return text.rstrip() | |
def _find_logical(source_lines): | |
# Make a variable which is the index of all the starts of lines. | |
logical_start = [] | |
logical_end = [] | |
last_newline = True | |
parens = 0 | |
for t in generate_tokens(''.join(source_lines)): | |
if t[0] in [tokenize.COMMENT, tokenize.DEDENT, | |
tokenize.INDENT, tokenize.NL, | |
tokenize.ENDMARKER]: | |
continue | |
if not parens and t[0] in [tokenize.NEWLINE, tokenize.SEMI]: | |
last_newline = True | |
logical_end.append((t[3][0] - 1, t[2][1])) | |
continue | |
if last_newline and not parens: | |
logical_start.append((t[2][0] - 1, t[2][1])) | |
last_newline = False | |
if t[0] == tokenize.OP: | |
if t[1] in '([{': | |
parens += 1 | |
elif t[1] in '}])': | |
parens -= 1 | |
return (logical_start, logical_end) | |
def _get_logical(source_lines, result, logical_start, logical_end): | |
"""Return the logical line corresponding to the result. | |
Assumes input is already E702-clean. | |
""" | |
row = result['line'] - 1 | |
col = result['column'] - 1 | |
ls = None | |
le = None | |
for i in range(0, len(logical_start), 1): | |
assert logical_end | |
x = logical_end[i] | |
if x[0] > row or (x[0] == row and x[1] > col): | |
le = x | |
ls = logical_start[i] | |
break | |
if ls is None: | |
return None | |
original = source_lines[ls[0]:le[0] + 1] | |
return ls, le, original | |
def get_item(items, index, default=None): | |
if 0 <= index < len(items): | |
return items[index] | |
else: | |
return default | |
def reindent(source, indent_size): | |
"""Reindent all lines.""" | |
reindenter = Reindenter(source) | |
return reindenter.run(indent_size) | |
def code_almost_equal(a, b): | |
"""Return True if code is similar. | |
Ignore whitespace when comparing specific line. | |
""" | |
split_a = split_and_strip_non_empty_lines(a) | |
split_b = split_and_strip_non_empty_lines(b) | |
if len(split_a) != len(split_b): | |
return False | |
for (index, _) in enumerate(split_a): | |
if ''.join(split_a[index].split()) != ''.join(split_b[index].split()): | |
return False | |
return True | |
def split_and_strip_non_empty_lines(text): | |
"""Return lines split by newline. | |
Ignore empty lines. | |
""" | |
return [line.strip() for line in text.splitlines() if line.strip()] | |
def fix_e265(source, aggressive=False): # pylint: disable=unused-argument | |
"""Format block comments.""" | |
if '#' not in source: | |
# Optimization. | |
return source | |
ignored_line_numbers = multiline_string_lines( | |
source, | |
include_docstrings=True) | set(commented_out_code_lines(source)) | |
fixed_lines = [] | |
sio = io.StringIO(source) | |
for (line_number, line) in enumerate(sio.readlines(), start=1): | |
if ( | |
line.lstrip().startswith('#') and | |
line_number not in ignored_line_numbers and | |
not pep8.noqa(line) | |
): | |
indentation = _get_indentation(line) | |
line = line.lstrip() | |
# Normalize beginning if not a shebang. | |
if len(line) > 1: | |
pos = next((index for index, c in enumerate(line) | |
if c != '#')) | |
if ( | |
# Leave multiple spaces like '# ' alone. | |
(line[:pos].count('#') > 1 or line[1].isalnum()) and | |
# Leave stylistic outlined blocks alone. | |
not line.rstrip().endswith('#') | |
): | |
line = '# ' + line.lstrip('# \t') | |
fixed_lines.append(indentation + line) | |
else: | |
fixed_lines.append(line) | |
return ''.join(fixed_lines) | |
def refactor(source, fixer_names, ignore=None, filename=''): | |
"""Return refactored code using lib2to3. | |
Skip if ignore string is produced in the refactored code. | |
""" | |
from lib2to3 import pgen2 | |
try: | |
new_text = refactor_with_2to3(source, | |
fixer_names=fixer_names, | |
filename=filename) | |
except (pgen2.parse.ParseError, | |
SyntaxError, | |
UnicodeDecodeError, | |
UnicodeEncodeError): | |
return source | |
if ignore: | |
if ignore in new_text and ignore not in source: | |
return source | |
return new_text | |
def code_to_2to3(select, ignore): | |
fixes = set() | |
for code, fix in CODE_TO_2TO3.items(): | |
if code_match(code, select=select, ignore=ignore): | |
fixes |= set(fix) | |
return fixes | |
def fix_2to3(source, | |
aggressive=True, select=None, ignore=None, filename=''): | |
"""Fix various deprecated code (via lib2to3).""" | |
if not aggressive: | |
return source | |
select = select or [] | |
ignore = ignore or [] | |
return refactor(source, | |
code_to_2to3(select=select, | |
ignore=ignore), | |
filename=filename) | |
def fix_w602(source, aggressive=True): | |
"""Fix deprecated form of raising exception.""" | |
if not aggressive: | |
return source | |
return refactor(source, ['raise'], | |
ignore='with_traceback') | |
def find_newline(source): | |
"""Return type of newline used in source. | |
Input is a list of lines. | |
""" | |
assert not isinstance(source, unicode) | |
counter = collections.defaultdict(int) | |
for line in source: | |
if line.endswith(CRLF): | |
counter[CRLF] += 1 | |
elif line.endswith(CR): | |
counter[CR] += 1 | |
elif line.endswith(LF): | |
counter[LF] += 1 | |
return (sorted(counter, key=counter.get, reverse=True) or [LF])[0] | |
def _get_indentword(source): | |
"""Return indentation type.""" | |
indent_word = ' ' # Default in case source has no indentation | |
try: | |
for t in generate_tokens(source): | |
if t[0] == token.INDENT: | |
indent_word = t[1] | |
break | |
except (SyntaxError, tokenize.TokenError): | |
pass | |
return indent_word | |
def _get_indentation(line): | |
"""Return leading whitespace.""" | |
if line.strip(): | |
non_whitespace_index = len(line) - len(line.lstrip()) | |
return line[:non_whitespace_index] | |
else: | |
return '' | |
def get_diff_text(old, new, filename): | |
"""Return text of unified diff between old and new.""" | |
newline = '\n' | |
diff = difflib.unified_diff( | |
old, new, | |
'original/' + filename, | |
'fixed/' + filename, | |
lineterm=newline) | |
text = '' | |
for line in diff: | |
text += line | |
# Work around missing newline (http://bugs.python.org/issue2142). | |
if text and not line.endswith(newline): | |
text += newline + r'\ No newline at end of file' + newline | |
return text | |
def _priority_key(pep8_result): | |
"""Key for sorting PEP8 results. | |
Global fixes should be done first. This is important for things like | |
indentation. | |
""" | |
priority = [ | |
# Fix multiline colon-based before semicolon based. | |
'e701', | |
# Break multiline statements early. | |
'e702', | |
# Things that make lines longer. | |
'e225', 'e231', | |
# Remove extraneous whitespace before breaking lines. | |
'e201', | |
# Shorten whitespace in comment before resorting to wrapping. | |
'e262' | |
] | |
middle_index = 10000 | |
lowest_priority = [ | |
# We need to shorten lines last since the logical fixer can get in a | |
# loop, which causes us to exit early. | |
'e501' | |
] | |
key = pep8_result['id'].lower() | |
try: | |
return priority.index(key) | |
except ValueError: | |
try: | |
return middle_index + lowest_priority.index(key) + 1 | |
except ValueError: | |
return middle_index | |
def shorten_line(tokens, source, indentation, indent_word, max_line_length, | |
aggressive=False, experimental=False, previous_line=''): | |
"""Separate line at OPERATOR. | |
Multiple candidates will be yielded. | |
""" | |
for candidate in _shorten_line(tokens=tokens, | |
source=source, | |
indentation=indentation, | |
indent_word=indent_word, | |
aggressive=aggressive, | |
previous_line=previous_line): | |
yield candidate | |
if aggressive: | |
for key_token_strings in SHORTEN_OPERATOR_GROUPS: | |
shortened = _shorten_line_at_tokens( | |
tokens=tokens, | |
source=source, | |
indentation=indentation, | |
indent_word=indent_word, | |
key_token_strings=key_token_strings, | |
aggressive=aggressive) | |
if shortened is not None and shortened != source: | |
yield shortened | |
if experimental: | |
for shortened in _shorten_line_at_tokens_new( | |
tokens=tokens, | |
source=source, | |
indentation=indentation, | |
max_line_length=max_line_length): | |
yield shortened | |
def _shorten_line(tokens, source, indentation, indent_word, | |
aggressive=False, previous_line=''): | |
"""Separate line at OPERATOR. | |
The input is expected to be free of newlines except for inside multiline | |
strings and at the end. | |
Multiple candidates will be yielded. | |
""" | |
for (token_type, | |
token_string, | |
start_offset, | |
end_offset) in token_offsets(tokens): | |
if ( | |
token_type == tokenize.COMMENT and | |
not is_probably_part_of_multiline(previous_line) and | |
not is_probably_part_of_multiline(source) and | |
not source[start_offset + 1:].strip().lower().startswith( | |
('noqa', 'pragma:', 'pylint:')) | |
): | |
# Move inline comments to previous line. | |
first = source[:start_offset] | |
second = source[start_offset:] | |
yield (indentation + second.strip() + '\n' + | |
indentation + first.strip() + '\n') | |
elif token_type == token.OP and token_string != '=': | |
# Don't break on '=' after keyword as this violates PEP 8. | |
assert token_type != token.INDENT | |
first = source[:end_offset] | |
second_indent = indentation | |
if first.rstrip().endswith('('): | |
second_indent += indent_word | |
elif '(' in first: | |
second_indent += ' ' * (1 + first.find('(')) | |
else: | |
second_indent += indent_word | |
second = (second_indent + source[end_offset:].lstrip()) | |
if ( | |
not second.strip() or | |
second.lstrip().startswith('#') | |
): | |
continue | |
# Do not begin a line with a comma | |
if second.lstrip().startswith(','): | |
continue | |
# Do end a line with a dot | |
if first.rstrip().endswith('.'): | |
continue | |
if token_string in '+-*/': | |
fixed = first + ' \\' + '\n' + second | |
else: | |
fixed = first + '\n' + second | |
# Only fix if syntax is okay. | |
if check_syntax(normalize_multiline(fixed) | |
if aggressive else fixed): | |
yield indentation + fixed | |
# A convenient way to handle tokens. | |
Token = collections.namedtuple('Token', ['token_type', 'token_string', | |
'spos', 'epos', 'line']) | |
class ReformattedLines(object): | |
"""The reflowed lines of atoms. | |
Each part of the line is represented as an "atom." They can be moved | |
around when need be to get the optimal formatting. | |
""" | |
########################################################################### | |
# Private Classes | |
class _Indent(object): | |
"""Represent an indentation in the atom stream.""" | |
def __init__(self, indent_amt): | |
self._indent_amt = indent_amt | |
def emit(self): | |
return ' ' * self._indent_amt | |
@property | |
def size(self): | |
return self._indent_amt | |
class _Space(object): | |
"""Represent a space in the atom stream.""" | |
def emit(self): | |
return ' ' | |
@property | |
def size(self): | |
return 1 | |
class _LineBreak(object): | |
"""Represent a line break in the atom stream.""" | |
def emit(self): | |
return '\n' | |
@property | |
def size(self): | |
return 0 | |
def __init__(self, max_line_length): | |
self._max_line_length = max_line_length | |
self._lines = [] | |
self._bracket_depth = 0 | |
self._prev_item = None | |
self._prev_prev_item = None | |
def __repr__(self): | |
return self.emit() | |
########################################################################### | |
# Public Methods | |
def add(self, obj, indent_amt, break_after_open_bracket): | |
if isinstance(obj, Atom): | |
self._add_item(obj, indent_amt) | |
return | |
self._add_container(obj, indent_amt, break_after_open_bracket) | |
def add_comment(self, item): | |
num_spaces = 2 | |
if len(self._lines) > 1: | |
if isinstance(self._lines[-1], self._Space): | |
num_spaces -= 1 | |
if len(self._lines) > 2: | |
if isinstance(self._lines[-2], self._Space): | |
num_spaces -= 1 | |
while num_spaces > 0: | |
self._lines.append(self._Space()) | |
num_spaces -= 1 | |
self._lines.append(item) | |
def add_indent(self, indent_amt): | |
self._lines.append(self._Indent(indent_amt)) | |
def add_line_break(self, indent): | |
self._lines.append(self._LineBreak()) | |
self.add_indent(len(indent)) | |
def add_line_break_at(self, index, indent_amt): | |
self._lines.insert(index, self._LineBreak()) | |
self._lines.insert(index + 1, self._Indent(indent_amt)) | |
def add_space_if_needed(self, curr_text, equal=False): | |
if ( | |
not self._lines or isinstance( | |
self._lines[-1], (self._LineBreak, self._Indent, self._Space)) | |
): | |
return | |
prev_text = unicode(self._prev_item) | |
prev_prev_text = ( | |
unicode(self._prev_prev_item) if self._prev_prev_item else '') | |
if ( | |
# The previous item was a keyword or identifier and the current | |
# item isn't an operator that doesn't require a space. | |
((self._prev_item.is_keyword or self._prev_item.is_string or | |
self._prev_item.is_name or self._prev_item.is_number) and | |
(curr_text[0] not in '([{.,:}])' or | |
(curr_text[0] == '=' and equal))) or | |
# Don't place spaces around a '.', unless it's in an 'import' | |
# statement. | |
((prev_prev_text != 'from' and prev_text[-1] != '.' and | |
curr_text != 'import') and | |
# Don't place a space before a colon. | |
curr_text[0] != ':' and | |
# Don't split up ending brackets by spaces. | |
((prev_text[-1] in '}])' and curr_text[0] not in '.,}])') or | |
# Put a space after a colon or comma. | |
prev_text[-1] in ':,' or | |
# Put space around '=' if asked to. | |
(equal and prev_text == '=') or | |
# Put spaces around non-unary arithmetic operators. | |
((self._prev_prev_item and | |
(prev_text not in '+-' and | |
(self._prev_prev_item.is_name or | |
self._prev_prev_item.is_number or | |
self._prev_prev_item.is_string)) and | |
prev_text in ('+', '-', '%', '*', '/', '//', '**', 'in'))))) | |
): | |
self._lines.append(self._Space()) | |
def previous_item(self): | |
"""Return the previous non-whitespace item.""" | |
return self._prev_item | |
def fits_on_current_line(self, item_extent): | |
return self.current_size() + item_extent <= self._max_line_length | |
def current_size(self): | |
"""The size of the current line minus the indentation.""" | |
size = 0 | |
for item in reversed(self._lines): | |
size += item.size | |
if isinstance(item, self._LineBreak): | |
break | |
return size | |
def line_empty(self): | |
return (self._lines and | |
isinstance(self._lines[-1], | |
(self._LineBreak, self._Indent))) | |
def emit(self): | |
string = '' | |
for item in self._lines: | |
if isinstance(item, self._LineBreak): | |
string = string.rstrip() | |
string += item.emit() | |
return string.rstrip() + '\n' | |
########################################################################### | |
# Private Methods | |
def _add_item(self, item, indent_amt): | |
"""Add an item to the line. | |
Reflow the line to get the best formatting after the item is | |
inserted. The bracket depth indicates if the item is being | |
inserted inside of a container or not. | |
""" | |
if self._prev_item and self._prev_item.is_string and item.is_string: | |
# Place consecutive string literals on separate lines. | |
self._lines.append(self._LineBreak()) | |
self._lines.append(self._Indent(indent_amt)) | |
item_text = unicode(item) | |
if self._lines and self._bracket_depth: | |
# Adding the item into a container. | |
self._prevent_default_initializer_splitting(item, indent_amt) | |
if item_text in '.,)]}': | |
self._split_after_delimiter(item, indent_amt) | |
elif self._lines and not self.line_empty(): | |
# Adding the item outside of a container. | |
if self.fits_on_current_line(len(item_text)): | |
self._enforce_space(item) | |
else: | |
# Line break for the new item. | |
self._lines.append(self._LineBreak()) | |
self._lines.append(self._Indent(indent_amt)) | |
self._lines.append(item) | |
self._prev_item, self._prev_prev_item = item, self._prev_item | |
if item_text in '([{': | |
self._bracket_depth += 1 | |
elif item_text in '}])': | |
self._bracket_depth -= 1 | |
assert self._bracket_depth >= 0 | |
def _add_container(self, container, indent_amt, break_after_open_bracket): | |
actual_indent = indent_amt + 1 | |
if ( | |
unicode(self._prev_item) != '=' and | |
not self.line_empty() and | |
not self.fits_on_current_line( | |
container.size + self._bracket_depth + 2) | |
): | |
if unicode(container)[0] == '(' and self._prev_item.is_name: | |
# Don't split before the opening bracket of a call. | |
break_after_open_bracket = True | |
actual_indent = indent_amt + 4 | |
elif ( | |
break_after_open_bracket or | |
unicode(self._prev_item) not in '([{' | |
): | |
# If the container doesn't fit on the current line and the | |
# current line isn't empty, place the container on the next | |
# line. | |
self._lines.append(self._LineBreak()) | |
self._lines.append(self._Indent(indent_amt)) | |
break_after_open_bracket = False | |
else: | |
actual_indent = self.current_size() + 1 | |
break_after_open_bracket = False | |
if isinstance(container, (ListComprehension, IfExpression)): | |
actual_indent = indent_amt | |
# Increase the continued indentation only if recursing on a | |
# container. | |
container.reflow(self, ' ' * actual_indent, | |
break_after_open_bracket=break_after_open_bracket) | |
def _prevent_default_initializer_splitting(self, item, indent_amt): | |
"""Prevent splitting between a default initializer. | |
When there is a default initializer, it's best to keep it all on | |
the same line. It's nicer and more readable, even if it goes | |
over the maximum allowable line length. This goes back along the | |
current line to determine if we have a default initializer, and, | |
if so, to remove extraneous whitespaces and add a line | |
break/indent before it if needed. | |
""" | |
if unicode(item) == '=': | |
# This is the assignment in the initializer. Just remove spaces for | |
# now. | |
self._delete_whitespace() | |
return | |
if (not self._prev_item or not self._prev_prev_item or | |
unicode(self._prev_item) != '='): | |
return | |
self._delete_whitespace() | |
prev_prev_index = self._lines.index(self._prev_prev_item) | |
if ( | |
isinstance(self._lines[prev_prev_index - 1], self._Indent) or | |
self.fits_on_current_line(item.size + 1) | |
): | |
# The default initializer is already the only item on this line. | |
# Don't insert a newline here. | |
return | |
# Replace the space with a newline/indent combo. | |
if isinstance(self._lines[prev_prev_index - 1], self._Space): | |
del self._lines[prev_prev_index - 1] | |
self.add_line_break_at(self._lines.index(self._prev_prev_item), | |
indent_amt) | |
def _split_after_delimiter(self, item, indent_amt): | |
"""Split the line only after a delimiter.""" | |
self._delete_whitespace() | |
if self.fits_on_current_line(item.size): | |
return | |
last_space = None | |
for item in reversed(self._lines): | |
if ( | |
last_space and | |
(not isinstance(item, Atom) or not item.is_colon) | |
): | |
break | |
else: | |
last_space = None | |
if isinstance(item, self._Space): | |
last_space = item | |
if isinstance(item, (self._LineBreak, self._Indent)): | |
return | |
if not last_space: | |
return | |
self.add_line_break_at(self._lines.index(last_space), indent_amt) | |
def _enforce_space(self, item): | |
"""Enforce a space in certain situations. | |
There are cases where we will want a space where normally we | |
wouldn't put one. This just enforces the addition of a space. | |
""" | |
if isinstance(self._lines[-1], | |
(self._Space, self._LineBreak, self._Indent)): | |
return | |
if not self._prev_item: | |
return | |
item_text = unicode(item) | |
prev_text = unicode(self._prev_item) | |
# Prefer a space around a '.' in an import statement, and between the | |
# 'import' and '('. | |
if ( | |
(item_text == '.' and prev_text == 'from') or | |
(item_text == 'import' and prev_text == '.') or | |
(item_text == '(' and prev_text == 'import') | |
): | |
self._lines.append(self._Space()) | |
def _delete_whitespace(self): | |
"""Delete all whitespace from the end of the line.""" | |
while isinstance(self._lines[-1], (self._Space, self._LineBreak, | |
self._Indent)): | |
del self._lines[-1] | |
class Atom(object): | |
"""The smallest unbreakable unit that can be reflowed.""" | |
def __init__(self, atom): | |
self._atom = atom | |
def __repr__(self): | |
return self._atom.token_string | |
def __len__(self): | |
return self.size | |
def reflow( | |
self, reflowed_lines, continued_indent, extent, | |
break_after_open_bracket=False, | |
is_list_comp_or_if_expr=False, | |
next_is_dot=False | |
): | |
if self._atom.token_type == tokenize.COMMENT: | |
reflowed_lines.add_comment(self) | |
return | |
total_size = extent if extent else self.size | |
if self._atom.token_string not in ',:([{}])': | |
# Some atoms will need an extra 1-sized space token after them. | |
total_size += 1 | |
prev_item = reflowed_lines.previous_item() | |
if ( | |
not is_list_comp_or_if_expr and | |
not reflowed_lines.fits_on_current_line(total_size) and | |
not (next_is_dot and | |
reflowed_lines.fits_on_current_line(self.size + 1)) and | |
not reflowed_lines.line_empty() and | |
not self.is_colon and | |
not (prev_item and prev_item.is_name and | |
unicode(self) == '(') | |
): | |
# Start a new line if there is already something on the line and | |
# adding this atom would make it go over the max line length. | |
reflowed_lines.add_line_break(continued_indent) | |
else: | |
reflowed_lines.add_space_if_needed(unicode(self)) | |
reflowed_lines.add(self, len(continued_indent), | |
break_after_open_bracket) | |
def emit(self): | |
return self.__repr__() | |
@property | |
def is_keyword(self): | |
return keyword.iskeyword(self._atom.token_string) | |
@property | |
def is_string(self): | |
return self._atom.token_type == tokenize.STRING | |
@property | |
def is_name(self): | |
return self._atom.token_type == tokenize.NAME | |
@property | |
def is_number(self): | |
return self._atom.token_type == tokenize.NUMBER | |
@property | |
def is_comma(self): | |
return self._atom.token_string == ',' | |
@property | |
def is_colon(self): | |
return self._atom.token_string == ':' | |
@property | |
def size(self): | |
return len(self._atom.token_string) | |
class Container(object): | |
"""Base class for all container types.""" | |
def __init__(self, items): | |
self._items = items | |
def __repr__(self): | |
string = '' | |
last_was_keyword = False | |
for item in self._items: | |
if item.is_comma: | |
string += ', ' | |
elif item.is_colon: | |
string += ': ' | |
else: | |
item_string = unicode(item) | |
if ( | |
string and | |
(last_was_keyword or | |
(not string.endswith(tuple('([{,.:}]) ')) and | |
not item_string.startswith(tuple('([{,.:}])')))) | |
): | |
string += ' ' | |
string += item_string | |
last_was_keyword = item.is_keyword | |
return string | |
def __iter__(self): | |
for element in self._items: | |
yield element | |
def __getitem__(self, idx): | |
return self._items[idx] | |
def reflow(self, reflowed_lines, continued_indent, | |
break_after_open_bracket=False): | |
last_was_container = False | |
for (index, item) in enumerate(self._items): | |
next_item = get_item(self._items, index + 1) | |
if isinstance(item, Atom): | |
is_list_comp_or_if_expr = ( | |
isinstance(self, (ListComprehension, IfExpression))) | |
item.reflow(reflowed_lines, continued_indent, | |
self._get_extent(index), | |
is_list_comp_or_if_expr=is_list_comp_or_if_expr, | |
next_is_dot=(next_item and | |
unicode(next_item) == '.')) | |
if last_was_container and item.is_comma: | |
reflowed_lines.add_line_break(continued_indent) | |
last_was_container = False | |
else: # isinstance(item, Container) | |
reflowed_lines.add(item, len(continued_indent), | |
break_after_open_bracket) | |
last_was_container = not isinstance(item, (ListComprehension, | |
IfExpression)) | |
if ( | |
break_after_open_bracket and index == 0 and | |
# Prefer to keep empty containers together instead of | |
# separating them. | |
unicode(item) == self.open_bracket and | |
(not next_item or unicode(next_item) != self.close_bracket) and | |
(len(self._items) != 3 or not isinstance(next_item, Atom)) | |
): | |
reflowed_lines.add_line_break(continued_indent) | |
break_after_open_bracket = False | |
else: | |
next_next_item = get_item(self._items, index + 2) | |
if ( | |
unicode(item) not in ['.', '%', 'in'] and | |
next_item and not isinstance(next_item, Container) and | |
unicode(next_item) != ':' and | |
next_next_item and (not isinstance(next_next_item, Atom) or | |
unicode(next_item) == 'not') and | |
not reflowed_lines.line_empty() and | |
not reflowed_lines.fits_on_current_line( | |
self._get_extent(index + 1) + 2) | |
): | |
reflowed_lines.add_line_break(continued_indent) | |
def _get_extent(self, index): | |
"""The extent of the full element. | |
E.g., the length of a function call or keyword. | |
""" | |
extent = 0 | |
prev_item = get_item(self._items, index - 1) | |
seen_dot = prev_item and unicode(prev_item) == '.' | |
while index < len(self._items): | |
item = get_item(self._items, index) | |
index += 1 | |
if isinstance(item, (ListComprehension, IfExpression)): | |
break | |
if isinstance(item, Container): | |
if prev_item and prev_item.is_name: | |
if seen_dot: | |
extent += 1 | |
else: | |
extent += item.size | |
prev_item = item | |
continue | |
elif (unicode(item) not in ['.', '=', ':', 'not'] and | |
not item.is_name and not item.is_string): | |
break | |
if unicode(item) == '.': | |
seen_dot = True | |
extent += item.size | |
prev_item = item | |
return extent | |
@property | |
def is_string(self): | |
return False | |
@property | |
def size(self): | |
return len(self.__repr__()) | |
@property | |
def is_keyword(self): | |
return False | |
@property | |
def is_name(self): | |
return False | |
@property | |
def is_comma(self): | |
return False | |
@property | |
def is_colon(self): | |
return False | |
@property | |
def open_bracket(self): | |
return None | |
@property | |
def close_bracket(self): | |
return None | |
class Tuple(Container): | |
"""A high-level representation of a tuple.""" | |
@property | |
def open_bracket(self): | |
return '(' | |
@property | |
def close_bracket(self): | |
return ')' | |
class List(Container): | |
"""A high-level representation of a list.""" | |
@property | |
def open_bracket(self): | |
return '[' | |
@property | |
def close_bracket(self): | |
return ']' | |
class DictOrSet(Container): | |
"""A high-level representation of a dictionary or set.""" | |
@property | |
def open_bracket(self): | |
return '{' | |
@property | |
def close_bracket(self): | |
return '}' | |
class ListComprehension(Container): | |
"""A high-level representation of a list comprehension.""" | |
@property | |
def size(self): | |
length = 0 | |
for item in self._items: | |
if isinstance(item, IfExpression): | |
break | |
length += item.size | |
return length | |
class IfExpression(Container): | |
"""A high-level representation of an if-expression.""" | |
def _parse_container(tokens, index, for_or_if=None): | |
"""Parse a high-level container, such as a list, tuple, etc.""" | |
# Store the opening bracket. | |
items = [Atom(Token(*tokens[index]))] | |
index += 1 | |
num_tokens = len(tokens) | |
while index < num_tokens: | |
tok = Token(*tokens[index]) | |
if tok.token_string in ',)]}': | |
# First check if we're at the end of a list comprehension or | |
# if-expression. Don't add the ending token as part of the list | |
# comprehension or if-expression, because they aren't part of those | |
# constructs. | |
if for_or_if == 'for': | |
return (ListComprehension(items), index - 1) | |
elif for_or_if == 'if': | |
return (IfExpression(items), index - 1) | |
# We've reached the end of a container. | |
items.append(Atom(tok)) | |
# If not, then we are at the end of a container. | |
if tok.token_string == ')': | |
# The end of a tuple. | |
return (Tuple(items), index) | |
elif tok.token_string == ']': | |
# The end of a list. | |
return (List(items), index) | |
elif tok.token_string == '}': | |
# The end of a dictionary or set. | |
return (DictOrSet(items), index) | |
elif tok.token_string in '([{': | |
# A sub-container is being defined. | |
(container, index) = _parse_container(tokens, index) | |
items.append(container) | |
elif tok.token_string == 'for': | |
(container, index) = _parse_container(tokens, index, 'for') | |
items.append(container) | |
elif tok.token_string == 'if': | |
(container, index) = _parse_container(tokens, index, 'if') | |
items.append(container) | |
else: | |
items.append(Atom(tok)) | |
index += 1 | |
return (None, None) | |
def _parse_tokens(tokens): | |
"""Parse the tokens. | |
This converts the tokens into a form where we can manipulate them | |
more easily. | |
""" | |
index = 0 | |
parsed_tokens = [] | |
num_tokens = len(tokens) | |
while index < num_tokens: | |
tok = Token(*tokens[index]) | |
assert tok.token_type != token.INDENT | |
if tok.token_type == tokenize.NEWLINE: | |
# There's only one newline and it's at the end. | |
break | |
if tok.token_string in '([{': | |
(container, index) = _parse_container(tokens, index) | |
if not container: | |
return None | |
parsed_tokens.append(container) | |
else: | |
parsed_tokens.append(Atom(tok)) | |
index += 1 | |
return parsed_tokens | |
def _reflow_lines(parsed_tokens, indentation, max_line_length, | |
start_on_prefix_line): | |
"""Reflow the lines so that it looks nice.""" | |
if unicode(parsed_tokens[0]) == 'def': | |
# A function definition gets indented a bit more. | |
continued_indent = indentation + ' ' * 2 * DEFAULT_INDENT_SIZE | |
else: | |
continued_indent = indentation + ' ' * DEFAULT_INDENT_SIZE | |
break_after_open_bracket = not start_on_prefix_line | |
lines = ReformattedLines(max_line_length) | |
lines.add_indent(len(indentation.lstrip('\r\n'))) | |
if not start_on_prefix_line: | |
# If splitting after the opening bracket will cause the first element | |
# to be aligned weirdly, don't try it. | |
first_token = get_item(parsed_tokens, 0) | |
second_token = get_item(parsed_tokens, 1) | |
if ( | |
first_token and second_token and | |
unicode(second_token)[0] == '(' and | |
len(indentation) + len(first_token) + 1 == len(continued_indent) | |
): | |
return None | |
for item in parsed_tokens: | |
lines.add_space_if_needed(unicode(item), equal=True) | |
save_continued_indent = continued_indent | |
if start_on_prefix_line and isinstance(item, Container): | |
start_on_prefix_line = False | |
continued_indent = ' ' * (lines.current_size() + 1) | |
item.reflow(lines, continued_indent, break_after_open_bracket) | |
continued_indent = save_continued_indent | |
return lines.emit() | |
def _shorten_line_at_tokens_new(tokens, source, indentation, | |
max_line_length): | |
"""Shorten the line taking its length into account. | |
The input is expected to be free of newlines except for inside | |
multiline strings and at the end. | |
""" | |
# Yield the original source so to see if it's a better choice than the | |
# shortened candidate lines we generate here. | |
yield indentation + source | |
parsed_tokens = _parse_tokens(tokens) | |
if parsed_tokens: | |
# Perform two reflows. The first one starts on the same line as the | |
# prefix. The second starts on the line after the prefix. | |
fixed = _reflow_lines(parsed_tokens, indentation, max_line_length, | |
start_on_prefix_line=True) | |
if fixed and check_syntax(normalize_multiline(fixed.lstrip())): | |
yield fixed | |
fixed = _reflow_lines(parsed_tokens, indentation, max_line_length, | |
start_on_prefix_line=False) | |
if fixed and check_syntax(normalize_multiline(fixed.lstrip())): | |
yield fixed | |
def _shorten_line_at_tokens(tokens, source, indentation, indent_word, | |
key_token_strings, aggressive): | |
"""Separate line by breaking at tokens in key_token_strings. | |
The input is expected to be free of newlines except for inside | |
multiline strings and at the end. | |
""" | |
offsets = [] | |
for (index, _t) in enumerate(token_offsets(tokens)): | |
(token_type, | |
token_string, | |
start_offset, | |
end_offset) = _t | |
assert token_type != token.INDENT | |
if token_string in key_token_strings: | |
# Do not break in containers with zero or one items. | |
unwanted_next_token = { | |
'(': ')', | |
'[': ']', | |
'{': '}'}.get(token_string) | |
if unwanted_next_token: | |
if ( | |
get_item(tokens, | |
index + 1, | |
default=[None, None])[1] == unwanted_next_token or | |
get_item(tokens, | |
index + 2, | |
default=[None, None])[1] == unwanted_next_token | |
): | |
continue | |
if ( | |
index > 2 and token_string == '(' and | |
tokens[index - 1][1] in ',(%[' | |
): | |
# Don't split after a tuple start, or before a tuple start if | |
# the tuple is in a list. | |
continue | |
if end_offset < len(source) - 1: | |
# Don't split right before newline. | |
offsets.append(end_offset) | |
else: | |
# Break at adjacent strings. These were probably meant to be on | |
# separate lines in the first place. | |
previous_token = get_item(tokens, index - 1) | |
if ( | |
token_type == tokenize.STRING and | |
previous_token and previous_token[0] == tokenize.STRING | |
): | |
offsets.append(start_offset) | |
current_indent = None | |
fixed = None | |
for line in split_at_offsets(source, offsets): | |
if fixed: | |
fixed += '\n' + current_indent + line | |
for symbol in '([{': | |
if line.endswith(symbol): | |
current_indent += indent_word | |
else: | |
# First line. | |
fixed = line | |
assert not current_indent | |
current_indent = indent_word | |
assert fixed is not None | |
if check_syntax(normalize_multiline(fixed) | |
if aggressive > 1 else fixed): | |
return indentation + fixed | |
else: | |
return None | |
def token_offsets(tokens): | |
"""Yield tokens and offsets.""" | |
end_offset = 0 | |
previous_end_row = 0 | |
previous_end_column = 0 | |
for t in tokens: | |
token_type = t[0] | |
token_string = t[1] | |
(start_row, start_column) = t[2] | |
(end_row, end_column) = t[3] | |
# Account for the whitespace between tokens. | |
end_offset += start_column | |
if previous_end_row == start_row: | |
end_offset -= previous_end_column | |
# Record the start offset of the token. | |
start_offset = end_offset | |
# Account for the length of the token itself. | |
end_offset += len(token_string) | |
yield (token_type, | |
token_string, | |
start_offset, | |
end_offset) | |
previous_end_row = end_row | |
previous_end_column = end_column | |
def normalize_multiline(line): | |
"""Normalize multiline-related code that will cause syntax error. | |
This is for purposes of checking syntax. | |
""" | |
if line.startswith('def ') and line.rstrip().endswith(':'): | |
return line + ' pass' | |
elif line.startswith('return '): | |
return 'def _(): ' + line | |
elif line.startswith('@'): | |
return line + 'def _(): pass' | |
elif line.startswith('class '): | |
return line + ' pass' | |
elif line.startswith(('if ', 'elif ', 'for ', 'while ')): | |
return line + ' pass' | |
else: | |
return line | |
def fix_whitespace(line, offset, replacement): | |
"""Replace whitespace at offset and return fixed line.""" | |
# Replace escaped newlines too | |
left = line[:offset].rstrip('\n\r \t\\') | |
right = line[offset:].lstrip('\n\r \t\\') | |
if right.startswith('#'): | |
return line | |
else: | |
return left + replacement + right | |
def _execute_pep8(pep8_options, source): | |
"""Execute pep8 via python method calls.""" | |
class QuietReport(pep8.BaseReport): | |
"""Version of checker that does not print.""" | |
def __init__(self, options): | |
super(QuietReport, self).__init__(options) | |
self.__full_error_results = [] | |
def error(self, line_number, offset, text, check): | |
"""Collect errors.""" | |
code = super(QuietReport, self).error(line_number, | |
offset, | |
text, | |
check) | |
if code: | |
self.__full_error_results.append( | |
{'id': code, | |
'line': line_number, | |
'column': offset + 1, | |
'info': text}) | |
def full_error_results(self): | |
"""Return error results in detail. | |
Results are in the form of a list of dictionaries. Each | |
dictionary contains 'id', 'line', 'column', and 'info'. | |
""" | |
return self.__full_error_results | |
checker = pep8.Checker('', lines=source, | |
reporter=QuietReport, **pep8_options) | |
checker.check_all() | |
return checker.report.full_error_results() | |
def _remove_leading_and_normalize(line): | |
return line.lstrip().rstrip(CR + LF) + '\n' | |
class Reindenter(object): | |
"""Reindents badly-indented code to uniformly use four-space indentation. | |
Released to the public domain, by Tim Peters, 03 October 2000. | |
""" | |
def __init__(self, input_text): | |
sio = io.StringIO(input_text) | |
source_lines = sio.readlines() | |
self.string_content_line_numbers = multiline_string_lines(input_text) | |
# File lines, rstripped & tab-expanded. Dummy at start is so | |
# that we can use tokenize's 1-based line numbering easily. | |
# Note that a line is all-blank iff it is a newline. | |
self.lines = [] | |
for line_number, line in enumerate(source_lines, start=1): | |
# Do not modify if inside a multiline string. | |
if line_number in self.string_content_line_numbers: | |
self.lines.append(line) | |
else: | |
# Only expand leading tabs. | |
self.lines.append(_get_indentation(line).expandtabs() + | |
_remove_leading_and_normalize(line)) | |
self.lines.insert(0, None) | |
self.index = 1 # index into self.lines of next line | |
self.input_text = input_text | |
def run(self, indent_size=DEFAULT_INDENT_SIZE): | |
"""Fix indentation and return modified line numbers. | |
Line numbers are indexed at 1. | |
""" | |
if indent_size < 1: | |
return self.input_text | |
try: | |
stats = _reindent_stats(tokenize.generate_tokens(self.getline)) | |
except (SyntaxError, tokenize.TokenError): | |
return self.input_text | |
# Remove trailing empty lines. | |
lines = self.lines | |
# Sentinel. | |
stats.append((len(lines), 0)) | |
# Map count of leading spaces to # we want. | |
have2want = {} | |
# Program after transformation. | |
after = [] | |
# Copy over initial empty lines -- there's nothing to do until | |
# we see a line with *something* on it. | |
i = stats[0][0] | |
after.extend(lines[1:i]) | |
for i in range(len(stats) - 1): | |
thisstmt, thislevel = stats[i] | |
nextstmt = stats[i + 1][0] | |
have = _leading_space_count(lines[thisstmt]) | |
want = thislevel * indent_size | |
if want < 0: | |
# A comment line. | |
if have: | |
# An indented comment line. If we saw the same | |
# indentation before, reuse what it most recently | |
# mapped to. | |
want = have2want.get(have, -1) | |
if want < 0: | |
# Then it probably belongs to the next real stmt. | |
for j in range(i + 1, len(stats) - 1): | |
jline, jlevel = stats[j] | |
if jlevel >= 0: | |
if have == _leading_space_count(lines[jline]): | |
want = jlevel * indent_size | |
break | |
if want < 0: # Maybe it's a hanging | |
# comment like this one, | |
# in which case we should shift it like its base | |
# line got shifted. | |
for j in range(i - 1, -1, -1): | |
jline, jlevel = stats[j] | |
if jlevel >= 0: | |
want = (have + _leading_space_count( | |
after[jline - 1]) - | |
_leading_space_count(lines[jline])) | |
break | |
if want < 0: | |
# Still no luck -- leave it alone. | |
want = have | |
else: | |
want = 0 | |
assert want >= 0 | |
have2want[have] = want | |
diff = want - have | |
if diff == 0 or have == 0: | |
after.extend(lines[thisstmt:nextstmt]) | |
else: | |
for line_number, line in enumerate(lines[thisstmt:nextstmt], | |
start=thisstmt): | |
if line_number in self.string_content_line_numbers: | |
after.append(line) | |
elif diff > 0: | |
if line == '\n': | |
after.append(line) | |
else: | |
after.append(' ' * diff + line) | |
else: | |
remove = min(_leading_space_count(line), -diff) | |
after.append(line[remove:]) | |
return ''.join(after) | |
def getline(self): | |
"""Line-getter for tokenize.""" | |
if self.index >= len(self.lines): | |
line = '' | |
else: | |
line = self.lines[self.index] | |
self.index += 1 | |
return line | |
def _reindent_stats(tokens): | |
"""Return list of (lineno, indentlevel) pairs. | |
One for each stmt and comment line. indentlevel is -1 for comment lines, as | |
a signal that tokenize doesn't know what to do about them; indeed, they're | |
our headache! | |
""" | |
find_stmt = 1 # Next token begins a fresh stmt? | |
level = 0 # Current indent level. | |
stats = [] | |
for t in tokens: | |
token_type = t[0] | |
sline = t[2][0] | |
line = t[4] | |
if token_type == tokenize.NEWLINE: | |
# A program statement, or ENDMARKER, will eventually follow, | |
# after some (possibly empty) run of tokens of the form | |
# (NL | COMMENT)* (INDENT | DEDENT+)? | |
find_stmt = 1 | |
elif token_type == tokenize.INDENT: | |
find_stmt = 1 | |
level += 1 | |
elif token_type == tokenize.DEDENT: | |
find_stmt = 1 | |
level -= 1 | |
elif token_type == tokenize.COMMENT: | |
if find_stmt: | |
stats.append((sline, -1)) | |
# But we're still looking for a new stmt, so leave | |
# find_stmt alone. | |
elif token_type == tokenize.NL: | |
pass | |
elif find_stmt: | |
# This is the first "real token" following a NEWLINE, so it | |
# must be the first token of the next program statement, or an | |
# ENDMARKER. | |
find_stmt = 0 | |
if line: # Not endmarker. | |
stats.append((sline, level)) | |
return stats | |
def _leading_space_count(line): | |
"""Return number of leading spaces in line.""" | |
i = 0 | |
while i < len(line) and line[i] == ' ': | |
i += 1 | |
return i | |
def refactor_with_2to3(source_text, fixer_names, filename=''): | |
"""Use lib2to3 to refactor the source. | |
Return the refactored source code. | |
""" | |
from lib2to3.refactor import RefactoringTool | |
fixers = ['lib2to3.fixes.fix_' + name for name in fixer_names] | |
tool = RefactoringTool(fixer_names=fixers, explicit=fixers) | |
from lib2to3.pgen2 import tokenize as lib2to3_tokenize | |
try: | |
# The name parameter is necessary particularly for the "import" fixer. | |
return unicode(tool.refactor_string(source_text, name=filename)) | |
except lib2to3_tokenize.TokenError: | |
return source_text | |
def check_syntax(code): | |
"""Return True if syntax is okay.""" | |
try: | |
return compile(code, '<string>', 'exec') | |
except (SyntaxError, TypeError, UnicodeDecodeError): | |
return False | |
def filter_results(source, results, aggressive): | |
"""Filter out spurious reports from pep8. | |
If aggressive is True, we allow possibly unsafe fixes (E711, E712). | |
""" | |
non_docstring_string_line_numbers = multiline_string_lines( | |
source, include_docstrings=False) | |
all_string_line_numbers = multiline_string_lines( | |
source, include_docstrings=True) | |
commented_out_code_line_numbers = commented_out_code_lines(source) | |
has_e901 = any(result['id'].lower() == 'e901' for result in |