Skip to content

Instantly share code, notes, and snippets.

@JamesTheAwesomeDude
Last active June 4, 2021 19:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JamesTheAwesomeDude/3608ea5567800aace8188966f94a986e to your computer and use it in GitHub Desktop.
Save JamesTheAwesomeDude/3608ea5567800aace8188966f94a986e to your computer and use it in GitHub Desktop.
Zenity-based GNU dd GUI (Py2&Py3 wrapper)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import division
import sys, os, stat, fcntl, subprocess, select, re
if sys.version_info < (3,):
from pipes import quote as shquote
else:
from shlex import quote as shquote
from functools import reduce, wraps
from itertools import repeat, starmap
from contextlib import closing
#TODO: rewrite this whole program as a sub-512-byte Perl script
#TODO: also copy dd's stderr to our stderr? (gotta handle it bilingually)
def main(args=sys.argv[1:]):
dcmd = ['dd'] + args + ['status=progress']
zcmd = ['zenity', '--progress',
'--time-remaining',
'--title=dd',
'--text=%s' % ' '.join(map(shquote, args)),
'--cancel-label=Abort',
'--ok-label=Done',
]
n = None
try:
fname = parseifname(args)
n = getsize(fname)
count = parsecount(args, default=None)
ibs = parseibs(args)
if count is not None:
if n:
n = min(n, count * ibs)
else:
n = count * ibs
except Exception:
# we could not identify the file input size;
# the parameters MAY be malformed,
# but we'll leave it up to dd to be the judge of that.
pass
calcPercent = genCalcPercent(n)
with _autoclose(select.epoll()) as E:
# Launch Zenity
zenity = subprocess.Popen(zcmd, stdin=subprocess.PIPE, universal_newlines=True)
status = blocking_writer(zenity.stdin) # TODO: https://stackoverflow.com/q/66878224/1874170
E.register(zenity.stdin,
select.EPOLLHUP
)
# Launch dd
dd = subprocess.Popen(dcmd, stderr=subprocess.PIPE)
modfl(dd.stderr, (+os.O_NONBLOCK,))
E.register(dd.stderr,
select.EPOLLIN
| select.EPOLLHUP
)
for fn, ev in ( event for events in repeatfunc(E.poll) for event in events ):
if (fn == zenity.stdin.fileno()) and (ev & (select.EPOLLHUP | select.EPOLLERR)):
#Zenity exited
dd.terminate()
break
if (fn == dd.stderr.fileno()) and (ev & select.EPOLLIN):
#dd sent some output
line = dd.stderr.read()
m = rBytes.search(line)
if m:
x = int(m.group(1))
status('%f\n' % calcPercent(x))
#else:
# print(line, file=sys.stderr)
continue
if (fn == dd.stderr.fileno()) and (ev & (select.EPOLLHUP | select.EPOLLERR)):
#dd exited
break
# wait for dd to finish being terminated, if necessary; drain any of its final output to avoid deadlocking
dd.communicate()
assert dd.returncode is not None
if dd.returncode == 0:
# dd exited successfully;
try:
#set Zenity to 100% complete and set it free.
status('100\n')
zenity.stdin.close()
except BrokenPipeError:
#(Zenity was already exited for some reason; this is fine.)
pass
else:
# dd exited unsuccessfully
# (almost certainly user-cancelled via SIGINT
# or died while trying to parse malofrmed arguments)
# so just kill Zenity, too.
zenity.terminate()
return dd.returncode
## Definitions below here ##
#basically just tries to invert the transformation applied by fprintf on lines 816–827 of dd.c
#i.e. reconstitute w_bytes from the string written to stderr
rBytes = re.compile(br'(?:\A|\r)(\d+) ', (re.MULTILINE)) # https://git.savannah.gnu.org/cgit/coreutils.git/tree/src/dd.c?h=v8.32#n813
def parseifname(args, default='/dev/stdin'):
'''Given dd's argument list, attempts to return the name of that file which dd would use as its input file'''
M = re.compile(r'^if=(.+)')
ifname = default
for arg in args:
m = M.fullmatch(arg)
if m:
ifname = m.group(1)
#Don't break; the last match is what's used
return ifname
def parseibs(args, default=512):
'''Given dd's argument list, attempts to return the input block size'''
# default = 512: https://git.savannah.gnu.org/cgit/coreutils.git/tree/src/dd.c?h=v8.32#n92
M = re.compile(r'^i?bs=(.+)')
bs = default
for arg in args:
m = M.fullmatch(arg)
if m:
bs = _xstrtoi(m.group(1))
return bs
def parsecount(args, default=None):
'''Given dd's argument list, attempts to return the number of blocks which dd would iterate for.'''
M = re.compile(r'^count=(\s*\d+)')
count = default
for arg in args:
m = M.fullmatch(arg)
if m:
count = int(m.group(1))
return count
def getsize(fname):
'''Attempts to find the length, in bytes, of the given file or block device'''
s = os.stat(fname)
m = s.st_mode
if stat.S_ISREG(m):
#Regular File
return s.st_size
if stat.S_ISBLK(m):
#Block Device
# Probably don't want to re-implement this ourselves: https://git.kernel.org/pub/scm/utils/util-linux/util-linux.git/tree/misc-utils/lsblk.c?h=stable/v2.36#n1198
return int(subprocess.check_output([
'lsblk', '-d', fname, # List the block device at fname:
'-n', # Omit the header line;
'-l', # Use the cleaner "list" output format;
'-o', 'SIZE', # Output (only) the device size,
'-b', # in bytes, rather than "human-readable" format
], stderr=subprocess.STDOUT))
raise ValueError("file type %i was not recognized" % m)
def genCalcPercent(n):
'''Given n, returns a function which, given x, returns either x as a percentage of n, or some sane stand-in for such'''
if n:
#Input file size was identified
return lambda x: 100 * x / n
else:
#Input file size was unidentifiable, zero, or otherwise falsy
#we'll at least try to visually show progress:
# a continuous exponential approach to (100-1e-5)%
# which passes 50% at 4GiB transferred
return lambda x: 99.99999 * (1 - 0.5 ** (x / 2**32))
def modfl(fd, flags):
'''Adds all given positive flags to, and removes all given negative flags from, the given file descriptor'''
# e.g.: modfl(f, (+os.O_SYNC, -os.O_NONBLOCK))
# would make f synchronous and blocking
#OR-in positive flags; NAND-out negative ones
def ins(fl, x=0):
if x >= 0:
return fl | x
else:
return fl &~ -x
# 1. Get the current flag field
cur = fcntl.fcntl(fd, fcntl.F_GETFL)
# 2. Calculate the new flag field
new = reduce(ins, flags, cur)
# 3. Apply the new flag field
return fcntl.fcntl(fd, fcntl.F_SETFL, new)
def blocking_writer(fd):
if sys.version_info.major == 3:
# why doesn't O_SYNC work on Python 3??? ARGH
@wraps(fd.write)
def write(*a, **k):
n = fd.write(*a, **k)
fd.flush()
return n
return write
else:
modfl(fd, (+os.O_SYNC,))
return fd.write
def repeatfunc(func, times=None, *args):
"""Repeat calls to func with specified arguments.
Example: repeatfunc(random.random)
"""
if times is None:
return starmap(func, repeat(args))
return starmap(func, repeat(args, times))
def _xstrtoi(s):
"""slimmed-down python port of xstrol for internal use in zendd.py
In particular, this implementation does not support non-base-ten numbers
due to dd only passing in that base (see line 1438 of dd.c version 8.32)
xstrol reference here:
http://git.savannah.gnu.org/cgit/gnulib.git/tree/lib/xstrtol.c?id=d279bc6d9f9323e19ad8c32b6d12ff96dfb0f5ba#n75
xstrtol is an augmented version of strtol:
https://pubs.opengroup.org/onlinepubs/9699919799/functions/strtol.html"""
# Any amount of whitespace,
# followed by an optional '+' or '-',
# followed by one or more digits(*),
# followed by a suffix.
val, digits, suffix = re.match(r'\s*((?:\+|-)?(\d+)?)(.*)', s).groups()
if not digits:
if suffix:
# (*)The digits may be absent if there is a suffix;
# in this case, the value is taken to be 1.
val += '1'
else:
raise ValueError(s)
val = int(val)
if suffix:
base = 1024
p, i, b = re.fullmatch(r'([bcEGkKMPTwYZ])(?:(?<=[EGkKMPTYZ])(i)?(B))?', suffix).groups()
if b and not i:
# this matches e.g MB - but not MiB
base = 1000
if p == 'b':
val *= 512
elif p == 'c':
pass
elif p == 'E':
val *= base ** 6
elif p == 'G':
val *= base ** 3
elif p == 'k' or p == 'K':
val *= base ** 1
elif p == 'M':
val *= base ** 2
elif p == 'P':
val *= base ** 5
elif p == 'T':
val *= base ** 4
elif p == 'w':
val *= 2
elif p == 'Y':
val *= base ** 8
elif p == 'Z':
val *= base ** 7
else:
raise ValueError(s)
return val
def _autoclose(obj):
"""Smarter version of contextlib.closing that uses the object's native ContextManager where applicable"""
if not hasattr(obj, '__enter__'):
obj = closing(obj)
return obj
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment