Last active
June 4, 2021 19:05
-
-
Save JamesTheAwesomeDude/3608ea5567800aace8188966f94a986e to your computer and use it in GitHub Desktop.
Zenity-based GNU dd GUI (Py2&Py3 wrapper)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
from __future__ import division | |
import sys, os, stat, fcntl, subprocess, select, re | |
if sys.version_info < (3,): | |
from pipes import quote as shquote | |
else: | |
from shlex import quote as shquote | |
from functools import reduce, wraps | |
from itertools import repeat, starmap | |
from contextlib import closing | |
#TODO: rewrite this whole program as a sub-512-byte Perl script | |
#TODO: also copy dd's stderr to our stderr? (gotta handle it bilingually) | |
def main(args=sys.argv[1:]): | |
dcmd = ['dd'] + args + ['status=progress'] | |
zcmd = ['zenity', '--progress', | |
'--time-remaining', | |
'--title=dd', | |
'--text=%s' % ' '.join(map(shquote, args)), | |
'--cancel-label=Abort', | |
'--ok-label=Done', | |
] | |
n = None | |
try: | |
fname = parseifname(args) | |
n = getsize(fname) | |
count = parsecount(args, default=None) | |
ibs = parseibs(args) | |
if count is not None: | |
if n: | |
n = min(n, count * ibs) | |
else: | |
n = count * ibs | |
except Exception: | |
# we could not identify the file input size; | |
# the parameters MAY be malformed, | |
# but we'll leave it up to dd to be the judge of that. | |
pass | |
calcPercent = genCalcPercent(n) | |
with _autoclose(select.epoll()) as E: | |
# Launch Zenity | |
zenity = subprocess.Popen(zcmd, stdin=subprocess.PIPE, universal_newlines=True) | |
status = blocking_writer(zenity.stdin) # TODO: https://stackoverflow.com/q/66878224/1874170 | |
E.register(zenity.stdin, | |
select.EPOLLHUP | |
) | |
# Launch dd | |
dd = subprocess.Popen(dcmd, stderr=subprocess.PIPE) | |
modfl(dd.stderr, (+os.O_NONBLOCK,)) | |
E.register(dd.stderr, | |
select.EPOLLIN | |
| select.EPOLLHUP | |
) | |
for fn, ev in ( event for events in repeatfunc(E.poll) for event in events ): | |
if (fn == zenity.stdin.fileno()) and (ev & (select.EPOLLHUP | select.EPOLLERR)): | |
#Zenity exited | |
dd.terminate() | |
break | |
if (fn == dd.stderr.fileno()) and (ev & select.EPOLLIN): | |
#dd sent some output | |
line = dd.stderr.read() | |
m = rBytes.search(line) | |
if m: | |
x = int(m.group(1)) | |
status('%f\n' % calcPercent(x)) | |
#else: | |
# print(line, file=sys.stderr) | |
continue | |
if (fn == dd.stderr.fileno()) and (ev & (select.EPOLLHUP | select.EPOLLERR)): | |
#dd exited | |
break | |
# wait for dd to finish being terminated, if necessary; drain any of its final output to avoid deadlocking | |
dd.communicate() | |
assert dd.returncode is not None | |
if dd.returncode == 0: | |
# dd exited successfully; | |
try: | |
#set Zenity to 100% complete and set it free. | |
status('100\n') | |
zenity.stdin.close() | |
except BrokenPipeError: | |
#(Zenity was already exited for some reason; this is fine.) | |
pass | |
else: | |
# dd exited unsuccessfully | |
# (almost certainly user-cancelled via SIGINT | |
# or died while trying to parse malofrmed arguments) | |
# so just kill Zenity, too. | |
zenity.terminate() | |
return dd.returncode | |
## Definitions below here ## | |
#basically just tries to invert the transformation applied by fprintf on lines 816–827 of dd.c | |
#i.e. reconstitute w_bytes from the string written to stderr | |
rBytes = re.compile(br'(?:\A|\r)(\d+) ', (re.MULTILINE)) # https://git.savannah.gnu.org/cgit/coreutils.git/tree/src/dd.c?h=v8.32#n813 | |
def parseifname(args, default='/dev/stdin'): | |
'''Given dd's argument list, attempts to return the name of that file which dd would use as its input file''' | |
M = re.compile(r'^if=(.+)') | |
ifname = default | |
for arg in args: | |
m = M.fullmatch(arg) | |
if m: | |
ifname = m.group(1) | |
#Don't break; the last match is what's used | |
return ifname | |
def parseibs(args, default=512): | |
'''Given dd's argument list, attempts to return the input block size''' | |
# default = 512: https://git.savannah.gnu.org/cgit/coreutils.git/tree/src/dd.c?h=v8.32#n92 | |
M = re.compile(r'^i?bs=(.+)') | |
bs = default | |
for arg in args: | |
m = M.fullmatch(arg) | |
if m: | |
bs = _xstrtoi(m.group(1)) | |
return bs | |
def parsecount(args, default=None): | |
'''Given dd's argument list, attempts to return the number of blocks which dd would iterate for.''' | |
M = re.compile(r'^count=(\s*\d+)') | |
count = default | |
for arg in args: | |
m = M.fullmatch(arg) | |
if m: | |
count = int(m.group(1)) | |
return count | |
def getsize(fname): | |
'''Attempts to find the length, in bytes, of the given file or block device''' | |
s = os.stat(fname) | |
m = s.st_mode | |
if stat.S_ISREG(m): | |
#Regular File | |
return s.st_size | |
if stat.S_ISBLK(m): | |
#Block Device | |
# Probably don't want to re-implement this ourselves: https://git.kernel.org/pub/scm/utils/util-linux/util-linux.git/tree/misc-utils/lsblk.c?h=stable/v2.36#n1198 | |
return int(subprocess.check_output([ | |
'lsblk', '-d', fname, # List the block device at fname: | |
'-n', # Omit the header line; | |
'-l', # Use the cleaner "list" output format; | |
'-o', 'SIZE', # Output (only) the device size, | |
'-b', # in bytes, rather than "human-readable" format | |
], stderr=subprocess.STDOUT)) | |
raise ValueError("file type %i was not recognized" % m) | |
def genCalcPercent(n): | |
'''Given n, returns a function which, given x, returns either x as a percentage of n, or some sane stand-in for such''' | |
if n: | |
#Input file size was identified | |
return lambda x: 100 * x / n | |
else: | |
#Input file size was unidentifiable, zero, or otherwise falsy | |
#we'll at least try to visually show progress: | |
# a continuous exponential approach to (100-1e-5)% | |
# which passes 50% at 4GiB transferred | |
return lambda x: 99.99999 * (1 - 0.5 ** (x / 2**32)) | |
def modfl(fd, flags): | |
'''Adds all given positive flags to, and removes all given negative flags from, the given file descriptor''' | |
# e.g.: modfl(f, (+os.O_SYNC, -os.O_NONBLOCK)) | |
# would make f synchronous and blocking | |
#OR-in positive flags; NAND-out negative ones | |
def ins(fl, x=0): | |
if x >= 0: | |
return fl | x | |
else: | |
return fl &~ -x | |
# 1. Get the current flag field | |
cur = fcntl.fcntl(fd, fcntl.F_GETFL) | |
# 2. Calculate the new flag field | |
new = reduce(ins, flags, cur) | |
# 3. Apply the new flag field | |
return fcntl.fcntl(fd, fcntl.F_SETFL, new) | |
def blocking_writer(fd): | |
if sys.version_info.major == 3: | |
# why doesn't O_SYNC work on Python 3??? ARGH | |
@wraps(fd.write) | |
def write(*a, **k): | |
n = fd.write(*a, **k) | |
fd.flush() | |
return n | |
return write | |
else: | |
modfl(fd, (+os.O_SYNC,)) | |
return fd.write | |
def repeatfunc(func, times=None, *args): | |
"""Repeat calls to func with specified arguments. | |
Example: repeatfunc(random.random) | |
""" | |
if times is None: | |
return starmap(func, repeat(args)) | |
return starmap(func, repeat(args, times)) | |
def _xstrtoi(s): | |
"""slimmed-down python port of xstrol for internal use in zendd.py | |
In particular, this implementation does not support non-base-ten numbers | |
due to dd only passing in that base (see line 1438 of dd.c version 8.32) | |
xstrol reference here: | |
http://git.savannah.gnu.org/cgit/gnulib.git/tree/lib/xstrtol.c?id=d279bc6d9f9323e19ad8c32b6d12ff96dfb0f5ba#n75 | |
xstrtol is an augmented version of strtol: | |
https://pubs.opengroup.org/onlinepubs/9699919799/functions/strtol.html""" | |
# Any amount of whitespace, | |
# followed by an optional '+' or '-', | |
# followed by one or more digits(*), | |
# followed by a suffix. | |
val, digits, suffix = re.match(r'\s*((?:\+|-)?(\d+)?)(.*)', s).groups() | |
if not digits: | |
if suffix: | |
# (*)The digits may be absent if there is a suffix; | |
# in this case, the value is taken to be 1. | |
val += '1' | |
else: | |
raise ValueError(s) | |
val = int(val) | |
if suffix: | |
base = 1024 | |
p, i, b = re.fullmatch(r'([bcEGkKMPTwYZ])(?:(?<=[EGkKMPTYZ])(i)?(B))?', suffix).groups() | |
if b and not i: | |
# this matches e.g MB - but not MiB | |
base = 1000 | |
if p == 'b': | |
val *= 512 | |
elif p == 'c': | |
pass | |
elif p == 'E': | |
val *= base ** 6 | |
elif p == 'G': | |
val *= base ** 3 | |
elif p == 'k' or p == 'K': | |
val *= base ** 1 | |
elif p == 'M': | |
val *= base ** 2 | |
elif p == 'P': | |
val *= base ** 5 | |
elif p == 'T': | |
val *= base ** 4 | |
elif p == 'w': | |
val *= 2 | |
elif p == 'Y': | |
val *= base ** 8 | |
elif p == 'Z': | |
val *= base ** 7 | |
else: | |
raise ValueError(s) | |
return val | |
def _autoclose(obj): | |
"""Smarter version of contextlib.closing that uses the object's native ContextManager where applicable""" | |
if not hasattr(obj, '__enter__'): | |
obj = closing(obj) | |
return obj | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment