Skip to content

Instantly share code, notes, and snippets.

Last active September 27, 2022 17:19
Show Gist options
  • Save dannyim/1262640b009c495bee498909665f770c to your computer and use it in GitHub Desktop.
Save dannyim/1262640b009c495bee498909665f770c to your computer and use it in GitHub Desktop.
goal: try to stream the http response from a Spectra Logic BlackPearl through python's tarfile.
- compute md5 checksum while extracting each member of the tarball, compare against existing checksums and retry a download if they don't match.
import concurrent.futures
import logging
import os
import pprint
import tarfile
import time
from typing import Dict, Set
from ds3 import ds3
from ds3.ds3Helpers import Blob
# bucket = "scenes-anteas-06"
# object_keys = [
# "WV02/2013/12/103001002A29AD00/WV02_20131230225228_103001002A29AD00_13DEC30225228-M1BS-500106813060_01_P001.tar",
# "WV02/2013/12/103001002A6D9A00/WV02_20131221031756_103001002A6D9A00_13DEC21031756-M1BS-500106808150_01_P001.tar",
# "WV02/2013/12/103001002A6D9A00/WV02_20131221031756_103001002A6D9A00_13DEC21031756-M1BS-500106808150_01_P002.tar",
# "WV02/2013/12/10300100296E9900/WV02_20131216012331_10300100296E9900_13DEC16012331-P1BS-500216153070_01_P010.tar",
# "WV02/2013/12/10300100296E9900/WV02_20131216012330_10300100296E9900_13DEC16012330-P1BS-500216153070_01_P009.tar",
# "WV02/2013/12/10300100296E9900/WV02_20131216012329_10300100296E9900_13DEC16012329-P1BS-500216153070_01_P008.tar",
# "WV02/2013/12/10300100296E9900/WV02_20131216012328_10300100296E9900_13DEC16012328-P1BS-500216153070_01_P007.tar",
# "WV02/2013/12/10300100296E9900/WV02_20131216012327_10300100296E9900_13DEC16012327-P1BS-500216153070_01_P006.tar",
# "WV02/2013/12/10300100296E9900/WV02_20131216012326_10300100296E9900_13DEC16012326-P1BS-500216153070_01_P005.tar",
# "WV02/2013/12/10300100296E9900/WV02_20131216012330_10300100296E9900_13DEC16012330-M1BS-500216153070_01_P009.tar",
# "WV02/2013/12/10300100296E9900/WV02_20131216012331_10300100296E9900_13DEC16012331-M1BS-500216153070_01_P010.tar",
# ]
# this object is > 64GB and will be split into multiple parts
bucket = "dems"
object_keys = [
class SpectraObject(object):
def __init__(self, bucket: str, name: str, initial_blob):
self.bucket = bucket = name
self.blobs = [initial_blob]
self.current_offset = 0
r, w = os.pipe()
self.pipe_reader = r
self.pipe_writer = w
def __eq__(self, other):
return (self.bucket == other.bucket and ==
def __hash__(self):
return hash((, self.bucket))
def get_blobs(self):
return self.blobs
def add_blob(self, blob):
def set_offset(self, offset):
self.current_offset = offset
def get_pipe_reader(self):
return self.pipe_reader
def get_pipe_writer(self):
return self.pipe_writer
def main():
retry_delay = 60
max_threads = 4
output_dir = "output"
logger = logging.getLogger("app")
client = ds3.createClientFromEnv()
objects = [ds3.Ds3GetObject(name=key) for key in object_keys]
# keeping chunks in order will make it such that the chunks hang out in cache for less time. Not ideal performance-wise but we need
# to process blobs sequentially in order to stream the data to tarfile
req = ds3.GetBulkJobSpectraS3Request(bucket, objects, chunk_client_processing_order_guarantee="IN_ORDER", priority="URGENT")
bulkGetResult = client.get_bulk_job_spectra_s3(req)
result = bulkGetResult.result
# pprint.pprint(result)
# import sys
# sys.exit(0)
# each item is a chunk. each chunk has a list of blobs (with offsets) Use this to track state of downloads
job_id = result['JobId']
chunk_list = result['ObjectsList']
s3object_map: Dict[str, SpectraObject] = dict()
# used to keep track of remaining blobs
blob_set: Set[Blob] = set()
for chunk in chunk_list:
blob_list = chunk['ObjectList']
for blob in blob_list:
name = blob['Name']
length: int = int(blob['Length'])
offset: int = int(blob['Offset'])
cur_blob = Blob(name=name, length=length, offset=offset)
if name in s3object_map:
existing_object = s3object_map[name]
s3object_map[name] = SpectraObject(bucket, name, cur_blob)
while len(blob_set) > 0:
# check job status
# get list of chunks ready
req = ds3.GetJobChunksReadyForClientProcessingSpectraS3Request(job_id)
available_chunks = client.get_job_chunks_ready_for_client_processing_spectra_s3(req)
chunks = available_chunks.result['ObjectsList']
if len(chunks) <= 0:
logger.debug(f"No chunks available, the BP has not read anything from tape into cache yet, waiting {retry_delay} seconds before retrying")
# retrieve all available blobs concurrently. For multipart objects, retrieve the blobs serially
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
# For objects split into parts, we must be careful about the ordering in which the blobs are downloaded.
# Suppose a file is split into multiple blobs - they must not be downloaded concurrently since
# in order for us to stream the data to tar, we must ensure that the blobs are downloaded sequentially.
for chunk in chunks:
for blob in chunk['ObjectList']:
name: str = blob['Name']
length: int = int(blob['Length'])
offset: int = int(blob['Offset'])
cur_blob = Blob(name=name, length=length, offset=offset)
if cur_blob in blob_set:
# if the blob is part of a larger object, hold off on downloading this blob until all previous blobs have completed downloading.
s3object = s3object_map[]
if offset == s3object.current_offset:
logger.debug(f"offset matches the current offset of {s3object.current_offset}, proceeding to download")
# okay to download
executor.submit(stream_download, client, bucket, s3object, offset, length, job_id)
executor.submit(extract, s3object, output_dir)
logger.debug(f"blob for object {name} at offset {offset} is ready for download, but we must wait for the preceding blobs of the object to finish downloading first")
# should we wait longer here?
def stream_download(client, bucket, s3object, offset, length, job_id):
w = s3object.get_pipe_writer()
stream = open(w, 'wb')
print(f"downloading {} at offset {offset} with length {length} bytes")
print(f"job id: {job_id}")
# PGC does not use versioning, we can ignore
print(f"setting offset to {length}. Previous offset: {offset}")
def extract(s3object, output_dir):
print(f"extracting members from tarball to {output_dir}")
r = s3object.get_pipe_reader()
tarball =, 'rb'), mode="r|")
for member in tarball:
print(f"extracting {member}")
tarball.extract(member, output_dir)
print("done extracting")
if __name__ == "__main__":
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment