Last active
February 16, 2021 22:28
-
-
Save kalekseev/5199309df80cbee3e326ea2dd90e334b to your computer and use it in GitHub Desktop.
pip-tools upgrade-interactive POC
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import curses | |
import json | |
import logging | |
import subprocess | |
import sys | |
import threading | |
import typing | |
def init_logging(level): | |
class Formatter(logging.Formatter): | |
def formatMessage(self, record): | |
s = super().formatMessage(record) | |
if record.levelno >= logging.ERROR: | |
return f"\x1b[31;21m{s}\x1b[0m" | |
return s | |
log_stream = logging.StreamHandler() | |
log_stream.setFormatter(Formatter(fmt="%(levelname)s: %(message)s")) | |
logging.basicConfig(handlers=[log_stream], level=level) | |
class CursesFormatter: | |
STYLE_DEFAULT = None | |
def __init__(self, text): | |
self.text = text | |
self.others = [] | |
self.style = None | |
def __add__(self, other): | |
self.others.append(other) | |
return self | |
def print(self, stdscr, new_line=True): | |
stdscr.addstr(self.text, self.style or self.STYLE_DEFAULT) | |
for other in self.others: | |
if isinstance(other, str): | |
other = CursesFormatter(other) | |
other.print(stdscr, new_line=False) | |
if new_line: | |
stdscr.addstr("\n") | |
def ljust(self, num): | |
num -= len(self.text) | |
if num > 0: | |
return self + " " * num | |
def init_colors(): | |
curses.start_color() | |
curses.use_default_colors() | |
colors = [ | |
"red", | |
"green", | |
"yellow", | |
"blue", | |
"magenta", | |
"cyan", | |
"white", | |
"black", | |
] | |
def fn(style): | |
def f(self): | |
self.style = self.style | style if self.style else style | |
return self | |
return f | |
for i, color in enumerate(colors, 1): | |
curses.init_pair(i, getattr(curses, f"COLOR_{color.upper()}"), -1) | |
setattr( | |
CursesFormatter, | |
color, | |
property(fn(curses.color_pair(i))), | |
) | |
for i, color in enumerate(colors, 8): | |
curses.init_pair(i, i, -1) | |
setattr( | |
CursesFormatter, | |
f"{color}_bright", | |
property(fn(curses.color_pair(i))), | |
) | |
setattr(CursesFormatter, "bold", property(fn(curses.A_BOLD))) # noqa | |
setattr( # noqa | |
CursesFormatter, | |
"underline", | |
property(fn(curses.A_UNDERLINE)), | |
) | |
CursesFormatter.STYLE_DEFAULT = curses.A_NORMAL | curses.color_pair(0) | |
return CursesFormatter | |
def update(files, packages): | |
flags = [ | |
"--no-annotate", | |
"--quiet", | |
] | |
def _update(file): | |
def f(): | |
cmd = [ | |
"pip-compile", | |
*flags, | |
*[a for p in packages for a in ["--upgrade-package", p]], | |
file.replace(".txt", ".in"), | |
] | |
subprocess.run(cmd) | |
return f | |
threads = [threading.Thread(target=_update(file)) for file in files] | |
for t in threads: | |
t.start() | |
for t in threads: | |
t.join() | |
def check_synced(files: typing.List[str]): | |
try: | |
subprocess.check_output(["pip-sync", "-n", *files]) | |
except subprocess.CalledProcessError as e: | |
logging.debug(e) | |
logging.debug("Command output:\n" + e.output.decode()) | |
return False | |
return True | |
# outdated = json.load(open("data.json")) | |
__version__ = "0.1.0" | |
class Package: | |
def __init__(self, name, version, latest_version): | |
self.name = name | |
self.version = version | |
self.latest_version = latest_version | |
self._s_version = version.split(".") | |
self._s_latest_version = latest_version.split(".") | |
@property | |
def is_patch(self): | |
return True | |
@property | |
def is_minor(self): | |
return self._s_version[1] != self._s_latest_version[1] | |
@property | |
def is_major(self): | |
return self._s_version[0] != self._s_latest_version[0] | |
class Table: | |
def __init__(self, packages: typing.Sequence[Package], max_rows, fmt): | |
self.fmt = fmt | |
self.columns = ("", "name", "from", "", "to") | |
self.widths = {} | |
self.packages = packages | |
# self.index = {r["name"]: i for i, r in enumerate(rows)} | |
self._lines = None | |
self.widths = { | |
"name": max(len(p.name) for p in packages), | |
"from": max(len(p.version) for p in packages), | |
"to": max(len(p.latest_version) for p in packages), | |
} | |
self._header = [ | |
self.fmt(" ") | |
+ self.fmt("name").white_bright.underline.bold.ljust( | |
self.widths["name"] + 1 | |
) | |
+ self.fmt("from").white_bright.underline.bold.ljust( | |
self.widths["from"] + 1 | |
) | |
+ " " | |
+ self.fmt("to").white_bright.underline.bold.ljust(self.widths["to"] + 1) | |
] | |
@property | |
def _render_lines(self): | |
if self._lines is not None: | |
return self._lines | |
lines = {} | |
def get_style(package): | |
if package.is_major: | |
return "red" | |
if package.is_minor: | |
return "yellow" | |
if package.is_patch: | |
return "green" | |
for p in self.packages: | |
lines[p.name] = ( | |
getattr(self.fmt(p.name), get_style(p)).ljust(self.widths["name"] + 1) | |
+ self.fmt(p.version).blue.ljust(self.widths["from"] + 1) | |
+ "❯ " | |
+ self.fmt(p.latest_version).green.ljust(self.widths["to"] + 1) | |
) | |
self._lines = lines | |
return self._lines | |
def get_view(self, into_view_name, selected, max_rows): | |
assert max_rows > 0, "The screen is too small" | |
result = [] | |
packages = self.packages | |
if max_rows < len(packages): | |
raise NotImplementedError() | |
index = self.index[into_view_name] | |
center = max_rows // 2 | |
if index > center: | |
offset = index - center | |
packages = packages[offset:max_rows] | |
if max_rows > 1: | |
max_rows -= 1 | |
result = [*self._header] | |
for row in self.packages[:max_rows]: | |
name = row.name | |
result.append( | |
( | |
self.fmt("❯◉ " if name in selected else "❯◯ ").cyan | |
if name == into_view_name | |
else self.fmt(" ◉ " if name in selected else " ◯ ") | |
) | |
+ self._render_lines[name] | |
) | |
return result | |
class View: | |
def __init__(self, packages: typing.Sequence[Package]): | |
self.selected = set() | |
self.packages = packages | |
self.fmt = None | |
self.height = None | |
self.index = 0 | |
@property | |
def current(self): | |
return self.packages[self.index].name | |
def next_package(self): | |
if self.index < len(self.packages) - 1: | |
self.index += 1 | |
return True | |
def previous_package(self): | |
if self.index > 0: | |
self.index -= 1 | |
return True | |
def set_height(self, height): | |
self.table = Table(self.packages, height, self.fmt) | |
self.height = height | |
self.has_header = height > ( | |
len(self.packages) + len(self.table._header) + len(self._header) | |
) | |
@property | |
def _header(self): | |
header = [ | |
self.fmt(f"pip-up {__version__}").bold, | |
self.fmt("info").blue + " Color legend :", | |
self.fmt(' "') | |
+ self.fmt("<red>").red | |
+ '" : Major Update backward-incompatible updates', | |
self.fmt(' "') | |
+ self.fmt("<yellow>").yellow | |
+ '" : Minor Update backward-compatible features', | |
self.fmt(' "') | |
+ self.fmt("<green>").green | |
+ '" : Patch Update backward-compatible bug fixes', | |
self.fmt("?").green + self.fmt(" Choose which packages to update.").bold, | |
] | |
return header | |
if self.table.height + len(header) > self.height: | |
return header | |
return [] | |
def select_package(self): | |
if self.current in self.selected: | |
self.selected.remove(self.current) | |
else: | |
self.selected.add(self.current) | |
def render(self): | |
assert self.height is not None, "View height is not set." | |
return self._header + self.table.get_view( | |
self.current, self.selected, self.height | |
) | |
def render(view, stdscr): | |
# stdscr.clear() | |
view.fmt = init_colors() | |
curses.curs_set(0) | |
y, _ = stdscr.getmaxyx() | |
view.set_height(y) | |
# for i in range(0, 17): | |
# stdscr.addstr(str(i), curses.color_pair(i)) | |
# stdscr.clear() | |
for v in view.render(): | |
v.print(stdscr) | |
stdscr.refresh() | |
# stdscr.addch(view.current_line_number, 0, "❯") | |
while True: | |
ch = stdscr.getkey() | |
if ch in ("j", "k"): | |
move = view.next_package if ch == "j" else view.previous_package | |
if move(): | |
stdscr.clear() | |
for v in view.render(): | |
v.print(stdscr) | |
elif ch == " ": | |
view.select_package() | |
stdscr.clear() | |
for v in view.render(): | |
v.print(stdscr) | |
# stdscr.refresh() | |
elif ch == "\n": | |
return | |
def create_view(files: typing.List[str]): | |
if not check_synced(files): | |
logging.error( | |
"Installed packages are not in sync with requirements files. Run pip-sync first." | |
) | |
sys.exit(1) | |
outdated = json.loads( | |
subprocess.check_output(["pip", "list", "-o", "--format", "json"]) | |
) | |
packages = [ | |
Package( | |
name=p["name"], version=p["version"], latest_version=p["latest_version"] | |
) | |
for p in outdated[:50] | |
] | |
view = View(packages) | |
return view | |
def main(): | |
parser = argparse.ArgumentParser( | |
description="Interactive upgrade of pip-tools requirements files." | |
) | |
parser.add_argument( | |
"files", | |
metavar="SRC_FILE", | |
type=str, | |
default=[], | |
nargs="*", | |
help="requirements.txt files", | |
) | |
parser.add_argument( | |
"-v", "--verbose", action="store_true", help="enable verbose output" | |
) | |
args = parser.parse_args() | |
init_logging(level=logging.DEBUG if args.verbose else logging.ERROR) | |
try: | |
view = create_view(args.files) | |
curses.wrapper(lambda stdscr: render(view, stdscr)) | |
update(args.files, view.selected) | |
except KeyboardInterrupt: | |
pass | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment