Skip to content

Instantly share code, notes, and snippets.

@aroder
Last active January 8, 2021 00:16
Show Gist options
  • Save aroder/24463ba2878f6bfd452bd3853886a540 to your computer and use it in GitHub Desktop.
Save aroder/24463ba2878f6bfd452bd3853886a540 to your computer and use it in GitHub Desktop.
import base64
import json
import multiprocessing
import sys
import boto3
import botocore
INPUT_FILE = 'all_raw_files.txt'
S3_RAW_BUCKET_NAME = '____'
CLIENT_CODE = '____'
LAMBDA_FUNCTION_ARN = '____'
BOTOCORE_CONFIG = botocore.config.Config(connect_timeout=60*15, read_timeout=60*15, retries={'max_attempts': 1})
def get_aws_service_client(service):
session = boto3.session.Session(profile_name=f'admin-{CLIENT_CODE}')
service_client = session.client(service, config=BOTOCORE_CONFIG)
return service_client
def get_raw_bucket_object_keys(input_file):
raw_object_keys = []
with open(input_file, 'r') as f:
for line in f:
raw_object_keys.append(line.rstrip())
return raw_object_keys
def get_event(bucket, key):
return {
'Records': [{
's3': {
'bucket': {
'name': bucket
},
'object': {
'key': key
}
}
}]
}
def run_event(key):
event = get_event(S3_RAW_BUCKET_NAME, key)
trigger_event(event)
def trigger_event(event):
lambda_service = get_aws_service_client('lambda')
print(f'Triggering preprocessor for {event["Records"][0]["s3"]["object"]["key"]}. Waiting for reponse...')
# print(json.dumps(event))
res = lambda_service.invoke(
FunctionName = LAMBDA_FUNCTION_ARN,
InvocationType='RequestResponse',
Payload=json.dumps(event)
)
code = res["StatusCode"]
message = json.loads(res["Payload"].read())
print(f'Received HTTP {code} response with message: {message}')
print()
if __name__ == '__main__':
s3 = get_aws_service_client('s3')
raw_object_keys = get_raw_bucket_object_keys(INPUT_FILE)
pool = multiprocessing.Pool()
pool.map(run_event, raw_object_keys)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment