Created
July 26, 2019 00:33
-
-
Save xbrianh/6ff32d36870c8bf126f2f6783276940b to your computer and use it in GitHub Desktop.
Tar archive class useful for interfacing with S3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Prototyped utilities for working with indexed, unterminated tar archives. | |
See "end-of-archive entry" in the following link for more information about "unterminated": | |
https://www.gnu.org/software/tar/manual/html_node/Standard.html | |
These can be used efficiently with s3 objects to extract individual files, | |
splice archives together, and stream. See examples at end of file. | |
""" | |
import io | |
import os | |
import json | |
import tarfile | |
from random import randint | |
from contextlib import AbstractContextManager | |
from collections import OrderedDict | |
import boto3 | |
from botocore.response import StreamingBody | |
bucket = boto3.resource("s3").Bucket("org-hca-dss-test") | |
class UnterminatedTarArchive(AbstractContextManager): | |
def __init__(self, data=None, blob=None, manifest=None): | |
assert not (data and blob) | |
if data or blob: | |
assert manifest | |
self.blob = blob | |
self.data = data | |
self.manifest = manifest | |
@classmethod | |
def new(cls): | |
tf = cls() | |
tf._tarfile = tarfile.TarFile(fileobj=io.BytesIO(), mode="w") | |
tf.manifest = OrderedDict() | |
return tf | |
def append_data(self, name, data): | |
ti = tarfile.TarInfo(name) | |
ti.size = len(data) | |
offset = self._tarfile.offset | |
self._tarfile.addfile(ti, io.BytesIO(data)) | |
self.manifest[name] = offset | |
def close(self): | |
if self._tarfile: | |
end_of_records = self._tarfile.offset | |
self._tarfile.close() | |
self._tarfile.fileobj.seek(0) | |
self.data = self._tarfile.fileobj.read()[:end_of_records] | |
del self._tarfile | |
def __exit__(self, *args, **kwargs): | |
self.close() | |
def __add__(self, other): | |
new_data = self.data + other.data | |
new_manifest = self.manifest.copy() | |
for name, offset in other.manifest.items(): | |
new_manifest[name] = offset | |
return type(self)(data=new_data, manifest=new_manifest) | |
@property | |
def iterate(self): | |
if self.data: | |
return iterate_tarfile(io.BytesIO(self.data)) | |
elif self.blob: | |
return iterate_tarfile(self.blob.get()['Body']) | |
else: | |
raise ValueError(f"{type(self)} in write-only mode") | |
def names(self): | |
return list(self.manifest.keys()) | |
def get(self, name): | |
offset = self.manifest[name] | |
if self.data: | |
fileobj = io.BytesIO(self.data[offset:]) | |
elif self.blob: | |
fileobj = self.blob.get(Range=f"bytes={offset}-")['Body'] | |
for ti, fh in iterate_tarfile(fileobj): | |
return ti, fh.read() | |
def iterate_tarfile(fileobj): | |
tf = tarfile.open(fileobj=fileobj, mode=f"r|") | |
while True: | |
ti = tf.next() | |
if ti: | |
yield ti, tf.extractfile(ti) | |
else: | |
break | |
files_1 = {f"tf1/subdir/{i + 1}.json": os.urandom(randint(1, 20)) | |
for i in range(10)} | |
with UnterminatedTarArchive.new() as tf1: | |
for name, data in files_1.items(): | |
tf1.append_data(name, data) | |
files_2 = {f"tf2/subdir/{i + 1}.json": os.urandom(randint(1, 20)) | |
for i in range(10)} | |
with UnterminatedTarArchive.new() as tf2: | |
for name, data in files_2.items(): | |
tf2.append_data(name, data) | |
all_files = files_1.copy() | |
all_files.update(files_2) | |
tf = tf2 + tf1 | |
for ti, fh in tf.iterate: | |
pass | |
key = "bhannafi-tar-test/test.tar" | |
bucket.Object(key).upload_fileobj(io.BytesIO(tf.data)) | |
tf = UnterminatedTarArchive(blob=bucket.Object(key), manifest=tf.manifest) | |
name = tf.names()[3] | |
ti, data = tf.get(name) | |
assert data == all_files[name] | |
for ti, fh in tf.iterate: | |
assert fh.read() == all_files[ti.name] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment