Last active
April 26, 2021 11:15
-
-
Save hhsprings/7fd0e2f3b0a3f27a03ab61cfd3c81cc6 to your computer and use it in GitHub Desktop.
An example for ctypes.windll.kernel32.GetCompressedFileSizeW, ctypes.windll.kernel32.GetDiskFreeSpaceExW and curses
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! py -3 | |
# -*- coding: utf-8 -*- | |
import sys | |
import os | |
import time | |
import ctypes | |
import functools | |
from glob import glob | |
from fnmatch import fnmatch | |
from datetime import datetime | |
from collections import namedtuple | |
import curses | |
def _stat(fn): | |
try: | |
return os.stat(fn) | |
except Exception: | |
# lost file, maybe. | |
return namedtuple( | |
'dummy_stat', | |
['st_size', 'st_mtime'] | |
)(-1, -1) | |
def _getsizes(fn): | |
hi = ctypes.c_ulong() | |
lo = ctypes.windll.kernel32.GetCompressedFileSizeW(fn, ctypes.byref(hi)) | |
return _stat(fn).st_size, ctypes.c_ulong(lo).value + (hi.value << 32) | |
def _is_ntfs_compress_target(fn): | |
# FILE_ATTRIBUTE_ARCHIVE = 0x20 | |
# FILE_ATTRIBUTE_COMPRESSED = 0x800 | |
# FILE_ATTRIBUTE_DEVICE = 0x40 | |
# FILE_ATTRIBUTE_DIRECTORY = 0x10 | |
# FILE_ATTRIBUTE_ENCRYPTED = 0x4000 | |
# FILE_ATTRIBUTE_HIDDEN = 0x2 | |
# FILE_ATTRIBUTE_INTEGRITY_STREAM = 0x8000 | |
# FILE_ATTRIBUTE_NORMAL = 0x80 | |
# FILE_ATTRIBUTE_NOT_CONTENT_INDEXED = 0x2000 | |
# FILE_ATTRIBUTE_NO_SCRUB_DATA = 0x20000 | |
# FILE_ATTRIBUTE_OFFLINE = 0x1000 | |
# FILE_ATTRIBUTE_READONLY = 0x1 | |
# FILE_ATTRIBUTE_RECALL_ON_DATA_ACCESS = 0x400000 | |
# FILE_ATTRIBUTE_RECALL_ON_OPEN = 0x40000 | |
# FILE_ATTRIBUTE_REPARSE_POINT = 0x400 | |
# FILE_ATTRIBUTE_SPARSE_FILE = 0x200 | |
# FILE_ATTRIBUTE_SYSTEM = 0x4 | |
# FILE_ATTRIBUTE_TEMPORARY = 0x100 | |
f = ctypes.windll.kernel32.GetFileAttributesW | |
return f(fn) & 0x800 | |
def _diskusage_free(p): | |
f = ctypes.windll.kernel32.GetDiskFreeSpaceExW | |
fba = ctypes.c_ulonglong() | |
tit = ctypes.c_ulonglong() | |
fre = ctypes.c_ulonglong() | |
f(p, ctypes.byref(fba), ctypes.byref(tit), ctypes.byref(fre)) | |
return ("Free: {:.2f}".format(fre.value / 1024**3) + " G").rjust(18) | |
def _files(dir, pats=["*"], filtfun=lambda fn: fn): | |
allfiles = [] | |
for pat in pats: | |
allfiles.extend(list(glob(os.path.join(dir, pat)))) | |
files = list( | |
filter( | |
lambda fn: os.path.isfile(fn) and filtfun(fn), | |
allfiles)) | |
return files | |
def _my_special_curses_wrapper(func, *args, **kwds): # wrapper(func, /, *args, **kwds): | |
def _wrap(stdscr, *args, **kwds): | |
stdscr.timeout(0) | |
curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_GREEN) | |
curses.init_pair(3, curses.COLOR_WHITE, curses.COLOR_BLUE) | |
curses.init_pair(4, curses.COLOR_WHITE, curses.COLOR_CYAN) | |
curses.init_pair(5, curses.COLOR_BLACK, curses.COLOR_YELLOW) | |
return func(stdscr, *args, **kwds) | |
return curses.wrapper(_wrap, *args, **kwds) | |
def _addstr(stdscr, row, col, s, maxwidth): | |
tmp = [] # [(cw, ch)] | |
for ch in s: | |
cw = 2 if len(ch.encode("utf-8")) > 1 else 1 | |
tmp.append((cw, ch)) | |
def _judge(cur): | |
return sum([cw for cw, _ in cur]) > ((maxwidth - 1) - col) - 1 | |
if _judge(tmp): | |
r = 0 | |
while _judge(tmp): | |
r = len(tmp) // 2 | |
tmp.pop(r) | |
tmp.insert(r, (2, "..")) | |
for cw, ch in tmp: | |
stdscr.addstr(row, col, ch) | |
col += cw | |
class _TargetDirInfo(object): | |
def __init__(self, dir, pats=["*"], filtfun=lambda fn: fn): | |
self._dir = dir | |
self._files_getfun = functools.partial(_files, dir, pats, filtfun) | |
self.files = self._files_getfun() | |
self.sz = self.sz_ini = {fn: _getsizes(fn) for fn in self.files} | |
self.dt = self.dt_ini = datetime.now() | |
self.duf = self.duf_ini = _diskusage_free(self._dir) | |
def update(self): | |
if (datetime.now() - self.dt).total_seconds() > 30: | |
self.files = self._files_getfun() | |
self.sz = {fn: _getsizes(fn) for fn in self.files} | |
self.dt = datetime.now() | |
self.duf = _diskusage_free(self._dir) | |
def _main(args, stdscr): | |
if args.pattern: | |
patterns = args.pattern | |
else: | |
patterns = ["*"] | |
if args.exclude_pattern: | |
ffun = lambda fn: all(not fnmatch(fn, pat) for pat in args.exclude_pattern) | |
else: | |
ffun = lambda fn: fn | |
# | |
targ = _TargetDirInfo(args.dir, patterns, ffun) | |
height, width = stdscr.getmaxyx() | |
pagesize = min(height - 4, len(targ.files)) | |
sl = slice(0, pagesize) | |
# | |
_nones = (float('nan'), float('nan')) | |
_sortfuns = [ | |
lambda fn: (fn, targ.sz.get(fn, _nones)[0]), | |
lambda fn: (_stat(fn).st_mtime, fn), | |
lambda fn: (-_stat(fn).st_mtime, fn), | |
lambda fn: (targ.sz_ini.get(fn, _nones)[0], fn), | |
lambda fn: (-targ.sz_ini.get(fn, _nones)[0], fn), | |
lambda fn: (targ.sz.get(fn, _nones)[0], fn), | |
lambda fn: (-targ.sz.get(fn, _nones)[0], fn), | |
] | |
_sortfuns_idx = 0 | |
# | |
while True: | |
kc = stdscr.getch() | |
if kc in (ord('q'), ord('Q')): | |
break | |
elif kc in (ord(' '), curses.KEY_NPAGE) or kc == curses.KEY_DOWN: | |
adv = pagesize if kc in (ord(' '), curses.KEY_NPAGE) else 1 | |
sl = slice( | |
min(sl.start + adv, len(targ.files)), | |
min(len(targ.files), sl.start + pagesize + adv)) | |
if sl.start == sl.stop: | |
sl = slice(0, pagesize) | |
elif kc in (ord('b'), curses.KEY_PPAGE) or kc == curses.KEY_UP: | |
dst = pagesize if kc in (ord('b'), curses.KEY_PPAGE) else 1 | |
if sl.start > 0: | |
st = max(0, sl.start - dst) | |
sl = slice(st, st + pagesize) | |
else: | |
sl = slice(len(targ.files) - pagesize, len(targ.files)) | |
elif kc == ord("s"): | |
_sortfuns_idx = (_sortfuns_idx + 1) % len(_sortfuns) | |
stdscr.clear() | |
targ.update() | |
# | |
col = 0 | |
_addstr(stdscr, 0, col, args.dir, width) | |
stdscr.addstr(1, 0, targ.dt_ini.strftime("%m-%d %H:%M:%S").rjust(18)) | |
stdscr.addstr(1, 18, targ.dt.strftime("%m-%d %H:%M:%S").rjust(18)) | |
stdscr.addstr(2, 0, targ.duf_ini) | |
stdscr.addstr(2, 18, targ.duf) | |
for i, fn in enumerate( | |
list(sorted( | |
targ.files, | |
key=_sortfuns[_sortfuns_idx]))[sl]): | |
v_ini = (-1, -1) | |
ren = False | |
if fn in targ.sz_ini: | |
v_ini = targ.sz_ini[fn] | |
else: | |
bn, _ = os.path.splitext(fn) | |
for orig in targ.sz_ini.keys(): | |
if os.path.splitext(orig)[0] == bn: | |
v_ini = targ.sz_ini[orig] | |
ren = True | |
break | |
v_now = targ.sz.get(fn, (-1, -1)) | |
ca = _is_ntfs_compress_target(fn) | |
bn = os.path.basename(fn) | |
stdscr.addstr(i + 3, 0, "{:3,d}".format(v_ini[0]).rjust(18), | |
curses.color_pair(5 if ren else 1)) | |
stdscr.addstr(i + 3, 18, "{:3,d}".format(v_now[0]).rjust(18), | |
curses.color_pair(1 if v_ini[0] != v_now[0] else 2)) | |
csclr = 4 if ca else 1 | |
if not (v_now[0] == v_now[1] or v_now[1] < 0): | |
csclr = 3 | |
stdscr.addstr(i + 3, 36, "{:3,d}".format(v_now[1]).rjust(18), | |
curses.color_pair(csclr)) | |
col = 56 | |
_addstr(stdscr, i + 3, col, bn, width) | |
if sl.stop < len(targ.files): | |
stdscr.addstr(height - 1, 0, "-- more --") | |
stdscr.refresh() | |
time.sleep(0.5) | |
if __name__ == '__main__': | |
import argparse | |
ap = argparse.ArgumentParser() | |
ap.add_argument("dir") | |
ap.add_argument("--pattern", action="append") | |
ap.add_argument("--exclude_pattern", action="append") | |
args = ap.parse_args() | |
_my_special_curses_wrapper(functools.partial(_main, args)) |
rev1時点で「ctypes.windll.kernel32.GetCompressedFileSizeW」を使っているほう、圧縮後サイズが大きい場合(?)の値がおかしい。負として表示されてしまう結果になっている。原因究明&措置検討中...。あと、「<<16」が正しいのか自信がなくなってきてる。「<<32」では?
loword のほうは DWORD、つまり unsigned long のつもりの戻りなのに signed long として解釈していたということ、であれば、入れた措置は正しいのであろう。現実のファイル相手にした結果も問題なさそうにみえる。「<<16」だった部分は「<<32」とひとまずしておいたが、まだ問題となる現実のファイルを相手にしてなくて、確信が持ててない。シフトだけでマスクしなくて大丈夫だったっけ?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
rev1時点で「ctypes.windll.kernel32.GetCompressedFileSizeW」を使っているほう、圧縮後サイズが大きい場合(?)の値がおかしい。負として表示されてしまう結果になっている。原因究明&措置検討中...。あと、「<<16」が正しいのか自信がなくなってきてる。「<<32」では?