Skip to content

Instantly share code, notes, and snippets.

@AdamGagorik
Created June 28, 2024 17:55
Show Gist options
  • Save AdamGagorik/e66782f46cc83aed654ea1673833f86b to your computer and use it in GitHub Desktop.
Save AdamGagorik/e66782f46cc83aed654ea1673833f86b to your computer and use it in GitHub Desktop.
A script to memorize common directories
#!/usr/bin/env python3
"""
Memorize key directories and cd to them.
examples:
cd $(harpoon)
cd $(harpoon key)
harpoon add key path
"""
import argparse
import contextlib
import dataclasses
import logging
import os
import shelve
import sys
from dataclasses import dataclass
from pathlib import Path
from subprocess import PIPE, CalledProcessError, Popen, run
from typing import Any, Generator
DATABASE = Path(__file__).parent.joinpath("harpoon.db")
@contextlib.contextmanager
def context() -> Generator[shelve.Shelf, None, None]:
try:
with shelve.open(str(DATABASE), writeback=True) as db:
yield db
finally:
pass
def yes(msg: str, default: str = "y") -> bool:
return (input(f"{msg} (yes/no) [default={default}]: ") or default).strip().lower() in {"y", "yes"}
def initialize_db():
with context():
pass
def insert_new_item(key: str, path: Path, file: bool):
path = path.resolve()
if not path.exists():
raise FileNotFoundError(path)
if not file and not path.is_dir():
raise NotADirectoryError(path)
with context() as db:
if key in db:
logging.warning("key already exists! %s", key)
logging.info("add %s", key)
db[key] = str(path)
def delete_existing_item(key: str, force: bool):
with context() as db:
if key in db:
if force or yes(f"Delete {key}?"):
logging.info("del %s", key)
del db[key]
else:
logging.error("operation canceled")
else:
logging.warning("key does not exist! %s", key)
def delete_missing_items(force: bool) -> None:
with context() as db:
for key, path in db.items():
if not Path(path).exists():
if force or yes(f"Delete {key}?"):
logging.info("del %s", key)
del db[key]
else:
logging.error("operation canceled")
def display_existing_item(key: str):
with context() as db:
if key in db:
print(db[key])
else:
logging.error("key does not exist! %s", key)
print("NOT_FOUND")
def show_all_items_in_storage():
with context() as db:
size = len(db)
if size == 0:
print(f"total: {size}")
else:
print("\n".join(make_databse_table()) + f" total: {size}")
def get_database_info() -> Generator[dict[str, Any], None, None]:
with context() as db:
for key, path in db.items():
path = Path(path)
yield {
"key": key,
"path": str(path),
"exists": "x" if path.exists() else "",
"is_dir": "x" if path.is_dir() else "",
"is_file": "x" if path.is_file() else "",
}
def make_databse_table() -> Generator[str, None, None]:
widths = {}
for row in get_database_info():
for col, value in row.items():
widths[col] = max(widths.get(col, len(col)), len(str(value)))
yield "| {} |".format(" | ".join(f"{k:<{w}}" for k, w in widths.items()))
yield "|{}|".format("|".join("-" * (w + 2) for k, w in widths.items()))
for row in get_database_info():
yield "| {} |".format(" | ".join(f"{value:<{widths[col]}}" for col, value in row.items()))
def remove_database(force: bool):
with context() as db:
count = len(db)
if force or yes(f"Delete {count} items?"):
for ext in {"bak", "dat", "dir"}:
if (path := DATABASE.with_suffix(f".db.{ext}")).exists():
logging.info("rm %s", path)
os.remove(path)
else:
logging.error("operation canceled")
@dataclass
class Selector:
items: tuple[Any, ...] = ()
headers: tuple[str, ...] = ()
delimiter: str = "\u00a0"
select: int | None = None
match: int | None = None
def __call__(self) -> str:
command = [
"fzf",
"--height=~100%",
"--delimiter",
self.delimiter,
*(("--nth", f"{self.match + 1}") if self.match is not None else ()),
"--header-lines",
"1",
]
inputs = b"\n".join(self.inputs)
process = Popen(command, stdin=PIPE, stdout=PIPE)
stdout, stderr = process.communicate(input=inputs)
if process.returncode != 0:
raise CalledProcessError(process.returncode, " ".join(command), stdout, stderr)
if self.select is not None:
return [token.strip() for token in stdout.decode("utf-8").strip().split(self.delimiter)][self.select]
else:
return stdout.decode("utf-8").strip()
@property
def inputs(self) -> Generator[bytes, None, None]:
widths = {}
for i, item in enumerate(self.items):
for j, value in enumerate(item):
widths[j] = max(widths.get(j, len(self.headers[j])), len(str(value)))
yield self.delimiter.join(f"{header:<{widths[i]}}" for i, header in enumerate(self.headers)).encode("utf-8")
for item in self.items:
yield self.delimiter.join(f"{value:<{widths[i]}}" for i, value in enumerate(item)).encode("utf-8")
def select_from_table(col: int = 0) -> str:
with context() as db:
if db:
return Selector(items=tuple(db.items()), headers=("KEY", "PATH"), match=0, select=col)()
else:
raise RuntimeError("no key to select, database is empty!")
def select_path_with_fzf(file: bool = False) -> Path:
if file:
p0 = Popen(["fd", "--type", "f"], stdout=PIPE, stderr=PIPE)
else:
p0 = Popen(["fd", "--type", "d"], stdout=PIPE, stderr=PIPE)
p1 = Popen(["fzf"], stdin=p0.stdout, stdout=PIPE, stderr=PIPE)
stdout, stderr = p1.communicate()
for p in {p1}:
if p.returncode != 0:
raise CalledProcessError(p.returncode, " ".join(map(str, p.args)))
path = Path(stdout.decode("utf-8").strip())
if not path.exists():
raise FileNotFoundError(path)
return path
def main():
logging.basicConfig(level=logging.INFO, stream=sys.stderr, format="%(levelname)-7s | %(message)s")
parent = argparse.ArgumentParser(add_help=False)
parent.add_argument("action", nargs="?")
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
subparsers = parser.add_subparsers(dest="action")
parser_lut = {
"add": subparsers.add_parser("add", help="add a new row to the storage"),
"del": subparsers.add_parser("del", help="remove the row at the given key"),
"show": subparsers.add_parser("show", help="show the path given the key"),
"list": subparsers.add_parser("list", help="list all items in the database"),
"clear": subparsers.add_parser("clear", help="clear all items from the database"),
"prune": subparsers.add_parser("prune", help="clear items that do not exist from the database"),
}
for k in {"add", "del", "show"}:
kwargs = dict(help="The associated key of a path")
if k not in {"add"}:
kwargs.update(nargs="?", default=None)
parser_lut[k].add_argument("key", **kwargs)
for k in {"add"}:
parser_lut[k].add_argument("path", type=Path, nargs="?", default=None, help="The path to memorize")
parser_lut[k].add_argument("--file", action="store_true", help="allow files to be memorized")
for k in {"del", "prune", "clear"}:
parser_lut[k].add_argument("--yes", action="store_true", help="do not prompt for confirmations?")
opts, remaining = parent.parse_known_args()
if "--help" in sys.argv or "-h" in sys.argv:
parser.parse_args()
return 0
elif opts.action is None:
print(select_from_table(col=1))
return 0
elif opts.action in parser_lut:
opts = parser.parse_args()
else:
opts = parser.parse_args(args=("show", opts.action, *remaining))
if opts.action == "add":
opts.path = opts.path if opts.path is not None else select_path_with_fzf(file=opts.file)
insert_new_item(key=opts.key, path=opts.path, file=opts.file)
elif opts.action == "del":
opts.key = opts.key if opts.key is not None else select_from_table(col=0)
delete_existing_item(key=opts.key, force=opts.yes)
elif opts.action == "show":
opts.key = opts.key if opts.key is not None else select_from_table(col=0)
display_existing_item(opts.key)
elif opts.action == "list":
show_all_items_in_storage()
elif opts.action == "prune":
delete_missing_items(force=opts.yes)
elif opts.action == "clear":
remove_database(force=opts.yes)
else:
logging.error("unknown action: %s", opts.action)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment