Skip to content

Instantly share code, notes, and snippets.

@ftfarias
Last active September 24, 2018 16:40
Show Gist options
  • Save ftfarias/2b2d5146e85aaa7b445a50f87a16e04c to your computer and use it in GitHub Desktop.
Save ftfarias/2b2d5146e85aaa7b445a50f87a16e04c to your computer and use it in GitHub Desktop.
Read csv as dictionary from S3
import boto3
from io import TextIOWrapper, BytesIO
from gzip import GzipFile
import csv
import logging
import collections
from tqdm import tqdm_notebook as tqdm
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def read_dict(s3, table_name, index_names):
logger.info('Reading {}'.format(key))
result = {}
response = s3.get_object(Bucket=bucket, Key=table_name)
zipped = key.endswith('.gzip')
if zipped:
gzipped = GzipFile(None, 'rb', fileobj=response['Body'])
data = TextIOWrapper(gzipped)
else:
data = (x.decode('utf-8') for x in response['Body'].iter_lines())
#data = response['Body']
headers = []
index_headers = []
totals = collections.defaultdict(int)
input_csv = csv.reader(data, delimiter=';', quotechar="'")
for line_num,line in enumerate(tqdm(input_csv)):
if line_num == 0:
headers = line
#print(headers)
for i,h in enumerate(headers):
if h in index_names:
index_headers.append(i)
print('Adding header {} -> {}'.format(h,i))
if len(index_headers) != len(index_names):
logger.error('Index not found: lookinf for {}, found {}. Headers: {}'.format(index_names, index_headers, headers))
continue
keys = []
for h in index_headers:
keys.append(line[h])
key_str = '|'.join(keys)
if key_str in result:
logger.warn('Duplicated KEY: {} -> {}'.format(key_str, line))
result[key_str] = line
logger.info(f'{line_num:,} lines read')
return headers,result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment