Last active
April 28, 2020 21:17
-
-
Save amywieliczka/67004b2701c7e38b55130975bdb47be6 to your computer and use it in GitHub Desktop.
async fetching with python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import aiohttp | |
import aioboto3 | |
import boto3 | |
import xmltodict | |
import json | |
import time | |
async def build_fetchRequest(oai, resumptionToken): | |
try: | |
base = f"{oai.get('url')}?verb=ListRecords" | |
except KeyError: | |
print('no oai feed url') | |
if isinstance(resumptionToken[0], bool) and resumptionToken[0]: | |
# first request - url includes metadataPrefix and set | |
url = base | |
if oai.get('metadataPrefix'): | |
url = f"{url}&metadataPrefix={oai.get('metadataPrefix')}" | |
if oai.get('oai_set'): | |
url = f"{url}&set={oai.get('oai_set')}" | |
elif resumptionToken[0].get('#text'): | |
# next requests - url includes only resumptionToken | |
url = f"{base}&resumptionToken={resumptionToken[0].get('#text')}" | |
else: | |
url = None | |
return url | |
async def parse_httpResp(httpResp): | |
data = xmltodict.parse(httpResp) | |
records = data.get('OAI-PMH', | |
{}).get('ListRecords', | |
{}).get('record') | |
jsonl = "\n".join([json.dumps(record) for record in records]) | |
resumptionToken = data.get('OAI-PMH', | |
{}).get('ListRecords', | |
{}).get('resumptionToken', False) | |
return jsonl, resumptionToken | |
"""oai_read_write | |
connects to s3 & http to get data from oai, process it, and write to s3 | |
two side effects: increments page and resumptionToken to pass to next lambda | |
""" | |
async def oai_read_write(collection_id, page, resumptionToken, oai): | |
# https://pypi.org/project/aioboto3/ | |
async with aioboto3.client("s3") as s3_client: | |
# https://docs.aiohttp.org/en/stable/client_quickstart.html#make-a-request | |
# Note: Don't create a session per request | |
async with aiohttp.ClientSession() as http_client: | |
fetchRequest = await build_fetchRequest(oai, resumptionToken) | |
while(fetchRequest): | |
async with http_client.get(fetchRequest) as response: | |
httpResp = await response.text() | |
records, nextResumptionToken = await parse_httpResp(httpResp) | |
# upload records to s3 | |
bucket = 'amy-test-bucket' | |
key = f"{collection_id}/{time.strftime('%Y-%m-%d')}/{page[0]}.jsonl" | |
acl = 'bucket-owner-full-control' | |
try: | |
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_object | |
await s3_client.put_object( | |
ACL=acl, | |
Body=records, | |
Bucket=bucket, | |
Key=key) | |
except Exception as e: | |
print(e) | |
# side effects | |
page[0] += 1 | |
resumptionToken[0] = nextResumptionToken | |
# next step for this while loop | |
fetchRequest = await build_fetchRequest(oai, resumptionToken) | |
async def timer(payload): | |
collection_id = payload.get('collection_id') | |
page = payload.get('page') | |
resumptionToken = payload.get('resumptionToken') | |
oai = payload.get('oai') | |
if not collection_id or not page or not resumptionToken or not oai: | |
print('required parameters not sent') | |
return None | |
# start timer | |
startTime = time.strftime('%X') | |
try: | |
# https://docs.python.org/3/library/asyncio-task.html#timeouts | |
await asyncio.wait_for( | |
oai_read_write(collection_id, page, resumptionToken, | |
oai), | |
60) # time in seconds - set to 15 for testing with this small collection | |
except asyncio.TimeoutError: | |
# time to spawn a new lambda | |
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda.html | |
lambda_client = boto3.client('lambda', region_name="us-west-2",) | |
lambda_client.invoke( | |
FunctionName="async-fetch-test", | |
InvocationType="Event", #invoke asynchronously | |
Payload=json.dumps({ | |
'collection_id': collection_id, | |
'page': page, | |
'resumptionToken': resumptionToken, | |
'oai': oai | |
}).encode('utf-8') | |
) | |
print(f"NEW LAMBDA: resumptionToken: {resumptionToken}, page: {page}") | |
# await timer({ | |
# 'collection_id': collection_id, | |
# 'page': page, | |
# 'resumptionToken': resumptionToken, | |
# 'oai': oai}) | |
endTime = time.strftime('%X') | |
print(f"started at {startTime}") | |
print(f"finished at {endTime}") | |
def lambda_handler(payload, context): | |
asyncio.run(timer(payload)) | |
return { | |
'statusCode': 200, | |
'body': json.dumps('Hello from Lambda!') | |
} | |
# handler({ | |
# 'collection_id': 27435, | |
# 'page': [0], | |
# 'resumptionToken': [True], | |
# 'oai': { | |
# 'url': "https://digicoll.lib.berkeley.edu/oai2d", | |
# 'metadataPrefix': "marcxml", | |
# 'oai_set': "sugoroku" | |
# } | |
# }, {}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment