Skip to content

Instantly share code, notes, and snippets.

@xbrianh
Created July 26, 2019 00:33
Show Gist options
  • Save xbrianh/6ff32d36870c8bf126f2f6783276940b to your computer and use it in GitHub Desktop.
Save xbrianh/6ff32d36870c8bf126f2f6783276940b to your computer and use it in GitHub Desktop.
Tar archive class useful for interfacing with S3
#!/usr/bin/env python
"""
Prototyped utilities for working with indexed, unterminated tar archives.
See "end-of-archive entry" in the following link for more information about "unterminated":
https://www.gnu.org/software/tar/manual/html_node/Standard.html
These can be used efficiently with s3 objects to extract individual files,
splice archives together, and stream. See examples at end of file.
"""
import io
import os
import json
import tarfile
from random import randint
from contextlib import AbstractContextManager
from collections import OrderedDict
import boto3
from botocore.response import StreamingBody
bucket = boto3.resource("s3").Bucket("org-hca-dss-test")
class UnterminatedTarArchive(AbstractContextManager):
def __init__(self, data=None, blob=None, manifest=None):
assert not (data and blob)
if data or blob:
assert manifest
self.blob = blob
self.data = data
self.manifest = manifest
@classmethod
def new(cls):
tf = cls()
tf._tarfile = tarfile.TarFile(fileobj=io.BytesIO(), mode="w")
tf.manifest = OrderedDict()
return tf
def append_data(self, name, data):
ti = tarfile.TarInfo(name)
ti.size = len(data)
offset = self._tarfile.offset
self._tarfile.addfile(ti, io.BytesIO(data))
self.manifest[name] = offset
def close(self):
if self._tarfile:
end_of_records = self._tarfile.offset
self._tarfile.close()
self._tarfile.fileobj.seek(0)
self.data = self._tarfile.fileobj.read()[:end_of_records]
del self._tarfile
def __exit__(self, *args, **kwargs):
self.close()
def __add__(self, other):
new_data = self.data + other.data
new_manifest = self.manifest.copy()
for name, offset in other.manifest.items():
new_manifest[name] = offset
return type(self)(data=new_data, manifest=new_manifest)
@property
def iterate(self):
if self.data:
return iterate_tarfile(io.BytesIO(self.data))
elif self.blob:
return iterate_tarfile(self.blob.get()['Body'])
else:
raise ValueError(f"{type(self)} in write-only mode")
def names(self):
return list(self.manifest.keys())
def get(self, name):
offset = self.manifest[name]
if self.data:
fileobj = io.BytesIO(self.data[offset:])
elif self.blob:
fileobj = self.blob.get(Range=f"bytes={offset}-")['Body']
for ti, fh in iterate_tarfile(fileobj):
return ti, fh.read()
def iterate_tarfile(fileobj):
tf = tarfile.open(fileobj=fileobj, mode=f"r|")
while True:
ti = tf.next()
if ti:
yield ti, tf.extractfile(ti)
else:
break
files_1 = {f"tf1/subdir/{i + 1}.json": os.urandom(randint(1, 20))
for i in range(10)}
with UnterminatedTarArchive.new() as tf1:
for name, data in files_1.items():
tf1.append_data(name, data)
files_2 = {f"tf2/subdir/{i + 1}.json": os.urandom(randint(1, 20))
for i in range(10)}
with UnterminatedTarArchive.new() as tf2:
for name, data in files_2.items():
tf2.append_data(name, data)
all_files = files_1.copy()
all_files.update(files_2)
tf = tf2 + tf1
for ti, fh in tf.iterate:
pass
key = "bhannafi-tar-test/test.tar"
bucket.Object(key).upload_fileobj(io.BytesIO(tf.data))
tf = UnterminatedTarArchive(blob=bucket.Object(key), manifest=tf.manifest)
name = tf.names()[3]
ti, data = tf.get(name)
assert data == all_files[name]
for ti, fh in tf.iterate:
assert fh.read() == all_files[ti.name]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment