Skip to content

Instantly share code, notes, and snippets.

@bubthegreat
Created July 2, 2022 09:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save bubthegreat/bbcb4a7ac8c22a884d7b2d4f39c45ccc to your computer and use it in GitHub Desktop.
Save bubthegreat/bbcb4a7ac8c22a884d7b2d4f39c45ccc to your computer and use it in GitHub Desktop.
Gets cigna payer transparency files
import json
import requests
import asyncio
import re
import os
import aiohttp
import aiofiles
import time
FILES_PATH = "files"
GOOD_FILES = os.path.join(FILES_PATH,"good")
BAD_FILES = os.path.join(FILES_PATH,"bad")
async def download_file(url, filename):
ftype = filename.split('cost-transparency.mrf.')[-1].split('.reporting')[0]
if not os.path.exists(os.path.join(GOOD_FILES, ftype)):
os.mkdir(os.path.join(GOOD_FILES, ftype))
if not os.path.exists(os.path.join(BAD_FILES, ftype)):
os.mkdir(os.path.join(BAD_FILES, ftype))
async with aiohttp.ClientSession() as session:
async with session.get(url) as r:
if r.status != 200:
fpath = os.path.join(os.path.join(BAD_FILES, ftype), filename)
print(f"Error downloading file {filename} from url {url}: http response code {r.status}")
else:
fpath = os.path.join(os.path.join(GOOD_FILES, ftype), filename)
async with aiofiles.open(fpath, mode='wb+') as f:
await f.write(await r.read())
# Creating same file name
# with _done appended to know that file has been downloaded
# print(f'Finished downloading {filename}')
def get_file_urls(input_file):
with open(input_file, "r") as f:
data = json.load(f)
for key in data.keys():
d = data.get(key)
ktype = type(d)
if ktype == list:
for fileinfo_dict in d:
# print(fileinfo_dict)
file_url = get_file_url(fileinfo_dict)
if 'company_empty' not in file_url:
yield file_url
def read_in_chunks(file_object, chunk_size=1024):
"""Lazy function (generator) to read a file piece by piece.
Default chunk size: 1k."""
data = file_object.read(chunk_size)
while data:
yield data
data = file_object.read(chunk_size)
file_object.close()
def get_file_url(fileinfo_dict):
strdict = str(fileinfo_dict)
file_url = strdict.split("location':")[-1].strip().strip("}").strip("'")
return file_url
def get_filename_from_url(file_url):
filename = file_url.split('?')[0].split('cloudfront.net')[-1].strip('/').replace('/', '.').replace("=", '-').replace('_', '-')
return filename
def print_file(filename):
# await download_file(file_loc, filename)
print(f"Downloaded file {filename}")
with open(filename, "rb") as f:
try:
fdata = json.load(f)
print(fdata)
except UnicodeDecodeError:
# Some of these come across as gzipped files, possibly based on file sizes
print("Could not parse this one in json:")
fdata = json.loads(gzip.decompress(open(filename, 'rb').read()))
print(fdata)
finally:
print("Some wonky shit here yo")
for chonk in read_in_chunks(f):
print(chonk)
async def download_file_from_url(url):
filename = get_filename_from_url(url)
await download_file(url, filename)
return filename
async def gather_with_concurrency(n, *tasks):
semaphore = asyncio.Semaphore(n)
async def sem_task(task):
async with semaphore:
return await task
return await asyncio.gather(*(sem_task(task) for task in tasks))
if __name__ == "__main__":
metadata_file = "2022-07-01_cigna-health-life-insurance-company_index.json"
metadata_file_url = "https://d25kgz5rikkq4n.cloudfront.net/cost_transparency/mrf/table-of-contents/reporting_month=2022-07/2022-07-01_cigna-health-life-insurance-company_index.json?Expires=1660438220&Policy=eyJTdGF0ZW1lbnQiOlt7IlJlc291cmNlIjoiaHR0cHM6Ly9kMjVrZ3o1cmlra3E0bi5jbG91ZGZyb250Lm5ldC9jb3N0X3RyYW5zcGFyZW5jeS9tcmYvdGFibGUtb2YtY29udGVudHMvcmVwb3J0aW5nX21vbnRoPTIwMjItMDcvMjAyMi0wNy0wMV9jaWduYS1oZWFsdGgtbGlmZS1pbnN1cmFuY2UtY29tcGFueV9pbmRleC5qc29uIiwiQ29uZGl0aW9uIjp7IkRhdGVMZXNzVGhhbiI6eyJBV1M6RXBvY2hUaW1lIjoxNjYwNDM4MjIwfX19XX0_&Signature=LMaexmKyE5pVwnkPZY-JbpIJdVBFO9UAuX1bxAi4lG7epYGNeagBhu-x5SzUZroH~Bo3MwfEOj0hYQWzJi4Ir3y5GRhcjgoLmBaK5eGAChd~QuVMG9kdAst529mCichuXGqkeJLybFSf~1GBdOP1qpoBhHAeDuQqLDRMfQ991WtpGPw-xYctFpZ6hcmf1BTnlcbkjb8YzmIVOUmEpdhoqmtegoSbCVRtyM-t~VRlSmrqsckrds9reneY6lojMwzPZn0vRMMif0ttC9l059HC8iP4~9douwNYLx2ONqljrfXxv5uXjcOkq4iK~jidt0GtLJCIOxd3p3dlUx~j8BL~-Q__&Key-Pair-Id=K1NVBEPVH9LWJP"
if not os.path.exists(FILES_PATH):
os.mkdir(FILES_PATH)
if not os.path.exists(GOOD_FILES):
os.mkdir(GOOD_FILES)
if not os.path.exists(BAD_FILES):
os.mkdir(BAD_FILES)
if not os.path.exists(metadata_file):
await download_file(metadata_file_url, metadata_file)
file_urls_list = list(get_file_urls(metadata_file))
file_futures = [download_file_from_url(url) for url in file_urls_list]
count = len(file_futures)
print(f"Starting download of {count} files.")
start = time.time()
result = await gather_with_concurrency(100, *file_futures)
finish = time.time()
total = finish - start
print(f"Finished download of {len(file_urls_list)} files in {total} seconds")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment