Skip to content

Instantly share code, notes, and snippets.

@Rudd-O
Created January 19, 2016 00:49
Show Gist options
  • Save Rudd-O/da8bc169e2cccb3a3707 to your computer and use it in GitHub Desktop.
Save Rudd-O/da8bc169e2cccb3a3707 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import Queue
import collections
import cStringIO
import signal # Important.
import subprocess
import tarfile
import threading
def which_files_to_compress():
yield "/var/lib/qubes/appvms/somelargevm/private.img"
def transform(path):
return path.lstrip("/")
def chunked_reader(fobject):
# This particular value is the existing value
# in qubes backup. Given that the threadpool
# is only 4 deep, and four processes are
# expected to run at this time at the maximum
# then the memory comsumption will be bounded
# by twice this value. If this value is
# lowered, or the parallelism is lowered from
# four to two, then memory cnosumption will
# be bounded by the buffer size below.
return fobject.read(1024*1024*1024)
class SerializedTarWriter(object):
def __init__(self, tarfileobject):
self.t = tarfileobject
self.m = threading.Lock()
def addfile(self, tarinfoobject, stringio):
self.m.acquire()
try:
self.t.addfile(tarinfoobject, stringio)
finally:
self.m.release()
def gettarinfo(self, name):
return self.t.gettarinfo(name)
class TaskArbitrator(object):
def __init__(self, numtasks):
self.sem = threading.Semaphore(numtasks)
self.exception = None
self.threads = []
self.lock = threading.RLock()
self.supervisor_thread = threading.Thread(target=self._supervisor)
self.supervisor_thread.setDaemon(True)
self.supervisor_thread.start()
def _supervisor(self):
while True:
with self.lock:
print len(self.threads)
time.sleep(1)
def add(self, kallable):
with self.lock:
for t in self.threads[:]:
t.join(0)
if t.isAlive():
continue
print "removing dead thread", t
self.threads.remove(t)
if self.exception:
self.end()
t = threading.Thread(target=self._run_thread, args=(kallable,))
self.sem.acquire()
t.start()
self.threads.append(t)
def _run_thread(self, kallable):
try:
kallable()
except BaseException, e:
with self.lock:
self.exception = e
self.sem.release()
def end(self):
print "ending"
with self.lock:
for t in self.threads[:]:
t.join()
if not t.isAlive():
self.threads.remove(t)
with self.lock:
if self.exception:
raise self.exception
def add_contents_to_tar(tarqueue, originalname, nameintar, data):
ti = tarqueue.gettarinfo(originalname)
ti.size = len(data)
ti.name = nameintar
import time; time.sleep(5) # Simulate slow write. Unnecessary.
tarqueue.addfile(ti, cStringIO.StringIO(data))
print "added contents of file", nameintar
def add_hmac_to_tar(tarqueue, originalname, nameintar, data):
p = subprocess.Popen(["openssl", "dgst",
"-SHA512", "-hmac", "password"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
stdout, stderr = p.communicate(data)
ret = p.wait()
assert 0 == ret, ret
ti = tarqueue.gettarinfo(originalname)
ti.name = nameintar
ti.size = len(stdout)
import time; time.sleep(5) # Simulate slow write. Unnecessary.
tarqueue.addfile(ti, cStringIO.StringIO(stdout))
print "added hmac to", nameintar
def run_backup(compressor="cat"):
tar = tarfile.open("/path/to/backup", "w")
tarqueue = SerializedTarWriter(tar)
tasker = TaskArbitrator(4)
buf = bytearray(10)
for path in which_files_to_compress():
pathintar = transform(path)
originalfobject = open(path, "rb")
cpr = subprocess.Popen(compressor,
stdin=originalfobject,
stdout=subprocess.PIPE)
originalfobject.close()
for n in xrange(1000):
red = chunked_reader(cpr.stdout)
if len(red) == 0: break
tasker.add(lambda: add_contents_to_tar(tarqueue, path, "%s-%03d" % (pathintar, n), red))
tasker.add(lambda: add_hmac_to_tar(tarqueue, path, "%s-%03d" % (pathintar, n) + ".hmac", red))
ret = cpr.wait()
assert ret == 0, ret
assert n < 1000, n
tasker.end()
tar.close()
if __name__ == "__main__":
run_backup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment