Skip to content

Instantly share code, notes, and snippets.

@amywieliczka
Last active April 28, 2020 21:17
Show Gist options
  • Save amywieliczka/67004b2701c7e38b55130975bdb47be6 to your computer and use it in GitHub Desktop.
Save amywieliczka/67004b2701c7e38b55130975bdb47be6 to your computer and use it in GitHub Desktop.
async fetching with python
import asyncio
import aiohttp
import aioboto3
import boto3
import xmltodict
import json
import time
async def build_fetchRequest(oai, resumptionToken):
try:
base = f"{oai.get('url')}?verb=ListRecords"
except KeyError:
print('no oai feed url')
if isinstance(resumptionToken[0], bool) and resumptionToken[0]:
# first request - url includes metadataPrefix and set
url = base
if oai.get('metadataPrefix'):
url = f"{url}&metadataPrefix={oai.get('metadataPrefix')}"
if oai.get('oai_set'):
url = f"{url}&set={oai.get('oai_set')}"
elif resumptionToken[0].get('#text'):
# next requests - url includes only resumptionToken
url = f"{base}&resumptionToken={resumptionToken[0].get('#text')}"
else:
url = None
return url
async def parse_httpResp(httpResp):
data = xmltodict.parse(httpResp)
records = data.get('OAI-PMH',
{}).get('ListRecords',
{}).get('record')
jsonl = "\n".join([json.dumps(record) for record in records])
resumptionToken = data.get('OAI-PMH',
{}).get('ListRecords',
{}).get('resumptionToken', False)
return jsonl, resumptionToken
"""oai_read_write
connects to s3 & http to get data from oai, process it, and write to s3
two side effects: increments page and resumptionToken to pass to next lambda
"""
async def oai_read_write(collection_id, page, resumptionToken, oai):
# https://pypi.org/project/aioboto3/
async with aioboto3.client("s3") as s3_client:
# https://docs.aiohttp.org/en/stable/client_quickstart.html#make-a-request
# Note: Don't create a session per request
async with aiohttp.ClientSession() as http_client:
fetchRequest = await build_fetchRequest(oai, resumptionToken)
while(fetchRequest):
async with http_client.get(fetchRequest) as response:
httpResp = await response.text()
records, nextResumptionToken = await parse_httpResp(httpResp)
# upload records to s3
bucket = 'amy-test-bucket'
key = f"{collection_id}/{time.strftime('%Y-%m-%d')}/{page[0]}.jsonl"
acl = 'bucket-owner-full-control'
try:
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_object
await s3_client.put_object(
ACL=acl,
Body=records,
Bucket=bucket,
Key=key)
except Exception as e:
print(e)
# side effects
page[0] += 1
resumptionToken[0] = nextResumptionToken
# next step for this while loop
fetchRequest = await build_fetchRequest(oai, resumptionToken)
async def timer(payload):
collection_id = payload.get('collection_id')
page = payload.get('page')
resumptionToken = payload.get('resumptionToken')
oai = payload.get('oai')
if not collection_id or not page or not resumptionToken or not oai:
print('required parameters not sent')
return None
# start timer
startTime = time.strftime('%X')
try:
# https://docs.python.org/3/library/asyncio-task.html#timeouts
await asyncio.wait_for(
oai_read_write(collection_id, page, resumptionToken,
oai),
60) # time in seconds - set to 15 for testing with this small collection
except asyncio.TimeoutError:
# time to spawn a new lambda
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda.html
lambda_client = boto3.client('lambda', region_name="us-west-2",)
lambda_client.invoke(
FunctionName="async-fetch-test",
InvocationType="Event", #invoke asynchronously
Payload=json.dumps({
'collection_id': collection_id,
'page': page,
'resumptionToken': resumptionToken,
'oai': oai
}).encode('utf-8')
)
print(f"NEW LAMBDA: resumptionToken: {resumptionToken}, page: {page}")
# await timer({
# 'collection_id': collection_id,
# 'page': page,
# 'resumptionToken': resumptionToken,
# 'oai': oai})
endTime = time.strftime('%X')
print(f"started at {startTime}")
print(f"finished at {endTime}")
def lambda_handler(payload, context):
asyncio.run(timer(payload))
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
# handler({
# 'collection_id': 27435,
# 'page': [0],
# 'resumptionToken': [True],
# 'oai': {
# 'url': "https://digicoll.lib.berkeley.edu/oai2d",
# 'metadataPrefix': "marcxml",
# 'oai_set': "sugoroku"
# }
# }, {})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment