| #!/usr/bin/env python | |
| import Queue | |
| import collections | |
| import cStringIO | |
| import signal # Important. | |
| import subprocess | |
| import tarfile | |
| import threading | |
| def which_files_to_compress(): | |
| yield "/var/lib/qubes/appvms/somelargevm/private.img" | |
| def transform(path): | |
| return path.lstrip("/") | |
| def chunked_reader(fobject): | |
| # This particular value is the existing value | |
| # in qubes backup. Given that the threadpool | |
| # is only 4 deep, and four processes are | |
| # expected to run at this time at the maximum | |
| # then the memory comsumption will be bounded | |
| # by twice this value. If this value is | |
| # lowered, or the parallelism is lowered from | |
| # four to two, then memory cnosumption will | |
| # be bounded by the buffer size below. | |
| return fobject.read(1024*1024*1024) | |
| class SerializedTarWriter(object): | |
| def __init__(self, tarfileobject): | |
| self.t = tarfileobject | |
| self.m = threading.Lock() | |
| def addfile(self, tarinfoobject, stringio): | |
| self.m.acquire() | |
| try: | |
| self.t.addfile(tarinfoobject, stringio) | |
| finally: | |
| self.m.release() | |
| def gettarinfo(self, name): | |
| return self.t.gettarinfo(name) | |
| class TaskArbitrator(object): | |
| def __init__(self, numtasks): | |
| self.sem = threading.Semaphore(numtasks) | |
| self.exception = None | |
| self.threads = [] | |
| self.lock = threading.RLock() | |
| self.supervisor_thread = threading.Thread(target=self._supervisor) | |
| self.supervisor_thread.setDaemon(True) | |
| self.supervisor_thread.start() | |
| def _supervisor(self): | |
| while True: | |
| with self.lock: | |
| print len(self.threads) | |
| time.sleep(1) | |
| def add(self, kallable): | |
| with self.lock: | |
| for t in self.threads[:]: | |
| t.join(0) | |
| if t.isAlive(): | |
| continue | |
| print "removing dead thread", t | |
| self.threads.remove(t) | |
| if self.exception: | |
| self.end() | |
| t = threading.Thread(target=self._run_thread, args=(kallable,)) | |
| self.sem.acquire() | |
| t.start() | |
| self.threads.append(t) | |
| def _run_thread(self, kallable): | |
| try: | |
| kallable() | |
| except BaseException, e: | |
| with self.lock: | |
| self.exception = e | |
| self.sem.release() | |
| def end(self): | |
| print "ending" | |
| with self.lock: | |
| for t in self.threads[:]: | |
| t.join() | |
| if not t.isAlive(): | |
| self.threads.remove(t) | |
| with self.lock: | |
| if self.exception: | |
| raise self.exception | |
| def add_contents_to_tar(tarqueue, originalname, nameintar, data): | |
| ti = tarqueue.gettarinfo(originalname) | |
| ti.size = len(data) | |
| ti.name = nameintar | |
| import time; time.sleep(5) # Simulate slow write. Unnecessary. | |
| tarqueue.addfile(ti, cStringIO.StringIO(data)) | |
| print "added contents of file", nameintar | |
| def add_hmac_to_tar(tarqueue, originalname, nameintar, data): | |
| p = subprocess.Popen(["openssl", "dgst", | |
| "-SHA512", "-hmac", "password"], | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE) | |
| stdout, stderr = p.communicate(data) | |
| ret = p.wait() | |
| assert 0 == ret, ret | |
| ti = tarqueue.gettarinfo(originalname) | |
| ti.name = nameintar | |
| ti.size = len(stdout) | |
| import time; time.sleep(5) # Simulate slow write. Unnecessary. | |
| tarqueue.addfile(ti, cStringIO.StringIO(stdout)) | |
| print "added hmac to", nameintar | |
| def run_backup(compressor="cat"): | |
| tar = tarfile.open("/path/to/backup", "w") | |
| tarqueue = SerializedTarWriter(tar) | |
| tasker = TaskArbitrator(4) | |
| buf = bytearray(10) | |
| for path in which_files_to_compress(): | |
| pathintar = transform(path) | |
| originalfobject = open(path, "rb") | |
| cpr = subprocess.Popen(compressor, | |
| stdin=originalfobject, | |
| stdout=subprocess.PIPE) | |
| originalfobject.close() | |
| for n in xrange(1000): | |
| red = chunked_reader(cpr.stdout) | |
| if len(red) == 0: break | |
| tasker.add(lambda: add_contents_to_tar(tarqueue, path, "%s-%03d" % (pathintar, n), red)) | |
| tasker.add(lambda: add_hmac_to_tar(tarqueue, path, "%s-%03d" % (pathintar, n) + ".hmac", red)) | |
| ret = cpr.wait() | |
| assert ret == 0, ret | |
| assert n < 1000, n | |
| tasker.end() | |
| tar.close() | |
| if __name__ == "__main__": | |
| run_backup() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment