Last active
April 26, 2021 11:15
-
-
Save hhsprings/7fd0e2f3b0a3f27a03ab61cfd3c81cc6 to your computer and use it in GitHub Desktop.
An example for ctypes.windll.kernel32.GetCompressedFileSizeW, ctypes.windll.kernel32.GetDiskFreeSpaceExW and curses
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! py -3 | |
# -*- coding: utf-8 -*- | |
import sys | |
import os | |
import time | |
import ctypes | |
import functools | |
from glob import glob | |
from fnmatch import fnmatch | |
from datetime import datetime | |
from collections import namedtuple | |
import curses | |
def _stat(fn): | |
try: | |
return os.stat(fn) | |
except Exception: | |
# lost file, maybe. | |
return namedtuple( | |
'dummy_stat', | |
['st_size', 'st_mtime'] | |
)(-1, -1) | |
def _getsizes(fn): | |
hi = ctypes.c_ulong() | |
lo = ctypes.windll.kernel32.GetCompressedFileSizeW(fn, ctypes.byref(hi)) | |
return _stat(fn).st_size, ctypes.c_ulong(lo).value + (hi.value << 32) | |
def _is_ntfs_compress_target(fn): | |
# FILE_ATTRIBUTE_ARCHIVE = 0x20 | |
# FILE_ATTRIBUTE_COMPRESSED = 0x800 | |
# FILE_ATTRIBUTE_DEVICE = 0x40 | |
# FILE_ATTRIBUTE_DIRECTORY = 0x10 | |
# FILE_ATTRIBUTE_ENCRYPTED = 0x4000 | |
# FILE_ATTRIBUTE_HIDDEN = 0x2 | |
# FILE_ATTRIBUTE_INTEGRITY_STREAM = 0x8000 | |
# FILE_ATTRIBUTE_NORMAL = 0x80 | |
# FILE_ATTRIBUTE_NOT_CONTENT_INDEXED = 0x2000 | |
# FILE_ATTRIBUTE_NO_SCRUB_DATA = 0x20000 | |
# FILE_ATTRIBUTE_OFFLINE = 0x1000 | |
# FILE_ATTRIBUTE_READONLY = 0x1 | |
# FILE_ATTRIBUTE_RECALL_ON_DATA_ACCESS = 0x400000 | |
# FILE_ATTRIBUTE_RECALL_ON_OPEN = 0x40000 | |
# FILE_ATTRIBUTE_REPARSE_POINT = 0x400 | |
# FILE_ATTRIBUTE_SPARSE_FILE = 0x200 | |
# FILE_ATTRIBUTE_SYSTEM = 0x4 | |
# FILE_ATTRIBUTE_TEMPORARY = 0x100 | |
f = ctypes.windll.kernel32.GetFileAttributesW | |
return f(fn) & 0x800 | |
def _diskusage_free(p): | |
f = ctypes.windll.kernel32.GetDiskFreeSpaceExW | |
fba = ctypes.c_ulonglong() | |
tit = ctypes.c_ulonglong() | |
fre = ctypes.c_ulonglong() | |
f(p, ctypes.byref(fba), ctypes.byref(tit), ctypes.byref(fre)) | |
return ("Free: {:.2f}".format(fre.value / 1024**3) + " G").rjust(18) | |
def _files(dir, pats=["*"], filtfun=lambda fn: fn): | |
allfiles = [] | |
for pat in pats: | |
allfiles.extend(list(glob(os.path.join(dir, pat)))) | |
files = list( | |
filter( | |
lambda fn: os.path.isfile(fn) and filtfun(fn), | |
allfiles)) | |
return files | |
def _my_special_curses_wrapper(func, *args, **kwds): # wrapper(func, /, *args, **kwds): | |
def _wrap(stdscr, *args, **kwds): | |
stdscr.timeout(0) | |
curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_GREEN) | |
curses.init_pair(3, curses.COLOR_WHITE, curses.COLOR_BLUE) | |
curses.init_pair(4, curses.COLOR_WHITE, curses.COLOR_CYAN) | |
curses.init_pair(5, curses.COLOR_BLACK, curses.COLOR_YELLOW) | |
return func(stdscr, *args, **kwds) | |
return curses.wrapper(_wrap, *args, **kwds) | |
def _addstr(stdscr, row, col, s, maxwidth): | |
tmp = [] # [(cw, ch)] | |
for ch in s: | |
cw = 2 if len(ch.encode("utf-8")) > 1 else 1 | |
tmp.append((cw, ch)) | |
def _judge(cur): | |
return sum([cw for cw, _ in cur]) > ((maxwidth - 1) - col) - 1 | |
if _judge(tmp): | |
r = 0 | |
while _judge(tmp): | |
r = len(tmp) // 2 | |
tmp.pop(r) | |
tmp.insert(r, (2, "..")) | |
for cw, ch in tmp: | |
stdscr.addstr(row, col, ch) | |
col += cw | |
class _TargetDirInfo(object): | |
def __init__(self, dir, pats=["*"], filtfun=lambda fn: fn): | |
self._dir = dir | |
self._files_getfun = functools.partial(_files, dir, pats, filtfun) | |
self.files = self._files_getfun() | |
self.sz = self.sz_ini = {fn: _getsizes(fn) for fn in self.files} | |
self.dt = self.dt_ini = datetime.now() | |
self.duf = self.duf_ini = _diskusage_free(self._dir) | |
def update(self): | |
if (datetime.now() - self.dt).total_seconds() > 30: | |
self.files = self._files_getfun() | |
self.sz = {fn: _getsizes(fn) for fn in self.files} | |
self.dt = datetime.now() | |
self.duf = _diskusage_free(self._dir) | |
def _main(args, stdscr): | |
if args.pattern: | |
patterns = args.pattern | |
else: | |
patterns = ["*"] | |
if args.exclude_pattern: | |
ffun = lambda fn: all(not fnmatch(fn, pat) for pat in args.exclude_pattern) | |
else: | |
ffun = lambda fn: fn | |
# | |
targ = _TargetDirInfo(args.dir, patterns, ffun) | |
height, width = stdscr.getmaxyx() | |
pagesize = min(height - 4, len(targ.files)) | |
sl = slice(0, pagesize) | |
# | |
_nones = (float('nan'), float('nan')) | |
_sortfuns = [ | |
lambda fn: (fn, targ.sz.get(fn, _nones)[0]), | |
lambda fn: (_stat(fn).st_mtime, fn), | |
lambda fn: (-_stat(fn).st_mtime, fn), | |
lambda fn: (targ.sz_ini.get(fn, _nones)[0], fn), | |
lambda fn: (-targ.sz_ini.get(fn, _nones)[0], fn), | |
lambda fn: (targ.sz.get(fn, _nones)[0], fn), | |
lambda fn: (-targ.sz.get(fn, _nones)[0], fn), | |
] | |
_sortfuns_idx = 0 | |
# | |
while True: | |
kc = stdscr.getch() | |
if kc in (ord('q'), ord('Q')): | |
break | |
elif kc in (ord(' '), curses.KEY_NPAGE) or kc == curses.KEY_DOWN: | |
adv = pagesize if kc in (ord(' '), curses.KEY_NPAGE) else 1 | |
sl = slice( | |
min(sl.start + adv, len(targ.files)), | |
min(len(targ.files), sl.start + pagesize + adv)) | |
if sl.start == sl.stop: | |
sl = slice(0, pagesize) | |
elif kc in (ord('b'), curses.KEY_PPAGE) or kc == curses.KEY_UP: | |
dst = pagesize if kc in (ord('b'), curses.KEY_PPAGE) else 1 | |
if sl.start > 0: | |
st = max(0, sl.start - dst) | |
sl = slice(st, st + pagesize) | |
else: | |
sl = slice(len(targ.files) - pagesize, len(targ.files)) | |
elif kc == ord("s"): | |
_sortfuns_idx = (_sortfuns_idx + 1) % len(_sortfuns) | |
stdscr.clear() | |
targ.update() | |
# | |
col = 0 | |
_addstr(stdscr, 0, col, args.dir, width) | |
stdscr.addstr(1, 0, targ.dt_ini.strftime("%m-%d %H:%M:%S").rjust(18)) | |
stdscr.addstr(1, 18, targ.dt.strftime("%m-%d %H:%M:%S").rjust(18)) | |
stdscr.addstr(2, 0, targ.duf_ini) | |
stdscr.addstr(2, 18, targ.duf) | |
for i, fn in enumerate( | |
list(sorted( | |
targ.files, | |
key=_sortfuns[_sortfuns_idx]))[sl]): | |
v_ini = (-1, -1) | |
ren = False | |
if fn in targ.sz_ini: | |
v_ini = targ.sz_ini[fn] | |
else: | |
bn, _ = os.path.splitext(fn) | |
for orig in targ.sz_ini.keys(): | |
if os.path.splitext(orig)[0] == bn: | |
v_ini = targ.sz_ini[orig] | |
ren = True | |
break | |
v_now = targ.sz.get(fn, (-1, -1)) | |
ca = _is_ntfs_compress_target(fn) | |
bn = os.path.basename(fn) | |
stdscr.addstr(i + 3, 0, "{:3,d}".format(v_ini[0]).rjust(18), | |
curses.color_pair(5 if ren else 1)) | |
stdscr.addstr(i + 3, 18, "{:3,d}".format(v_now[0]).rjust(18), | |
curses.color_pair(1 if v_ini[0] != v_now[0] else 2)) | |
csclr = 4 if ca else 1 | |
if not (v_now[0] == v_now[1] or v_now[1] < 0): | |
csclr = 3 | |
stdscr.addstr(i + 3, 36, "{:3,d}".format(v_now[1]).rjust(18), | |
curses.color_pair(csclr)) | |
col = 56 | |
_addstr(stdscr, i + 3, col, bn, width) | |
if sl.stop < len(targ.files): | |
stdscr.addstr(height - 1, 0, "-- more --") | |
stdscr.refresh() | |
time.sleep(0.5) | |
if __name__ == '__main__': | |
import argparse | |
ap = argparse.ArgumentParser() | |
ap.add_argument("dir") | |
ap.add_argument("--pattern", action="append") | |
ap.add_argument("--exclude_pattern", action="append") | |
args = ap.parse_args() | |
_my_special_curses_wrapper(functools.partial(_main, args)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
loword のほうは DWORD、つまり unsigned long のつもりの戻りなのに signed long として解釈していたということ、であれば、入れた措置は正しいのであろう。現実のファイル相手にした結果も問題なさそうにみえる。「<<16」だった部分は「<<32」とひとまずしておいたが、まだ問題となる現実のファイルを相手にしてなくて、確信が持ててない。シフトだけでマスクしなくて大丈夫だったっけ?