Created
May 22, 2018 19:09
-
-
Save ftfarias/e34a39a4e079e2412ceb7ab87b4228ea to your computer and use it in GitHub Desktop.
How to read files from Amazon AWS S3 line by line
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import argparse | |
import elasticsearch | |
from io import TextIOWrapper | |
from gzip import GzipFile | |
import csv | |
fact_key = "/2018/05/15/mycsv_files" | |
BUCKET = 'csv_data' | |
print(f'Reading files at {fact_key}') | |
parser = argparse.ArgumentParser(description='S3 Reader') | |
parser.add_argument('token', type=str, help='6 digit mfa token', default='', nargs='?') | |
args = parser.parse_args() | |
token = args.token | |
if token: | |
# if supplied, the MFA token will authenticate thought API | |
sts_client = boto3.client('sts') | |
print('Assuming Role...') | |
# From the response that contains the assumed role, get the temporary | |
# credentials that can be used to make subsequent API calls | |
assumedRoleObject = sts_client.assume_role( | |
RoleArn="arn:aws:iam::1234:role/developer-role", | |
RoleSessionName="currentRoleSession", | |
DurationSeconds=3600, | |
SerialNumber="arn:aws:iam::1234:mfa/felipe.farias", | |
TokenCode=token | |
) | |
credentials = assumedRoleObject['Credentials'] | |
print('Credentials:') | |
print(credentials) | |
# Use the temporary credentials that AssumeRole returns to make a | |
# connection to Amazon S3 | |
s3 = boto3.client( | |
's3', | |
aws_access_key_id = credentials['AccessKeyId'], | |
aws_secret_access_key = credentials['SecretAccessKey'], | |
aws_session_token = credentials['SessionToken'], | |
) | |
else: | |
# Not token supplied, so runs with current user | |
s3 = boto3.client('s3') | |
# for bucket in s3.buckets.all(): | |
# print(bucket.name) | |
def process_file(key): | |
print(f'processing key {key}') | |
count = 0 | |
response = s3.get_object(Bucket=BUCKET, Key=key) | |
gzipped = GzipFile(None, 'rb', fileobj=response['Body']) | |
data = TextIOWrapper(gzipped) | |
input_csv = csv.reader(data, delimiter=';', quotechar='"') | |
for line in input_csv: | |
if count % 1000000 == 0: | |
print(f'{count:,}') | |
if count < 10: | |
print(line) | |
count += 1 | |
print(f'Processed {count:,} lines') | |
# Read all files from bucket/key | |
response = s3.list_objects(Bucket=BUCKET, Prefix=fact_key) | |
for row in response['Contents']: | |
file_key = row['Key'] | |
process_file(file_key) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment