Skip to content

Instantly share code, notes, and snippets.

@a10y
Created September 16, 2023 01:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save a10y/b8a5ee757052985d4347e728fcb3fdcd to your computer and use it in GitHub Desktop.
Save a10y/b8a5ee757052985d4347e728fcb3fdcd to your computer and use it in GitHub Desktop.
# Pull data from SAM.gov, extract all attachments, push to S3 storage
import subprocess
import json
import os
from pathlib import Path
from multiprocessing import Pool
WASABI_ENDPOINT = "https://s3.wasabisys.com"
# Worker
def do_upload(
solicitation_number: str,
idx: int,
doc_url: str,
folder_path: Path,
bucket: str,
endpoint: str,
):
# strip any leading slash on folder_path for uploads
if folder_path.is_absolute():
folder_path = folder_path.relative_to("/")
# localize the file based on solicitation number + idx in resourceLinks list
local_path = f"{solicitation_number}_{idx}"
subprocess.run(["wget", doc_url, "-O", local_path])
# Check if the filetype matches a known set of file types
file_type = subprocess.run(
["file", local_path], stdout=subprocess.PIPE
).stdout.decode("utf-8")
if "PDF" in file_type:
remote_path = folder_path / f"{local_path}.pdf"
elif "Microsoft Word" in file_type:
remote_path = folder_path / f"{local_path}.docx"
elif "Microsoft Excel" in file_type:
remote_path = folder_path / f"{local_path}.xlsx"
elif "JPEG" in file_type:
remote_path = folder_path / f"{local_path}.jpeg"
elif "PNG" in file_type:
remote_path = folder_path / f"{local_path}.png"
else:
# Unknown: don't include a file extension
print(f"unknown file type: {file_type}")
remote_path = folder_path / local_path
upload_command = [
"aws",
"s3",
f"--endpoint-url={endpoint}",
"cp",
local_path,
f"s3://{bucket}/{str(remote_path)}",
]
try:
subprocess.run(upload_command, check=True, stderr=subprocess.PIPE)
# Delete the temp input file
os.remove(local_path)
print(f"uploaded {remote_path}")
except subprocess.CalledProcessError as e:
print(f"s3 upload subprocess failed for {local_path}: {e}")
def worker_wrapper(args):
do_upload(*args)
def main():
import sys
assert len(sys.argv) >= 3, f"{__file__} opportunities.json bucket_name directory"
[fname, bucket, directory] = sys.argv[1:]
directory = Path(directory)
with open(fname, "r") as f:
opportunity_data = json.load(f)
for opportunity in opportunity_data["opportunitiesData"]:
solicitation_number = opportunity["solicitationNumber"]
links = opportunity["resourceLinks"]
if type(links) != list:
continue
# Submit to the processing pool to download all files
tasks = [
(solicitation_number, idx, link, directory, bucket, WASABI_ENDPOINT)
for (idx, link) in enumerate(links)
]
with Pool(16) as pool:
results = [pool.apply_async(do_upload, task) for task in tasks]
for res in results:
res.get()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment