Skip to content

Instantly share code, notes, and snippets.

@hhsprings
Last active April 26, 2021 11:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hhsprings/7fd0e2f3b0a3f27a03ab61cfd3c81cc6 to your computer and use it in GitHub Desktop.
Save hhsprings/7fd0e2f3b0a3f27a03ab61cfd3c81cc6 to your computer and use it in GitHub Desktop.
An example for ctypes.windll.kernel32.GetCompressedFileSizeW, ctypes.windll.kernel32.GetDiskFreeSpaceExW and curses
#! py -3
# -*- coding: utf-8 -*-
import sys
import os
import time
import ctypes
import functools
from glob import glob
from fnmatch import fnmatch
from datetime import datetime
from collections import namedtuple
import curses
def _stat(fn):
try:
return os.stat(fn)
except Exception:
# lost file, maybe.
return namedtuple(
'dummy_stat',
['st_size', 'st_mtime']
)(-1, -1)
def _getsizes(fn):
hi = ctypes.c_ulong()
lo = ctypes.windll.kernel32.GetCompressedFileSizeW(fn, ctypes.byref(hi))
return _stat(fn).st_size, ctypes.c_ulong(lo).value + (hi.value << 32)
def _is_ntfs_compress_target(fn):
# FILE_ATTRIBUTE_ARCHIVE = 0x20
# FILE_ATTRIBUTE_COMPRESSED = 0x800
# FILE_ATTRIBUTE_DEVICE = 0x40
# FILE_ATTRIBUTE_DIRECTORY = 0x10
# FILE_ATTRIBUTE_ENCRYPTED = 0x4000
# FILE_ATTRIBUTE_HIDDEN = 0x2
# FILE_ATTRIBUTE_INTEGRITY_STREAM = 0x8000
# FILE_ATTRIBUTE_NORMAL = 0x80
# FILE_ATTRIBUTE_NOT_CONTENT_INDEXED = 0x2000
# FILE_ATTRIBUTE_NO_SCRUB_DATA = 0x20000
# FILE_ATTRIBUTE_OFFLINE = 0x1000
# FILE_ATTRIBUTE_READONLY = 0x1
# FILE_ATTRIBUTE_RECALL_ON_DATA_ACCESS = 0x400000
# FILE_ATTRIBUTE_RECALL_ON_OPEN = 0x40000
# FILE_ATTRIBUTE_REPARSE_POINT = 0x400
# FILE_ATTRIBUTE_SPARSE_FILE = 0x200
# FILE_ATTRIBUTE_SYSTEM = 0x4
# FILE_ATTRIBUTE_TEMPORARY = 0x100
f = ctypes.windll.kernel32.GetFileAttributesW
return f(fn) & 0x800
def _diskusage_free(p):
f = ctypes.windll.kernel32.GetDiskFreeSpaceExW
fba = ctypes.c_ulonglong()
tit = ctypes.c_ulonglong()
fre = ctypes.c_ulonglong()
f(p, ctypes.byref(fba), ctypes.byref(tit), ctypes.byref(fre))
return ("Free: {:.2f}".format(fre.value / 1024**3) + " G").rjust(18)
def _files(dir, pats=["*"], filtfun=lambda fn: fn):
allfiles = []
for pat in pats:
allfiles.extend(list(glob(os.path.join(dir, pat))))
files = list(
filter(
lambda fn: os.path.isfile(fn) and filtfun(fn),
allfiles))
return files
def _my_special_curses_wrapper(func, *args, **kwds): # wrapper(func, /, *args, **kwds):
def _wrap(stdscr, *args, **kwds):
stdscr.timeout(0)
curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_GREEN)
curses.init_pair(3, curses.COLOR_WHITE, curses.COLOR_BLUE)
curses.init_pair(4, curses.COLOR_WHITE, curses.COLOR_CYAN)
curses.init_pair(5, curses.COLOR_BLACK, curses.COLOR_YELLOW)
return func(stdscr, *args, **kwds)
return curses.wrapper(_wrap, *args, **kwds)
def _addstr(stdscr, row, col, s, maxwidth):
tmp = [] # [(cw, ch)]
for ch in s:
cw = 2 if len(ch.encode("utf-8")) > 1 else 1
tmp.append((cw, ch))
def _judge(cur):
return sum([cw for cw, _ in cur]) > ((maxwidth - 1) - col) - 1
if _judge(tmp):
r = 0
while _judge(tmp):
r = len(tmp) // 2
tmp.pop(r)
tmp.insert(r, (2, ".."))
for cw, ch in tmp:
stdscr.addstr(row, col, ch)
col += cw
class _TargetDirInfo(object):
def __init__(self, dir, pats=["*"], filtfun=lambda fn: fn):
self._dir = dir
self._files_getfun = functools.partial(_files, dir, pats, filtfun)
self.files = self._files_getfun()
self.sz = self.sz_ini = {fn: _getsizes(fn) for fn in self.files}
self.dt = self.dt_ini = datetime.now()
self.duf = self.duf_ini = _diskusage_free(self._dir)
def update(self):
if (datetime.now() - self.dt).total_seconds() > 30:
self.files = self._files_getfun()
self.sz = {fn: _getsizes(fn) for fn in self.files}
self.dt = datetime.now()
self.duf = _diskusage_free(self._dir)
def _main(args, stdscr):
if args.pattern:
patterns = args.pattern
else:
patterns = ["*"]
if args.exclude_pattern:
ffun = lambda fn: all(not fnmatch(fn, pat) for pat in args.exclude_pattern)
else:
ffun = lambda fn: fn
#
targ = _TargetDirInfo(args.dir, patterns, ffun)
height, width = stdscr.getmaxyx()
pagesize = min(height - 4, len(targ.files))
sl = slice(0, pagesize)
#
_nones = (float('nan'), float('nan'))
_sortfuns = [
lambda fn: (fn, targ.sz.get(fn, _nones)[0]),
lambda fn: (_stat(fn).st_mtime, fn),
lambda fn: (-_stat(fn).st_mtime, fn),
lambda fn: (targ.sz_ini.get(fn, _nones)[0], fn),
lambda fn: (-targ.sz_ini.get(fn, _nones)[0], fn),
lambda fn: (targ.sz.get(fn, _nones)[0], fn),
lambda fn: (-targ.sz.get(fn, _nones)[0], fn),
]
_sortfuns_idx = 0
#
while True:
kc = stdscr.getch()
if kc in (ord('q'), ord('Q')):
break
elif kc in (ord(' '), curses.KEY_NPAGE) or kc == curses.KEY_DOWN:
adv = pagesize if kc in (ord(' '), curses.KEY_NPAGE) else 1
sl = slice(
min(sl.start + adv, len(targ.files)),
min(len(targ.files), sl.start + pagesize + adv))
if sl.start == sl.stop:
sl = slice(0, pagesize)
elif kc in (ord('b'), curses.KEY_PPAGE) or kc == curses.KEY_UP:
dst = pagesize if kc in (ord('b'), curses.KEY_PPAGE) else 1
if sl.start > 0:
st = max(0, sl.start - dst)
sl = slice(st, st + pagesize)
else:
sl = slice(len(targ.files) - pagesize, len(targ.files))
elif kc == ord("s"):
_sortfuns_idx = (_sortfuns_idx + 1) % len(_sortfuns)
stdscr.clear()
targ.update()
#
col = 0
_addstr(stdscr, 0, col, args.dir, width)
stdscr.addstr(1, 0, targ.dt_ini.strftime("%m-%d %H:%M:%S").rjust(18))
stdscr.addstr(1, 18, targ.dt.strftime("%m-%d %H:%M:%S").rjust(18))
stdscr.addstr(2, 0, targ.duf_ini)
stdscr.addstr(2, 18, targ.duf)
for i, fn in enumerate(
list(sorted(
targ.files,
key=_sortfuns[_sortfuns_idx]))[sl]):
v_ini = (-1, -1)
ren = False
if fn in targ.sz_ini:
v_ini = targ.sz_ini[fn]
else:
bn, _ = os.path.splitext(fn)
for orig in targ.sz_ini.keys():
if os.path.splitext(orig)[0] == bn:
v_ini = targ.sz_ini[orig]
ren = True
break
v_now = targ.sz.get(fn, (-1, -1))
ca = _is_ntfs_compress_target(fn)
bn = os.path.basename(fn)
stdscr.addstr(i + 3, 0, "{:3,d}".format(v_ini[0]).rjust(18),
curses.color_pair(5 if ren else 1))
stdscr.addstr(i + 3, 18, "{:3,d}".format(v_now[0]).rjust(18),
curses.color_pair(1 if v_ini[0] != v_now[0] else 2))
csclr = 4 if ca else 1
if not (v_now[0] == v_now[1] or v_now[1] < 0):
csclr = 3
stdscr.addstr(i + 3, 36, "{:3,d}".format(v_now[1]).rjust(18),
curses.color_pair(csclr))
col = 56
_addstr(stdscr, i + 3, col, bn, width)
if sl.stop < len(targ.files):
stdscr.addstr(height - 1, 0, "-- more --")
stdscr.refresh()
time.sleep(0.5)
if __name__ == '__main__':
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("dir")
ap.add_argument("--pattern", action="append")
ap.add_argument("--exclude_pattern", action="append")
args = ap.parse_args()
_my_special_curses_wrapper(functools.partial(_main, args))
@hhsprings
Copy link
Author

hhsprings commented Apr 20, 2021

rev1時点で「ctypes.windll.kernel32.GetCompressedFileSizeW」を使っているほう、圧縮後サイズが大きい場合(?)の値がおかしい。負として表示されてしまう結果になっている。原因究明&措置検討中...。あと、「<<16」が正しいのか自信がなくなってきてる。「<<32」では?

@hhsprings
Copy link
Author

rev1時点で「ctypes.windll.kernel32.GetCompressedFileSizeW」を使っているほう、圧縮後サイズが大きい場合(?)の値がおかしい。負として表示されてしまう結果になっている。原因究明&措置検討中...。あと、「<<16」が正しいのか自信がなくなってきてる。「<<32」では?

loword のほうは DWORD、つまり unsigned long のつもりの戻りなのに signed long として解釈していたということ、であれば、入れた措置は正しいのであろう。現実のファイル相手にした結果も問題なさそうにみえる。「<<16」だった部分は「<<32」とひとまずしておいたが、まだ問題となる現実のファイルを相手にしてなくて、確信が持ててない。シフトだけでマスクしなくて大丈夫だったっけ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment