Skip to content

Instantly share code, notes, and snippets.

@pabloem
Created July 9, 2021 11:59
Show Gist options
  • Save pabloem/9e9fb60225aba79c7d6a9b6477fb9b1a to your computer and use it in GitHub Desktop.
Save pabloem/9e9fb60225aba79c7d6a9b6477fb9b1a to your computer and use it in GitHub Desktop.
Extract tar and move to external storage
apache-beam[aws,gcp,test]
ipython
import tarfile
import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.io.filesystems import FileSystems
INPUT = '/home/pabloem/codes/test-tar/data/*'
OUTPUT_DIR = '/home/pabloem/codes/test-tar/data/scratchspace/'
# 1MB buffers
BUFFER_SIZE = 1024 * 1024
def _print_and_return(x):
print(x)
return x
def _extract_tar_archive(archive):
tf = tarfile.open(mode='r', fileobj=archive.open())
for tarinfo in tf:
if not tarinfo.isreg():
continue
remote_file_name = FileSystems.join(OUTPUT_DIR, tarinfo.name)
remote_file = FileSystems.create(remote_file_name)
extracting_file = tf.extractfile(tarinfo)
while True:
data = extracting_file.read(BUFFER_SIZE)
if not data:
break
remote_file.write(data)
extracting_file.close()
remote_file.flush()
remote_file.close()
yield remote_file_name
tf.close()
def run():
with beam.Pipeline() as p:
tarfiles = (p
| beam.Create([INPUT])
| fileio.MatchAll()
| fileio.ReadMatches()
| beam.FlatMap(_extract_tar_archive)
)
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment