Skip to content

Instantly share code, notes, and snippets.

@kalekseev
Last active February 16, 2021 22:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kalekseev/5199309df80cbee3e326ea2dd90e334b to your computer and use it in GitHub Desktop.
Save kalekseev/5199309df80cbee3e326ea2dd90e334b to your computer and use it in GitHub Desktop.
pip-tools upgrade-interactive POC
#!/usr/bin/env python3
import argparse
import curses
import json
import logging
import subprocess
import sys
import threading
import typing
def init_logging(level):
class Formatter(logging.Formatter):
def formatMessage(self, record):
s = super().formatMessage(record)
if record.levelno >= logging.ERROR:
return f"\x1b[31;21m{s}\x1b[0m"
return s
log_stream = logging.StreamHandler()
log_stream.setFormatter(Formatter(fmt="%(levelname)s: %(message)s"))
logging.basicConfig(handlers=[log_stream], level=level)
class CursesFormatter:
STYLE_DEFAULT = None
def __init__(self, text):
self.text = text
self.others = []
self.style = None
def __add__(self, other):
self.others.append(other)
return self
def print(self, stdscr, new_line=True):
stdscr.addstr(self.text, self.style or self.STYLE_DEFAULT)
for other in self.others:
if isinstance(other, str):
other = CursesFormatter(other)
other.print(stdscr, new_line=False)
if new_line:
stdscr.addstr("\n")
def ljust(self, num):
num -= len(self.text)
if num > 0:
return self + " " * num
def init_colors():
curses.start_color()
curses.use_default_colors()
colors = [
"red",
"green",
"yellow",
"blue",
"magenta",
"cyan",
"white",
"black",
]
def fn(style):
def f(self):
self.style = self.style | style if self.style else style
return self
return f
for i, color in enumerate(colors, 1):
curses.init_pair(i, getattr(curses, f"COLOR_{color.upper()}"), -1)
setattr(
CursesFormatter,
color,
property(fn(curses.color_pair(i))),
)
for i, color in enumerate(colors, 8):
curses.init_pair(i, i, -1)
setattr(
CursesFormatter,
f"{color}_bright",
property(fn(curses.color_pair(i))),
)
setattr(CursesFormatter, "bold", property(fn(curses.A_BOLD))) # noqa
setattr( # noqa
CursesFormatter,
"underline",
property(fn(curses.A_UNDERLINE)),
)
CursesFormatter.STYLE_DEFAULT = curses.A_NORMAL | curses.color_pair(0)
return CursesFormatter
def update(files, packages):
flags = [
"--no-annotate",
"--quiet",
]
def _update(file):
def f():
cmd = [
"pip-compile",
*flags,
*[a for p in packages for a in ["--upgrade-package", p]],
file.replace(".txt", ".in"),
]
subprocess.run(cmd)
return f
threads = [threading.Thread(target=_update(file)) for file in files]
for t in threads:
t.start()
for t in threads:
t.join()
def check_synced(files: typing.List[str]):
try:
subprocess.check_output(["pip-sync", "-n", *files])
except subprocess.CalledProcessError as e:
logging.debug(e)
logging.debug("Command output:\n" + e.output.decode())
return False
return True
# outdated = json.load(open("data.json"))
__version__ = "0.1.0"
class Package:
def __init__(self, name, version, latest_version):
self.name = name
self.version = version
self.latest_version = latest_version
self._s_version = version.split(".")
self._s_latest_version = latest_version.split(".")
@property
def is_patch(self):
return True
@property
def is_minor(self):
return self._s_version[1] != self._s_latest_version[1]
@property
def is_major(self):
return self._s_version[0] != self._s_latest_version[0]
class Table:
def __init__(self, packages: typing.Sequence[Package], max_rows, fmt):
self.fmt = fmt
self.columns = ("", "name", "from", "", "to")
self.widths = {}
self.packages = packages
# self.index = {r["name"]: i for i, r in enumerate(rows)}
self._lines = None
self.widths = {
"name": max(len(p.name) for p in packages),
"from": max(len(p.version) for p in packages),
"to": max(len(p.latest_version) for p in packages),
}
self._header = [
self.fmt(" ")
+ self.fmt("name").white_bright.underline.bold.ljust(
self.widths["name"] + 1
)
+ self.fmt("from").white_bright.underline.bold.ljust(
self.widths["from"] + 1
)
+ " "
+ self.fmt("to").white_bright.underline.bold.ljust(self.widths["to"] + 1)
]
@property
def _render_lines(self):
if self._lines is not None:
return self._lines
lines = {}
def get_style(package):
if package.is_major:
return "red"
if package.is_minor:
return "yellow"
if package.is_patch:
return "green"
for p in self.packages:
lines[p.name] = (
getattr(self.fmt(p.name), get_style(p)).ljust(self.widths["name"] + 1)
+ self.fmt(p.version).blue.ljust(self.widths["from"] + 1)
+ "❯ "
+ self.fmt(p.latest_version).green.ljust(self.widths["to"] + 1)
)
self._lines = lines
return self._lines
def get_view(self, into_view_name, selected, max_rows):
assert max_rows > 0, "The screen is too small"
result = []
packages = self.packages
if max_rows < len(packages):
raise NotImplementedError()
index = self.index[into_view_name]
center = max_rows // 2
if index > center:
offset = index - center
packages = packages[offset:max_rows]
if max_rows > 1:
max_rows -= 1
result = [*self._header]
for row in self.packages[:max_rows]:
name = row.name
result.append(
(
self.fmt("❯◉ " if name in selected else "❯◯ ").cyan
if name == into_view_name
else self.fmt(" ◉ " if name in selected else " ◯ ")
)
+ self._render_lines[name]
)
return result
class View:
def __init__(self, packages: typing.Sequence[Package]):
self.selected = set()
self.packages = packages
self.fmt = None
self.height = None
self.index = 0
@property
def current(self):
return self.packages[self.index].name
def next_package(self):
if self.index < len(self.packages) - 1:
self.index += 1
return True
def previous_package(self):
if self.index > 0:
self.index -= 1
return True
def set_height(self, height):
self.table = Table(self.packages, height, self.fmt)
self.height = height
self.has_header = height > (
len(self.packages) + len(self.table._header) + len(self._header)
)
@property
def _header(self):
header = [
self.fmt(f"pip-up {__version__}").bold,
self.fmt("info").blue + " Color legend :",
self.fmt(' "')
+ self.fmt("<red>").red
+ '" : Major Update backward-incompatible updates',
self.fmt(' "')
+ self.fmt("<yellow>").yellow
+ '" : Minor Update backward-compatible features',
self.fmt(' "')
+ self.fmt("<green>").green
+ '" : Patch Update backward-compatible bug fixes',
self.fmt("?").green + self.fmt(" Choose which packages to update.").bold,
]
return header
if self.table.height + len(header) > self.height:
return header
return []
def select_package(self):
if self.current in self.selected:
self.selected.remove(self.current)
else:
self.selected.add(self.current)
def render(self):
assert self.height is not None, "View height is not set."
return self._header + self.table.get_view(
self.current, self.selected, self.height
)
def render(view, stdscr):
# stdscr.clear()
view.fmt = init_colors()
curses.curs_set(0)
y, _ = stdscr.getmaxyx()
view.set_height(y)
# for i in range(0, 17):
# stdscr.addstr(str(i), curses.color_pair(i))
# stdscr.clear()
for v in view.render():
v.print(stdscr)
stdscr.refresh()
# stdscr.addch(view.current_line_number, 0, "❯")
while True:
ch = stdscr.getkey()
if ch in ("j", "k"):
move = view.next_package if ch == "j" else view.previous_package
if move():
stdscr.clear()
for v in view.render():
v.print(stdscr)
elif ch == " ":
view.select_package()
stdscr.clear()
for v in view.render():
v.print(stdscr)
# stdscr.refresh()
elif ch == "\n":
return
def create_view(files: typing.List[str]):
if not check_synced(files):
logging.error(
"Installed packages are not in sync with requirements files. Run pip-sync first."
)
sys.exit(1)
outdated = json.loads(
subprocess.check_output(["pip", "list", "-o", "--format", "json"])
)
packages = [
Package(
name=p["name"], version=p["version"], latest_version=p["latest_version"]
)
for p in outdated[:50]
]
view = View(packages)
return view
def main():
parser = argparse.ArgumentParser(
description="Interactive upgrade of pip-tools requirements files."
)
parser.add_argument(
"files",
metavar="SRC_FILE",
type=str,
default=[],
nargs="*",
help="requirements.txt files",
)
parser.add_argument(
"-v", "--verbose", action="store_true", help="enable verbose output"
)
args = parser.parse_args()
init_logging(level=logging.DEBUG if args.verbose else logging.ERROR)
try:
view = create_view(args.files)
curses.wrapper(lambda stdscr: render(view, stdscr))
update(args.files, view.selected)
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment