Skip to content

Instantly share code, notes, and snippets.

@rom1504
Last active August 7, 2023 02:02
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rom1504/4bc1711908a407a66c91e0260c1dfce9 to your computer and use it in GitHub Desktop.
Save rom1504/4bc1711908a407a66c91e0260c1dfce9 to your computer and use it in GitHub Desktop.
Count tar. Generated by gpt4
"""
I have hundred thousand tar files on S3. I want to count the number of sample in each file and save the result in a json file. I would like this to be done in parallel with maximum speed. You can use fsspec and tar lib in python. Please send an implementation.
"""
import json
import concurrent.futures
import tarfile
import fsspec
from io import BytesIO
def count_samples_in_tar(tarfile_obj):
# This function should be replaced with your actual function to count samples in a tar file
return len(tarfile_obj.getnames())
def process_tar_file(file_path, fs):
with fs.open(file_path, 'rb') as f:
file_obj = BytesIO(f.read())
with tarfile.open(fileobj=file_obj) as tar:
sample_count = count_samples_in_tar(tar)
return file_path, sample_count
def count_samples_in_s3_bucket(bucket_name):
fs = fsspec.filesystem('s3')
file_paths = fs.ls(bucket_name)
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(process_tar_file, file_path, fs) for file_path in file_paths]
for future in concurrent.futures.as_completed(futures):
file_path, sample_count = future.result()
results[file_path] = sample_count
with open('results.json', 'w') as f:
json.dump(results, f)
if __name__ == "__main__":
count_samples_in_s3_bucket('your_bucket_name')
"""
Can you improve it to avoid reading the whole tar file to count the number of samples?
"""
import json
import concurrent.futures
import tarfile
import fsspec
import io
def process_tar_file(file_path, fs):
with fs.open(file_path, 'rb') as f:
with tarfile.open(fileobj=io.BufferedReader(f), mode='r|') as tar:
sample_count = sum(1 for _ in tar)
return file_path, sample_count
def count_samples_in_s3_bucket(bucket_name):
fs = fsspec.filesystem('s3')
file_paths = fs.ls(bucket_name)
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(process_tar_file, file_path, fs) for file_path in file_paths]
for future in concurrent.futures.as_completed(futures):
file_path, sample_count = future.result()
results[file_path] = sample_count
with open('results.json', 'w') as f:
json.dump(results, f)
if __name__ == "__main__":
count_samples_in_s3_bucket('your_bucket_name')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment