Skip to content

Instantly share code, notes, and snippets.

@dannyim
Last active September 27, 2022 17:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dannyim/1262640b009c495bee498909665f770c to your computer and use it in GitHub Desktop.
Save dannyim/1262640b009c495bee498909665f770c to your computer and use it in GitHub Desktop.
bp_tar_stream
"""
goal: try to stream the http response from a Spectra Logic BlackPearl through python's tarfile.
TODO:
- compute md5 checksum while extracting each member of the tarball, compare against existing checksums and retry a download if they don't match.
"""
import concurrent.futures
import logging
import os
import pprint
import tarfile
import time
from typing import Dict, Set
from ds3 import ds3
from ds3.ds3Helpers import Blob
# bucket = "scenes-anteas-06"
# object_keys = [
# "WV02/2013/12/103001002A29AD00/WV02_20131230225228_103001002A29AD00_13DEC30225228-M1BS-500106813060_01_P001.tar",
# "WV02/2013/12/103001002A6D9A00/WV02_20131221031756_103001002A6D9A00_13DEC21031756-M1BS-500106808150_01_P001.tar",
# "WV02/2013/12/103001002A6D9A00/WV02_20131221031756_103001002A6D9A00_13DEC21031756-M1BS-500106808150_01_P002.tar",
# "WV02/2013/12/10300100296E9900/WV02_20131216012331_10300100296E9900_13DEC16012331-P1BS-500216153070_01_P010.tar",
# "WV02/2013/12/10300100296E9900/WV02_20131216012330_10300100296E9900_13DEC16012330-P1BS-500216153070_01_P009.tar",
# "WV02/2013/12/10300100296E9900/WV02_20131216012329_10300100296E9900_13DEC16012329-P1BS-500216153070_01_P008.tar",
# "WV02/2013/12/10300100296E9900/WV02_20131216012328_10300100296E9900_13DEC16012328-P1BS-500216153070_01_P007.tar",
# "WV02/2013/12/10300100296E9900/WV02_20131216012327_10300100296E9900_13DEC16012327-P1BS-500216153070_01_P006.tar",
# "WV02/2013/12/10300100296E9900/WV02_20131216012326_10300100296E9900_13DEC16012326-P1BS-500216153070_01_P005.tar",
# "WV02/2013/12/10300100296E9900/WV02_20131216012330_10300100296E9900_13DEC16012330-M1BS-500216153070_01_P009.tar",
# "WV02/2013/12/10300100296E9900/WV02_20131216012331_10300100296E9900_13DEC16012331-M1BS-500216153070_01_P010.tar",
# ]
# this object is > 64GB and will be split into multiple parts
bucket = "dems"
object_keys = [
"setsm/scene/WV03/2017/01/WV03_20170120_1040010028905E00_1040010027193900_50cm_v040000.tar"
]
class SpectraObject(object):
def __init__(self, bucket: str, name: str, initial_blob):
self.bucket = bucket
self.name = name
self.blobs = [initial_blob]
self.current_offset = 0
r, w = os.pipe()
self.pipe_reader = r
self.pipe_writer = w
def __eq__(self, other):
return (self.bucket == other.bucket and self.name == other.name)
def __hash__(self):
return hash((self.name, self.bucket))
def get_blobs(self):
return self.blobs
def add_blob(self, blob):
self.blobs.append(blob)
def set_offset(self, offset):
self.current_offset = offset
def get_pipe_reader(self):
return self.pipe_reader
def get_pipe_writer(self):
return self.pipe_writer
def main():
retry_delay = 60
max_threads = 4
output_dir = "output"
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("app")
client = ds3.createClientFromEnv()
objects = [ds3.Ds3GetObject(name=key) for key in object_keys]
# keeping chunks in order will make it such that the chunks hang out in cache for less time. Not ideal performance-wise but we need
# to process blobs sequentially in order to stream the data to tarfile
req = ds3.GetBulkJobSpectraS3Request(bucket, objects, chunk_client_processing_order_guarantee="IN_ORDER", priority="URGENT")
bulkGetResult = client.get_bulk_job_spectra_s3(req)
result = bulkGetResult.result
# pprint.pprint(result)
# import sys
# sys.exit(0)
# each item is a chunk. each chunk has a list of blobs (with offsets) Use this to track state of downloads
job_id = result['JobId']
chunk_list = result['ObjectsList']
s3object_map: Dict[str, SpectraObject] = dict()
# used to keep track of remaining blobs
blob_set: Set[Blob] = set()
for chunk in chunk_list:
blob_list = chunk['ObjectList']
for blob in blob_list:
name = blob['Name']
length: int = int(blob['Length'])
offset: int = int(blob['Offset'])
cur_blob = Blob(name=name, length=length, offset=offset)
if name in s3object_map:
existing_object = s3object_map[name]
existing_object.add_blob(cur_blob)
else:
s3object_map[name] = SpectraObject(bucket, name, cur_blob)
blob_set.add(cur_blob)
while len(blob_set) > 0:
# check job status
# get list of chunks ready
req = ds3.GetJobChunksReadyForClientProcessingSpectraS3Request(job_id)
available_chunks = client.get_job_chunks_ready_for_client_processing_spectra_s3(req)
chunks = available_chunks.result['ObjectsList']
if len(chunks) <= 0:
logger.debug(f"No chunks available, the BP has not read anything from tape into cache yet, waiting {retry_delay} seconds before retrying")
time.sleep(retry_delay)
continue
# retrieve all available blobs concurrently. For multipart objects, retrieve the blobs serially
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
# For objects split into parts, we must be careful about the ordering in which the blobs are downloaded.
# Suppose a file is split into multiple blobs - they must not be downloaded concurrently since
# in order for us to stream the data to tar, we must ensure that the blobs are downloaded sequentially.
for chunk in chunks:
for blob in chunk['ObjectList']:
name: str = blob['Name']
length: int = int(blob['Length'])
offset: int = int(blob['Offset'])
cur_blob = Blob(name=name, length=length, offset=offset)
if cur_blob in blob_set:
# if the blob is part of a larger object, hold off on downloading this blob until all previous blobs have completed downloading.
s3object = s3object_map[cur_blob.name]
if offset == s3object.current_offset:
logger.debug(f"offset matches the current offset of {s3object.current_offset}, proceeding to download")
# okay to download
executor.submit(stream_download, client, bucket, s3object, offset, length, job_id)
executor.submit(extract, s3object, output_dir)
blob_set.remove(cur_blob)
else:
logger.debug(f"blob for object {name} at offset {offset} is ready for download, but we must wait for the preceding blobs of the object to finish downloading first")
# should we wait longer here?
time.sleep(retry_delay)
continue
executor.shutdown(wait=True)
def stream_download(client, bucket, s3object, offset, length, job_id):
w = s3object.get_pipe_writer()
stream = open(w, 'wb')
print(f"downloading {s3object.name} at offset {offset} with length {length} bytes")
print(f"job id: {job_id}")
client.get_object(ds3.GetObjectRequest(bucket_name=bucket,
object_name=s3object.name,
stream=stream,
offset=offset,
job=job_id,
# PGC does not use versioning, we can ignore
version_id=None))
print(f"setting offset to {length}. Previous offset: {offset}")
s3object.set_offset(length)
stream.close()
def extract(s3object, output_dir):
print(f"extracting members from tarball to {output_dir}")
r = s3object.get_pipe_reader()
tarball = tarfile.open(fileobj=open(r, 'rb'), mode="r|")
for member in tarball:
print(f"extracting {member}")
tarball.extract(member, output_dir)
print("done extracting")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment